Skip to content

icebox: polling-daemon rewrite (replaces HTTP/cycle service)#72

Merged
jghoman merged 8 commits into
mainfrom
icebox-polling-daemon
Jun 8, 2026
Merged

icebox: polling-daemon rewrite (replaces HTTP/cycle service)#72
jghoman merged 8 commits into
mainfrom
icebox-polling-daemon

Conversation

@jghoman

@jghoman jghoman commented Jun 5, 2026

Copy link
Copy Markdown
Collaborator

Summary

Replaces the icebox HTTP service + cycle state machine with a polling-daemon committer per (Iceberg namespace, table). Net ~9000 lines deleted. Driven by the docs/icebox-self-healing-recovery.md v6 design after the 02:51 incident where coordinated bounce of all six `*-icebox-coord` pods wedged in `recover_in_flight_cycles`.

Architecture:

```
32× millpond writers ──INSERT───▶ icebox_files (PG)
(Parquet to S3) │

icebox daemon

├──tick──▶ Lakekeeper
└──tick──▶ Kafka offset commit (post-tx)
```

Writers INSERT file metadata directly into a per-deployment PG schema's `icebox_files` table. The daemon polls every 60s, batches eligible rows (older than the age filter), commits to Iceberg via PyIceberg, and advances Kafka offsets AFTER the PG transaction commits AND after the pool connection is returned — preserving the invariant "Kafka offset committed iff PG knows the file's fate".

Key design decisions (full discussion in the v6 doc):

  • SKIP LOCKED + chart-level `replicas:1 + Recreate` replaces the cycle-era PG advisory lock.
  • PyIceberg silently accepts duplicate file_paths (verified); downstream UUID dedup absorbs the rare crash-after-Iceberg-before-PG window.
  • Per-row `build_data_file` partitioning: one bad row no longer poisons the batch.
  • `with_timeout` wraps `commit_data_files` because PyIceberg has no native timeout — bounds row-lock hold time during Lakekeeper degradation.
  • Boot-seeds the heartbeat before the daemon thread starts so a slow first tick doesn't race the kubelet probe.

What's deleted: `icebox/api.py`, `icebox/committer.py` (cycle state machine + recover_in_flight_cycles), `icebox/postgres_async.py`, `icebox/schema_cache.py`, `fastapi`, `uvicorn`, `httpx`, `asyncpg` deps.

What's new: `icebox/daemon.py` (tick + loop + DaemonDeps), `icebox/timeout.py` (with_timeout wrapper), stdlib `ThreadingHTTPServer` in `icebox/main.py` for `/healthz` + `/metrics` (no FastAPI/uvicorn).

Commits (logical units, reviewable in order):

  1. `3cd7242` — Add `icebox_files` schema + polling-daemon helpers (foundation)
  2. `8462ef9` — `with_timeout` + `commit_data_files` cycle_id optional
  3. `266f431` — Polling daemon tick + loop + v6 metrics
  4. `b812c8e` — Rip out HTTP/cycle machinery (the big diff)
  5. `1f353e4` — Address PE+QE review findings + integration test suite
  6. `4fe0e7f` — README / AGENT / icebox-README updates

Test plan

  • Full unit suite: 880 passed, 2 xfailed (pre-existing), 0 regressions
  • Integration suite (testcontainers PG + mocked Lakekeeper): 49 passed
    • 9 daemon tick tests (happy path, age filter, SKIP LOCKED, transport/batch failure, crash mid-tick, heartbeat-on-every-exit)
    • 4 writer-side INSERT/replay tests
    • 3 shutdown/drain tests (incl. post-tx Kafka commit wiring)
    • 6 probe HTTP server tests (200, 503 stale/NULL/PG-unreachable, /metrics, 404)
    • 3 migration idempotency tests
    • 2 boot-sequence heartbeat-seed tests
  • PE + QE reviews completed (two rounds each); all BLOCKERs and convergent MAJORs addressed in `1f353e4`
  • Doc-vs-code review verified all concrete claims (env vars, defaults, metric names, failure model rows)
  • Chart-side update (drop `/v1/files` ingress, drop degraded-mode 503 config, point writers at `ICEBOX_PG_*`, update probe timing) — separate PR in PostHog/charts
  • mw-dev rollout (drop existing schemas, redeploy)
  • mw-prod-us rollout after mw-dev soak

Out of scope (filed as follow-up)

  • Real-Lakekeeper docker-compose integration harness (deferred per v6 doc "Out of scope")
  • Snapshot expiration (`expire_snapshots` job) — separate operational concern
  • Kafka AdminClient wire-shape pin canary
  • Probe-tick concurrency starvation load test under `pool_max=4`

jghoman added 6 commits June 5, 2026 15:05
Foundation commit for the polling-daemon rewrite (see
docs/icebox-self-healing-recovery.md v6). Adds the new icebox_files
table and helpers alongside the existing cycle-era code; no behavior
change yet. The cycle machinery will be removed in a follow-up once
the new daemon is in place.

icebox/schema.py:
  - CREATE_ICEBOX_FILES with result enum ('pending'|'committed'|'failed'
    via CHECK), UNIQUE(file_path), no cycle/staged/committed columns.
  - CREATE_ICEBOX_FILES_PENDING_IDX partial on inserted_at where
    result='pending' — daemon hot SELECT is bounded by O(pending).
  - Both DDL entries appended to ALL_DDL.
  - IceboxPendingFileRow Pydantic model (distinct from cycle-era
    IceboxFileRow until that's removed).

icebox/postgres_sync.py:
  - claim_pending_batch: SELECT ... FOR UPDATE SKIP LOCKED with age
    filter and LIMIT, returns list[IceboxPendingFileRow].
  - mark_committed: UPDATE result='committed', result_at=now(),
    iceberg_snapshot_id=...; no-op on empty ids.
  - mark_failed: UPDATE result='failed', result_at=now(); leaves
    iceberg_snapshot_id NULL as audit signal.

Tests:
  - SQL structural assertions (SKIP LOCKED, filters, ORDER BY,
    CHECK constraint, UNIQUE, partial-index predicate).
  - Call-shape mocks for each helper, including empty-ids no-ops.
  - Negative tests guarding the rewrite invariants: no cycle_id in
    new SQL, no cycle-era columns in new DDL, no FK to commit_cycles.
  - Pydantic boundary tests (string-keyed kafka_offsets).
Adds the thread-based timeout wrapper the polling daemon will use to
defend against a wedged Lakekeeper (PyIceberg has no native timeout on
commit_data_files) and makes commit_data_files's cycle_id parameter
optional so the new daemon can call it without a UUID.

icebox/timeout.py (new):
  - with_timeout(seconds, fn): runs fn on a daemon thread, joins with
    a wall-clock budget, raises TimeoutError if exceeded. Re-raises
    any exception from fn verbatim (including BaseException).
  - Documents the can't-kill-Python-threads caveat: a timed-out call
    keeps running in the background and is reclaimed when the process
    exits (k8s does this whenever the heartbeat goes stale).

icebox/iceberg.py:
  - commit_data_files: cycle_id is now optional (UUID | None = None).
    None means no posthog.icebox.cycle_id stamp in snapshot summary;
    the kwarg goes away entirely once cycle code is removed.
  - Empty-data-files ValueError no longer references cycle_id.
  - Producer-snapshot-id-None RuntimeError similarly cycle_id-free.

Tests:
  - test_icebox_timeout.py: 8 tests — happy path (incl. None return),
    Exception and BaseException propagation, sub-second budgets,
    TimeoutError on over-budget call, daemon-thread flag, fast call
    doesn't add measurable overhead.
  - test_icebox_iceberg.py: 3 new tests for cycle_id=None path —
    empty snapshot_properties, kwarg default is None, error message
    no longer references cycle_id. All 11 existing cycle_id=cycle
    tests still pass.

Cycle code in committer.py keeps calling commit_data_files with
cycle_id=...; that path goes away in the cycle-cleanup commit.
Adds the v6 polling daemon alongside the cycle code. The new daemon
reads icebox_files via SELECT FOR UPDATE SKIP LOCKED, commits batches
to Iceberg, and advances Kafka offsets — with the heartbeat-on-every-
exit-path semantic ("icebox stays up no matter what"). Cycle code is
untouched and will be removed in a follow-up; main.py wiring also
follows.

icebox/daemon.py (new):
  - daemon_tick: claim_pending_batch → build_data_file → commit_data_files
    wrapped in with_timeout → mark_committed → kafka commit → heartbeat.
  - Transient bucket (requests + TimeoutError + CommitFailedException +
    CommitStateUnknownException): rows stay pending, no Kafka commit,
    heartbeat fires, return OUTCOME_TRANSPORT_FAILURE.
  - Non-transport bucket (anything else, incl. build_data_file errors):
    mark rows failed, ADVANCE Kafka offsets past the batch ("make
    progress" — duplicate-tolerance covers replay), heartbeat,
    OUTCOME_BATCH_FAILURE.
  - Vacuous: heartbeat, OUTCOME_VACUOUS.
  - daemon_loop: opens a PG tx per cadence, calls daemon_tick, then
    refresh_state_gauges (read-only). PG errors bump
    icebox_pg_unreachable_total + loop continues.
  - No advisory lock. SKIP LOCKED + chart-level singleton replaces it;
    multi-replica is correctness-safe (offset commits may briefly
    reorder at coordinator — writer replay handles).

icebox/config.py:
  - iceberg_timeout_s (default 5.0, env ICEBOX_ICEBERG_TIMEOUT_S):
    wall-clock budget for commit_data_files via with_timeout.
  - age_filter_seconds (default 60.0, env ICEBOX_AGE_FILTER_SECONDS):
    pending rows younger than this aren't eligible (batching).
  - Both placed after the existing defaulted fields so the dataclass
    field-order constraint holds.

icebox/metrics.py:
  - State gauges (icebox_files_count{result}, icebox_files_bytes{result},
    icebox_files_oldest_pending_seconds).
  - Per-tick histograms (icebox_tick_duration_seconds{outcome},
    icebox_iceberg_commit_duration_seconds, icebox_kafka_commit_duration_seconds,
    icebox_batch_size).
  - Throughput counters (icebox_files_committed_total_v6 — v6 suffix
    during the rollout to avoid collision, icebox_files_failed_total,
    icebox_records_committed_total).
  - Liveness (icebox_last_success_at, icebox_ticks_total{outcome}).
  - Failure-mode counters (icebox_lakekeeper_failures_total,
    icebox_batch_failures_total, icebox_pg_unreachable_total,
    icebox_iceberg_timeout_total).
  - initialize_outcome_counters() instantiates per-label zero
    observations so /metrics exports every (outcome, result) label
    from boot — Grafana queries against unseen labels otherwise
    return no data and look like 'metric not implemented'.

Tests:
  - test_icebox_daemon.py: 22 tests — vacuous, success path with
    mark_committed + kafka offsets, transport failure parametrized
    across requests.{Timeout,ConnectionError,HTTPError,RequestException}
    + CommitFailedException + CommitStateUnknownException, with_timeout
    firing → transport classification + iceberg_timeout counter,
    batch failure marks failed + advances offsets, build_data_file
    error treated as batch failure, kafka commit swallowed on both
    success + batch-failure branches, kafka helper edges,
    refresh_state_gauges (incl. resets-to-zero and -1 sentinel),
    loop PG error → counter + keeps going, loop interleaves tick +
    refresh_state_gauges, initialize_outcome_counters called on
    startup, _TRANSIENT_EXCEPTIONS guard.
  - test_icebox_metrics.py: 7 new tests covering the v6 surface.

Test results: 1069 passed (+40), 2 xfailed, 0 regressions.

Cycle code (committer.py, recover_in_flight_cycles, advisory lock,
api.py, etc.) is unchanged; main.py still launches committer_loop.
The cleanup commit will swap main.py to daemon_loop and delete the
cycle machinery.
Completes the polling-daemon rewrite started in 3cd7242 / 8462ef9 /
266f431. Deletes the HTTP service surface, the cycle state machine,
the advisory lock, the schema-fingerprint cache, the async PG pool,
and every supporting test. Writers now INSERT directly into icebox_files;
the daemon polls, commits to Iceberg, and advances Kafka offsets.

Net: 9682 lines deleted, 738 added. ~9000-line reduction.

Deleted:
  - icebox/api.py (FastAPI service)
  - icebox/committer.py (cycle state machine, recover_in_flight_cycles,
    _recover_one, A/B/C branches)
  - icebox/postgres_async.py (asyncpg pool used by HTTP handlers)
  - icebox/schema_cache.py (fingerprint cache for HTTP perimeter)
  - tests/unit/test_icebox_{api,committer,postgres_async,schema_cache}.py
  - tests/integration/{conftest.py,test_icebox_docker.py,test_icebox_e2e.py}
  - Dependencies: httpx, fastapi, uvicorn, asyncpg

Renamed:
  - IceboxPgClient -> IceboxClient (only client now)
  - ICEBOX_FILES_COMMITTED_TOTAL_V6 -> ICEBOX_FILES_COMMITTED_TOTAL

Modified:
  - icebox/main.py: stdlib ThreadingHTTPServer serving /healthz +
    /metrics; daemon thread instead of committer thread. /healthz
    reads status.last_committer_heartbeat directly.
  - icebox/iceberg.py: commit_data_files's cycle_id arg gone;
    CYCLE_ID_SUMMARY_KEY and find_snapshot_for_cycle deleted.
  - icebox/postgres_sync.py: collapses to ~5 helpers
    (claim_pending_batch, mark_committed, mark_failed,
    update_heartbeat, migrations + bootstrap).
  - icebox/schema.py: commit_cycles + old files DDL gone. status row
    simplified to (id, last_committer_heartbeat).
  - icebox/metrics.py: all cycle-era metrics gone. The 6 Iceberg
    table-state gauges (ICEBERG_TABLE_*) kept and now updated by the
    daemon's success path (ported _update_table_state_gauges into
    daemon.py).
  - icebox/structured_logging.py: cycle_id_var ContextVar + filter
    gone. Uvicorn silencing gone (no uvicorn).
  - icebox/config.py: dropped asyncpg_pool_*,
    committer_degraded_failure_threshold,
    schema_fingerprint_cache_ttl_seconds. Default
    committer_max_pending_files is now 100 (was 1000) - caps bad-
    batch blast radius per v6 doc.
  - millpond/icebox_sink.py: HTTP IceboxClient + IceboxResponseError +
    IceboxBackpressureExhausted deleted. The (renamed) IceboxClient
    is psycopg-only.
  - millpond/config.py: dropped icebox_url, icebox_max_attempts,
    icebox_max_backoff_s, icebox_timeout_s. PG fields required when
    destination='icebox'.
  - millpond/sink.py: icebox dispatch is one branch, no client choice.
  - millpond/logging_config.py: millpond.icebox.url resource attr
    replaced with millpond.icebox.pg.host.

Behavior preserved:
  - Iceberg table-state gauges still advance on every successful tick.
  - Heartbeat / liveness semantics unchanged (probe checks
    now() - last_committer_heartbeat against
    cadence * stale_multiple).
  - Graceful shutdown drain budget min(cadence * 5, 600s).

Tests: 877 passed, 2 xfailed, 0 regressions.

Charts repo update (drop /v1/files ingress, drop degraded-mode 503
config, point writers at ICEBOX_PG_*, update probe timing) is a
separate PR.
Fixes 2 BLOCKERs and 3 MAJORs from the principal-engineer review plus
2 BLOCKERs and 5 MAJORs from the QE lead review. Adds a testcontainers-
backed integration suite covering daemon tick semantics, SKIP LOCKED
concurrency, writer-side INSERT, shutdown drain, probe HTTP server,
migrations, and the boot sequence.

PE-B1 (boot heartbeat never seeded):
  icebox/main.py stamps an initial heartbeat AFTER apply_migrations
  and BEFORE the daemon thread starts. Without this, a slow first
  tick on a fresh pod races the kubelet probe and yields
  CrashLoopBackOff on a degraded Lakekeeper.

PE-B2 (commit_kafka_offsets ran inside conn.transaction()):
  TickResult now carries `rows_to_commit_offsets`. daemon_loop runs
  the Kafka commit AFTER the PG tx commits AND after the pool conn
  is returned to the pool. Holds the doc invariant "Kafka offset
  committed iff PG knows the file's fate" even on crash between
  PG COMMIT and Kafka commit (cumulative-offset recovery on the
  next tick). Returning the conn before the AdminClient RPC also
  keeps the pool-sizing rationale honest — a 30s Kafka stall no
  longer pins 1 of 4 pool conns idle.

PE-M1 (one bad row poisoned the whole batch):
  Per-row try/except in daemon_tick partitions rows into good/bad.
  Good rows get committed with the Iceberg snapshot id; bad rows
  get marked failed inline (same tx); rows_to_commit_offsets covers
  both so the writer advances past all of them.

PE-M2 (daemon_loop hot-spin on PoolClosed):
  Explicit `except psycopg_pool.PoolClosed: break`. Distinct from
  `psycopg.Error` handler (which still increments
  ICEBOX_PG_UNREACHABLE_TOTAL and continues for legitimate PG
  outages).

PE-M3 (probe handler starved on a 2-slot pool):
  ICEBOX_PSYCOPG_POOL_MAX default 4 (was 2), floor 3. The daemon's
  tick conn + refresh_state_gauges conn + concurrent /healthz +
  /metrics scrape need at least 4 slots; floor 3 is the documented
  minimum (tick + one probe).

QE-B3 (SKIP LOCKED test passed by serialization):
  test_two_concurrent_ticks_disjoint_via_skip_locked rewritten with
  an event-gated commit_data_files mock. Thread A signals
  `a_holding_locks` after its SELECT FOR UPDATE has acquired locks
  but BEFORE returning from commit_data_files; the test waits on
  that event, starts thread B, joins B (asserting B did NOT block —
  which would prove SKIP LOCKED is broken), then releases A.

QE-B4 + QE-M1 (probe HTTP server untested incl. staleness boundary):
  New tests/integration/test_icebox_probe_integration.py. Six tests
  cover /healthz 200 (fresh heartbeat), 503 (NULL heartbeat), 503
  (stale heartbeat — 10s old vs 3s threshold), 503 (PG unreachable
  via closed pool), /metrics 200 with prometheus content-type, and
  404 fall-through. Probe-server helpers bind on port 0 (OS-assigned)
  to avoid the bind-then-rebind race.

QE-M2 (migration runner idempotency untested):
  New tests/integration/test_icebox_migrations_integration.py. Three
  tests: double-apply against real PG, partial-rerun recovery (drop
  the index, re-apply, verify recreated), and a string-level
  ALL_DDL idempotency guard catching any non-idempotent DDL added
  later.

Boot-sequence verification (QE follow-up):
  New tests/integration/test_icebox_boot_integration.py. Source-
  inspection test asserts main.main() contains apply_migrations,
  ps.update_heartbeat, and daemon_thread.start() in that order — the
  cheapest way to catch deletion of the seed step. End-to-end PG
  test clears the heartbeat and verifies update_heartbeat stamps
  it non-NULL.

Loop-wires-Kafka-commit integration test (QE follow-up):
  test_daemon_loop_invokes_kafka_commit_after_tx_commits in the
  shutdown integration file runs daemon_loop for one cadence interval
  against real PG, spies on the Kafka commit, and verifies the
  merged max-offsets dict, group_id, and topic match the rows the
  tick surfaced via rows_to_commit_offsets.

Integration suite:
  - tests/integration/conftest.py: session-scoped Postgres container
    via testcontainers; per-test isolated schema with apply_migrations
    run against it; helpers (insert_pending_row, select_result,
    heartbeat_age_seconds) for tick assertions.
  - tests/integration/test_icebox_daemon_integration.py: 9 tests
    covering happy path, age filter, SKIP LOCKED disjoint, transport
    failure (rows stay pending), batch failure (rows marked failed +
    rows_to_commit_offsets surfaced), crash mid-tick (PG-side
    recovery), heartbeat stamped on every tick exit path.
  - tests/integration/test_icebox_writer_integration.py: 4 tests
    covering writer INSERT 201, replay 409 returning the same row id,
    distinct paths get distinct rows, INSERTed row visible to daemon
    claim_pending_batch.
  - tests/integration/test_icebox_shutdown_integration.py: 3 tests
    covering daemon_loop drain within budget, already-open pool,
    Kafka commit wired post-tx.
  - tests/integration/test_icebox_probe_integration.py: 6 tests for
    /healthz + /metrics.
  - tests/integration/test_icebox_migrations_integration.py: 3 tests
    for migration idempotency.
  - tests/integration/test_icebox_boot_integration.py: 2 tests for
    the boot sequence's heartbeat seed.

Tests: 880 unit, 49 integration, 2 xfailed, 0 regressions.

Deferred (filed as follow-up):
  - Probe-tick concurrency starvation under pool=4 (load test).
  - Kafka AdminClient wire-shape pin canary (paired with the
    Lakekeeper-harness deferral).
  - Partial-rerun depth for table/seed loss (covered by string-level
    ALL_DDL idempotency check + dropped-index recovery test).
  - Multi-replica Kafka offset regression (writer replay absorbs it;
    documented in design doc).
  - SIGTERM-before-serve_forever race (kubelet kill-after-grace
    covers it).
Updates the three top-of-tree docs to describe the v6 polling-daemon
architecture; removes ~9000 lines of stale references to the deleted
HTTP/cycle service (FastAPI, uvicorn, httpx, asyncpg, /v1/files, cycle
state machine, advisory lock, cycle_id ContextVar, etc.).

icebox/README.md:
  - Rewrites the architecture diagram (writers INSERT into icebox_files;
    daemon polls and commits to Iceberg + advances Kafka offsets).
  - Adds an explicit failure-model table covering all branches of
    daemon_tick / daemon_loop (transport vs batch failure, crash mid-
    tick, crash after Iceberg before PG UPDATE, crash after PG before
    Kafka, Lakekeeper unreachable, PG unreachable / PoolClosed).
  - Documents the probe HTTP server surface (/healthz with stale +
    NULL + PG-unreachable conditions, /metrics).
  - Lists all metrics actually defined in icebox/metrics.py, including
    the ICEBERG_TABLE_* gauges the daemon updates on success.
  - Replaces the cycle-era config table with the daemon's actual env
    vars and defaults verified against icebox/config.py:
    - ICEBOX_PG_USERNAME defaults to "lakekeeper" (reuses Lakekeeper's
      role); ICEBOX_ICEBERG_WAREHOUSE defaults to "ingest";
      ICEBOX_KAFKA_EXTRA_CONFIG defaults to "{}".
    - Adds ICEBOX_API_HOST/_PORT and ICEBOX_PSYCOPG_POOL_MIN to the
      config table (previously missing).
  - Drops the cycle-era operational notes (advisory lock + Aurora
    failover) and adds the v6 design-property note that SKIP LOCKED
    plus chart-level replicas:1 replaces the advisory lock.

README.md:
  - Reframes the icebox bullet around direct PG INSERT.
  - Updates the writer-side icebox env-var table to ICEBOX_PG_HOST/_*
    (replaces ICEBOX_URL/_MAX_ATTEMPTS/_MAX_BACKOFF_S/_TIMEOUT_S).
  - Notes that icebox_files lives in a per-deployment PG schema (one
    schema per (namespace, table)), not a globally shared table.
  - Fixes the write-retry-loop prose to catch psycopg.Error raised by
    IceboxClient.register_file (instead of the deleted
    IceboxResponseError / IceboxBackpressureExhausted / httpx errors).

AGENT.md:
  - Rewrites the "icebox - polling-daemon committer" section: writers
    INSERT directly, daemon polls + commits + advances offsets after
    the PG tx commits AND after the pool conn is returned.
  - Removes fastapi / uvicorn / httpx / asyncpg from the dependency
    table; calls out their explicit removal in the polling-daemon
    rewrite.
  - Updates the icebox/ source tree (api.py / committer.py /
    postgres_async.py / schema_cache.py deleted; daemon.py + timeout.py
    added).
  - Adds shared/structured_logging.py to the shared/ tree (was missing).

Code-vs-doc review fed each concrete claim through the actual source.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR rewrites the Icebox architecture from an HTTP/cycle-based committer service to a polling daemon that commits directly from a Postgres icebox_files table per (Iceberg namespace, table), reducing surface area (removing FastAPI/uvicorn/httpx/asyncpg) while preserving the writer/committer split and Kafka offset commit invariants.

Changes:

  • Replace Icebox HTTP API + cycle state machine with a polling daemon (daemon_loop/daemon_tick) and a minimal stdlib probe server (/healthz, /metrics).
  • Switch writer-side Icebox integration from HTTP POSTs to direct psycopg INSERTs into icebox_files, with updated config/env and tests.
  • Add/reshape DDL, metrics, and integration test coverage around polling, SKIP LOCKED behavior, probe behavior, migrations idempotency, and shutdown/drain.

Reviewed changes

Copilot reviewed 50 out of 51 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tests/unit/test_sink.py Updates sink config fixtures and adds assertions for PG-backed Icebox client wiring.
tests/unit/test_pyiceberg_pin.py Removes cycle_id snapshot summary pinning; updates references to daemon.
tests/unit/test_millpond_logging_config.py Updates resource attributes for Icebox from URL to PG host.
tests/unit/test_icebox_timeout.py Adds unit tests for icebox.timeout.with_timeout.
tests/unit/test_icebox_structured_logging.py Updates structured logging tests after removing cycle_id contextvar behavior.
tests/unit/test_icebox_sink.py Replaces HTTP retry tests with psycopg INSERT-path tests and SQL pins.
tests/unit/test_icebox_schema.py Updates DDL/model tests for icebox_files polling-daemon schema.
tests/unit/test_icebox_schema_cache.py Removes tests for deleted schema fingerprint cache.
tests/unit/test_icebox_postgres_async.py Removes tests for deleted asyncpg-based API helpers.
tests/unit/test_icebox_metrics.py Updates metrics tests for daemon/tick-oriented metric set.
tests/unit/test_icebox_iceberg.py Removes cycle-based recovery scan; updates commit call-shape tests.
tests/unit/test_icebox_config.py Updates icebox config defaults/validation for daemon + pool sizing.
tests/unit/test_consumer.py Updates config fixtures for new Icebox PG fields.
tests/unit/test_config.py Updates millpond config env handling for Icebox PG mode.
tests/integration/test_icebox_writer_integration.py Adds integration tests for writer-side PG INSERT contract into icebox_files.
tests/integration/test_icebox_shutdown_integration.py Adds integration tests for daemon loop drain/shutdown and Kafka commit ordering.
tests/integration/test_icebox_probe_integration.py Adds integration tests for stdlib probe server /healthz and /metrics.
tests/integration/test_icebox_migrations_integration.py Adds integration tests for migration idempotency and partial-state recovery.
tests/integration/test_icebox_daemon_integration.py Adds integration tests for daemon tick outcomes, SKIP LOCKED, failure modes, heartbeat.
tests/integration/test_icebox_boot_integration.py Adds integration tests for boot heartbeat seeding ordering.
tests/integration/test_iceberg_integration.py Updates config fixtures for new Icebox PG fields.
tests/integration/conftest.py Reworks integration fixtures to use testcontainers Postgres with per-test schemas/pools.
shared/structured_logging.py Removes Icebox-specific cycle_id mention from shared logging docs.
README.md Updates docs for PG-writer + polling-daemon Icebox model and env vars.
pyproject.toml Removes asyncpg/fastapi/uvicorn/httpx deps; keeps psycopg + pyiceberg.
millpond/sink.py Switches Icebox sink construction from HTTP client params to PG params.
millpond/logging_config.py Updates OTLP resource attrs for Icebox from URL to PG host.
millpond/icebox_sink.py Replaces HTTP Icebox client with psycopg pool + INSERT/lookup flow.
millpond/config.py Adds Icebox PG config fields and env var loading.
icebox/timeout.py Adds thread-based timeout wrapper used to bound Lakekeeper call time.
icebox/structured_logging.py Removes cycle_id stamping and uvicorn-specific silencing; keeps JSON formatter + OTLP wiring.
icebox/schema.py Replaces cycle-era tables with icebox_files + status heartbeat schema and row model.
icebox/schema_cache.py Deletes schema fingerprint cache (API perimeter removed).
icebox/postgres_sync.py Replaces cycle-state SQL helpers with polling-daemon helpers (claim_pending_batch, mark_committed, etc.).
icebox/postgres_async.py Deletes asyncpg API module (FastAPI surface removed).
icebox/metrics.py Replaces cycle/perimeter metrics with daemon tick/state/failure-mode metrics.
icebox/main.py Replaces uvicorn/FastAPI entrypoint with daemon thread + stdlib probe server.
icebox/iceberg.py Removes cycle_id snapshot tagging and recovery scan; keeps commit + datafile builder.
icebox/config.py Removes asyncpg/api config; adds daemon knobs (timeouts, age filter) and pool sizing rules.
icebox/api.py Deletes FastAPI API surface (POST/status/readyz/metrics/healthz).
AGENT.md Updates operational/architecture documentation to the polling-daemon design.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread icebox/main.py
Comment thread icebox/postgres_sync.py Outdated
Comment thread icebox/postgres_sync.py
Comment thread millpond/icebox_sink.py
jghoman added 2 commits June 5, 2026 17:24
ICEBOX_PG_SCHEMA flows into the libpq conninfo
`options=-csearch_path=<schema>` interpolation in
millpond/icebox_sink.py — whitespace or a stray `-c` could redirect
writes to a different schema. The icebox daemon already validates
this at config load; the writer didn't.

Hoist the validation rule (regex + reserved-name + reserved-keyword
sets + a `validate_pg_schema()` helper) into shared/pg_identifier.py
so writer + icebox can't disagree on what's safe. icebox/config.py
now delegates schema validation to the shared helper; the regex
stays imported for pg_database which has a slightly different rule
(no reserved-name check).

Addresses Copilot review comment on PR #72.
icebox/main.py: ThreadingMixIn defaults daemon_threads=False, which
keeps the interpreter alive past serve_forever()'s return until any
in-flight /healthz or /metrics handler thread finishes. Set
daemon_threads=True so process exit is bounded by the daemon thread's
drain budget instead. Also call server.server_close() right after
serve_forever() so the listening socket is released — kubelet probes
fast-fail with connection-refused rather than connect-then-hang.

icebox/postgres_sync.py: two docstring fixes flagged by the PR review.
- Module docstring claimed psycopg_pool defaults to max=2; reality is
  default 4 with a config.py-enforced floor of 3 (probe handler reads
  PG on every /healthz + /metrics scrape, so a tick holding 1 conn
  across the Iceberg commit needs ≥2 headroom).
- update_heartbeat docstring still described the deleted async API +
  POST-503 backpressure path. Rewrite to describe what actually reads
  the heartbeat now — the /healthz handler — plus the two call sites
  (boot seed + tick-end).

Addresses remaining Copilot review comments on PR #72.
@jghoman jghoman merged commit 0a91064 into main Jun 8, 2026
16 of 18 checks passed
@jghoman jghoman deleted the icebox-polling-daemon branch June 8, 2026 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants