Skip to content

PostHog/millpond

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

88 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Millpond — Kafka to DuckLake or Iceberg

A standalone Python app that consumes from a Kafka topic and writes to a lake table. Single thread, single loop, no Kafka Connect. One deployment writes to exactly one destination — either DuckLake, Apache Iceberg via direct PyIceberg commit, or Iceberg via the bundled icebox writer/committer split (production-scale Iceberg). Selected via MILLPOND_DESTINATION (ducklake | iceberg | icebox).

Contents: Naming | Why | Architecture | Destinations | icebox | Record Handling | Adaptive Backpressure | Performance | Resource Footprint | Setup | Development | Configuration | Releases | Deployment | Partitioning | Object Sizing | Error Handling | Multiple Pipelines | AWS Credential Isolation | Operational Notes | tools

Naming

A mill pond

millpond (noun): a pond created by damming a stream to produce a head of water for operating a mill. — Merriam-Webster

Millpond accumulates a stream of Kafka records until a threshold is reached, then releases them into a downstream lake. Like a mill pond feeding a lake.

Why

Kafka Connect imposes ~1100 lines of lock management, scheduled executors, and rebalance handling to work around its lack of backpressure and explicit offset control. Millpond replaces all of that with:

loop:
  consume() → JSON → Arrow → accumulate
  when buffer full or time elapsed:
    write to lake → commit offsets

Single thread, single loop. Kafka is the buffer. Offset commit is explicit (after successful write only). No data loss window.

Architecture

K8s StatefulSet (N replicas)
  └─ Pod (ordinal 0..N-1)
       └─ Single loop: consume → convert → [filter] → accumulate → [sort] → flush → commit
  • One topic and one table per deployment
  • Static partition assignment via pod ordinal — no consumer groups
  • If a pod dies, its partitions stop being consumed until K8s restarts it
  • Optional filter and sort stages — see Record Handling below

Destinations

Millpond writes to one of two lake formats, selected at startup by MILLPOND_DESTINATION (default ducklake). A single deployment writes to exactly one — there is no per-batch routing. Switching destinations requires redeploying with different env vars; the at-rest data is not portable between the two without a separate migration.

DuckLake Iceberg
Catalog Postgres (via DuckDB ducklake extension) REST catalog (e.g. Polaris, Tabular, AWS Glue REST adapter)
Storage S3 / S3-compatible S3 / S3-compatible
Reader ecosystem DuckDB-native; growing third-party support Broad (Spark, Trino, Athena, Snowflake, DuckDB ≥1.5)
Partitioning Caller-supplied via DUCKLAKE_PARTITION_BY; arbitrary DDL expression Hardcoded identity transforms on derived year/month/day/hour int32 columns; Hive-style layout
Schema evolution DuckDB DDL (ADD COLUMN IF NOT EXISTS, ALTER COLUMN SET DATA TYPE with widening enforcement) PyIceberg update_schema() transaction (single commit per flush)
Maintenance tooling Bundled (tools/ducklake_maintenance.py CronJob, tools/ducklake_metrics.py daemon) Not bundled — use your catalog's native compaction/expiry
_inserted_at column Added at INSERT via DuckDB NOW() (per-row, microsecond drift possible within a flush) Added at write time in Python (single timestamp shared by every row in a flush)
Multi-pod concurrent writes Native; idempotent DDL handles races Native; PyIceberg optimistic concurrency + retry loop handles races

The selection is a thin Protocol-based abstraction (millpond/sink.py) — main.py only sees Sink.write(batch), reset_caches(), close(). Both implementations are in their own module (ducklake.py, iceberg.py).

For Iceberg at production scale, writers do not commit to the catalog directly. They emit parquet to S3 and POST /v1/files to icebox — a writer/committer split that serializes commits from many concurrent writer pods through a single committer thread per (namespace, table), eliminating PyIceberg's REST-catalog optimistic-concurrency contention. See icebox/README.md for the full design, endpoint contract, and operational notes.

Record Handling

Two optional stages sit between Kafka conversion and the sink. Both are disabled when their env vars are unset.

Allowlist filter

