icebox: polling-daemon rewrite (replaces HTTP/cycle service)#72
Merged
Conversation
Foundation commit for the polling-daemon rewrite (see
docs/icebox-self-healing-recovery.md v6). Adds the new icebox_files
table and helpers alongside the existing cycle-era code; no behavior
change yet. The cycle machinery will be removed in a follow-up once
the new daemon is in place.
icebox/schema.py:
- CREATE_ICEBOX_FILES with result enum ('pending'|'committed'|'failed'
via CHECK), UNIQUE(file_path), no cycle/staged/committed columns.
- CREATE_ICEBOX_FILES_PENDING_IDX partial on inserted_at where
result='pending' — daemon hot SELECT is bounded by O(pending).
- Both DDL entries appended to ALL_DDL.
- IceboxPendingFileRow Pydantic model (distinct from cycle-era
IceboxFileRow until that's removed).
icebox/postgres_sync.py:
- claim_pending_batch: SELECT ... FOR UPDATE SKIP LOCKED with age
filter and LIMIT, returns list[IceboxPendingFileRow].
- mark_committed: UPDATE result='committed', result_at=now(),
iceberg_snapshot_id=...; no-op on empty ids.
- mark_failed: UPDATE result='failed', result_at=now(); leaves
iceberg_snapshot_id NULL as audit signal.
Tests:
- SQL structural assertions (SKIP LOCKED, filters, ORDER BY,
CHECK constraint, UNIQUE, partial-index predicate).
- Call-shape mocks for each helper, including empty-ids no-ops.
- Negative tests guarding the rewrite invariants: no cycle_id in
new SQL, no cycle-era columns in new DDL, no FK to commit_cycles.
- Pydantic boundary tests (string-keyed kafka_offsets).
Adds the thread-based timeout wrapper the polling daemon will use to
defend against a wedged Lakekeeper (PyIceberg has no native timeout on
commit_data_files) and makes commit_data_files's cycle_id parameter
optional so the new daemon can call it without a UUID.
icebox/timeout.py (new):
- with_timeout(seconds, fn): runs fn on a daemon thread, joins with
a wall-clock budget, raises TimeoutError if exceeded. Re-raises
any exception from fn verbatim (including BaseException).
- Documents the can't-kill-Python-threads caveat: a timed-out call
keeps running in the background and is reclaimed when the process
exits (k8s does this whenever the heartbeat goes stale).
icebox/iceberg.py:
- commit_data_files: cycle_id is now optional (UUID | None = None).
None means no posthog.icebox.cycle_id stamp in snapshot summary;
the kwarg goes away entirely once cycle code is removed.
- Empty-data-files ValueError no longer references cycle_id.
- Producer-snapshot-id-None RuntimeError similarly cycle_id-free.
Tests:
- test_icebox_timeout.py: 8 tests — happy path (incl. None return),
Exception and BaseException propagation, sub-second budgets,
TimeoutError on over-budget call, daemon-thread flag, fast call
doesn't add measurable overhead.
- test_icebox_iceberg.py: 3 new tests for cycle_id=None path —
empty snapshot_properties, kwarg default is None, error message
no longer references cycle_id. All 11 existing cycle_id=cycle
tests still pass.
Cycle code in committer.py keeps calling commit_data_files with
cycle_id=...; that path goes away in the cycle-cleanup commit.
Adds the v6 polling daemon alongside the cycle code. The new daemon
reads icebox_files via SELECT FOR UPDATE SKIP LOCKED, commits batches
to Iceberg, and advances Kafka offsets — with the heartbeat-on-every-
exit-path semantic ("icebox stays up no matter what"). Cycle code is
untouched and will be removed in a follow-up; main.py wiring also
follows.
icebox/daemon.py (new):
- daemon_tick: claim_pending_batch → build_data_file → commit_data_files
wrapped in with_timeout → mark_committed → kafka commit → heartbeat.
- Transient bucket (requests + TimeoutError + CommitFailedException +
CommitStateUnknownException): rows stay pending, no Kafka commit,
heartbeat fires, return OUTCOME_TRANSPORT_FAILURE.
- Non-transport bucket (anything else, incl. build_data_file errors):
mark rows failed, ADVANCE Kafka offsets past the batch ("make
progress" — duplicate-tolerance covers replay), heartbeat,
OUTCOME_BATCH_FAILURE.
- Vacuous: heartbeat, OUTCOME_VACUOUS.
- daemon_loop: opens a PG tx per cadence, calls daemon_tick, then
refresh_state_gauges (read-only). PG errors bump
icebox_pg_unreachable_total + loop continues.
- No advisory lock. SKIP LOCKED + chart-level singleton replaces it;
multi-replica is correctness-safe (offset commits may briefly
reorder at coordinator — writer replay handles).
icebox/config.py:
- iceberg_timeout_s (default 5.0, env ICEBOX_ICEBERG_TIMEOUT_S):
wall-clock budget for commit_data_files via with_timeout.
- age_filter_seconds (default 60.0, env ICEBOX_AGE_FILTER_SECONDS):
pending rows younger than this aren't eligible (batching).
- Both placed after the existing defaulted fields so the dataclass
field-order constraint holds.
icebox/metrics.py:
- State gauges (icebox_files_count{result}, icebox_files_bytes{result},
icebox_files_oldest_pending_seconds).
- Per-tick histograms (icebox_tick_duration_seconds{outcome},
icebox_iceberg_commit_duration_seconds, icebox_kafka_commit_duration_seconds,
icebox_batch_size).
- Throughput counters (icebox_files_committed_total_v6 — v6 suffix
during the rollout to avoid collision, icebox_files_failed_total,
icebox_records_committed_total).
- Liveness (icebox_last_success_at, icebox_ticks_total{outcome}).
- Failure-mode counters (icebox_lakekeeper_failures_total,
icebox_batch_failures_total, icebox_pg_unreachable_total,
icebox_iceberg_timeout_total).
- initialize_outcome_counters() instantiates per-label zero
observations so /metrics exports every (outcome, result) label
from boot — Grafana queries against unseen labels otherwise
return no data and look like 'metric not implemented'.
Tests:
- test_icebox_daemon.py: 22 tests — vacuous, success path with
mark_committed + kafka offsets, transport failure parametrized
across requests.{Timeout,ConnectionError,HTTPError,RequestException}
+ CommitFailedException + CommitStateUnknownException, with_timeout
firing → transport classification + iceberg_timeout counter,
batch failure marks failed + advances offsets, build_data_file
error treated as batch failure, kafka commit swallowed on both
success + batch-failure branches, kafka helper edges,
refresh_state_gauges (incl. resets-to-zero and -1 sentinel),
loop PG error → counter + keeps going, loop interleaves tick +
refresh_state_gauges, initialize_outcome_counters called on
startup, _TRANSIENT_EXCEPTIONS guard.
- test_icebox_metrics.py: 7 new tests covering the v6 surface.
Test results: 1069 passed (+40), 2 xfailed, 0 regressions.
Cycle code (committer.py, recover_in_flight_cycles, advisory lock,
api.py, etc.) is unchanged; main.py still launches committer_loop.
The cleanup commit will swap main.py to daemon_loop and delete the
cycle machinery.
Completes the polling-daemon rewrite started in 3cd7242 / 8462ef9 / 266f431. Deletes the HTTP service surface, the cycle state machine, the advisory lock, the schema-fingerprint cache, the async PG pool, and every supporting test. Writers now INSERT directly into icebox_files; the daemon polls, commits to Iceberg, and advances Kafka offsets. Net: 9682 lines deleted, 738 added. ~9000-line reduction. Deleted: - icebox/api.py (FastAPI service) - icebox/committer.py (cycle state machine, recover_in_flight_cycles, _recover_one, A/B/C branches) - icebox/postgres_async.py (asyncpg pool used by HTTP handlers) - icebox/schema_cache.py (fingerprint cache for HTTP perimeter) - tests/unit/test_icebox_{api,committer,postgres_async,schema_cache}.py - tests/integration/{conftest.py,test_icebox_docker.py,test_icebox_e2e.py} - Dependencies: httpx, fastapi, uvicorn, asyncpg Renamed: - IceboxPgClient -> IceboxClient (only client now) - ICEBOX_FILES_COMMITTED_TOTAL_V6 -> ICEBOX_FILES_COMMITTED_TOTAL Modified: - icebox/main.py: stdlib ThreadingHTTPServer serving /healthz + /metrics; daemon thread instead of committer thread. /healthz reads status.last_committer_heartbeat directly. - icebox/iceberg.py: commit_data_files's cycle_id arg gone; CYCLE_ID_SUMMARY_KEY and find_snapshot_for_cycle deleted. - icebox/postgres_sync.py: collapses to ~5 helpers (claim_pending_batch, mark_committed, mark_failed, update_heartbeat, migrations + bootstrap). - icebox/schema.py: commit_cycles + old files DDL gone. status row simplified to (id, last_committer_heartbeat). - icebox/metrics.py: all cycle-era metrics gone. The 6 Iceberg table-state gauges (ICEBERG_TABLE_*) kept and now updated by the daemon's success path (ported _update_table_state_gauges into daemon.py). - icebox/structured_logging.py: cycle_id_var ContextVar + filter gone. Uvicorn silencing gone (no uvicorn). - icebox/config.py: dropped asyncpg_pool_*, committer_degraded_failure_threshold, schema_fingerprint_cache_ttl_seconds. Default committer_max_pending_files is now 100 (was 1000) - caps bad- batch blast radius per v6 doc. - millpond/icebox_sink.py: HTTP IceboxClient + IceboxResponseError + IceboxBackpressureExhausted deleted. The (renamed) IceboxClient is psycopg-only. - millpond/config.py: dropped icebox_url, icebox_max_attempts, icebox_max_backoff_s, icebox_timeout_s. PG fields required when destination='icebox'. - millpond/sink.py: icebox dispatch is one branch, no client choice. - millpond/logging_config.py: millpond.icebox.url resource attr replaced with millpond.icebox.pg.host. Behavior preserved: - Iceberg table-state gauges still advance on every successful tick. - Heartbeat / liveness semantics unchanged (probe checks now() - last_committer_heartbeat against cadence * stale_multiple). - Graceful shutdown drain budget min(cadence * 5, 600s). Tests: 877 passed, 2 xfailed, 0 regressions. Charts repo update (drop /v1/files ingress, drop degraded-mode 503 config, point writers at ICEBOX_PG_*, update probe timing) is a separate PR.
Fixes 2 BLOCKERs and 3 MAJORs from the principal-engineer review plus
2 BLOCKERs and 5 MAJORs from the QE lead review. Adds a testcontainers-
backed integration suite covering daemon tick semantics, SKIP LOCKED
concurrency, writer-side INSERT, shutdown drain, probe HTTP server,
migrations, and the boot sequence.
PE-B1 (boot heartbeat never seeded):
icebox/main.py stamps an initial heartbeat AFTER apply_migrations
and BEFORE the daemon thread starts. Without this, a slow first
tick on a fresh pod races the kubelet probe and yields
CrashLoopBackOff on a degraded Lakekeeper.
PE-B2 (commit_kafka_offsets ran inside conn.transaction()):
TickResult now carries `rows_to_commit_offsets`. daemon_loop runs
the Kafka commit AFTER the PG tx commits AND after the pool conn
is returned to the pool. Holds the doc invariant "Kafka offset
committed iff PG knows the file's fate" even on crash between
PG COMMIT and Kafka commit (cumulative-offset recovery on the
next tick). Returning the conn before the AdminClient RPC also
keeps the pool-sizing rationale honest — a 30s Kafka stall no
longer pins 1 of 4 pool conns idle.
PE-M1 (one bad row poisoned the whole batch):
Per-row try/except in daemon_tick partitions rows into good/bad.
Good rows get committed with the Iceberg snapshot id; bad rows
get marked failed inline (same tx); rows_to_commit_offsets covers
both so the writer advances past all of them.
PE-M2 (daemon_loop hot-spin on PoolClosed):
Explicit `except psycopg_pool.PoolClosed: break`. Distinct from
`psycopg.Error` handler (which still increments
ICEBOX_PG_UNREACHABLE_TOTAL and continues for legitimate PG
outages).
PE-M3 (probe handler starved on a 2-slot pool):
ICEBOX_PSYCOPG_POOL_MAX default 4 (was 2), floor 3. The daemon's
tick conn + refresh_state_gauges conn + concurrent /healthz +
/metrics scrape need at least 4 slots; floor 3 is the documented
minimum (tick + one probe).
QE-B3 (SKIP LOCKED test passed by serialization):
test_two_concurrent_ticks_disjoint_via_skip_locked rewritten with
an event-gated commit_data_files mock. Thread A signals
`a_holding_locks` after its SELECT FOR UPDATE has acquired locks
but BEFORE returning from commit_data_files; the test waits on
that event, starts thread B, joins B (asserting B did NOT block —
which would prove SKIP LOCKED is broken), then releases A.
QE-B4 + QE-M1 (probe HTTP server untested incl. staleness boundary):
New tests/integration/test_icebox_probe_integration.py. Six tests
cover /healthz 200 (fresh heartbeat), 503 (NULL heartbeat), 503
(stale heartbeat — 10s old vs 3s threshold), 503 (PG unreachable
via closed pool), /metrics 200 with prometheus content-type, and
404 fall-through. Probe-server helpers bind on port 0 (OS-assigned)
to avoid the bind-then-rebind race.
QE-M2 (migration runner idempotency untested):
New tests/integration/test_icebox_migrations_integration.py. Three
tests: double-apply against real PG, partial-rerun recovery (drop
the index, re-apply, verify recreated), and a string-level
ALL_DDL idempotency guard catching any non-idempotent DDL added
later.
Boot-sequence verification (QE follow-up):
New tests/integration/test_icebox_boot_integration.py. Source-
inspection test asserts main.main() contains apply_migrations,
ps.update_heartbeat, and daemon_thread.start() in that order — the
cheapest way to catch deletion of the seed step. End-to-end PG
test clears the heartbeat and verifies update_heartbeat stamps
it non-NULL.
Loop-wires-Kafka-commit integration test (QE follow-up):
test_daemon_loop_invokes_kafka_commit_after_tx_commits in the
shutdown integration file runs daemon_loop for one cadence interval
against real PG, spies on the Kafka commit, and verifies the
merged max-offsets dict, group_id, and topic match the rows the
tick surfaced via rows_to_commit_offsets.
Integration suite:
- tests/integration/conftest.py: session-scoped Postgres container
via testcontainers; per-test isolated schema with apply_migrations
run against it; helpers (insert_pending_row, select_result,
heartbeat_age_seconds) for tick assertions.
- tests/integration/test_icebox_daemon_integration.py: 9 tests
covering happy path, age filter, SKIP LOCKED disjoint, transport
failure (rows stay pending), batch failure (rows marked failed +
rows_to_commit_offsets surfaced), crash mid-tick (PG-side
recovery), heartbeat stamped on every tick exit path.
- tests/integration/test_icebox_writer_integration.py: 4 tests
covering writer INSERT 201, replay 409 returning the same row id,
distinct paths get distinct rows, INSERTed row visible to daemon
claim_pending_batch.
- tests/integration/test_icebox_shutdown_integration.py: 3 tests
covering daemon_loop drain within budget, already-open pool,
Kafka commit wired post-tx.
- tests/integration/test_icebox_probe_integration.py: 6 tests for
/healthz + /metrics.
- tests/integration/test_icebox_migrations_integration.py: 3 tests
for migration idempotency.
- tests/integration/test_icebox_boot_integration.py: 2 tests for
the boot sequence's heartbeat seed.
Tests: 880 unit, 49 integration, 2 xfailed, 0 regressions.
Deferred (filed as follow-up):
- Probe-tick concurrency starvation under pool=4 (load test).
- Kafka AdminClient wire-shape pin canary (paired with the
Lakekeeper-harness deferral).
- Partial-rerun depth for table/seed loss (covered by string-level
ALL_DDL idempotency check + dropped-index recovery test).
- Multi-replica Kafka offset regression (writer replay absorbs it;
documented in design doc).
- SIGTERM-before-serve_forever race (kubelet kill-after-grace
covers it).
Updates the three top-of-tree docs to describe the v6 polling-daemon
architecture; removes ~9000 lines of stale references to the deleted
HTTP/cycle service (FastAPI, uvicorn, httpx, asyncpg, /v1/files, cycle
state machine, advisory lock, cycle_id ContextVar, etc.).
icebox/README.md:
- Rewrites the architecture diagram (writers INSERT into icebox_files;
daemon polls and commits to Iceberg + advances Kafka offsets).
- Adds an explicit failure-model table covering all branches of
daemon_tick / daemon_loop (transport vs batch failure, crash mid-
tick, crash after Iceberg before PG UPDATE, crash after PG before
Kafka, Lakekeeper unreachable, PG unreachable / PoolClosed).
- Documents the probe HTTP server surface (/healthz with stale +
NULL + PG-unreachable conditions, /metrics).
- Lists all metrics actually defined in icebox/metrics.py, including
the ICEBERG_TABLE_* gauges the daemon updates on success.
- Replaces the cycle-era config table with the daemon's actual env
vars and defaults verified against icebox/config.py:
- ICEBOX_PG_USERNAME defaults to "lakekeeper" (reuses Lakekeeper's
role); ICEBOX_ICEBERG_WAREHOUSE defaults to "ingest";
ICEBOX_KAFKA_EXTRA_CONFIG defaults to "{}".
- Adds ICEBOX_API_HOST/_PORT and ICEBOX_PSYCOPG_POOL_MIN to the
config table (previously missing).
- Drops the cycle-era operational notes (advisory lock + Aurora
failover) and adds the v6 design-property note that SKIP LOCKED
plus chart-level replicas:1 replaces the advisory lock.
README.md:
- Reframes the icebox bullet around direct PG INSERT.
- Updates the writer-side icebox env-var table to ICEBOX_PG_HOST/_*
(replaces ICEBOX_URL/_MAX_ATTEMPTS/_MAX_BACKOFF_S/_TIMEOUT_S).
- Notes that icebox_files lives in a per-deployment PG schema (one
schema per (namespace, table)), not a globally shared table.
- Fixes the write-retry-loop prose to catch psycopg.Error raised by
IceboxClient.register_file (instead of the deleted
IceboxResponseError / IceboxBackpressureExhausted / httpx errors).
AGENT.md:
- Rewrites the "icebox - polling-daemon committer" section: writers
INSERT directly, daemon polls + commits + advances offsets after
the PG tx commits AND after the pool conn is returned.
- Removes fastapi / uvicorn / httpx / asyncpg from the dependency
table; calls out their explicit removal in the polling-daemon
rewrite.
- Updates the icebox/ source tree (api.py / committer.py /
postgres_async.py / schema_cache.py deleted; daemon.py + timeout.py
added).
- Adds shared/structured_logging.py to the shared/ tree (was missing).
Code-vs-doc review fed each concrete claim through the actual source.
There was a problem hiding this comment.
Pull request overview
This PR rewrites the Icebox architecture from an HTTP/cycle-based committer service to a polling daemon that commits directly from a Postgres icebox_files table per (Iceberg namespace, table), reducing surface area (removing FastAPI/uvicorn/httpx/asyncpg) while preserving the writer/committer split and Kafka offset commit invariants.
Changes:
- Replace Icebox HTTP API + cycle state machine with a polling daemon (
daemon_loop/daemon_tick) and a minimal stdlib probe server (/healthz,/metrics). - Switch writer-side Icebox integration from HTTP POSTs to direct
psycopgINSERTs intoicebox_files, with updated config/env and tests. - Add/reshape DDL, metrics, and integration test coverage around polling, SKIP LOCKED behavior, probe behavior, migrations idempotency, and shutdown/drain.
Reviewed changes
Copilot reviewed 50 out of 51 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/test_sink.py | Updates sink config fixtures and adds assertions for PG-backed Icebox client wiring. |
| tests/unit/test_pyiceberg_pin.py | Removes cycle_id snapshot summary pinning; updates references to daemon. |
| tests/unit/test_millpond_logging_config.py | Updates resource attributes for Icebox from URL to PG host. |
| tests/unit/test_icebox_timeout.py | Adds unit tests for icebox.timeout.with_timeout. |
| tests/unit/test_icebox_structured_logging.py | Updates structured logging tests after removing cycle_id contextvar behavior. |
| tests/unit/test_icebox_sink.py | Replaces HTTP retry tests with psycopg INSERT-path tests and SQL pins. |
| tests/unit/test_icebox_schema.py | Updates DDL/model tests for icebox_files polling-daemon schema. |
| tests/unit/test_icebox_schema_cache.py | Removes tests for deleted schema fingerprint cache. |
| tests/unit/test_icebox_postgres_async.py | Removes tests for deleted asyncpg-based API helpers. |
| tests/unit/test_icebox_metrics.py | Updates metrics tests for daemon/tick-oriented metric set. |
| tests/unit/test_icebox_iceberg.py | Removes cycle-based recovery scan; updates commit call-shape tests. |
| tests/unit/test_icebox_config.py | Updates icebox config defaults/validation for daemon + pool sizing. |
| tests/unit/test_consumer.py | Updates config fixtures for new Icebox PG fields. |
| tests/unit/test_config.py | Updates millpond config env handling for Icebox PG mode. |
| tests/integration/test_icebox_writer_integration.py | Adds integration tests for writer-side PG INSERT contract into icebox_files. |
| tests/integration/test_icebox_shutdown_integration.py | Adds integration tests for daemon loop drain/shutdown and Kafka commit ordering. |
| tests/integration/test_icebox_probe_integration.py | Adds integration tests for stdlib probe server /healthz and /metrics. |
| tests/integration/test_icebox_migrations_integration.py | Adds integration tests for migration idempotency and partial-state recovery. |
| tests/integration/test_icebox_daemon_integration.py | Adds integration tests for daemon tick outcomes, SKIP LOCKED, failure modes, heartbeat. |
| tests/integration/test_icebox_boot_integration.py | Adds integration tests for boot heartbeat seeding ordering. |
| tests/integration/test_iceberg_integration.py | Updates config fixtures for new Icebox PG fields. |
| tests/integration/conftest.py | Reworks integration fixtures to use testcontainers Postgres with per-test schemas/pools. |
| shared/structured_logging.py | Removes Icebox-specific cycle_id mention from shared logging docs. |
| README.md | Updates docs for PG-writer + polling-daemon Icebox model and env vars. |
| pyproject.toml | Removes asyncpg/fastapi/uvicorn/httpx deps; keeps psycopg + pyiceberg. |
| millpond/sink.py | Switches Icebox sink construction from HTTP client params to PG params. |
| millpond/logging_config.py | Updates OTLP resource attrs for Icebox from URL to PG host. |
| millpond/icebox_sink.py | Replaces HTTP Icebox client with psycopg pool + INSERT/lookup flow. |
| millpond/config.py | Adds Icebox PG config fields and env var loading. |
| icebox/timeout.py | Adds thread-based timeout wrapper used to bound Lakekeeper call time. |
| icebox/structured_logging.py | Removes cycle_id stamping and uvicorn-specific silencing; keeps JSON formatter + OTLP wiring. |
| icebox/schema.py | Replaces cycle-era tables with icebox_files + status heartbeat schema and row model. |
| icebox/schema_cache.py | Deletes schema fingerprint cache (API perimeter removed). |
| icebox/postgres_sync.py | Replaces cycle-state SQL helpers with polling-daemon helpers (claim_pending_batch, mark_committed, etc.). |
| icebox/postgres_async.py | Deletes asyncpg API module (FastAPI surface removed). |
| icebox/metrics.py | Replaces cycle/perimeter metrics with daemon tick/state/failure-mode metrics. |
| icebox/main.py | Replaces uvicorn/FastAPI entrypoint with daemon thread + stdlib probe server. |
| icebox/iceberg.py | Removes cycle_id snapshot tagging and recovery scan; keeps commit + datafile builder. |
| icebox/config.py | Removes asyncpg/api config; adds daemon knobs (timeouts, age filter) and pool sizing rules. |
| icebox/api.py | Deletes FastAPI API surface (POST/status/readyz/metrics/healthz). |
| AGENT.md | Updates operational/architecture documentation to the polling-daemon design. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ICEBOX_PG_SCHEMA flows into the libpq conninfo `options=-csearch_path=<schema>` interpolation in millpond/icebox_sink.py — whitespace or a stray `-c` could redirect writes to a different schema. The icebox daemon already validates this at config load; the writer didn't. Hoist the validation rule (regex + reserved-name + reserved-keyword sets + a `validate_pg_schema()` helper) into shared/pg_identifier.py so writer + icebox can't disagree on what's safe. icebox/config.py now delegates schema validation to the shared helper; the regex stays imported for pg_database which has a slightly different rule (no reserved-name check). Addresses Copilot review comment on PR #72.
icebox/main.py: ThreadingMixIn defaults daemon_threads=False, which keeps the interpreter alive past serve_forever()'s return until any in-flight /healthz or /metrics handler thread finishes. Set daemon_threads=True so process exit is bounded by the daemon thread's drain budget instead. Also call server.server_close() right after serve_forever() so the listening socket is released — kubelet probes fast-fail with connection-refused rather than connect-then-hang. icebox/postgres_sync.py: two docstring fixes flagged by the PR review. - Module docstring claimed psycopg_pool defaults to max=2; reality is default 4 with a config.py-enforced floor of 3 (probe handler reads PG on every /healthz + /metrics scrape, so a tick holding 1 conn across the Iceberg commit needs ≥2 headroom). - update_heartbeat docstring still described the deleted async API + POST-503 backpressure path. Rewrite to describe what actually reads the heartbeat now — the /healthz handler — plus the two call sites (boot seed + tick-end). Addresses remaining Copilot review comments on PR #72.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Replaces the icebox HTTP service + cycle state machine with a polling-daemon committer per
(Iceberg namespace, table). Net ~9000 lines deleted. Driven by the docs/icebox-self-healing-recovery.md v6 design after the 02:51 incident where coordinated bounce of all six `*-icebox-coord` pods wedged in `recover_in_flight_cycles`.Architecture:
```
32× millpond writers ──INSERT───▶ icebox_files (PG)
(Parquet to S3) │
▼
icebox daemon
│
├──tick──▶ Lakekeeper
└──tick──▶ Kafka offset commit (post-tx)
```
Writers INSERT file metadata directly into a per-deployment PG schema's `icebox_files` table. The daemon polls every 60s, batches eligible rows (older than the age filter), commits to Iceberg via PyIceberg, and advances Kafka offsets AFTER the PG transaction commits AND after the pool connection is returned — preserving the invariant "Kafka offset committed iff PG knows the file's fate".
Key design decisions (full discussion in the v6 doc):
What's deleted: `icebox/api.py`, `icebox/committer.py` (cycle state machine + recover_in_flight_cycles), `icebox/postgres_async.py`, `icebox/schema_cache.py`, `fastapi`, `uvicorn`, `httpx`, `asyncpg` deps.
What's new: `icebox/daemon.py` (tick + loop + DaemonDeps), `icebox/timeout.py` (with_timeout wrapper), stdlib `ThreadingHTTPServer` in `icebox/main.py` for `/healthz` + `/metrics` (no FastAPI/uvicorn).
Commits (logical units, reviewable in order):
Test plan
Out of scope (filed as follow-up)