Drops records whose value in a configured field is not in a configured allowlist. Applied immediately after JSON→Arrow conversion, before records enter the pending buffer.

MILLPOND_FILTER_KEEP_FIELD_NAME=team_id
MILLPOND_FILTER_VALUES=2,4,1956,69

Values auto-detect: tokens that all parse as integers become an int allowlist; otherwise the whole list is treated as strings.

Two skip reasons are tracked on millpond_records_skipped_total:

  • filter_field_missing — column absent from this batch's schema, null for that row, or column type is not filterable (only integer and string columns are supported; bool, float, timestamp, struct, list, etc. are rejected explicitly to avoid silent surprising matches under PyArrow's safe=True cast semantics).
  • filter_excluded — column present and non-null but value not in the allowlist. Expected steady-state drop reason.

MILLPOND_FILTER_DROP_FIELD_NAME is reserved at the config layer (mutex with keep) and currently rejected at startup. It will become a denylist filter in a future release without env-var churn.

Pre-write sort

Sorts the consolidated batch by one or more columns ascending, right before sink.write(). Both DuckLake and Iceberg sinks see pre-sorted data, which improves Parquet compression (especially for low-cardinality keys like team_id) and downstream reader predicate pushdown.

MILLPOND_SORT_BY=team_id,timestamp

Sort order is left-to-right (team_id primary, timestamp secondary). Direction is ascending only today; if you need descending, file an issue. PyArrow's sort is stable, so equal-key rows preserve their consume order.

If any sort field is missing from a batch's schema, the sort is skipped (records still flow through, just unsorted), millpond_sort_skipped_total{reason="field_missing"} increments by the record count, and a warning logs once per distinct missing-fields pattern (per pod lifetime — prevents log floods under sustained misconfiguration).

Per-flush cost is ~50–200 ms on a 256 MB / 30k-row batch. Peak memory roughly doubles during the sort because pa.Table.take() rewrites a fresh copy of every column; budget accordingly relative to the pod's memory limit.

Adaptive Backpressure

The consume batch size automatically scales based on how full the pending buffer is relative to the flush threshold. When the buffer is empty, millpond consumes at full speed. As the buffer approaches the flush size, the batch size drops proportionally, smoothing throughput during catchup and traffic spikes. OOM prevention comes from bounding librdkafka's internal fetch buffer via queued.max.messages.kbytes (16MB per partition).

fullness = pending_bytes / flush_size
batch_size = max(10, int(CONSUME_BATCH_SIZE * (1.0 - fullness)))

Metrics: millpond_buffer_fullness and millpond_consume_batch_size_current.

Performance

The hot path is all C/C++: librdkafka → orjson → PyArrow → DuckDB (zero-copy Arrow scan). Python is glue.

Resource Footprint

Kafka Connect worker Millpond pod
Memory request 4-8Gi (JVM heap) 256Mi
Memory limit 8-16Gi 512Mi
Steady-state ~4GB (JVM + framework + GC headroom) ~250-300MB

No JVM, no framework, no GC heap overhead. ~16x less memory per pod. The entire runtime is C/C++ libraries with a Python glue layer.

Setup

Requires Flox:

flox activate
just sync
just run

Development

just fmt               # format code
just lint              # lint code
just test              # run unit tests (includes both backends' suites + cross-backend equivalence)
just test-integration  # run integration tests (local DuckDB + MinIO/iceberg-rest via testcontainers)
just test-e2e          # run E2E tests (docker-compose, builds stack automatically)
just ci                # format check + lint + unit tests
just up                # start docker-compose stack (DuckLake — plaintext Kafka)
just up-ssl            # start docker-compose stack (DuckLake — SSL Kafka, closer to prod)
just down              # stop docker-compose stack
just down-ssl          # stop SSL docker-compose stack

The just up / just up-ssl dev stacks are DuckLake-only. For Iceberg local dev, the integration test fixture in tests/integration/compose.yaml brings up MinIO + a tabulario/iceberg-rest catalog; that stack is what the iceberg integration tests use and what to point at for ad-hoc Iceberg work.

SSL Kafka Testing

The just up-ssl recipe generates self-signed certs and runs Kafka with SSL listeners, matching the production MSK configuration. This exercises the KAFKA_CONSUMER_* env var override path that isn't tested with plaintext Kafka.

Requires Docker (uses keytool from the Kafka container image for cert generation).

DuckLake Maintenance and state metrics

The tools/ directory ships two DuckLake-only operational binaries inside the same image as the writer:

  • tools/ducklake_maintenance.py — CLI for snapshot expiry, file cleanup, orphan recovery, tiered compaction, fsck. Runs as a K8s CronJob.
  • tools/ducklake_metrics.py — Long-running Prometheus-exposition daemon for catalog-side lake-state metrics. Runs as a single-replica Deployment.

Subcommand and YAML schema reference, full env-var contract, and the just recipe inventory live in tools/README.md. Both binaries reuse the writer's DUCKLAKE_RDS_* / DUCKDB_S3_* / DUCKLAKE_DATA_PATH env vars.

Configuration

All configuration via environment variables.

Shared (always required)

Variable Required Default Description
KAFKA_BOOTSTRAP_SERVERS yes Kafka broker addresses
KAFKA_TOPIC yes Topic to consume
REPLICA_COUNT yes Number of StatefulSet replicas (must match spec.replicas)
MILLPOND_DESTINATION no ducklake Destination: ducklake, iceberg (direct PyIceberg commit), or icebox (Iceberg via the bundled writer/committer split — see icebox/README.md). Case-insensitive; empty/whitespace falls back to ducklake.
FLUSH_SIZE no 104857600 Flush after this many bytes of accumulated Arrow data (default 100MB)
FLUSH_INTERVAL_MS no 60000 Flush after this many ms
GROUP_ID no millpond-{topic}-{table_label_part} Kafka group.id — used for offset storage in __consumer_offsets only, no consumer group semantics. Changing this loses committed offsets and triggers full replay. {table_label_part} is derived from the destination table identifier in millpond/config.py (the namespace prefix only shows up in metrics/client.id, not group.id).
CONSUME_BATCH_SIZE no 1000 Max messages per consume() call — amortizes Python↔C boundary cost
FETCH_MIN_BYTES no 1048576 Broker accumulates at least this many bytes before responding (1MB)
FETCH_MAX_WAIT_MS no 500 Max broker wait when fetch.min.bytes not yet satisfied
STATS_INTERVAL_MS no 5000 librdkafka internal stats emission interval (0 to disable)
LOG_LEVEL no INFO Python log level (DEBUG, INFO, WARNING, ERROR)

DuckLake (required when MILLPOND_DESTINATION=ducklake)

Variable Required Default Description
DUCKLAKE_TABLE yes Target DuckLake table name
DUCKLAKE_DATA_PATH yes S3 path for DuckLake data files
DUCKLAKE_CONNECTION yes DuckDB connection string
DUCKLAKE_RDS_HOST yes Postgres host for DuckLake metadata
DUCKLAKE_RDS_PORT no 5432 Postgres port
DUCKLAKE_RDS_DATABASE no ducklake Postgres database name
DUCKLAKE_RDS_USERNAME no ducklake Postgres username
DUCKLAKE_RDS_PASSWORD yes Postgres password
DUCKLAKE_PARTITION_BY no Hive-style partition expression (e.g. year(_inserted_at),month(_inserted_at),day(_inserted_at),hour(_inserted_at)). Applied via ALTER TABLE SET PARTITIONED BY on first write.
DUCKDB_S3_ACCESS_KEY_ID yes Static S3 access key for DuckDB
DUCKDB_S3_SECRET_ACCESS_KEY yes Static S3 secret for DuckDB
DUCKDB_S3_REGION no S3 region
DUCKDB_S3_ENDPOINT no S3 endpoint override (MinIO, etc.)
DUCKDB_S3_USE_SSL no true / false
DUCKDB_S3_URL_STYLE no vhost / path

Iceberg (required when MILLPOND_DESTINATION=iceberg)

Variable Required Default Description
ICEBERG_CATALOG_URI yes REST catalog endpoint (e.g. https://catalog.example.com)
ICEBERG_WAREHOUSE yes Warehouse identifier, typically the S3 root (s3://warehouse/)
ICEBERG_NAMESPACE yes Catalog namespace (validated as a safe identifier)
ICEBERG_TABLE yes Target table name within the namespace
ICEBERG_TABLE_LOCATION no Explicit s3://... table location; if unset, catalog picks
ICEBERG_CATALOG_TOKEN no Bearer / OAuth token for the REST catalog
MILLPOND_S3_ACCESS_KEY_ID yes Static S3 access key for PyIceberg's PyArrow S3 filesystem
MILLPOND_S3_SECRET_ACCESS_KEY yes Static S3 secret
MILLPOND_S3_REGION yes S3 region
MILLPOND_S3_ENDPOINT no S3 endpoint override (MinIO, etc.)

MILLPOND_S3_* is a separate env var family from DUCKDB_S3_* deliberately — they target different client libraries, and a deployment switch from DuckLake to Iceberg should be a clean swap of env vars rather than re-using the DuckDB-specific names.

icebox (required when MILLPOND_DESTINATION=icebox)

The icebox path reuses the entire Iceberg env-var block above (writers still need to know the catalog URI, warehouse, namespace, table, and S3 credentials so the catalog/file metadata they POST to the icebox is correctly addressed) and adds the writer-to-icebox transport variables:

Variable Required Default Description
ICEBOX_URL yes In-cluster base URL of the icebox service (e.g. http://icebox-events.megaberg.svc:8000)
ICEBOX_BUCKET yes S3 bucket the writer drops parquet into (the icebox reads the path from the POST body; it does not re-derive it)
ICEBOX_WAREHOUSE_PREFIX yes Prefix under ICEBOX_BUCKET for staged parquet files (e.g. kafka/events/data)
ICEBOX_MAX_ATTEMPTS no 6 Max attempts for POST /v1/files against the icebox (the writer's flush-side retry loop, separate from the lake-write retry)
ICEBOX_MAX_BACKOFF_S no 30 Max backoff between icebox POST retries (exponential, capped at this value)
ICEBOX_TIMEOUT_S no 10 Per-request timeout against the icebox API

The icebox itself has its own env-var contract (ICEBOX_* consumed by icebox/main.py — Postgres connection, committer cadence, etc.). Those are documented in icebox/README.md and live in the icebox deployment, not the writer.

Optional record handling

See Record Handling for context. All four variables below are optional; unset means the corresponding stage is disabled.

Variable Required Default Description
MILLPOND_FILTER_KEEP_FIELD_NAME no Column name to check against the allowlist. Must be set with MILLPOND_FILTER_VALUES. Validated as a safe identifier.
MILLPOND_FILTER_DROP_FIELD_NAME no Reserved for a future denylist filter; setting it today raises at startup. Mutually exclusive with MILLPOND_FILTER_KEEP_FIELD_NAME.
MILLPOND_FILTER_VALUES no Comma-separated allowed values. Auto-detected as int if every token parses as an integer, string otherwise. Required when either filter field name is set.
MILLPOND_SORT_BY no Comma-separated column names; the batch is sorted ascending by these in tuple order before each write. Missing fields cause the sort to be skipped (records still flow).

Releases

Every merge to main automatically:

  1. Bumps the patch version (v0.0.1v0.0.2)
  2. Builds and pushes a Docker image to ghcr.io/posthog/millpond:<tag>
  3. Creates a GitHub release with changelog

Images: ghcr.io/posthog/millpond:v0.0.X or ghcr.io/posthog/millpond:latest

Deployment

kubectl apply -f k8s/service.yaml
kubectl apply -f k8s/pdb.yaml
kubectl apply -f k8s/statefulset.yaml

Partition count is discovered at startup via admin.list_topics(topic=cfg.topic, timeout=30) (an AdminClient, not the consumer instance). Each pod computes its partition assignment from its ordinal:

my_partitions = [p for p in range(partition_count) if p % replica_count == ordinal]

Updating

Rolling updates are a poor fit — pods with different REPLICA_COUNT values cause double-assignment or gaps. Since Kafka is the durable buffer:

  1. Canary: Deploy one pod with the new version, verify metrics
  2. Graceful shutdown: Scale to 0 (pods flush and commit)
  3. Full redeploy: Update image/config, scale back up from committed offsets

Downtime = drain time + startup time (~2-3 min). Kafka buffers trivially.

Never kubectl scale without updating REPLICA_COUNT. Use Helm to manage both atomically.

Partitioning

Partitioning is per-destination — DuckLake takes a caller-supplied expression, Iceberg is hardcoded.

DuckLake

Set DUCKLAKE_PARTITION_BY to enable Hive-style partitioning on S3. Files are written into key=value/ directories (e.g. year=2026/month=3/day=23/hour=21/*.parquet), enabling S3 prefix filtering, bulk lifecycle rules, and partition discovery by external tools.

DUCKLAKE_PARTITION_BY="year(_inserted_at),month(_inserted_at),day(_inserted_at),hour(_inserted_at)"

Partition on _inserted_at (always a real TIMESTAMP), not source timestamp fields (typically VARCHAR). Applied via ALTER TABLE SET PARTITIONED BY on first write — idempotent, safe for multiple pods and restarts. If added to an existing unpartitioned table, new files get HSP layout while old files remain flat; DuckLake queries both transparently via metadata.

Iceberg

The partition spec is hardcoded: identity transforms on four int32 columns (year, month, day, hour) derived from _inserted_at at write time. This produces the same Hive-style layout as DuckLake — year=2026/month=3/day=23/hour=21/*.parquet — for the same S3-prefix-filter and lifecycle reasons. There is no env var; every Iceberg deployment gets the same spec.

Trade-off: Iceberg doesn't know the four columns are derived from _inserted_at, so reader queries need to filter on the partition columns explicitly to get pruning. A future spec evolution can layer hidden partitioning on top without rewriting data if reader ergonomics start to matter; not needed today.

Object Sizing

S3 throughput scales with object size — small objects (<1MB) waste per-request overhead, while larger objects (128MB+) maximize GET/PUT throughput. Millpond flushes are triggered by whichever comes first: FLUSH_SIZE (Arrow bytes in memory) or FLUSH_INTERVAL_MS (wall clock). The resulting Parquet file is typically 3-4x smaller than the Arrow representation due to columnar encoding and compression.

At steady state with moderate volume, most flushes are time-triggered — the interval expires before the size ceiling is hit. Object size is therefore driven by: (msgs/s per pod) × (bytes/msg as Parquet) × (flush interval).

Sizing by volume

Assuming ~366 bytes/row in Parquet (7-column event schema), 512 partitions, 8 replicas (64 partitions/pod):

Per-partition msg/s Total msg/s Per-pod msg/s Parquet/file @60s Parquet/file @90s Memory/pod @90s
500 256K 32K ~11MB ~17MB 512Mi
1K 512K 64K ~23MB ~34MB 512Mi
2K 1M 128K ~45MB ~68MB 512Mi
4K 2M 256K ~90MB ~135MB 640Mi
9.5K (peak) 4.9M 608K ~213MB ~320MB 1Gi

Recommended settings for ~128MB target objects

For a pipeline averaging 4K msg/s per partition with 512 partitions and 8 replicas:

FLUSH_SIZE: "1073741824"       # 1GB Arrow ceiling (safety valve for burst/catchup)
FLUSH_INTERVAL_MS: "90000"     # 90s — produces ~135MB Parquet at mean volume

Memory limit: 640Mi (90s × 256K msg/s × ~1KB Arrow/msg ≈ ~230MB Arrow + DuckDB + librdkafka overhead).

At peak (9.5K/partition), the size trigger fires at ~35s producing ~320MB objects — acceptable, and the pod stays within 1Gi.

When to add a merge job

If your volume is low enough that time-triggered flushes produce <10MB objects, run periodic compaction. The ducklake_maintenance.py compact subcommand implements a tiered strategy: small files merge frequently into medium files, medium files merge less often into large files. Tier ranges, target_file_size save/restore semantics, and the --threads / --memory-limit knobs are documented in tools/README.md. This is an out-of-band maintenance operation, not part of the hot path.

See the sizing calculator for interactive estimates.

Error Handling and Retries

The flush path has two failure points, each with its own retry policy:

Operation Attempts Backoff between failures On exhaustion
Lake write 3 1s, 2s (last attempt raises immediately) Re-raise → pod crashes, K8s restarts, replays from last committed offset
Offset commit 3 0.5s, 1s (last attempt raises immediately) Re-raise → pod crashes, replays from last committed offset (duplicates bounded by one flush batch)

Both use errors_total{type="write_retry"} and errors_total{type="offset_commit"} counters so transient vs persistent failures are distinguishable in dashboards.

The write-retry loop catches Exception broadly to cover every backend's failure modes — duckdb.Error for DuckLake; pyiceberg.exceptions.CommitFailedException, CommitStateUnknownException, ServerError, ServiceUnavailableError for direct-Iceberg REST catalog 5xx; httpx.HTTPError + the IceboxResponseError / IceboxBackpressureExhausted raised by millpond/icebox_sink.py for the icebox-path POST failures; OSError for S3; KafkaException for broker disconnects. Each retry invokes sink.reset_caches() to drop cached table/schema state so the next attempt re-checks the catalog (covers the case where another pod evolved the schema or recreated the table between attempts).

Why crash after exhausting retries? A persistent write failure means S3 or the catalog is down — continuing would just accumulate pending data in memory until OOM. A persistent commit failure means the Kafka coordinator is unreachable — the write already succeeded, but without committed offsets the next restart will replay the batch (at-least-once duplicates). In both cases, crashing lets K8s apply its restart backoff, and Kafka holds the data safely until the dependency recovers.

Multiple Pipelines

Each topic→table mapping is a separate StatefulSet. The application doesn't change — just the env vars. Template with Helm:

# values.yaml
pipelines:
  events:
    topic: clickhouse_events_json
    table: events
    partitions: 512
    replicas: 8
  sessions:
    topic: clickhouse_sessions_json
    table: sessions
    partitions: 64
    replicas: 4
  logs:
    topic: app_logs
    table: logs
    partitions: 128
    replicas: 8

One range over pipelines in the StatefulSet template produces N independent StatefulSets. Adding a pipeline is adding a block to values.yaml and running helm upgrade.

AWS Credential Isolation

Millpond uses two separate AWS credential paths that must not interfere with each other:

Component Auth Credential source
Kafka (MSK) SASL/OAUTHBEARER IRSA (standard AWS credential chain)
S3 (lake data files) Static IAM keys DUCKDB_S3_* (DuckLake) or MILLPOND_S3_* (Iceberg)

Neither backend uses the standard AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY env vars — those take precedence in the credential chain and would shadow the IRSA role used for Kafka authentication. DuckDB-specific names for DuckLake, Millpond-specific names for Iceberg. The two families are deliberately separate so a deployment switch between destinations is a clean env-var swap rather than a re-use.

DuckDB's aws extension does not support IRSA — it cannot perform the AssumeRoleWithWebIdentity token exchange that IRSA requires. PyIceberg's S3 access is similarly handled via static credentials passed through catalog properties (s3.access-key-id etc.) to keep the IRSA token out of the S3 client's credential resolution. Same isolation pattern, different transport.

Operational Notes

Periodic MSK IAM auth errors

When using MSK IAM authentication (SASL/OAUTHBEARER), you will see periodic bursts of connection reset by peer and SASL OAUTHBEARER mechanism handshake failed errors in the logs every ~48 minutes. These are expected and harmless.

librdkafka does not re-authenticate on existing connections when the OAUTHBEARER token refreshes (KIP-255). Instead, the MSK broker closes the connection when the old token expires (~15 min lifetime), and librdkafka reconnects with the refreshed token. The ~48 minute interval corresponds to the IRSA projected token refresh (80% of the default 1-hour TTL).

The errors come from librdkafka's internal logger (the %3|...|FAIL| lines) and bypass Python's log formatting. They auto-resolve within seconds with no data loss.

Related issues:

Note

This project should absolutely be called TableFowl, but that would be an SEO and linguistic palaver.


Photo: Public Domain, Wikimedia Commons

About

Purpose-built Kafka to Ducklake or Iceberg ingestion pipeline

Topics

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages