diff --git a/AGENTS.md b/AGENTS.md index 12f46d349..d01b40fa8 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -4,3 +4,33 @@ Read `CLAUDE.md` and `.claude/skills/rust/SKILL.md`. ## Tool usage Prefer the dedicated Read, Grep, and Glob tools for file searching and reading. Bash is permitted as a fallback when dedicated tools can't accomplish the task, but excessive Bash tool calls for search/read operations will be denied. + + +## Running tests + +Unit and integration tests use `cargo-nextest`: + +```sh +cargo nextest run --profile dev +``` + +If a single test fails, run it directly by name to isolate the failure. + +Integration tests live under `integration//`. The harness is multi-language; +`integration/run.sh` only dispatches the `python`, `ruby`, `java`, and `sql` suites. +Other suites (`go`, `rust`, `dry_run`, `pgbench`, `toxi`, `two_pc`, `plugins`, +`schema_sync`, `complex`, `mirror`, `load_balancer`, `copy_data`, ...) must be run +directly via their own `run.sh`. All suites require a running Postgres instance +configured by `bash integration/setup.sh`. + +```sh +bash integration/run.sh # python + ruby + java + sql +``` + +Or a specific suite: + +```sh +bash integration/go/run.sh +``` + +Always run `cargo fmt` before submitting changes. Run `cargo clippy` where practical. \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md index dfac2c16b..fa110cf83 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -5,3 +5,27 @@ PgDog is a connection pooler, load balancer and database sharder for PostgreSQL. ## Workflow Load the `/rust` skill. Don't follow C/C++/Go norms. + + +## Running tests + +Unit tests: + +```sh +cargo nextest run --profile dev +``` + +If a test fails, run it directly by name. Integration tests require Postgres configured via `bash integration/setup.sh` first. + +The integration harness is multi-language: each suite lives in `integration//`. +`integration/run.sh` only dispatches the `python`, `ruby`, `java`, and `sql` suites. +Other suites (`go`, `rust`, `dry_run`, `pgbench`, `toxi`, `two_pc`, `plugins`, +`schema_sync`, `complex`, `mirror`, `load_balancer`, `copy_data`, ...) must be run +directly via their own `run.sh`. + +```sh +bash integration/run.sh # python + ruby + java + sql +bash integration/go/run.sh # Go suite +``` + +Format before committing: `cargo fmt`. Run `cargo clippy` where practical. \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 61004877a..1b00f89d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1465,6 +1465,7 @@ dependencies = [ "quote", "rustc_version", "syn 2.0.117", + "unicode-xid", ] [[package]] @@ -3166,6 +3167,7 @@ dependencies = [ "csv-core", "dashmap", "derive_builder", + "derive_more", "fnv", "futures", "hickory-resolver", diff --git a/docs/REPLICATION.md b/docs/REPLICATION.md index d16798a67..698a7f5fc 100644 --- a/docs/REPLICATION.md +++ b/docs/REPLICATION.md @@ -54,7 +54,7 @@ flowchart TD SS -->|Commit| COM ``` -Every `Insert`, `Update`, and `Delete` passes through the same three-step path before reaching +Every `Insert`, `Update`, and `Delete` passes through a three-step gate before reaching the destination: ```mermaid @@ -71,6 +71,181 @@ flowchart LR ROUTE --> EXEC ``` +The execution path after that gate depends on the table's **replica identity**, described in +the sections below. + +--- + +## REPLICA IDENTITY DEFAULT and INDEX + +`REPLICA IDENTITY DEFAULT` uses the primary key columns as the row identity. +`REPLICA IDENTITY USING INDEX` uses the columns of a nominated unique index. +In both cases, PostgreSQL writes only those identity columns into UPDATE and DELETE WAL +records — not the full row. + +### What it means for row matching + +The identity columns are guaranteed NOT NULL — primary key columns by definition, and +`REPLICA IDENTITY USING INDEX` requires NOT NULL on every indexed column. + +This has a direct consequence for statement shape: + +- The WHERE clause in UPDATE and DELETE uses plain `=` predicates. NULL can never appear + in identity columns, so `=` always behaves correctly. +- The sharding key is extracted directly from the event tuple: identity columns for + UPDATE/DELETE, all columns for INSERT. +- Each event routes to exactly one destination shard — no broadcast. + +### Statement shapes + +`Table` in [`publisher/table.rs`](../pgdog/src/backend/replication/logical/publisher/table.rs) +prepares the following statement shapes at `relation()` time: + +| Operation | Statement shape | +|---|---| +| INSERT (sharded) | Plain `INSERT` | +| INSERT (omni) | `INSERT … ON CONFLICT (identity_cols) DO UPDATE SET …` | +| UPDATE | `UPDATE … SET non_identity_cols = $N WHERE identity_cols = $M` | +| UPDATE (partial) | Same WHERE, SET restricted to non-Toasted columns — shape-cached | +| DELETE | `DELETE … WHERE identity_cols = $N` | + +The partial UPDATE path is covered in detail in +[Handling unchanged-TOAST columns](#handling-unchanged-toast-columns). + +--- + +## REPLICA IDENTITY FULL + +`REPLICA IDENTITY FULL` is used on tables with no primary key or suitable unique index. +Instead of writing only key columns into UPDATE and DELETE WAL records, PostgreSQL writes +the entire pre-change row. PgDog detects this at `relation()` time +(`ReplicaIdentity.identity == "f"`) and sets a per-OID `full_identity` flag; all subsequent +events for that table are dispatched to dedicated handlers in `subscriber/stream.rs`. + +### Sharded vs omni + +The behavior splits on whether the table has a sharding column: + +- *Sharded FULL* tables route via the sharding-key value in the OLD tuple (UPDATE/DELETE) + or the NEW tuple (INSERT). No broadcast is needed. +- *Omni FULL* tables replicate to every shard. Because omni INSERTs fan out during the + bulk-copy/replication overlap window, duplicates are possible. PgDog requires a + **unique index that prevents NULL-keyed duplicates** on every destination shard. Either + declare every key column `NOT NULL`, or use a PG15+ `NULLS NOT DISTINCT` unique index. + A standard nullable unique index allows two `NULL`-keyed rows to coexist, which corrupts + the destination row count during the overlap window. If any shard lacks a usable index, + `connect()` rejects the stream with a clear error before streaming begins. + +### INSERT + +A plain `INSERT` for sharded tables. Omni tables use `INSERT … ON CONFLICT DO NOTHING` +(`upsert_full_identity` in `publisher/table.rs`), silently skipping rows that already +landed from the COPY phase. + +### UPDATE + +OLD and NEW tuples are routed through the query router independently to detect whether the +sharding key changed: + +```mermaid +flowchart TD + GUARD{"identity == Old?"} + ERR1["FullIdentityMissingOld"] + ROUTE["resolve shard for old tuple
resolve shard for new tuple"] + XSHARD{"shard key changed?"} + FANOUT["fill 'u' cols in new from old
DELETE on old shard
INSERT on new shard"] + FASTCHK{"new has
toasted cols?"} + FAST["fast path: full UPDATE"] + SLOW["slow path: partial UPDATE
(shape cache)"] + NOOP["skip — nothing changed"] + + GUARD -->|yes| ROUTE + GUARD -->|no| ERR1 + ROUTE --> XSHARD + XSHARD -->|yes| FANOUT + XSHARD -->|no| FASTCHK + FASTCHK -->|no| FAST + FASTCHK -->|yes| SLOW + SLOW -->|no non-key cols present| NOOP +``` + +#### Single-row targeting with `(tableoid, ctid)` + +FULL identity DELETE and UPDATE use a `(tableoid, ctid)` subquery instead of a bare predicate: + +```sql +-- what we emit +WHERE (tableoid, ctid) = (SELECT tableoid, ctid FROM t WHERE col IS NOT DISTINCT FROM $1 AND … LIMIT 1) + +-- what we avoid +WHERE col1 IS NOT DISTINCT FROM $1 AND col2 IS NOT DISTINCT FROM $2 AND … +``` + +**`ctid` is a physical address `(block, offset)` within a heap.** On a plain table this is +globally unique: one file, one address space. On a partitioned destination table each partition +is its own heap, and every heap starts its address space at `(0,1)`. Two partitions can both have +a live row at `(0,1)`. A bare `WHERE ctid = (SELECT ctid … LIMIT 1)` would expand across all +partitions in the outer DML and delete or update every partition that happens to have a row at +that address — not the one row we wanted. + +**`tableoid`** is the OID of the heap the row physically lives in — for a partitioned parent, +the specific leaf partition. `(tableoid, ctid)` is unique across the entire partition tree. +The outer `WHERE (tableoid, ctid) = (…)` Postgres row-constructor comparison pins both the +partition and the physical slot simultaneously, so only the single identified row is touched. +On a non-partitioned table `tableoid` is constant for the whole relation; the extra column adds +no overhead and does not change behaviour. + +**Why the subquery also helps with duplicates.** `LIMIT 1` lets the sequential scan stop at the +first matching row (expected depth N/2 for N rows), and the outer Tid Scan fetches that exact +page by physical address in a single buffer pin. Without the subquery a bare predicate is a full +table scan that modifies *every* matching row — incorrect when the destination contains +byte-for-byte duplicate rows (possible during the copy–stream overlap window). + +**When the destination has an index.** Omni FULL tables require a unique index on every +destination shard. Because that index must have all key columns `NOT NULL` (or use +`NULLS NOT DISTINCT`), the planner can treat `col IS NOT DISTINCT FROM $1` as plain `=` +for those columns and use the index for the inner subquery — changing the inner scan from +O(N) sequential to O(log N). Sharded FULL tables have no such requirement and typically +no index, so the scan remains sequential with `LIMIT 1` stopping it early. + +**Sharded vs omni.** For sharded tables the statement is sent to exactly one shard. +For omni tables the same bind is sent to every shard; each shard runs the subquery +independently against its own copy of the table and gets back its own `(tableoid, ctid)`. +The approach is correct on both: `tableoid` and `ctid` are local to the shard that produced them. + +`IS NOT DISTINCT FROM` is used rather than `=` because FULL identity columns have no NOT NULL +guarantee — `col = NULL` is always unknown in SQL and would silently match zero rows. +DEFAULT/INDEX identity uses plain `=`; its identity columns are guaranteed NOT NULL. + +#### Cross-shard key change + +When OLD and NEW route to different shards, a single `UPDATE` cannot span both. +PgDog detects this by comparing the shard resolved from each tuple. + +This has direct consequences for how the event is applied: + +- PgDog falls back to a DELETE on the old shard followed by an INSERT on the new shard. +- If `update.new` contains unchanged-TOAST (`'u'`) columns, PgDog fills them in from + `old_full` before constructing the INSERT bind. With FULL identity, `old_full` is always + fully materialised, so every `'u'` column has its value available at the same position. + +### DELETE + +PostgreSQL fetches TOAST values before writing DELETE WAL records, so `delete.old` always +carries fully materialized column values — `'u'` markers never appear. + +The WHERE clause uses the same `(tableoid, ctid)` subquery pattern as UPDATE: `IS NOT DISTINCT FROM` +on all columns, wrapped in `(SELECT tableoid, ctid … LIMIT 1)` to delete exactly one row even when +the destination contains duplicate rows that are byte-for-byte identical, or when the destination +is a partitioned table where bare `ctid` values are not unique across partitions. + +### Errors + +| Error | When | Recovery | +|---|---|---| +| `FullIdentityMissingOld` | UPDATE/DELETE arrives without a full OLD tuple | source replica identity changed; re-validate and restart | +| `FullIdentityOmniNoUniqueIndex` | Omni table missing a qualifying unique index on any shard | add a unique index with all key columns `NOT NULL`, or a PG15+ `NULLS NOT DISTINCT` unique index | + --- ## Handling unchanged-TOAST columns @@ -160,11 +335,29 @@ statement and never enter the partial path. A table whose large columns are freq unchanged will accumulate one cached statement per observed presence pattern — in practice a small number, since most applications update columns in a consistent subset. -`Update::partial_new()` produces the bind tuple by filtering `'u'` columns out of the WAL tuple -directly — no schema metadata needed, because PostgreSQL guarantees identity (primary-key) columns -are never marked `'u'`. The parameter sequence in the SQL and the bind tuple match by construction. +`Update::partial_new()` produces the SET bind tuple by filtering `'u'` columns out of the WAL +NEW tuple — no schema metadata needed, because PostgreSQL guarantees identity (primary-key) +columns are never marked `'u'`. For `REPLICA IDENTITY FULL`, PostgreSQL passes OLD through +`ExtractReplicaIdentity` → `toast_flatten_tuple` before WAL-logging, so OLD always has every +column present. The slow path keeps OLD intact for the WHERE clause (all `n` columns at +`$1..$n`) and uses `partial_new` for SET (the `k` non-Toasted columns at `$n+1..$n+k`). +`Update::full_identity_bind_tuple(&old_full, &partial_new)` then concatenates them into a +single `n+k`-parameter bind that matches the SQL emitted by `Table::update_full_identity_partial_set`. + +--- -Re-fetching the missing value from the source is not an option: a query issued outside the -replication stream's transaction context could reflect a newer write, breaking ordering guarantees. -It is also unnecessary — the destination already holds the correct value from the initial bulk -COPY or a prior full UPDATE, so skipping the column in the SET clause is exactly right. +## Error rollback + +WAL events dispatch `Bind/Execute/Flush` (no `Sync`), leaving Postgres in an implicit +transaction that holds row locks. On `Err`, `StreamSubscriber::handle` in +[`subscriber/stream.rs`](../pgdog/src/backend/replication/logical/subscriber/stream.rs) +clears `self.connections` and resets per-session state (`relations`, `statements`, `keys`, +`changed_tables`, `in_transaction`). Dropping each `Server` closes its TCP socket; +Postgres FATALs the backend and rolls back the implicit transaction. The next call to +`handle()` lazily reconnects and rebuilds prepared statements from the Relation messages +Postgres re-emits after reconnect. + +`Sync` would not work: it commits when Postgres saw no error, but PgDog raises some errors +(e.g. `FullIdentityMissingOld` on a missing OLD tuple) *after* a successful `CommandComplete`. +FATAL disconnect is the only signal that rolls back regardless. Connections come from +`Pool::standalone`, so dropping them closes the socket instead of returning to a pool. \ No newline at end of file diff --git a/docs/issues/914-pool-shutdown.md b/docs/issues/914-pool-shutdown.md deleted file mode 100644 index 855b5be87..000000000 --- a/docs/issues/914-pool-shutdown.md +++ /dev/null @@ -1,149 +0,0 @@ -# Issue #914: `pool is shut down` during resharding COPY - -[github.com/pgdogdev/pgdog/issues/914](https://github.com/pgdogdev/pgdog/issues/914) - -A large-scale reshard (276 tables, ~180 GB) completed its parallel COPY phase -successfully then failed to start replication: - -``` -2026-04-18T02:31:14.592Z ERROR [task: 1] pool: pool is shut down -``` - -The replication slot remained inactive with a 2855 GB WAL lag on the source. - -A second problem: two tables used `REPLICA IDENTITY FULL` instead of a primary -key. No error was raised before the copy; it would only have appeared after the -multi-hour copy finished. - ---- - -## Why COPY survived but replication didn't - -The long-running copy phase of `data_sync` uses only pool-bypassing connections: - -- **Source** — [`ReplicationSlot::data_sync`](../pgdog/src/backend/replication/logical/publisher/slot.rs) connects via raw `Address` → - `Server::connect(...)` directly. The pool is not consulted. -- **Destination** — [`CopySubscriber::connect()`](../pgdog/src/backend/replication/logical/subscriber/copy.rs) calls `pool.standalone()`, - which uses the pool as a config source only (address, credentials, TLS) and - opens a raw `Server` connection. No `Guard` is issued, `pool.online` is never - checked, and pool shutdown cannot reach these connections. - -The metadata prelude of `data_sync` (`sync_tables` and `create_slots`) does call -`pool.get()` via `shard.primary()`, but this completes in seconds before the copy -loop begins. A RELOAD racing that prelude could still produce `Error::Offline`, -though in practice both operations are fast enough that the window is negligible. - -When `replace_databases()` fires mid-copy — triggered by any client DDL, -passthrough auth event, or admin reload — it marks the old pool generation -offline (`guard.online = false`) and atomically swaps in a new one. The -already-open TCP connections used by the copy tasks are invisible to this; -they keep streaming rows. - -The first `pool.get()` call in the post-copy path is inside `replicate()` → -`sync_tables()` → `shard.primary().await?`. If the cluster reference passed in -points to a now-offline pool generation, this is where `Error::Offline` hits. - ---- - -## Root causes - -**Bug 1 — stale cluster reference at replication start** - -`Publisher` previously stored a `cluster: Cluster` field set at construction -time during `schema_sync_pre()`. Any pool reload during the multi-hour copy -created a new pool generation and marked the stored one offline. When -`replicate()` was called after the copy, it used the stale offline reference -for its first `pool.get()`. - -**Bug 2 — `valid()` called too late** - -`Table::valid()` checks that at least one column carries a replica identity. -Before this fix it was only called from `stream.rs` when the replication stream -processed the first WAL row — after the entire COPY had already completed. -`REPLICA IDENTITY FULL` tables always fail `valid()` because -`pg_get_replica_identity_index()` returns `NULL` for FULL mode, leaving all -columns with `identity=false`. The error was always going to appear; it just -appeared at the worst possible time. - -A secondary ordering bug was also present: `create_slots()` ran before -`valid()`, so a `TableValidation(NoIdentityColumns)` failure left orphaned replication slots on the -source that required manual cleanup. - ---- - -## Fix - -**Fix 1 — Remove the `cluster` field from `Publisher` entirely** - -Source and destination clusters are now held exclusively on `Orchestrator` and -passed as `&Cluster` parameters to `publisher.data_sync(...)` and -`publisher.replicate(...)` on each call. The publisher cannot hold a stale -reference because it no longer holds one at all. - -[`Orchestrator::refresh()`](../pgdog/src/backend/replication/logical/orchestrator.rs) re-fetches both clusters from `databases()` — -the live atomic store — immediately before `replicate()` is called. This is -done at two call sites: [`copy_data.rs`](../pgdog/src/admin/copy_data.rs) (the `COPY_DATA` admin command path) -and `replicate_and_cutover()` in [`orchestrator.rs`](../pgdog/src/backend/replication/logical/orchestrator.rs) (the `RESHARD` path). - -`refresh_publisher()` is intentionally kept separate. It recreates the -`Publisher` entirely, discarding the per-table LSN watermarks accumulated -during the copy. Discarding those watermarks would cause the replication stream -to re-apply all WAL from slot creation and produce duplicates. -`refresh_publisher()` is called at three points: construction, inside `schema_sync_pre()` -after the destination schema is reloaded, and after cutover. In all three cases the publisher -is reset to a clean slate with no per-table LSN watermarks, which is correct because no data -has been copied yet (or the cutover has already completed and a fresh stream is starting). - -**Fix 2 — Validate tables before the copy starts** - -[`Publisher::data_sync()`](../pgdog/src/backend/replication/logical/publisher/publisher_impl.rs) -now runs `sync_tables()` first (column metadata that `valid()` reads), then -`valid()` (rejects any table with no replica identity before touching resources), -then `create_slots()`. The ordering matters: a `TableValidation` error raised -before slot creation leaves no orphaned slots on the source that would require -manual cleanup. `Error::TableValidation(TableValidationErrors([NoIdentityColumns(...)]))` is returned with no data moved. ---- - -## Future risks - -**`pool.get()` inside the copy path will silently break on pool reload.** -The long-running copy loop of `data_sync` is immune to pool reloads because -it uses only `pool.standalone()` or raw `Address` connections. The short -metadata prelude (`sync_tables`, `create_slots`) does call `pool.get()` but -completes in seconds. Any future code that calls `pool.get()` inside the copy -loop will work under normal conditions and fail only when a pool reload races -a long copy in production. All connections opened during the copy loop must -use `pool.standalone()` or an existing `Server` handle. - -**`refresh()` is a point-in-time snapshot with a residual race window.** -A pool reload between `refresh()` returning and the first `shard.primary()` -call inside `replicate()` would produce the same error. The window is -microseconds in practice. The structural fix is to fetch from `databases()` at -each `pool.get()` call site rather than caching clusters on `Orchestrator` at -all — but that is a larger refactor. - -**`standalone()` connections have no reconnect.** -A dropped `standalone()` connection mid-copy fails the entire copy task with no -retry. The `lsn` field on `Table` (`publisher/table.rs`) records the WAL -position reached by the copy; resume-from-LSN is structurally possible without -restarting the full run, but is not yet implemented. - ---- - -## Guidance - -**Can a primary key be added after schema sync but before the copy?** -Yes. The safe window is between `schema_sync_pre()` completing and -`data_sync()` starting. Issue `ALTER TABLE … ADD PRIMARY KEY` on each -destination shard. The source publication must expose a compatible replica -identity — a matching PK or a unique index via -`ALTER TABLE … REPLICA IDENTITY USING INDEX`. - -**Is a unique index sufficient?** -Yes, with `ALTER TABLE t REPLICA IDENTITY USING INDEX unique_idx`. The indexed -columns become the WAL replica identity, `valid()` passes, and the upsert -target is deterministic. - -**`REPLICA IDENTITY FULL` is not sufficient.** `pg_get_replica_identity_index()` -returns `NULL` for FULL mode; no columns get `identity=true` and `valid()` -fails with `NoIdentityColumns`. Use `REPLICA IDENTITY USING INDEX` instead. diff --git a/integration/copy_data/data_sync/run.sh b/integration/copy_data/data_sync/run.sh index 965b5e072..f592581eb 100755 --- a/integration/copy_data/data_sync/run.sh +++ b/integration/copy_data/data_sync/run.sh @@ -55,8 +55,35 @@ stop_pgbench() { fi } -SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity copy_data.posts" -OMNI_TABLES="copy_data.countries copy_data.currencies copy_data.categories" +# Database name constants for the three roles in this test. +SRC_DB="pgdog" # single source database +DST_DB1="pgdog1" # destination shard 0 (0→2 target) +DST_DB2="pgdog2" # destination shard 1 (0→2 target) +DST2_DB1="shard_0" # destination2 shard 0 (2→2 target) +DST2_DB2="shard_1" # destination2 shard 1 (2→2 target) + +# sum_shards DB1 DB2 SQL [FALLBACK] +# Runs SQL on DB1 and DB2 independently and returns their integer sum. +# FALLBACK (default 0) is substituted when a query fails, e.g. while a table +# is still being created or when polling before replication catches up. +sum_shards() { + local db1=$1 db2=$2 sql=$3 fallback=${4:-0} + echo $(( + $(psql -d "$db1" -tAc "$sql" 2>/dev/null || echo "$fallback") + + $(psql -d "$db2" -tAc "$sql" 2>/dev/null || echo "$fallback") + )) +} + +# query_one DB SQL +# Runs SQL on a single database and returns the result with whitespace stripped. +# Errors are not suppressed — a failure here aborts the script. +query_one() { + local db=$1 sql=$2 + psql -d "$db" -tAc "$sql" | tr -d '\n\r' +} + +SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity copy_data.posts copy_data.full_identity_events" +OMNI_TABLES="copy_data.countries copy_data.currencies copy_data.categories copy_data.event_types" pushd ${SCRIPT_DIR} @@ -70,6 +97,11 @@ psql -f "${SCRIPT_DIR}/../setup.sql" # ${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ schema-sync --from-database source --to-database destination --publication pgdog +# event_types has REPLICA IDENTITY FULL (omni). The unique index on `code` is +# PostData and is not synced by schema-sync pre-data, so create it explicitly on +# each destination shard before data-sync so tables_missing_unique_index() finds it. +psql -d "${DST_DB1}" -c "CREATE UNIQUE INDEX IF NOT EXISTS event_types_code_idx ON copy_data.event_types (code)" +psql -d "${DST_DB2}" -c "CREATE UNIQUE INDEX IF NOT EXISTS event_types_code_idx ON copy_data.event_types (code)" start_pgbench ${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ data-sync --from-database source --to-database destination --publication pgdog & @@ -94,27 +126,66 @@ sleep 15 # PostgreSQL emits a 'u' (unchanged-TOAST) marker for `body` in the WAL record. # The subscriber must issue a filtered UPDATE that skips `body` entirely; # if it instead writes an empty string the body sum check below will catch it. -psql -d pgdog -c "UPDATE copy_data.posts SET title = title || '_updated' WHERE id BETWEEN 1 AND 50" +psql -d "${SRC_DB}" -c "UPDATE copy_data.posts SET title = title || '_updated' WHERE id BETWEEN 1 AND 50" + +# REPLICA IDENTITY FULL test: UPDATE and DELETE on a sharded no-PK table. +# seq 1..50 → UPDATE (label only; body unchanged → PG emits 'u' for body in WAL). +# seq 51..100 → DELETE. +# WAL shape: OLD = 4 columns inline (toast_flatten_tuple); NEW = 3 present + 1 'u'. +# Subscriber must bind the 3 present columns; binding all 4 from OLD causes 08P01. +psql -d "${SRC_DB}" -c "UPDATE copy_data.full_identity_events SET label = 'updated_' || seq WHERE seq BETWEEN 1 AND 50" +psql -d "${SRC_DB}" -c "DELETE FROM copy_data.full_identity_events WHERE seq BETWEEN 51 AND 100" +SQL_FIE_COUNT="SELECT COUNT(*) FROM copy_data.full_identity_events" +SQL_FIE_UPDATED="SELECT COUNT(*) FROM copy_data.full_identity_events WHERE label LIKE 'updated_%'" +SQL_FIE_BODY="SELECT COALESCE(SUM(octet_length(body)),0) FROM copy_data.full_identity_events" +FULL_EVENTS_EXPECTED=$(query_one "${SRC_DB}" "${SQL_FIE_COUNT}") +FULL_UPDATED_SRC=$(query_one "${SRC_DB}" "${SQL_FIE_UPDATED}") +FULL_EVENTS_BODY_SRC=$(query_one "${SRC_DB}" "${SQL_FIE_BODY}") + +# REPLICA IDENTITY FULL test: UPDATE on an omni no-PK table with unique index. +# The subscriber uses ON CONFLICT DO NOTHING for INSERT; plain UPDATE for WAL UPDATE. +psql -d "${SRC_DB}" -c "UPDATE copy_data.event_types SET label = 'Click Updated' WHERE code = 'click'" +# IS NOT DISTINCT FROM test: the null_desc row has description = NULL. +# The FULL identity WHERE clause for this UPDATE includes: +# description IS NOT DISTINCT FROM NULL +# A plain = predicate would match zero rows; this UPDATE would silently not propagate. +psql -d "${SRC_DB}" -c "UPDATE copy_data.event_types SET label = 'Null Desc Updated' WHERE code = 'null_desc'" +# Duplicate-row UPDATE test (ctid single-row targeting). +# Two identical rows (tenant_id=1, seq=200, label='dup_label') were seeded. Update exactly +# one on the source by targeting its ctid. PgDog receives one WAL UPDATE event whose OLD +# tuple matches both destination rows; ctid targeting must touch only one. +psql -d "${SRC_DB}" -c "UPDATE copy_data.full_identity_events SET label = 'dup_changed' + WHERE ctid = (SELECT ctid FROM copy_data.full_identity_events WHERE seq = 200 AND label = 'dup_label' LIMIT 1)" + +# Cross-shard FULL identity UPDATE test (fill_toasted_from + routing from filled tuple). +# Move seq=1 from tenant_id=1 (shard 0) to tenant_id=3 (shard 1). +# hash(1)%2=0, hash(3)%2=1 — verified cross-shard with PgDog's bigint hash. +# body is STORAGE EXTERNAL so the WAL new tuple carries 'u' for body. +# PgDog must fill body from old_full before routing (P1 fix) and before building the INSERT. +psql -d "${SRC_DB}" -c "UPDATE copy_data.full_identity_events SET tenant_id = 3 WHERE seq = 1" + +# REPLICATION SENTINEL — must be the last DML issued against the source. +# Updating this row to 'sentinel_done' produces a WAL record that is downstream of +# every preceding change. The poll loop below waits for it to land on the destination. +psql -d "${SRC_DB}" -c "UPDATE copy_data.full_identity_events SET label = 'sentinel_done' WHERE seq = 999" stop_pgbench -# Poll the destination until the expected data is replicated -echo "Waiting for title update to reach destination (timeout 120s)..." +# Wait for the replication sentinel to land on the destination. +# seq=999 is dedicated solely to this purpose — see setup.sql. +# WAL is ordered: once the sentinel row has propagated, every preceding change has too. +echo "Waiting for streaming changes to reach destination (timeout 120s)..." DEADLINE=$((SECONDS + 120)) while true; do - UPDATED_DST=$(( - $(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM copy_data.posts WHERE title LIKE '%_updated'" 2>/dev/null || echo 0) + - $(psql -d pgdog2 -tAc "SELECT COUNT(*) FROM copy_data.posts WHERE title LIKE '%_updated'" 2>/dev/null || echo 0) - )) - if [ "${UPDATED_DST}" -ge 50 ]; then - break - fi + SENTINEL=$(sum_shards "${DST_DB1}" "${DST_DB2}" \ + "SELECT COUNT(*) FROM copy_data.full_identity_events WHERE seq = 999 AND label = 'sentinel_done'" 0) + [ "${SENTINEL}" -eq 1 ] && break if ! kill -0 "${REPL_PID}" 2>/dev/null; then - echo "ERROR: replication process exited before delivering title update (${UPDATED_DST}/50)" + echo "ERROR: replication process exited before the sentinel (seq=999 label=sentinel_done) was delivered" exit 1 fi if [ "${SECONDS}" -ge "${DEADLINE}" ]; then - echo "ERROR: title update did not reach destination within 120s (${UPDATED_DST}/50)" + echo "ERROR: streaming changes did not reach destination within 120s" exit 1 fi sleep 1 @@ -147,26 +218,23 @@ ${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ ${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ schema-sync --from-database destination --to-database destination2 --publication pgdog --cutover -# Check row counts: destination (pgdog1 + pgdog2) vs destination2 (shard_0 + shard_1) -echo "Checking row counts: destination -> destination2..." +# Check row counts: source (pgdog) vs destination2 (shard_0 + shard_1). +# Both resharding hops must be lossless end-to-end. +echo "Checking row counts: source -> destination2..." for TABLE in ${SHARDED_TABLES}; do - SRC1=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}") - SRC2=$(psql -d pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}") - SRC=$((SRC1 + SRC2)) - DST1=$(psql -d shard_0 -tAc "SELECT COUNT(*) FROM ${TABLE}") - DST2=$(psql -d shard_1 -tAc "SELECT COUNT(*) FROM ${TABLE}") - DST=$((DST1 + DST2)) + SRC=$(query_one "${SRC_DB}" "SELECT COUNT(*) FROM ${TABLE}") + DST=$(sum_shards "${DST2_DB1}" "${DST2_DB2}" "SELECT COUNT(*) FROM ${TABLE}") if [ "${SRC}" -ne "${DST}" ]; then - echo "MISMATCH ${TABLE}: source=${SRC} destination=${DST} (shard0=${DST1} shard1=${DST2})" + echo "MISMATCH ${TABLE}: source=${SRC} destination2=${DST}" exit 1 fi echo "OK ${TABLE}: ${SRC} rows" done for TABLE in ${OMNI_TABLES}; do - SRC=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}") - DST1=$(psql -d shard_0 -tAc "SELECT COUNT(*) FROM ${TABLE}") - DST2=$(psql -d shard_1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + SRC=$(query_one "${SRC_DB}" "SELECT COUNT(*) FROM ${TABLE}") + DST1=$(query_one "${DST2_DB1}" "SELECT COUNT(*) FROM ${TABLE}") + DST2=$(query_one "${DST2_DB2}" "SELECT COUNT(*) FROM ${TABLE}") if [ "${SRC}" -ne "${DST1}" ] || [ "${SRC}" -ne "${DST2}" ]; then echo "MISMATCH ${TABLE}: source=${SRC} shard0=${DST1} shard1=${DST2} (expected ${SRC} on each shard)" exit 1 @@ -177,10 +245,9 @@ done # TOAST invariant: destination body bytes must equal source body bytes. # If the subscriber wrote an empty string instead of skipping the column, # the destination sum will be far smaller than the source. -BODY_SRC=$(psql -d pgdog -tAc "SELECT COALESCE(SUM(octet_length(body)),0) FROM copy_data.posts") -BODY_DST1=$(psql -d shard_0 -tAc "SELECT COALESCE(SUM(octet_length(body)),0) FROM copy_data.posts" 2>/dev/null || echo 0) -BODY_DST2=$(psql -d shard_1 -tAc "SELECT COALESCE(SUM(octet_length(body)),0) FROM copy_data.posts" 2>/dev/null || echo 0) -BODY_DST=$((BODY_DST1 + BODY_DST2)) +sql="SELECT COALESCE(SUM(octet_length(body)),0) FROM copy_data.posts" +BODY_SRC=$(query_one "${SRC_DB}" "${sql}") +BODY_DST=$(sum_shards "${DST2_DB1}" "${DST2_DB2}" "${sql}") if [ "${BODY_SRC}" -ne "${BODY_DST}" ]; then echo "ERROR unchanged-TOAST: source body sum=${BODY_SRC}, destination sum=${BODY_DST}" exit 1 @@ -188,16 +255,88 @@ fi echo "OK unchanged-TOAST body preserved: ${BODY_DST} bytes" # Title update must have propagated through both resharding hops. -UPDATED_DST=$(( - $(psql -d shard_0 -tAc "SELECT COUNT(*) FROM copy_data.posts WHERE title LIKE '%_updated'" 2>/dev/null || echo 0) + - $(psql -d shard_1 -tAc "SELECT COUNT(*) FROM copy_data.posts WHERE title LIKE '%_updated'" 2>/dev/null || echo 0) -)) +UPDATED_DST=$(sum_shards "${DST2_DB1}" "${DST2_DB2}" "SELECT COUNT(*) FROM copy_data.posts WHERE title LIKE '%_updated'") if [ "${UPDATED_DST}" -lt 50 ]; then echo "ERROR: title update did not propagate (${UPDATED_DST}/50 rows updated on destination)" exit 1 fi echo "OK title propagated: ${UPDATED_DST}/50 rows updated" +# REPLICA IDENTITY FULL: sharded table UPDATE propagated through 2→2 sync. +FULL_UPDATED_2=$(sum_shards "${DST2_DB1}" "${DST2_DB2}" "${SQL_FIE_UPDATED}") +EXPECTED=$(query_one "${SRC_DB}" "${SQL_FIE_UPDATED}") +if [ "${FULL_UPDATED_2}" -ne "${EXPECTED}" ]; then + echo "ERROR REPLICA IDENTITY FULL UPDATE: expected ${EXPECTED} 'updated_*' rows, got ${FULL_UPDATED_2} on destination2" + exit 1 +fi +echo "OK REPLICA IDENTITY FULL sharded UPDATE: ${FULL_UPDATED_2} rows propagated" + +# REPLICA IDENTITY FULL: sharded table DELETE propagated — total count matches. +FULL_TOTAL_2=$(sum_shards "${DST2_DB1}" "${DST2_DB2}" "${SQL_FIE_COUNT}" -1) +EXPECTED=$(query_one "${SRC_DB}" "${SQL_FIE_COUNT}") +if [ "${FULL_TOTAL_2}" -ne "${EXPECTED}" ]; then + echo "ERROR REPLICA IDENTITY FULL DELETE: expected ${EXPECTED} rows, got ${FULL_TOTAL_2} on destination2" + exit 1 +fi +echo "OK REPLICA IDENTITY FULL sharded DELETE: ${FULL_TOTAL_2} rows on destination2" + +# REPLICA IDENTITY FULL: omni table UPDATE propagated to both shards of destination2. +sql="SELECT label FROM copy_data.event_types WHERE code = 'click'" +OMNI_LABEL_0=$(query_one "${DST2_DB1}" "${sql}") +OMNI_LABEL_1=$(query_one "${DST2_DB2}" "${sql}") +if [ "${OMNI_LABEL_0}" != "Click Updated" ] || [ "${OMNI_LABEL_1}" != "Click Updated" ]; then + echo "ERROR REPLICA IDENTITY FULL omni UPDATE: shard_0='${OMNI_LABEL_0}' shard_1='${OMNI_LABEL_1}' (expected 'Click Updated')" + exit 1 +fi +echo "OK REPLICA IDENTITY FULL omni UPDATE: label='Click Updated' on both shards" + +# IS NOT DISTINCT FROM: the null_desc row (description IS NULL) must have propagated +# through both resharding hops. A plain = predicate would have matched zero rows on the +# source and left the destination label unchanged. +sql="SELECT label FROM copy_data.event_types WHERE code = 'null_desc'" +NULL_LABEL_0=$(query_one "${DST2_DB1}" "${sql}") +NULL_LABEL_1=$(query_one "${DST2_DB2}" "${sql}") +if [ "${NULL_LABEL_0}" != "Null Desc Updated" ] || [ "${NULL_LABEL_1}" != "Null Desc Updated" ]; then + echo "ERROR IS NOT DISTINCT FROM: shard_0='${NULL_LABEL_0}' shard_1='${NULL_LABEL_1}' (expected 'Null Desc Updated')" + exit 1 +fi +echo "OK IS NOT DISTINCT FROM: null_desc label='Null Desc Updated' on both shards" +# REPLICA IDENTITY FULL + TOAST slow-path: full_identity_events.body must be preserved. +# The seq 1..50 label UPDATE leaves body unchanged; PG emits 'u' for body in the WAL. +# OLD carries all 4 columns inline (toast_flatten_tuple); NEW carries 3 present + 1 'u'. +# If the subscriber bound all 4 from OLD, PG would have rejected with 08P01 and +# replication would have stalled before the poll completed. If it bound correctly but +# zeroed body in the SET clause, the byte sum collapses here. +FULL_EVENTS_BODY_DST=$(sum_shards "${DST2_DB1}" "${DST2_DB2}" "${SQL_FIE_BODY}") +if [ "${FULL_EVENTS_BODY_DST}" -ne "${FULL_EVENTS_BODY_SRC}" ]; then + echo "ERROR FULL identity TOAST slow-path: source body sum=${FULL_EVENTS_BODY_SRC}, destination2 sum=${FULL_EVENTS_BODY_DST}" + echo " slow-path UPDATE corrupted or dropped the unchanged body column" + exit 1 +fi +echo "OK FULL identity TOAST slow-path: body preserved (${FULL_EVENTS_BODY_DST} bytes)" + +# Duplicate-row UPDATE: ctid targeting must have changed exactly one of the two identical rows. +# Both resharding hops must preserve this invariant. +DUP_CHANGED_2=$(sum_shards "${DST2_DB1}" "${DST2_DB2}" "SELECT COUNT(*) FROM copy_data.full_identity_events WHERE seq = 200 AND label = 'dup_changed'") +DUP_TOTAL_2=$(sum_shards "${DST2_DB1}" "${DST2_DB2}" "SELECT COUNT(*) FROM copy_data.full_identity_events WHERE seq = 200") +if [ "${DUP_CHANGED_2}" -ne 1 ] || [ "${DUP_TOTAL_2}" -ne 2 ]; then + echo "ERROR ctid duplicate-row UPDATE: changed=${DUP_CHANGED_2} (expected 1), total=${DUP_TOTAL_2} (expected 2)" + echo " ctid targeting either modified both rows or none" + exit 1 +fi +echo "OK ctid duplicate-row UPDATE: exactly 1 row changed, ${DUP_TOTAL_2} rows total" + +# Cross-shard FULL UPDATE: seq=1 moved from tenant_id=1 to tenant_id=3. +# body was STORAGE EXTERNAL, so WAL new tuple carried 'u' for body — exercises fill_toasted_from. +# The row must exist exactly once across both shards after both resharding hops. +XSHARD_TOTAL_2=$(sum_shards "${DST2_DB1}" "${DST2_DB2}" "SELECT COUNT(*) FROM copy_data.full_identity_events WHERE seq = 1") +if [ "${XSHARD_TOTAL_2}" -ne 1 ]; then + echo "ERROR cross-shard FULL UPDATE: found ${XSHARD_TOTAL_2} copies of seq=1 (expected 1)" + echo " row was either duplicated or lost during the shard move" + exit 1 +fi +echo "OK cross-shard FULL UPDATE: seq=1 exists exactly once after shard move" + psql -f "${SCRIPT_DIR}/init.sql" popd diff --git a/integration/copy_data/retry_test/run.sh b/integration/copy_data/retry_test/run.sh index be3e89ef6..42cbf4bdd 100755 --- a/integration/copy_data/retry_test/run.sh +++ b/integration/copy_data/retry_test/run.sh @@ -71,6 +71,11 @@ src_psql -f "${SCRIPT_DIR}/init.sql" "${PGDOG_BIN}" --config "${PGDOG_CONFIG}" --users "${USERS_CONFIG}" \ schema-sync --from-database source --to-database destination --publication pgdog +# event_types has REPLICA IDENTITY FULL (omni). The unique index on `code` is +# PostData and is not synced by schema-sync pre-data, so create it explicitly on +# each destination shard before COPY_DATA so tables_missing_unique_index() finds it. +shard0_psql -c "CREATE UNIQUE INDEX IF NOT EXISTS event_types_code_idx ON copy_data.event_types (code)" +shard1_psql -c "CREATE UNIQUE INDEX IF NOT EXISTS event_types_code_idx ON copy_data.event_types (code)" echo "[retry_test] Starting pgdog server on port ${PGDOG_PORT}..." PGDOG_PORT="${PGDOG_PORT}" "${PGDOG_BIN}" --config "${PGDOG_CONFIG}" --users "${USERS_CONFIG}" & PGDOG_PID=$! diff --git a/integration/copy_data/setup.sql b/integration/copy_data/setup.sql index ecf1c9386..ab3575e9e 100644 --- a/integration/copy_data/setup.sql +++ b/integration/copy_data/setup.sql @@ -24,13 +24,7 @@ CREATE TABLE IF NOT EXISTS copy_data.orders ( refunded_at TIMESTAMPTZ ); --- TODO: remove the surrogate `id` column once pgdog supports replicating tables --- that have no primary key and no `REPLICA IDENTITY USING INDEX`. Without a PK --- (or unique index promoted via REPLICA IDENTITY USING INDEX), no column carries --- the replica identity flag, Table::valid() fails with NoIdentityColumns, and the --- table is rejected before the copy starts. See docs/FIX_ISSUE_914.md, Fix 2. CREATE TABLE IF NOT EXISTS copy_data.order_items ( - id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL, tenant_id BIGINT NOT NULL, order_id BIGINT NOT NULL REFERENCES copy_data.orders(id), @@ -39,6 +33,9 @@ CREATE TABLE IF NOT EXISTS copy_data.order_items ( refunded_at TIMESTAMPTZ ); +-- No primary key: replica identity is carried by all columns. +ALTER TABLE copy_data.order_items REPLICA IDENTITY FULL; + CREATE TABLE IF NOT EXISTS copy_data.log_actions( id BIGSERIAL PRIMARY KEY, tenant_id BIGINT, @@ -230,3 +227,70 @@ SELECT -- ~32 KB per row: large enough to guarantee out-of-line TOAST storage. repeat(md5(gs.id::text), 1024) FROM generate_series(1, 50) AS gs(id); + + +-- Sharded table with no primary key and REPLICA IDENTITY FULL. +-- Rows are uniquely identifiable by (tenant_id, seq) for controlled UPDATE/DELETE testing. +-- seq 1..50 will be UPDATEd (label set to 'updated_N'); seq 51..100 will be DELETEd. +-- +-- `body` is forced to EXTERNAL storage so that the seq 1..50 UPDATE (which only +-- changes `label`) produces 'u' in NEW for `body`, exercising the subscriber +-- slow-path. PG materialises OLD fully inline (toast_flatten_tuple) so OLD carries +-- all 4 columns; NEW carries 3 present + 1 'u'. The slow path must bind only the 3 +-- present columns (not all 4 from OLD) or PG rejects with a param-count mismatch. +CREATE TABLE IF NOT EXISTS copy_data.full_identity_events ( + tenant_id BIGINT NOT NULL, + seq INT NOT NULL, + label VARCHAR NOT NULL, + body TEXT NOT NULL +); + +ALTER TABLE copy_data.full_identity_events REPLICA IDENTITY FULL; +-- Force out-of-line storage: any UPDATE that leaves `body` unchanged emits 'u' in NEW. +ALTER TABLE copy_data.full_identity_events ALTER COLUMN body SET STORAGE EXTERNAL; + +INSERT INTO copy_data.full_identity_events (tenant_id, seq, label, body) +SELECT + ((gs.id - 1) % 10) + 1 AS tenant_id, + gs.id AS seq, + 'event_' || gs.id AS label, + repeat(md5(gs.id::text), 1024) AS body -- ~32 KB per row +FROM generate_series(1, 100) AS gs(id); + +-- Two identical rows for the duplicate-row ctid test. +-- seq=200 is outside the 1..100 range used by the UPDATE/DELETE DML. +-- Both rows are byte-for-byte identical; one WAL UPDATE must change exactly one. +INSERT INTO copy_data.full_identity_events (tenant_id, seq, label, body) VALUES + (1, 200, 'dup_label', repeat(md5('dup_label'), 1024)), + (1, 200, 'dup_label', repeat(md5('dup_label'), 1024)); + +-- REPLICATION SENTINEL — not a test assertion target. +-- seq=999 is updated last in run.sh to 'sentinel_done'. The poll loop waits for that +-- label to appear on the destination before asserting anything. WAL ordering guarantees +-- all preceding changes have propagated once this row has landed. +INSERT INTO copy_data.full_identity_events (tenant_id, seq, label, body) VALUES + (1, 999, 'sentinel', repeat(md5('sentinel'), 1024)); + +-- Omni (non-sharded) table with REPLICA IDENTITY FULL. +-- The standalone unique index is PostData and is not copied to the destination +-- by schema-sync pre-data. run.sh creates it explicitly on each destination shard +-- after schema-sync and before data-sync, satisfying tables_missing_unique_index(). +-- 'click' row will be UPDATEd (label set to 'Click Updated') during the test run. +CREATE TABLE IF NOT EXISTS copy_data.event_types ( + code VARCHAR NOT NULL, + label VARCHAR NOT NULL, + description TEXT +); + +CREATE UNIQUE INDEX ON copy_data.event_types (code); +ALTER TABLE copy_data.event_types REPLICA IDENTITY FULL; + +INSERT INTO copy_data.event_types (code, label, description) VALUES + ('click', 'Click', 'User clicked an element'), + ('view', 'View', 'User viewed a page'), + ('purchase', 'Purchase', 'User made a purchase'), + ('login', 'Login', 'User logged in'), + ('logout', 'Logout', 'User logged out'), + -- NULL description: exercises IS NOT DISTINCT FROM NULL in the FULL identity WHERE clause. + -- A plain = predicate would silently skip this row on UPDATE/DELETE. + ('null_desc', 'Null Description', NULL); \ No newline at end of file diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 497249088..63a81998e 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -24,6 +24,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "std"] tracing-throttle = "0.4" parking_lot = "0.12" thiserror = "2" +derive_more = { version = "2", features = ["display", "error"] } bytes = "1" clap = { version = "4", features = ["derive"] } serde = { version = "1", features = ["derive"] } diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 8ae50d5c5..1f2bd6cdb 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -760,6 +760,20 @@ mod test { hasher: Hasher::Postgres, ..Default::default() }, + // Duplicate-row table for FULL identity ctid-targeting tests. + // No primary key on destination — allows identical rows. + ShardedTable { + database: "pgdog".into(), + name: Some("full_dup_rows".into()), + column: "id".into(), + primary: true, + centroids: vec![], + data_type: DataType::Bigint, + centroids_path: None, + centroid_probes: 1, + hasher: Hasher::Postgres, + ..Default::default() + }, ], vec![ OmnishardedTable { diff --git a/pgdog/src/backend/replication/logical/error.rs b/pgdog/src/backend/replication/logical/error.rs index 2a9597768..9948bfc36 100644 --- a/pgdog/src/backend/replication/logical/error.rs +++ b/pgdog/src/backend/replication/logical/error.rs @@ -1,23 +1,39 @@ use std::fmt; use std::num::ParseIntError; -use thiserror::Error; +use derive_more::{Display, Error}; use crate::{ backend::replication::publisher::PublicationTable, net::{CommandComplete, ErrorResponse}, }; +/// The kind of validation failure, decoupled from which table it occurred on. +#[derive(Debug, Display)] +pub enum TableValidationErrorKind { + #[display("has no replica identity columns")] + NoIdentityColumns, + #[display( + "REPLICA IDENTITY NOTHING, UPDATE/DELETE carry no row identity and cannot be replicated; set it to DEFAULT, INDEX, or FULL" + )] + ReplicaIdentityNothing, + #[display( + "REPLICA IDENTITY FULL on a non-sharded table requires a unique index on the destination; add a unique index on the source or destination, use REPLICA IDENTITY USING INDEX on the source, or shard the table" + )] + FullIdentityOmniNoUniqueIndex, +} + /// A single table-level validation failure. -#[derive(Debug, Error)] -pub enum TableValidationError { - #[error("table {0} has no replica identity columns")] - NoIdentityColumns(PublicationTable), +#[derive(Debug, Display, Error)] +#[display("table {table_name}: {kind}")] +pub struct TableValidationError { + pub table_name: String, + pub kind: TableValidationErrorKind, } /// Newtype that `Display`s a slice of `TableValidationError` as a human-readable list. -#[derive(Debug)] -pub struct TableValidationErrors(pub Vec); +#[derive(Debug, Error)] +pub struct TableValidationErrors(#[error(ignore)] pub Vec); impl fmt::Display for TableValidationErrors { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -29,9 +45,28 @@ impl fmt::Display for TableValidationErrors { } } -impl std::error::Error for TableValidationErrors {} +/// Sort `errors` by table display name and return `Err(TableValidation)` if non-empty, +/// otherwise continue. Mirrors `anyhow::ensure!` — uses `return` to exit the calling fn. +/// +/// Only valid inside a function that returns `Result<_, Error>`. +macro_rules! ensure_validation { + ($errors:expr) => {{ + let mut __errors = $errors; + if !__errors.is_empty() { + __errors.sort_by_key(|e| e.table_name.clone()); + __errors.dedup_by(|a, b| a.table_name == b.table_name); + return Err( + $crate::backend::replication::logical::Error::TableValidation( + $crate::backend::replication::logical::TableValidationErrors(__errors), + ), + ); + } + }}; +} +// export macro +pub(crate) use ensure_validation; -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] pub enum Error { #[error("backend: {0}")] Backend(#[from] crate::backend::Error), @@ -170,6 +205,17 @@ pub enum Error { table: String, oid: pgdog_postgres_types::Oid, }, + + /// Source replica identity changed mid-stream: an UPDATE or DELETE arrived without an OLD pre-image + /// while the destination expected one. Re-sync the table to recover. + #[error( + "FULL identity {op} on {table} (oid {oid}): missing OLD pre-image; source replica identity changed mid-stream" + )] + FullIdentityMissingOld { + table: PublicationTable, + oid: pgdog_postgres_types::Oid, + op: &'static str, + }, } impl From for Error { @@ -268,9 +314,16 @@ mod tests { fn not_retryable() { assert!(!Error::CopyAborted(PublicationTable::default()).is_retryable()); assert!(!Error::DataSyncAborted.is_retryable()); - assert!(!Error::from(TableValidationError::NoIdentityColumns( - PublicationTable::default() - )) + assert!(!Error::from(TableValidationError { + table_name: String::new(), + kind: TableValidationErrorKind::NoIdentityColumns, + }) + .is_retryable()); + assert!(!Error::FullIdentityMissingOld { + table: PublicationTable::default(), + oid: pgdog_postgres_types::Oid::from(1234u32), + op: "UPDATE", + } .is_retryable()); assert!(!Error::NoReplicaIdentity("s".into(), "t".into()).is_retryable()); } @@ -278,32 +331,29 @@ mod tests { #[test] fn table_validation_error_display() { // Single error: header + one indented entry. - let single = Error::from(TableValidationError::NoIdentityColumns(PublicationTable { - schema: "public".into(), - name: "orders".into(), - ..Default::default() - })); + let single = Error::from(TableValidationError { + table_name: "\"public\".\"orders\"".into(), + kind: TableValidationErrorKind::NoIdentityColumns, + }); assert_eq!( single.to_string(), - "Table validation failed:\n\ttable \"public\".\"orders\" has no replica identity columns", + "Table validation failed:\n\ttable \"public\".\"orders\": has no replica identity columns", ); // Multiple errors: header + one indented line per entry. let multi = Error::TableValidation(TableValidationErrors(vec![ - TableValidationError::NoIdentityColumns(PublicationTable { - schema: "public".into(), - name: "orders".into(), - ..Default::default() - }), - TableValidationError::NoIdentityColumns(PublicationTable { - schema: "public".into(), - name: "items".into(), - ..Default::default() - }), + TableValidationError { + table_name: "\"public\".\"orders\"".into(), + kind: TableValidationErrorKind::NoIdentityColumns, + }, + TableValidationError { + table_name: "\"public\".\"items\"".into(), + kind: TableValidationErrorKind::ReplicaIdentityNothing, + }, ])); assert_eq!( multi.to_string(), - "Table validation failed:\n\ttable \"public\".\"orders\" has no replica identity columns\n\ttable \"public\".\"items\" has no replica identity columns", + "Table validation failed:\n\ttable \"public\".\"orders\": has no replica identity columns\n\ttable \"public\".\"items\": REPLICA IDENTITY NOTHING, UPDATE/DELETE carry no row identity and cannot be replicated; set it to DEFAULT, INDEX, or FULL", ); } } diff --git a/pgdog/src/backend/replication/logical/mod.rs b/pgdog/src/backend/replication/logical/mod.rs index 39a0e563a..f8cbf5445 100644 --- a/pgdog/src/backend/replication/logical/mod.rs +++ b/pgdog/src/backend/replication/logical/mod.rs @@ -9,7 +9,7 @@ pub mod subscriber; pub use admin::*; pub use copy_statement::CopyStatement; -pub use error::{Error, TableValidationError, TableValidationErrors}; +pub use error::*; use ee::*; use orchestrator::*; diff --git a/pgdog/src/backend/replication/logical/publisher/non_identity_columns_presence.rs b/pgdog/src/backend/replication/logical/publisher/non_identity_columns_presence.rs index a03c81b53..63b3f8359 100644 --- a/pgdog/src/backend/replication/logical/publisher/non_identity_columns_presence.rs +++ b/pgdog/src/backend/replication/logical/publisher/non_identity_columns_presence.rs @@ -21,6 +21,8 @@ use crate::net::messages::replication::logical::tuple_data::Identifier; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct NonIdentityColumnsPresence { mask: BitVec, + /// Cached count of present (non-Toasted) non-identity columns + count_present: usize, } impl NonIdentityColumnsPresence { @@ -37,16 +39,22 @@ impl NonIdentityColumnsPresence { let non_id_count = table.columns.iter().filter(|c| !c.identity).count(); let mut mask = BitVec::from_elem(non_id_count, false); let mut idx = 0usize; + let mut count_present = 0usize; for (tcol, col) in tuple.columns.iter().zip(table.columns.iter()) { if col.identity { + // identity columns always carry a value — skip continue; } if tcol.identifier != Identifier::Toasted { mask.set(idx, true); + count_present += 1; } idx += 1; } - Ok(Self { mask }) + Ok(Self { + mask, + count_present, + }) } /// Whether the `idx`-th non-identity column is present. @@ -58,19 +66,21 @@ impl NonIdentityColumnsPresence { pub fn no_non_identity_present(&self) -> bool { self.mask.none() } + + /// Count of present (non-Toasted) non-identity columns. O(1) — cached at construction. + pub fn count_present(&self) -> usize { + self.count_present + } } #[cfg(test)] impl NonIdentityColumnsPresence { - /// Every non-identity column marked present. Test helper. - /// - /// Uses the same allocation size as [`Self::from_tuple`] (non-identity - /// count only) so that two presences with identical logical shapes compare - /// equal under `Eq`/`Hash` and can be used as cache keys. + /// Every non-identity column marked present. pub(crate) fn all(table: &Table) -> Self { let non_id_count = table.columns.iter().filter(|c| !c.identity).count(); Self { mask: BitVec::from_elem(non_id_count, true), + count_present: non_id_count, } } diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index db5ccec65..8b25df02a 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -10,7 +10,7 @@ use tokio::time::Instant; use tokio::{select, spawn, time::interval}; use tracing::{debug, info, warn}; -use super::super::{publisher::Table, Error, TableValidationError, TableValidationErrors}; +use super::super::{ensure_validation, publisher::Table, Error}; use super::ReplicationSlot; use crate::backend::replication::logical::subscriber::stream::StreamSubscriber; @@ -320,22 +320,14 @@ impl Publisher { // Validate all tables support replication before committing to // what can be a multi-hour copy. A table with no primary key or // unique replica-identity index cannot be replicated correctly. - let mut validation_errors: Vec<_> = self + let validation_errors: Vec<_> = self .tables .values() .flat_map(|t| t.iter()) .filter_map(|t| t.valid().err()) .collect(); - if !validation_errors.is_empty() { - validation_errors.sort_by_key(|e| match e { - TableValidationError::NoIdentityColumns(table) => table.name.clone(), - }); - - return Err(Error::TableValidation(TableValidationErrors( - validation_errors, - ))); - } + ensure_validation!(validation_errors); // Create replication slots only after validation passes — a slot // created before valid() would be orphaned on a NoIdentityColumns error. @@ -576,9 +568,9 @@ mod test { assert_eq!( err.to_string(), "Table validation failed:\n\ - \ttable \"pgdog\".\"publication_test_no_pk\" has no replica identity columns\n\ - \ttable \"pgdog\".\"publication_test_no_pk_2\" has no replica identity columns\n\ - \ttable \"pgdog\".\"publication_test_no_pk_3\" has no replica identity columns", + \ttable \"pgdog\".\"publication_test_no_pk\": has no replica identity columns\n\ + \ttable \"pgdog\".\"publication_test_no_pk_2\": has no replica identity columns\n\ + \ttable \"pgdog\".\"publication_test_no_pk_3\": has no replica identity columns", ); assert!( @@ -596,4 +588,52 @@ mod test { server.execute(*ddl).await.unwrap(); } } + + /// `REPLICA IDENTITY NOTHING` must be rejected at `data_sync` time, + /// before any replication slot is created. This test executes against + /// a real Postgres instance so it validates the full metadata-fetch + valid() path. + #[tokio::test] + async fn data_sync_rejects_replica_identity_nothing() { + crate::logger(); + + let mut server = test_replication_server().await; + for ddl in &[ + "CREATE TABLE IF NOT EXISTS pub_test_nothing (data TEXT NOT NULL)", + "ALTER TABLE pub_test_nothing REPLICA IDENTITY NOTHING", + "DROP PUBLICATION IF EXISTS pub_full_identity_nothing_test", + "CREATE PUBLICATION pub_full_identity_nothing_test FOR TABLE pub_test_nothing", + ] { + server.execute(*ddl).await.unwrap(); + } + + let source = Cluster::new_test(&config()); + source.launch(); + let dest = Cluster::new_test(&config()); + + let mut publisher = Publisher::new( + "pub_full_identity_nothing_test", + QueryParserEngine::default(), + "pub_full_identity_nothing_slot".into(), + ); + + let result = publisher.data_sync(&source, &dest).await; + + let err = result.expect_err("data_sync must fail for REPLICA IDENTITY NOTHING table"); + assert!( + err.to_string().contains("REPLICA IDENTITY NOTHING"), + "expected NOTHING in error message, got: {err}" + ); + assert!( + publisher.slots.is_empty(), + "no replication slot must be created when NOTHING table is present" + ); + + source.shutdown(); + for ddl in &[ + "DROP PUBLICATION IF EXISTS pub_full_identity_nothing_test", + "DROP TABLE IF EXISTS pub_test_nothing", + ] { + server.execute(*ddl).await.unwrap(); + } + } } diff --git a/pgdog/src/backend/replication/logical/publisher/queries.rs b/pgdog/src/backend/replication/logical/publisher/queries.rs index bfa250db6..f1b341bbf 100644 --- a/pgdog/src/backend/replication/logical/publisher/queries.rs +++ b/pgdog/src/backend/replication/logical/publisher/queries.rs @@ -3,7 +3,7 @@ //! TODO: I think these are Postgres-version specific, so we need to handle that //! later. These were fetched from CREATE SUBSCRIPTION ran on Postgres 17. //! -use std::fmt::Display; +use std::{collections::HashSet, fmt::Display}; use pgdog_postgres_types::Oid; @@ -14,6 +14,10 @@ use crate::{ use super::super::Error; +fn quote_literal(s: &str) -> String { + format!("'{}'", s.replace('\'', "''")) +} + /// Get list of tables in publication. static TABLES: &str = "SELECT DISTINCT n.nspname, @@ -55,8 +59,10 @@ impl PublicationTable { publication: &str, server: &mut Server, ) -> Result, Error> { + // fetch_all (simple query protocol) is required: replication connections + // do not support the extended query protocol (error 08P01). Ok(server - .fetch_all(TABLES.replace("$1", &format!("'{}'", publication))) + .fetch_all(TABLES.replace("$1", "e_literal(publication))) .await?) } @@ -113,8 +119,8 @@ impl ReplicaIdentity { let identity: ReplicaIdentity = server .fetch_all( REPLICA_IDENTIFY - .replace("$1", &format!("'{}'", &table.schema)) - .replace("$2", &format!("'{}'", &table.name)), + .replace("$1", "e_literal(&table.schema)) + .replace("$2", "e_literal(&table.name)), ) .await? .pop() @@ -163,7 +169,7 @@ impl PublicationTableColumn { Ok(server .fetch_all( COLUMNS - .replace("$1", &identity.oid.to_string()) // Don't feel like using prepared statements. + .replace("$1", &identity.oid.to_string()) .replace("$2", &identity.oid.to_string()), ) .await?) @@ -181,6 +187,92 @@ impl From for PublicationTableColumn { } } +/// Returns the subset of `tables` that have no qualifying unique index on `server`. +/// +/// A qualifying index must satisfy: +/// - `indisunique`, `indisvalid`, `indisready`, `indislive`: skip indexes mid-build or mid-drop. +/// - `indpred IS NULL`: skip partial indexes (predicate rows are not constrained). +/// - `indexprs IS NULL`: skip expression indexes (constraint is on computed values). +/// - NULL-safety: either `indnullsnotdistinct = true` (PG15+, NULLs treated as equal in the +/// unique constraint) or every indexed attribute has `attnotnull = true` (NULLs impossible). +/// A plain nullable unique index allows two NULL-keyed rows to coexist during the copy–stream +/// overlap window, leaving the destination with more rows than the source. +/// Requires PostgreSQL 15+ when an `indnullsnotdistinct` index is present; the column does +/// not exist on older servers and the query will return an error rather than silently accept. +/// +/// If `tables` is empty, no query is issued and an empty vec is returned. +static UNIQUE_INDEX: &str = "SELECT DISTINCT n.nspname, c.relname +FROM pg_catalog.pg_index i +JOIN pg_catalog.pg_class c ON c.oid = i.indrelid +JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace +WHERE (n.nspname, c.relname) IN ($1) + AND i.indisunique + AND i.indisvalid + AND i.indisready + AND i.indislive + AND i.indpred IS NULL + AND i.indexprs IS NULL + AND ( + i.indnullsnotdistinct + OR NOT EXISTS ( + SELECT 1 + FROM unnest(i.indkey) AS k(attnum) + JOIN pg_catalog.pg_attribute a + ON a.attrelid = i.indrelid AND a.attnum = k.attnum + WHERE NOT a.attnotnull + ) + )"; + +pub async fn tables_missing_unique_index<'a>( + tables: impl IntoIterator, + server: &mut Server, +) -> Result, Error> { + let tables: Vec<&PublicationTable> = tables.into_iter().collect(); + + // Build `(nspname, relname) IN (('schema1','table1'), ...)` as literal string substitution. + // `fetch_all` uses the simple query protocol (no extended protocol on replication connections), + // so we cannot use real bind parameters here. + let in_list = tables + .iter() + .map(|t| { + format!( + "({}, {})", + quote_literal(t.destination_schema()), + quote_literal(t.destination_name()) + ) + }) + .collect::>() + .join(", "); + + if in_list.is_empty() { + return Ok(Vec::new()); + } + + let rows: Vec = server + .fetch_all(UNIQUE_INDEX.replace("$1", &in_list)) + .await?; + let found: HashSet<(String, String)> = rows + .into_iter() + .map(|row| { + Ok(( + row.get(0, Format::Text).ok_or(Error::MissingData)?, + row.get(1, Format::Text).ok_or(Error::MissingData)?, + )) + }) + .collect::, Error>>()?; + + Ok(tables + .into_iter() + .filter(|t| { + !found.contains(&( + t.destination_schema().to_string(), + t.destination_name().to_string(), + )) + }) + .map(|t| t.to_string()) + .collect()) +} + #[cfg(test)] mod test { use crate::backend::server::test::test_server; @@ -301,4 +393,138 @@ mod test { } server.execute("ROLLBACK").await.unwrap(); } + + /// Table with no unique index: must appear in missing set. + #[tokio::test] + async fn test_has_unique_index_no_index() { + let mut server = test_server().await; + server.execute("BEGIN").await.unwrap(); + server + .execute( + "CREATE TABLE huidx_no_index ( + a TEXT, + b INTEGER + )", + ) + .await + .unwrap(); + let table = PublicationTable { + schema: "pgdog".to_string(), + name: "huidx_no_index".to_string(), + ..Default::default() + }; + let result = tables_missing_unique_index(std::iter::once(&table), &mut server) + .await + .unwrap(); + assert_eq!( + result.len(), + 1, + "expected missing: table has no unique index" + ); + server.execute("ROLLBACK").await.unwrap(); + } + + /// Unique index on a NOT NULL column: must not appear in missing set. + #[tokio::test] + async fn test_has_unique_index_with_index() { + let mut server = test_server().await; + server.execute("BEGIN").await.unwrap(); + server + .execute( + "CREATE TABLE huidx_with_index ( + a TEXT NOT NULL, + b INTEGER + )", + ) + .await + .unwrap(); + server + .execute("CREATE UNIQUE INDEX ON huidx_with_index (a)") + .await + .unwrap(); + let table = PublicationTable { + schema: "pgdog".to_string(), + name: "huidx_with_index".to_string(), + ..Default::default() + }; + let result = tables_missing_unique_index(std::iter::once(&table), &mut server) + .await + .unwrap(); + assert!( + result.is_empty(), + "expected not missing: NOT NULL unique key column is safe for ON CONFLICT dedup" + ); + server.execute("ROLLBACK").await.unwrap(); + } + + /// Nullable unique index: must appear in missing set (NULLs are not distinct by default). + /// PG unique indexes treat NULLs as distinct, so nullable columns allow duplicate NULL rows + /// to be inserted during the copy–stream overlap window, corrupting the destination row count. + #[tokio::test] + async fn test_has_unique_index_rejects_nullable_unique_column() { + let mut server = test_server().await; + server.execute("BEGIN").await.unwrap(); + server + .execute( + "CREATE TABLE huidx_nullable ( + a TEXT, + b INTEGER + )", + ) + .await + .unwrap(); + server + .execute("CREATE UNIQUE INDEX ON huidx_nullable (a)") + .await + .unwrap(); + let table = PublicationTable { + schema: "pgdog".to_string(), + name: "huidx_nullable".to_string(), + ..Default::default() + }; + let result = tables_missing_unique_index(std::iter::once(&table), &mut server) + .await + .unwrap(); + assert_eq!( + result.len(), + 1, + "nullable unique column does not enforce NULL-vs-NULL conflict; must be missing" + ); + server.execute("ROLLBACK").await.unwrap(); + } + + /// `NULLS NOT DISTINCT` unique index on nullable column: must not appear in missing set (PG15+). + /// `indnullsnotdistinct = true` makes NULLs compare as equal, so two NULL-keyed rows + /// cannot coexist — the index is safe for `ON CONFLICT DO NOTHING` deduplication. + #[tokio::test] + async fn test_has_unique_index_accepts_nulls_not_distinct() { + let mut server = test_server().await; + server.execute("BEGIN").await.unwrap(); + server + .execute( + "CREATE TABLE huidx_nulls_not_distinct ( + a TEXT, + b INTEGER + )", + ) + .await + .unwrap(); + server + .execute("CREATE UNIQUE INDEX ON huidx_nulls_not_distinct (a) NULLS NOT DISTINCT") + .await + .unwrap(); + let table = PublicationTable { + schema: "pgdog".to_string(), + name: "huidx_nulls_not_distinct".to_string(), + ..Default::default() + }; + let result = tables_missing_unique_index(std::iter::once(&table), &mut server) + .await + .unwrap(); + assert!( + result.is_empty(), + "NULLS NOT DISTINCT index prevents NULL-keyed duplicates; must not be missing" + ); + server.execute("ROLLBACK").await.unwrap(); + } } diff --git a/pgdog/src/backend/replication/logical/publisher/table.rs b/pgdog/src/backend/replication/logical/publisher/table.rs index 7d17ef0c3..ec4dfe544 100644 --- a/pgdog/src/backend/replication/logical/publisher/table.rs +++ b/pgdog/src/backend/replication/logical/publisher/table.rs @@ -19,7 +19,9 @@ use crate::net::messages::Protocol; use crate::net::replication::StatusUpdate; use crate::util::escape_identifier; -use super::super::{subscriber::CopySubscriber, Error, TableValidationError}; +use super::super::{ + subscriber::CopySubscriber, Error, TableValidationError, TableValidationErrorKind, +}; use super::non_identity_columns_presence::NonIdentityColumnsPresence; use super::{ AbortSignal, Copy, PublicationTable, PublicationTableColumn, ReplicaIdentity, ReplicationSlot, @@ -79,11 +81,8 @@ where Columns::new(self.inner.filter(|(_, c)| !c.identity)) } - /// Drop original indices and assign sequential ones starting from 0. - /// - /// Use when only this subset's values are bound as parameters, so `$1` - /// refers to the first column in the subset, not its position in the - /// full row (e.g. DELETE binds only identity column values). + /// Reindex columns sequentially from 0, discarding original positions. + /// Use when binding only this subset's values ($1 = first column in subset). fn reindexed(self) -> Columns<'a, impl Iterator> { Columns::new(self.inner.map(|(_, c)| c).enumerate()) } @@ -119,6 +118,29 @@ where .collect::>() .join(" AND ") } + + /// Shift every column's parameter index by `offset`. + /// Use when a preceding clause has already consumed `$1..$offset`. + fn with_offset( + self, + offset: usize, + ) -> Columns<'a, impl Iterator> { + Columns::new(self.inner.map(move |(i, c)| (i + offset, c))) + } + + /// `"col" IS NOT DISTINCT FROM $pos` predicates joined by ` AND `. + fn is_not_distinct_from_predicates(self) -> String { + self.inner + .map(|(i, c)| { + format!( + "\"{}\" IS NOT DISTINCT FROM ${}", + escape_identifier(&c.name), + i + 1 + ) + }) + .collect::>() + .join(" AND ") + } } impl Table { @@ -154,14 +176,26 @@ impl Table { /// Check that the table supports replication. /// - /// Requires at least one column with a replica identity flag. Tables with - /// REPLICA IDENTITY FULL or NOTHING have no identity columns and fail here - /// with NoIdentityColumns. + /// - FULL (`"f"`): valid — identity comes from `update.old`/`delete.old`, not column metadata. + /// - NOTHING (`"n"`): permanently invalid — UPDATE/DELETE carry no row identity. + /// - DEFAULT/INDEX: valid only when at least one column carries `identity = true`. pub fn valid(&self) -> Result<(), TableValidationError> { - if !self.columns.iter().any(|c| c.identity) { - return Err(TableValidationError::NoIdentityColumns(self.table.clone())); + match self.identity.identity.as_str() { + "f" => Ok(()), + "n" => Err(TableValidationError { + table_name: self.table.to_string(), + kind: TableValidationErrorKind::ReplicaIdentityNothing, + }), + _ => { + if !self.columns.iter().any(|c| c.identity) { + return Err(TableValidationError { + table_name: self.table.to_string(), + kind: TableValidationErrorKind::NoIdentityColumns, + }); + } + Ok(()) + } } - Ok(()) } /// Returns all the columns, enumerated @@ -288,6 +322,87 @@ impl Table { ) } + /// `INSERT … ON CONFLICT DO NOTHING`. For FULL omni tables; deduplicates overlap-window rows. + pub fn upsert_full_identity(&self) -> String { + format!( + "INSERT INTO \"{}\".\"{}\" ({}) VALUES ({}) ON CONFLICT DO NOTHING", + escape_identifier(self.table.destination_schema()), + escape_identifier(self.table.destination_name()), + self.all_columns().names(), + self.all_columns().placeholders(), + ) + } + + /// DELETE for REPLICA IDENTITY FULL tables. + /// Targets exactly one row via a ctid subquery to handle tables with duplicate rows correctly. + pub fn delete_full_identity(&self) -> String { + format!( + "DELETE FROM \"{}\".\"{}\" WHERE (tableoid, ctid) = {}", + escape_identifier(self.table.destination_schema()), + escape_identifier(self.table.destination_name()), + self.ctid_subquery(), + ) + } + + /// UPDATE for REPLICA IDENTITY FULL tables — fast path. + /// + /// All `n` columns appear twice: + /// - WHERE `$1..$n` — ctid subquery using `IS NOT DISTINCT FROM` (all OLD columns) + /// - SET `$n+1..$2n` — assignments (all NEW columns) + /// + /// Targets exactly one row via ctid to handle tables with duplicate rows correctly. + /// Bind with `full_identity_bind_tuple(&old_full, &update.new)` → 2n params. + pub fn update_full_identity(&self) -> String { + let n = self.columns.len(); + format!( + "UPDATE \"{}\".\"{}\" SET {} WHERE (tableoid, ctid) = {}", + escape_identifier(self.table.destination_schema()), + escape_identifier(self.table.destination_name()), + self.all_columns().with_offset(n).assignments(), + self.ctid_subquery(), + ) + } + + /// UPDATE for REPLICA IDENTITY FULL tables — slow path (some NEW columns are toasted). + /// + /// WHERE uses the ctid subquery on all `n` OLD columns; SET uses only the `k` non-toasted NEW columns: + /// - WHERE `$1..$n` — ctid subquery with `IS NOT DISTINCT FROM` (all OLD columns) + /// - SET `$n+1..$n+k` — assignments for the `k` present columns only + /// + /// Targets exactly one row via ctid to handle tables with duplicate rows correctly. + /// Bind with `full_identity_bind_tuple(&old_full, &partial_new)` → n+k params. + pub fn update_full_identity_partial_set(&self, present: &NonIdentityColumnsPresence) -> String { + debug_assert!( + !present.no_non_identity_present(), + "update_full_identity_partial_set called with no present columns — would emit empty SET clause" + ); + + let n = self.columns.len(); + format!( + "UPDATE \"{}\".\"{}\" SET {} WHERE (tableoid, ctid) = {}", + escape_identifier(self.table.destination_schema()), + escape_identifier(self.table.destination_name()), + self.present_columns(present) + .filter_non_identity() + .with_offset(n) + .assignments(), + self.ctid_subquery(), + ) + } + + /// `(SELECT tableoid, ctid FROM "s"."t" WHERE col IS NOT DISTINCT FROM $1 AND … LIMIT 1)` + /// + /// Used by all FULL identity DELETE/UPDATE generators. `tableoid` scopes the ctid to its + /// owning heap so the comparison is unambiguous on partitioned destination tables. + fn ctid_subquery(&self) -> String { + format!( + "(SELECT tableoid, ctid FROM \"{}\".\"{}\" WHERE {} LIMIT 1)", + escape_identifier(self.table.destination_schema()), + escape_identifier(self.table.destination_name()), + self.all_columns().is_not_distinct_from_predicates(), + ) + } + /// Reload table data inside the transaction. pub async fn reload(&mut self, server: &mut Server) -> Result<(), Error> { if !server.in_transaction() { @@ -300,6 +415,11 @@ impl Table { Ok(()) } + /// `true` when the table uses `REPLICA IDENTITY FULL`. + pub fn is_identity_full(&self) -> bool { + self.identity.identity == "f" + } + /// Check if this table is sharded. pub fn is_sharded(&self, tables: &ShardedTables) -> bool { for column in &self.columns { @@ -475,7 +595,10 @@ mod test { let t = make_table(vec![("id", false), ("name", false)]); assert!(matches!( t.valid(), - Err(TableValidationError::NoIdentityColumns(_)) + Err(TableValidationError { + kind: TableValidationErrorKind::NoIdentityColumns, + .. + }) )); } @@ -803,7 +926,10 @@ mod test { .unwrap(); assert!(matches!( load_table(&mut s, "valid_test_nopk").await.valid(), - Err(TableValidationError::NoIdentityColumns(_)) + Err(TableValidationError { + kind: TableValidationErrorKind::NoIdentityColumns, + .. + }) )); s.execute("ROLLBACK").await.unwrap(); } @@ -821,10 +947,7 @@ mod test { .unwrap(); let table = load_table(&mut s, "valid_test_full").await; assert_eq!(table.identity.identity, "f"); - assert!(matches!( - table.valid(), - Err(TableValidationError::NoIdentityColumns(_)) - )); + assert!(table.valid().is_ok()); s.execute("ROLLBACK").await.unwrap(); } @@ -843,7 +966,10 @@ mod test { assert_eq!(table.identity.identity, "n"); assert!(matches!( table.valid(), - Err(TableValidationError::NoIdentityColumns(_)) + Err(TableValidationError { + kind: TableValidationErrorKind::ReplicaIdentityNothing, + .. + }) )); s.execute("ROLLBACK").await.unwrap(); } @@ -867,4 +993,139 @@ mod test { assert!(table.valid().is_ok()); s.execute("ROLLBACK").await.unwrap(); } + + // ------------------------------------------------------------------- + // REPLICA IDENTITY FULL SQL generators + // ------------------------------------------------------------------- + + #[test] + fn upsert_full_identity_is_valid_sql() { + let table = make_table(vec![("a", false), ("b", false), ("c", false)]); + let sql = table.upsert_full_identity(); + assert_eq!( + sql, + r#"INSERT INTO "public"."test_table" ("a", "b", "c") VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"#, + ); + } + + #[test] + fn delete_full_identity_param_numbering() { + // ctid subquery targets exactly one row; param order matches column order. + let table = make_table(vec![("a", false), ("b", false), ("c", false)]); + let sql = table.delete_full_identity(); + let (q, s, t) = ('"', "public", "test_table"); + let pred = format!("{q}a{q} IS NOT DISTINCT FROM $1 AND {q}b{q} IS NOT DISTINCT FROM $2 AND {q}c{q} IS NOT DISTINCT FROM $3"); + let subq = format!("(SELECT tableoid, ctid FROM {q}{s}{q}.{q}{t}{q} WHERE {pred} LIMIT 1)"); + assert_eq!( + sql, + format!("DELETE FROM {q}{s}{q}.{q}{t}{q} WHERE (tableoid, ctid) = {subq}") + ); + assert!(pg_query::parse(&sql).is_ok(), "delete_full_identity: {sql}"); + } + + #[test] + fn delete_full_identity_single_column() { + // Edge case: one-column table — subquery WHERE must still be well-formed. + let table = make_table(vec![("id", false)]); + let sql = table.delete_full_identity(); + let (q, s, t) = ('"', "public", "test_table"); + let pred = format!("{q}id{q} IS NOT DISTINCT FROM $1"); + let subq = format!("(SELECT tableoid, ctid FROM {q}{s}{q}.{q}{t}{q} WHERE {pred} LIMIT 1)"); + assert_eq!( + sql, + format!("DELETE FROM {q}{s}{q}.{q}{t}{q} WHERE (tableoid, ctid) = {subq}") + ); + assert!( + pg_query::parse(&sql).is_ok(), + "delete_full_identity single: {sql}" + ); + } + + #[test] + fn update_full_identity_all_present() { + // All columns present (no TOAST). WHERE in ctid subquery: $1..$3; SET: $4..$6. + let table = make_table(vec![("a", false), ("b", false), ("c", false)]); + let sql = table.update_full_identity(); + let (q, s, t) = ('"', "public", "test_table"); + let pred = format!("{q}a{q} IS NOT DISTINCT FROM $1 AND {q}b{q} IS NOT DISTINCT FROM $2 AND {q}c{q} IS NOT DISTINCT FROM $3"); + let subq = format!("(SELECT tableoid, ctid FROM {q}{s}{q}.{q}{t}{q} WHERE {pred} LIMIT 1)"); + let set = format!("{q}a{q} = $4, {q}b{q} = $5, {q}c{q} = $6 "); + assert_eq!( + sql, + format!("UPDATE {q}{s}{q}.{q}{t}{q} SET {set}WHERE (tableoid, ctid) = {subq}") + ); + assert!( + pg_query::parse(&sql).is_ok(), + "update_full_identity all_present: {sql}" + ); + } + + #[test] + fn update_full_identity_partial_present() { + // b is Toasted. WHERE in ctid subquery: all 3 ($1..$3); SET: a,c ($4..$5). + let table = make_table(vec![("a", false), ("b", false), ("c", false)]); + let tuple = TupleData { + columns: vec![text_col("1"), toasted_col(), text_col("3")], + }; + let present = NonIdentityColumnsPresence::from_tuple(&tuple, &table).unwrap(); + let sql = table.update_full_identity_partial_set(&present); + let (q, s, t) = ('"', "public", "test_table"); + let pred = format!("{q}a{q} IS NOT DISTINCT FROM $1 AND {q}b{q} IS NOT DISTINCT FROM $2 AND {q}c{q} IS NOT DISTINCT FROM $3"); + let subq = format!("(SELECT tableoid, ctid FROM {q}{s}{q}.{q}{t}{q} WHERE {pred} LIMIT 1)"); + let set = format!("{q}a{q} = $4, {q}c{q} = $5 "); + assert_eq!( + sql, + format!("UPDATE {q}{s}{q}.{q}{t}{q} SET {set}WHERE (tableoid, ctid) = {subq}") + ); + assert!( + pg_query::parse(&sql).is_ok(), + "update_full_identity partial: {sql}" + ); + } + + #[test] + fn update_full_identity_first_column_toasted() { + // a is Toasted; b and c present. WHERE in ctid subquery: all 3 ($1..$3); SET: b,c ($4..$5). + let table = make_table(vec![("a", false), ("b", false), ("c", false)]); + let tuple = TupleData { + columns: vec![toasted_col(), text_col("2"), text_col("3")], + }; + let present = NonIdentityColumnsPresence::from_tuple(&tuple, &table).unwrap(); + let sql = table.update_full_identity_partial_set(&present); + let (q, s, t) = ('"', "public", "test_table"); + let pred = format!("{q}a{q} IS NOT DISTINCT FROM $1 AND {q}b{q} IS NOT DISTINCT FROM $2 AND {q}c{q} IS NOT DISTINCT FROM $3"); + let subq = format!("(SELECT tableoid, ctid FROM {q}{s}{q}.{q}{t}{q} WHERE {pred} LIMIT 1)"); + let set = format!("{q}b{q} = $4, {q}c{q} = $5 "); + assert_eq!( + sql, + format!("UPDATE {q}{s}{q}.{q}{t}{q} SET {set}WHERE (tableoid, ctid) = {subq}") + ); + assert!( + pg_query::parse(&sql).is_ok(), + "update_full_identity first_toasted: {sql}" + ); + } + + #[test] + fn update_full_identity_only_one_column_present() { + // Only c is present. WHERE in ctid subquery: all 3 ($1..$3); SET: c ($4). + let table = make_table(vec![("a", false), ("b", false), ("c", false)]); + let tuple = TupleData { + columns: vec![toasted_col(), toasted_col(), text_col("3")], + }; + let present = NonIdentityColumnsPresence::from_tuple(&tuple, &table).unwrap(); + let sql = table.update_full_identity_partial_set(&present); + let (q, s, t) = ('"', "public", "test_table"); + let pred = format!("{q}a{q} IS NOT DISTINCT FROM $1 AND {q}b{q} IS NOT DISTINCT FROM $2 AND {q}c{q} IS NOT DISTINCT FROM $3"); + let subq = format!("(SELECT tableoid, ctid FROM {q}{s}{q}.{q}{t}{q} WHERE {pred} LIMIT 1)"); + let set = format!("{q}c{q} = $4 "); + assert_eq!( + sql, + format!("UPDATE {q}{s}{q}.{q}{t}{q} SET {set}WHERE (tableoid, ctid) = {subq}") + ); + assert!( + pg_query::parse(&sql).is_ok(), + "update_full_identity single col: {sql}" + ); + } } diff --git a/pgdog/src/backend/replication/logical/subscriber/context.rs b/pgdog/src/backend/replication/logical/subscriber/context.rs index c46ce4425..3185a4ff4 100644 --- a/pgdog/src/backend/replication/logical/subscriber/context.rs +++ b/pgdog/src/backend/replication/logical/subscriber/context.rs @@ -11,30 +11,47 @@ use crate::{ net::{replication::TupleData, Bind, Parameters, Parse}, }; +/// Holds the pre-computed `Bind` message and destination `Shard` for a single replication event. +/// Build with `new`; pass to the subscriber's send path to apply the event to the correct shard. #[derive(Debug)] -pub struct StreamContext<'a> { - request: ClientRequest, - cluster: &'a Cluster, +pub struct StreamContext { bind: Bind, - parse: Parse, + shard: Shard, } -impl<'a> StreamContext<'a> { - /// Construct new stream context. - pub fn new(cluster: &'a Cluster, tuple: &TupleData, stmt: &Parse) -> Self { +impl StreamContext { + /// Build a `StreamContext` from a WAL tuple and a prepared-statement parse. + /// + /// Runs the router to resolve the destination shard. Returns an error if + /// routing fails (e.g. unparseable query, wrong command type). + pub fn new(cluster: &Cluster, tuple: &TupleData, stmt: &Parse) -> Result { let bind = tuple.to_bind(stmt.name()); - let parse = stmt.clone(); - let request = ClientRequest::from(vec![parse.clone().into(), bind.clone().into()]); - Self { - request, - cluster, - bind, - parse, - } + let shard = Self::resolve_shard(cluster, &bind, stmt)?; + Ok(Self { bind, shard }) } - pub fn shard(&'a mut self) -> Result { - let router_context = self.router_context()?; + /// Route `stmt` through the query router and return the destination shard. + /// + /// Takes the already-built `bind` so the router sees the real parameter values + /// (required for sharding-key extraction). Separated from `new` so the routing + /// logic can be read, tested, and changed independently of bind construction. + fn resolve_shard(cluster: &Cluster, bind: &Bind, stmt: &Parse) -> Result { + lazy_static! { + static ref PARAMS: Parameters = Parameters::default(); + } + + let parse = stmt.clone(); + let mut request = ClientRequest::from(vec![parse.clone().into(), bind.clone().into()]); + + let ast_context = AstContext::from_cluster(cluster, &PARAMS); + let ast = Cache::get().query( + &BufferedQuery::Prepared(parse), + &ast_context, + &mut PreparedStatements::default(), + )?; + request.ast = Some(ast); + + let router_context = RouterContext::new(&request, cluster, &PARAMS, None, Sticky::new())?; let mut router = Router::new(); let route = router.query(router_context)?; @@ -45,33 +62,14 @@ impl<'a> StreamContext<'a> { } } - /// Get Bind message. + /// The `Bind` message to send to the destination. pub fn bind(&self) -> &Bind { &self.bind } - /// Construct router context. - pub fn router_context(&'a mut self) -> Result, Error> { - lazy_static! { - static ref PARAMS: Parameters = Parameters::default(); - } - - let ast_context = AstContext::from_cluster(self.cluster, &PARAMS); - - let ast = Cache::get().query( - &BufferedQuery::Prepared(self.parse.clone()), - &ast_context, - &mut PreparedStatements::default(), - )?; - self.request.ast = Some(ast); - - Ok(RouterContext::new( - &self.request, - self.cluster, - &PARAMS, - None, - Sticky::new(), - )?) + /// The shard(s) the statement should be routed to. + pub fn shard(&self) -> &Shard { + &self.shard } } @@ -135,10 +133,8 @@ mod test { }; let parse = Parse::new_anonymous("INSERT INTO sharded (customer_id) VALUES ($1)"); - let shard = StreamContext::new(&cluster, &tuple, &parse) - .shard() - .unwrap(); - assert!(matches!(shard, Shard::Direct(_))); + let shard = StreamContext::new(&cluster, &tuple, &parse).unwrap(); + assert!(matches!(shard.shard(), Shard::Direct(_))); } // Verify that $N in the generated SQL matches the bind slot to_bind() places diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index c6a29bb66..4d4052701 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -9,14 +9,18 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, }; +use futures::future::try_join_all; use once_cell::sync::Lazy; use pgdog_postgres_types::Oid; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; -use super::super::publisher::NonIdentityColumnsPresence; -use super::super::{publisher::Table, Error}; +use super::super::publisher::{tables_missing_unique_index, NonIdentityColumnsPresence}; +use super::super::{ + ensure_validation, publisher::Table, Error, TableValidationError, TableValidationErrorKind, +}; use super::StreamContext; use crate::net::messages::replication::logical::tuple_data::{Identifier, TupleData}; +use crate::net::messages::replication::logical::update::Update as XLogUpdate; use crate::{ backend::{Cluster, ConnectReason, Server}, config::Role, @@ -24,7 +28,7 @@ use crate::{ net::{ replication::{ xlog_data::XLogPayload, Commit as XLogCommit, Delete as XLogDelete, - Insert as XLogInsert, Relation, StatusUpdate, Update as XLogUpdate, + Insert as XLogInsert, Relation, StatusUpdate, UpdateIdentity, }, Bind, CommandComplete, CopyData, ErrorResponse, Execute, Flush, FromBytes, Parse, Protocol, Sync, ToBytes, @@ -61,9 +65,11 @@ struct Statements { update: Statement, delete: Statement, omni: bool, - /// Cached UPDATE statements keyed by the observed `NonIdentityColumnsPresence` shape — - /// one entry per distinct set of unchanged-TOAST columns. Populated lazily - /// by [`StreamSubscriber::ensure_update_shape`]. + /// `true` when the source table has `REPLICA IDENTITY FULL`. + /// Controls INSERT/UPDATE/DELETE dispatch to FULL-mode handlers. + full_identity: bool, + /// UPDATE statements keyed by `NonIdentityColumnsPresence` — one per + /// distinct TOAST-column shape. Shared by DEFAULT/INDEX and FULL identity. update_shapes: HashMap, } @@ -84,6 +90,7 @@ impl Statement { }) } } + #[derive(Debug, Default)] pub struct StreamSubscriber { /// Destination cluster. @@ -197,10 +204,24 @@ impl StreamSubscriber { } } + // Validate omni FULL-identity tables have a unique index on every destination shard. + let omni_full: Vec = self + .tables + .values() + .filter(|t| { + t.is_identity_full() && !t.is_sharded(&self.cluster.sharding_schema().tables) + }) + .cloned() + .collect(); + if !omni_full.is_empty() { + self.validate_full_identity_omni_has_unique_index(&omni_full) + .await?; + } + Ok(()) } - // Send a statement to one or more matching shards. + // Dispatch a pre-built bind to the matching shard(s). async fn send(&mut self, val: &Shard, bind: &Bind) -> Result<(), Error> { let mut conns: Vec<_> = self .connections @@ -268,21 +289,13 @@ impl StreamSubscriber { } if let Some(statements) = self.statements.get(&insert.oid) { - // Convert TupleData into a Bind message. We can now insert that tuple - // using a prepared statement. - let mut context = StreamContext::new( - &self.cluster, - &insert.tuple_data, - if statements.omni { - statements.upsert.parse() - } else { - statements.insert.parse() - }, - ); - let bind = context.bind().clone(); - let shard = context.shard()?; - - self.send(&shard, &bind).await?; + let parse = if statements.omni { + statements.upsert.parse() + } else { + statements.insert.parse() + }; + let ctx = StreamContext::new(&self.cluster, &insert.tuple_data, &parse)?; + self.send(ctx.shard(), ctx.bind()).await?; } self.mark_table_changed(insert.oid); @@ -300,39 +313,54 @@ impl StreamSubscriber { return Ok(()); } - self.check_toasted_identity(&update)?; - - // PK changed: delete old row by key, insert new row. - // Toasted column in new tuple means incomplete data — fail. - if let Some(key) = update.key { - if update.new.has_toasted() { - let table = self.get_table(update.oid)?; - - return Err(Error::ToastedRowMigration { - table: table.publication, + // Route by pre-image variant — the WAL byte encodes replica identity: + // Key → DEFAULT/INDEX, identity column(s) changed + // Old → REPLICA IDENTITY FULL (always) + // Nothing → DEFAULT/INDEX, identity column(s) unchanged + match update.identity { + UpdateIdentity::Key(ref key) => { + // PK changed: delete old row by key, insert new row. + // Identity columns must not be toasted — we need them to route the delete. + self.check_toasted_identity(&update)?; + if update.new.has_toasted() { + let table = self.get_table(update.oid)?; + return Err(Error::ToastedRowMigration { + table: table.publication, + oid: update.oid, + }); + } + let delete = XLogDelete { + key: Some(key.clone()), oid: update.oid, - }); + old: None, + }; + let insert = XLogInsert { + xid: None, + oid: update.oid, + tuple_data: update.new, + }; + self.delete(delete).await?; + self.insert(insert).await?; + Ok(()) + } + UpdateIdentity::Old(_) => { + // REPLICA IDENTITY FULL: old row is fully materialised. + // If every NEW column is unchanged-TOAST there is nothing to do. + if update.new.all_toasted() { + self.mark_table_changed(update.oid); + return Ok(()); + } + self.update_full_identity(update.oid, update).await + } + UpdateIdentity::Nothing => { + // Identity columns unchanged; none may be toasted (routing needs them). + self.check_toasted_identity(&update)?; + if !update.new.has_toasted() { + return self.update_full(update.oid, &update.new).await; + } + self.update_with_toasted(update.oid, update).await } - let delete = XLogDelete { - key: Some(key), - oid: update.oid, - old: None, - }; - let insert = XLogInsert { - xid: None, - oid: update.oid, - tuple_data: update.new, - }; - self.delete(delete).await?; - self.insert(insert).await?; - return Ok(()); - } - - if !update.new.has_toasted() { - return self.update_full(update.oid, &update.new).await; } - - self.update_with_toasted(update.oid, update).await } /// Resolve the `Table` for a relation OID. @@ -348,46 +376,42 @@ impl StreamSubscriber { self.tables.get(&key).cloned().ok_or(Error::MissingKey) } - /// Fast-path UPDATE: no unchanged-TOAST columns — bind every column in - /// tuple order and reuse the pre-prepared `update` statement. + /// Fast-path UPDATE (DEFAULT/INDEX): no unchanged-TOAST columns — bind every + /// column in tuple order and reuse the pre-prepared `update` statement. async fn update_full(&mut self, oid: Oid, new: &TupleData) -> Result<(), Error> { - let statements = self + let parse = self .statements .get(&oid) - .expect("statements entry checked before dispatch"); - let mut context = StreamContext::new(&self.cluster, new, statements.update.parse()); - let bind = context.bind().clone(); - let shard = context.shard()?; - self.send(&shard, &bind).await?; + .expect("statements entry checked before dispatch") + .update + .parse() + .clone(); + let ctx = StreamContext::new(&self.cluster, new, &parse)?; + self.send(ctx.shard(), ctx.bind()).await?; self.mark_table_changed(oid); Ok(()) } - /// Slow-path UPDATE: at least one unchanged-TOAST column. Build a shape - /// bitmask, look up or prepare the matching partial UPDATE statement, then - /// bind and execute it. + /// Slow-path UPDATE (DEFAULT/INDEX): at least one unchanged-TOAST column. + /// Build a shape bitmask, look up or prepare the matching partial UPDATE + /// statement, then bind and execute it. async fn update_with_toasted(&mut self, oid: Oid, update: XLogUpdate) -> Result<(), Error> { let table = self.get_table(update.oid)?; let present = NonIdentityColumnsPresence::from_tuple(&update.new, &table)?; if present.no_non_identity_present() { - // All non-identity columns are unchanged-TOAST and identity didn't - // change — the destination row already has every value we would - // write. No-op, but still advance the watermark so equal-LSN replay - // after commit is gated correctly. + // All non-identity columns are unchanged-TOAST — destination already + // has every value. No-op; still advance the watermark. self.mark_table_changed(oid); return Ok(()); } - let shape_stmt = self.ensure_update_shape(oid, &table, &present).await?; let partial_new = update.partial_new(); - let shape_parse = shape_stmt.parse(); - - // Route via the shape's AST/bind — param positions match by construction. - let mut context = StreamContext::new(&self.cluster, &partial_new, shape_parse); - let bind = context.bind().clone(); // clone before mutable shard() borrow - let shard = context.shard()?; - self.send(&shard, &bind).await?; + let shape_stmt = self + .ensure_update_shape_for(oid, &table, &present, false) + .await?; + let ctx = StreamContext::new(&self.cluster, &partial_new, shape_stmt.parse())?; + self.send(ctx.shard(), ctx.bind()).await?; self.mark_table_changed(oid); Ok(()) } @@ -413,11 +437,6 @@ impl StreamSubscriber { Ok(()) } - /// Prepare the UPDATE statement matching `present` on every shard and - /// cache it under `statements[oid].update_shapes[present]`. - /// - /// Returns the (possibly freshly prepared) `Statement` so the caller can - /// use it directly without a second cache lookup. /// Send a batch of [`Parse`] messages to every server and drain the /// acknowledgment cycle (`ParseComplete` × N, then `ReadyForQuery` when /// not in a transaction). @@ -458,11 +477,32 @@ impl StreamSubscriber { Ok(()) } - async fn ensure_update_shape( + // ── Routing helpers ──────────────────────────────────────────────────────────── + + /// Route a tuple to its shard without constructing a `Bind`. + /// Used when the bind merges multiple tuples (FULL identity UPDATE/DELETE). + fn shard_for(&self, tuple: &TupleData, parse: &Parse) -> Result { + Ok(StreamContext::new(&self.cluster, tuple, parse)? + .shard() + .clone()) + } + + // ── Shape-cache helpers ────────────────────────────────────────────────────── + + /// Look up or prepare the UPDATE statement for `present`, cached under + /// `statements[oid].update_shapes[present]`. + /// + /// `full_identity` selects the SQL generator on a cache miss: + /// - `false` → `Table::update_partial` (DEFAULT/INDEX) + /// - `true` → `Table::update_full_identity_partial_set` (FULL) + /// + /// Both modes share `update_shapes`; no collision since `full_identity` is table-scoped. + async fn ensure_update_shape_for( &mut self, oid: Oid, table: &Table, present: &NonIdentityColumnsPresence, + full_identity: bool, ) -> Result { if let Some(stmt) = self .statements @@ -472,7 +512,11 @@ impl StreamSubscriber { return Ok(stmt.clone()); } - let sql = table.update_partial(present); + let sql = if full_identity { + table.update_full_identity_partial_set(present) + } else { + table.update_partial(present) + }; let stmt = Statement::new(&sql)?; self.prepare_statements(&[stmt.parse().clone()]).await?; @@ -484,24 +528,120 @@ impl StreamSubscriber { Ok(stmt) } + /// FULL identity UPDATE: WHERE on old-row values (`$1..$k`), SET on new-row values (`$k+1..$n`). + /// On shard-key change fans out DELETE+INSERT across shards. + async fn update_full_identity(&mut self, oid: Oid, update: XLogUpdate) -> Result<(), Error> { + let table = self.get_table(oid)?; + + let old_full = match &update.identity { + UpdateIdentity::Old(old) => old, + _ => { + return Err(Error::FullIdentityMissingOld { + table: table.table, + oid, + op: "UPDATE", + }); + } + }; + + let (update_parse, delete_parse, insert_parse) = { + let stmts = self.statements.get(&oid).ok_or(Error::MissingKey)?; + ( + stmts.update.parse().clone(), + stmts.delete.parse().clone(), + stmts.insert.parse().clone(), + ) + }; + + // Fill any 'u' (unchanged-TOAST) columns from old_full before routing. + // FULL identity guarantees old_full is fully materialised; 'u' columns in + // update.new carry the same value as the corresponding column in old_full. + // Routing from a raw 'u' column yields empty bytes → wrong shard. + let complete_new = update.new.fill_toasted_from(&old_full)?; + let new_shard = self.shard_for(&complete_new, &update_parse)?; + let old_shard = self.shard_for(&old_full, &update_parse)?; + + if new_shard != old_shard { + // Shard key changed: DELETE on old shard, INSERT on new shard. + let delete_bind = old_full.to_bind(delete_parse.name()); + self.send(&old_shard, &delete_bind).await?; + + let insert_bind = complete_new.to_bind(insert_parse.name()); + self.send(&new_shard, &insert_bind).await?; + self.mark_table_changed(oid); + return Ok(()); + } + + let (parse, set_tuple, where_tuple) = if !update.new.has_toasted() { + // Fast path: all columns present — use the pre-prepared statement. + (update_parse, update.new, old_full.clone()) + } else { + // Slow path: at least one unchanged-TOAST (`'u'`) column in new. + let present = NonIdentityColumnsPresence::from_tuple(&update.new, &table)?; + if present.no_non_identity_present() { + self.mark_table_changed(oid); + return Ok(()); + } + let partial_new = update.partial_new(); + let stmt = self + .ensure_update_shape_for(oid, &table, &present, true) + .await?; + (stmt.parse().clone(), partial_new, old_full.clone()) + }; + + // Fast path: WHERE $1..$n (where_tuple=old_full), SET $n+1..$2n (set_tuple=update.new). + // Slow path: WHERE $1..$n (where_tuple=old_full), SET $n+1..$n+k (set_tuple=partial_new). + let bind = + XLogUpdate::full_identity_bind_tuple(&where_tuple, &set_tuple).to_bind(parse.name()); + self.send(&new_shard, &bind).await?; + self.mark_table_changed(oid); + Ok(()) + } + async fn delete(&mut self, delete: XLogDelete) -> Result<(), Error> { if self.lsn_applied(&delete.oid) { return Ok(()); } - if let Some(statements) = self.statements.get(&delete.oid) { - if let Some(key) = delete.key_non_null() { - let mut context = - StreamContext::new(&self.cluster, &key, statements.delete.parse()); - let bind = context.bind().clone(); - let shard = context.shard()?; + // Extract statement info upfront to release the shared borrow before + // async calls and the subsequent &mut self borrows in send(). + let Some(stmts) = self.statements.get(&delete.oid) else { + self.mark_table_changed(delete.oid); + return Ok(()); + }; + let full_identity = stmts.full_identity; + let delete_parse = stmts.delete.parse().clone(); + let oid = delete.oid; + + // Resolve the tuple used for both shard routing and the WHERE bind. + // FULL identity matches on the full old row; DEFAULT/INDEX on key columns only. + let tuple = if full_identity { + // Postgres materialises all TOAST values before writing DELETE WAL records, + // so old never contains 'u' markers. + let Some(old) = delete.old else { + let table = self.get_table(oid)?; + return Err(Error::FullIdentityMissingOld { + table: table.table.clone(), + oid, + op: "DELETE", + }); + }; + old + } else { + let Some(key) = delete.key_non_null() else { + // No key columns present — nothing to send, watermark still advances. + self.mark_table_changed(oid); + return Ok(()); + }; + key + }; - self.send(&shard, &bind).await?; - } - } + let shard = self.shard_for(&tuple, &delete_parse)?; + let bind = tuple.to_bind(delete_parse.name()); - self.mark_table_changed(delete.oid); + self.send(&shard, &bind).await?; + self.mark_table_changed(oid); Ok(()) } @@ -591,32 +731,76 @@ impl StreamSubscriber { debug!("queries for table {} already prepared", dest_key); } else { - let insert = Statement::new(&table.insert())?; - let upsert = Statement::new(&table.upsert())?; - let update = Statement::new(&table.update())?; - let delete = Statement::new(&table.delete())?; let omni = !table.is_sharded(&self.cluster.sharding_schema().tables); - self.prepare_statements(&[ - insert.parse().clone(), - upsert.parse().clone(), - update.parse().clone(), - delete.parse().clone(), - ]) - .await?; + let statements = if table.is_identity_full() { + // ── FULL identity path ────────────────────────────────────────────── + let insert = Statement::new(&table.insert())?; + let update = Statement::new(&table.update_full_identity())?; + let delete = Statement::new(&table.delete_full_identity())?; + + // Omni FULL: upsert dedup requires a unique constraint on the destination. + // Sharded FULL: each row routes to one shard — no upsert needed. + // Validated at connect() time. + let upsert = if omni { + Statement::new(&table.upsert_full_identity())? + } else { + warn!( + "table {} has REPLICA IDENTITY FULL and no primary key; \ + replication performance will be degraded without an index \ + on the destination table.", + dest_key + ); + // Upsert slot is unused for sharded tables (omni == false). + Statement::default() + }; + + let mut parses = vec![ + insert.parse().clone(), + update.parse().clone(), + delete.parse().clone(), + ]; + if omni { + parses.push(upsert.parse().clone()); + } + self.prepare_statements(&parses).await?; - self.statements.insert( - relation.oid, Statements { insert, upsert, update, delete, omni, + full_identity: true, update_shapes: HashMap::new(), - }, - ); + } + } else { + // ── DEFAULT / INDEX path ──────────────────────────────────────────── + let insert = Statement::new(&table.insert())?; + let upsert = Statement::new(&table.upsert())?; + let update = Statement::new(&table.update())?; + let delete = Statement::new(&table.delete())?; + + self.prepare_statements(&[ + insert.parse().clone(), + upsert.parse().clone(), + update.parse().clone(), + delete.parse().clone(), + ]) + .await?; + Statements { + insert, + upsert, + update, + delete, + omni, + full_identity: false, + update_shapes: HashMap::new(), + } + }; + + self.statements.insert(relation.oid, statements); self.keys.insert(dest_key, relation.oid); } @@ -628,10 +812,30 @@ impl StreamSubscriber { Ok(()) } - /// Handle replication stream message. + /// Handle one replication stream message. /// - /// Return true if stream is done, false otherwise. + /// On error, drops shard connections to roll back the implicit transaction left + /// by Bind/Execute/Flush, and clears per-session state. See + /// `docs/REPLICATION.md` → "Error rollback". pub async fn handle(&mut self, data: CopyData) -> Result, Error> { + match self.handle_inner(data).await { + Ok(status) => Ok(status), + Err(err) => { + // Drop sockets → backend FATAL → implicit transaction rolled back. + // `Sync` would commit Rust-side errors. See docs/REPLICATION.md. + self.connections.clear(); + // Per-session state — repopulated from Relation messages on reconnect. + self.relations.clear(); + self.statements.clear(); + self.keys.clear(); + self.changed_tables.clear(); + self.in_transaction = false; + Err(err) + } + } + } + + async fn handle_inner(&mut self, data: CopyData) -> Result, Error> { // Lazily connect to all shards. if self.connections.is_empty() { self.connect().await?; @@ -711,6 +915,35 @@ impl StreamSubscriber { pub(crate) fn missed_rows(&mut self) -> MissedRows { std::mem::take(&mut self.missed_rows) } + + /// Verify every destination shard has a qualifying unique index for all `tables`. + /// FULL-identity omni tables use `ON CONFLICT DO NOTHING` during the + /// copy-replication overlap window, which requires a unique constraint. + /// Queries all shards in parallel (one bulk query per shard) then surfaces + /// the complete set of missing indexes across the cluster in a single error. + async fn validate_full_identity_omni_has_unique_index( + &mut self, + tables: &[Table], + ) -> Result<(), Error> { + // Fan out to all shards concurrently; each gets one IN-list query. + let per_shard: Vec> = + try_join_all(self.connections.iter_mut().map(|dest_server| { + tables_missing_unique_index(tables.iter().map(|t| &t.table), dest_server) + })) + .await?; + + // Flatten; ensure_validation! deduplicates and sorts before reporting. + let errors: Vec = per_shard + .into_iter() + .flatten() + .map(|table_name| TableValidationError { + table_name, + kind: TableValidationErrorKind::FullIdentityOmniNoUniqueIndex, + }) + .collect(); + ensure_validation!(errors); + Ok(()) + } } #[derive(Debug, Default)] diff --git a/pgdog/src/backend/replication/logical/subscriber/tests.rs b/pgdog/src/backend/replication/logical/subscriber/tests.rs index f48bf75fe..a91877776 100644 --- a/pgdog/src/backend/replication/logical/subscriber/tests.rs +++ b/pgdog/src/backend/replication/logical/subscriber/tests.rs @@ -22,7 +22,7 @@ use crate::{ insert::Insert as XLogInsert, relation::{Column as RelColumn, Relation}, tuple_data::{Column as TupleColumn, Identifier, TupleData}, - update::Update as XLogUpdate, + update::{Update as XLogUpdate, UpdateIdentity}, }, XLogData, }, @@ -239,6 +239,14 @@ fn toasted_column() -> TupleColumn { } } +fn null_column() -> TupleColumn { + TupleColumn { + identifier: Identifier::Null, + len: 0, + data: Bytes::new(), + } +} + fn x_update(u: XLogUpdate) -> CopyData { xlog_copy_data(u.to_bytes().unwrap()) } @@ -260,9 +268,9 @@ fn make_subscriber_single_shard() -> StreamSubscriber { StreamSubscriber::new(&cluster, &tables) } -/// Count rows matching the given id using a separate connection. -async fn count_row(server: &mut Server, table: &str, id: &str) -> i64 { - let query = format!("SELECT COUNT(*) FROM {} WHERE id = {}", table, id); +/// Count rows matching the given `WHERE` predicate using a separate connection. +async fn count_where(server: &mut Server, table: &str, predicate: &str) -> i64 { + let query = format!("SELECT COUNT(*) FROM {} WHERE {}", table, predicate); let rows: Vec = server.fetch_all(query).await.unwrap(); rows.first() .and_then(|row: &crate::net::DataRow| row.column(0)) @@ -275,6 +283,22 @@ async fn count_row(server: &mut Server, table: &str, id: &str) -> i64 { .unwrap_or(0) } +/// Count rows matching the given id using a separate connection. +async fn count_row(server: &mut Server, table: &str, id: &str) -> i64 { + count_where(server, table, &format!("id = {}", id)).await +} + +/// Read `value` for a single row, or `None` if absent. Useful when a count check would +/// silently pass under SET-clause regressions. +async fn fetch_value(server: &mut Server, table: &str, id: &str) -> Option { + let query = format!("SELECT value FROM {} WHERE id = {}", table, id); + let rows: Vec = server.fetch_all(query).await.unwrap(); + rows.first().and_then(|row: &crate::net::DataRow| { + row.column(0) + .map(|col| std::str::from_utf8(&col[..]).unwrap().to_string()) + }) +} + async fn ensure_table(server: &mut Server, table: &str) { match table { "public.sharded" => { @@ -304,6 +328,44 @@ async fn ensure_table(server: &mut Server, table: &str) { .await .unwrap(); } + // Duplicate-row table: no PK, no unique index. + // Allows inserting identical rows to test ctid-based single-row targeting. + "public.full_dup_rows" => { + server + .execute( + "CREATE TABLE IF NOT EXISTS public.full_dup_rows \ + (id BIGINT, value TEXT)", + ) + .await + .unwrap(); + } + // Omni dedup table for ON CONFLICT DO NOTHING coverage. + // Requires a unique index so relation() accepts the omni FULL table. + "public.full_omni_dedup" => { + server + .execute( + "CREATE TABLE IF NOT EXISTS public.full_omni_dedup \ + (a TEXT NOT NULL, b TEXT NOT NULL)", + ) + .await + .unwrap(); + // Idempotently set NOT NULL: tables_missing_unique_index() requires all key columns to be NOT NULL. + // A stale nullable schema from a prior test run would silently fail the omni dedup test. + for col in ["a", "b"] { + let _ = server + .execute(format!( + "ALTER TABLE public.full_omni_dedup ALTER COLUMN {col} SET NOT NULL" + )) + .await; + } + server + .execute( + "CREATE UNIQUE INDEX IF NOT EXISTS full_omni_dedup_ab_idx \ + ON public.full_omni_dedup (a, b)", + ) + .await + .unwrap(); + } _ => (), } } @@ -891,8 +953,7 @@ fn posts_update_title_copy_data(oid: Oid, id: &str, new_title: &str) -> CopyData xlog_copy_data( XLogUpdate { oid, - key: None, - old: None, + identity: UpdateIdentity::Nothing, new: TupleData { columns: vec![text_column(id), text_column(new_title), toasted_column()], }, @@ -1051,8 +1112,7 @@ async fn toast_update_all_toasted_is_noop() { sub.handle(xlog_copy_data( XLogUpdate { oid, - key: None, - old: None, + identity: UpdateIdentity::Nothing, new: TupleData { columns: vec![text_column(&id), toasted_column(), toasted_column()], }, @@ -1096,10 +1156,9 @@ async fn toast_pk_change_with_u_rejects() { let err = sub .handle(x_update(XLogUpdate { oid, - key: Some(TupleData { + identity: UpdateIdentity::Key(TupleData { columns: vec![text_column("old")], }), - old: None, new: TupleData { columns: vec![text_column("new"), toasted_column()], }, @@ -1128,8 +1187,7 @@ async fn update_rejects_toasted_identity_no_key() { let err = sub .handle(x_update(XLogUpdate { oid, - key: None, - old: None, + identity: UpdateIdentity::Nothing, new: TupleData { columns: vec![ toasted_column(), @@ -1161,10 +1219,9 @@ async fn update_rejects_toasted_identity_with_key() { let err = sub .handle(x_update(XLogUpdate { oid, - key: Some(TupleData { + identity: UpdateIdentity::Key(TupleData { columns: vec![text_column("42")], }), - old: None, new: TupleData { columns: vec![ toasted_column(), @@ -1183,3 +1240,925 @@ async fn update_rejects_toasted_identity_with_key() { "got: {err:?}" ); } + +// ── REPLICA IDENTITY FULL tests ────────────────────────────────────────────── + +/// Build a sharded FULL-identity table that maps to `public.sharded`. +/// All columns have `identity = false` (FULL identity has no designated identity cols). +fn make_full_identity_sharded_table() -> Table { + Table { + publication: "test".to_string(), + table: PublicationTable { + schema: "public".to_string(), + name: "sharded".to_string(), + attributes: "".to_string(), + parent_schema: "".to_string(), + parent_name: "".to_string(), + }, + identity: ReplicaIdentity { + oid: Oid(3), + identity: "f".to_string(), + kind: "r".to_string(), + }, + columns: vec![ + PublicationTableColumn { + oid: 3, + name: "id".to_string(), + type_oid: Oid(20), // bigint + identity: false, // FULL: no designated identity columns + }, + PublicationTableColumn { + oid: 3, + name: "value".to_string(), + type_oid: Oid(25), // text + identity: false, + }, + ], + lsn: Lsn::default(), + query_parser_engine: QueryParserEngine::default(), + } +} + +/// Build a NOTHING-identity table — used to verify `relation()` rejects it. +fn make_replica_identity_nothing_table() -> Table { + let mut t = make_full_identity_sharded_table(); + t.identity.identity = "n".to_string(); + t +} + +/// Build an omni FULL-identity table that maps to `public.full_events_omni`. +/// Columns `(a, b)` are not part of the sharding schema → `is_sharded()` returns false. +fn make_full_identity_omni_table() -> Table { + Table { + publication: "test".to_string(), + table: PublicationTable { + schema: "public".to_string(), + name: "full_events_omni".to_string(), + attributes: "".to_string(), + parent_schema: "".to_string(), + parent_name: "".to_string(), + }, + identity: ReplicaIdentity { + oid: Oid(5), + identity: "f".to_string(), + kind: "r".to_string(), + }, + columns: vec![ + PublicationTableColumn { + oid: 5, + name: "a".to_string(), + type_oid: Oid(25), + identity: false, + }, + PublicationTableColumn { + oid: 5, + name: "b".to_string(), + type_oid: Oid(25), + identity: false, + }, + ], + lsn: Lsn::default(), + query_parser_engine: QueryParserEngine::default(), + } +} + +fn full_identity_relation(oid: Oid) -> Relation { + Relation { + oid, + namespace: "public".to_string(), + name: "sharded".to_string(), + replica_identity: b'f' as i8, + columns: vec![ + RelColumn { + flag: 0, + name: "id".to_string(), + oid: Oid(20), + type_modifier: -1, + }, + RelColumn { + flag: 0, + name: "value".to_string(), + oid: Oid(25), + type_modifier: -1, + }, + ], + } +} + +fn full_identity_relation_copy_data(oid: Oid) -> CopyData { + xlog_copy_data(full_identity_relation(oid).to_bytes().unwrap()) +} + +/// Helper: build a FULL-identity UPDATE CopyData. +/// Both old and new tuples share the same column positions. +fn full_update_copy_data( + oid: Oid, + old_id: &str, + old_value: &str, + new_id: &str, + new_value: &str, +) -> CopyData { + x_update(XLogUpdate { + oid, + identity: UpdateIdentity::Old(TupleData { + columns: vec![text_column(old_id), text_column(old_value)], + }), + new: TupleData { + columns: vec![text_column(new_id), text_column(new_value)], + }, + }) +} + +/// FULL-identity UPDATE: `value` Toasted in NEW (unchanged), fully present in OLD. +/// Real WAL shape — PG always materialises OLD inline under REPLICA IDENTITY FULL. +fn full_update_value_toasted_copy_data( + oid: Oid, + old_id: &str, + old_value: &str, + new_id: &str, +) -> CopyData { + x_update(XLogUpdate { + oid, + identity: UpdateIdentity::Old(TupleData { + columns: vec![text_column(old_id), text_column(old_value)], + }), + new: TupleData { + columns: vec![text_column(new_id), toasted_column()], + }, + }) +} + +/// Helper: build a FULL-identity UPDATE where ALL columns are Toasted in new. +fn full_update_all_toasted_copy_data(oid: Oid) -> CopyData { + x_update(XLogUpdate { + oid, + identity: UpdateIdentity::Old(TupleData { + columns: vec![toasted_column(), toasted_column()], + }), + new: TupleData { + columns: vec![toasted_column(), toasted_column()], + }, + }) +} + +/// Helper: FULL-identity DELETE using the full old-row tuple. +fn full_delete_copy_data(oid: Oid, id: &str, value: &str) -> CopyData { + xlog_copy_data( + XLogDelete { + oid, + key: None, + old: Some(TupleData { + columns: vec![text_column(id), text_column(value)], + }), + } + .to_bytes() + .unwrap(), + ) +} + +// ── Helpers for duplicate-row and omni-dedup tests ───────────────────────────────────────────── + +/// Table without a primary key — allows duplicate rows. +/// In the test sharding config so `is_sharded()` returns `true`, bypassing the omni unique-index check. +fn make_full_identity_dup_rows_table() -> Table { + let mut t = make_full_identity_sharded_table(); + t.table.name = "full_dup_rows".to_string(); + t +} + +fn full_dup_rows_relation(oid: Oid) -> Relation { + Relation { + oid, + namespace: "public".to_string(), + name: "full_dup_rows".to_string(), + replica_identity: b'f' as i8, + columns: vec![ + RelColumn { + flag: 0, + name: "id".to_string(), + oid: Oid(20), + type_modifier: -1, + }, + RelColumn { + flag: 0, + name: "value".to_string(), + oid: Oid(25), + type_modifier: -1, + }, + ], + } +} + +fn full_dup_rows_relation_copy_data(oid: Oid) -> CopyData { + xlog_copy_data(full_dup_rows_relation(oid).to_bytes().unwrap()) +} + +/// Omni FULL-identity table with `(a TEXT, b TEXT)` and a unique index on `(a, b)`. +/// A separate table from `full_events_omni` so the no-unique-index rejection test is unaffected. +fn make_full_identity_omni_dedup_table() -> Table { + let mut t = make_full_identity_omni_table(); + t.table.name = "full_omni_dedup".to_string(); + t +} + +fn full_omni_dedup_relation(oid: Oid) -> Relation { + Relation { + oid, + namespace: "public".to_string(), + name: "full_omni_dedup".to_string(), + replica_identity: b'f' as i8, + columns: vec![ + RelColumn { + flag: 0, + name: "a".to_string(), + oid: Oid(25), + type_modifier: -1, + }, + RelColumn { + flag: 0, + name: "b".to_string(), + oid: Oid(25), + type_modifier: -1, + }, + ], + } +} + +fn full_omni_dedup_relation_copy_data(oid: Oid) -> CopyData { + xlog_copy_data(full_omni_dedup_relation(oid).to_bytes().unwrap()) +} + +/// Build an INSERT CopyData for the omni dedup table `(a, b)`. +fn omni_insert_copy_data(oid: Oid, a: &str, b: &str) -> CopyData { + xlog_copy_data( + XLogInsert { + xid: None, + oid, + tuple_data: TupleData { + columns: vec![text_column(a), text_column(b)], + }, + } + .to_bytes() + .unwrap(), + ) +} + +// ── NOTHING rejection ─────────────────────────────────────────────────────────────────────────── + +/// REPLICA IDENTITY NOTHING must be rejected at relation() time. +#[tokio::test] +async fn full_identity_nothing_rejected() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_replica_identity_nothing_table()]); + sub.connect().await.unwrap(); + + let oid = Oid(16390); + // Use the same schema+name as the nothing table so relation() finds it. + let mut rel = full_identity_relation(oid); + rel.name = "sharded".to_string(); + let err = sub + .handle(xlog_copy_data(rel.to_bytes().unwrap())) + .await + .expect_err("REPLICA IDENTITY NOTHING must be rejected"); + assert!( + matches!( + err, + crate::backend::replication::logical::Error::TableValidation(_) + ), + "expected TableValidation error, got: {err:?}" + ); + // Match the exact Display rendering so a future copy edit (sort key, tabs, remediation guidance) + // is caught — mirrors the assertion style of `data_sync_rejects_no_pk_table_before_slots_created`. + assert_eq!( + err.to_string(), + "Table validation failed:\n\ttable \"public\".\"sharded\": REPLICA IDENTITY NOTHING, UPDATE/DELETE carry no row identity and cannot be replicated; set it to DEFAULT, INDEX, or FULL", + "NOTHING rejection message drifted; got: {err}" + ); +} + +// ── Omni no-unique-index rejection ──────────────────────────────────────────────────── + +/// FULL identity omni table without a unique index on the destination must be rejected. +/// `full_events_omni` is absent (or has no qualifying index) — enough for `tables_missing_unique_index()` to return it as missing. +#[tokio::test] +async fn full_identity_omni_no_unique_index_rejected() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_omni_table()]); + + // Enforce precondition: the table must exist but have no qualifying unique index. + // A stale unique index from a prior run would make tables_missing_unique_index() return empty, + // causing expect_err() to panic. Drop and recreate the table to guarantee a clean state. + { + let mut setup = test_server().await; + let _ = setup + .execute("DROP TABLE IF EXISTS public.full_events_omni") + .await; + setup + .execute("CREATE TABLE IF NOT EXISTS public.full_events_omni (a TEXT, b TEXT)") + .await + .unwrap(); + } + + let err = sub + .connect() + .await + .expect_err("omni FULL table without unique index must be rejected at connect time"); + assert!( + matches!( + err, + crate::backend::replication::logical::Error::TableValidation(_) + ), + "expected TableValidation error, got: {err:?}" + ); + assert!( + err.to_string().contains("REPLICA IDENTITY FULL"), + "error message must mention FULL identity, got: {err}" + ); +} + +// ── FULL identity DML tests ─────────────────────────────────────────────────────────── + +/// FULL identity sharded INSERT lands exactly once on the destination. +#[tokio::test] +async fn full_identity_insert_sharded() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut verify = test_server().await; + sub.connect().await.unwrap(); + + let oid = Oid(16384); + let id = random_id(); + cleanup(&mut verify, "public.sharded", &[&id]).await; + + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_identity_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(insert_copy_data(oid, &id, "full_hello")) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + + assert_eq!(count_row(&mut verify, "public.sharded", &id).await, 1); + cleanup(&mut verify, "public.sharded", &[&id]).await; +} + +/// FULL identity fast-path UPDATE: no Toasted columns — UPDATE matches the old row +/// via IS NOT DISTINCT FROM and applies the new values. +#[tokio::test] +async fn full_identity_update_fast_path() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut verify = test_server().await; + sub.connect().await.unwrap(); + + let oid = Oid(16384); + let id = random_id(); + let id2 = random_id(); + cleanup(&mut verify, "public.sharded", &[&id, &id2]).await; + + // Insert the initial row. + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_identity_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(insert_copy_data(oid, &id, "before")) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + assert_eq!(count_row(&mut verify, "public.sharded", &id).await, 1); + + // Update the row: change id from `id` to `id2`, value from "before" to "after". + sub.handle(begin_copy_data(300)).await.unwrap(); + sub.handle(full_update_copy_data(oid, &id, "before", &id2, "after")) + .await + .unwrap(); + sub.handle(commit_copy_data(400)).await.unwrap(); + + assert_eq!( + count_row(&mut verify, "public.sharded", &id).await, + 0, + "old row gone" + ); + assert_eq!( + count_row(&mut verify, "public.sharded", &id2).await, + 1, + "new row present" + ); + // Read back `value` so a SET-clause regression (dropped column / wrong $N) + // is observable. count_row alone would silently pass. + assert_eq!( + fetch_value(&mut verify, "public.sharded", &id2) + .await + .as_deref(), + Some("after"), + "SET clause must update value column" + ); + + cleanup(&mut verify, "public.sharded", &[&id, &id2]).await; +} + +/// FULL identity slow-path UPDATE: `value` is Toasted (unchanged), only `id` present. +/// Verifies the shape cache is populated and the partial UPDATE executes without error. +#[tokio::test] +async fn full_identity_update_slow_path() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut verify = test_server().await; + sub.connect().await.unwrap(); + + let oid = Oid(16384); + let id = random_id(); + let id2 = random_id(); + cleanup(&mut verify, "public.sharded", &[&id, &id2]).await; + + // Insert initial row. + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_identity_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(insert_copy_data(oid, &id, "initial")) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + + // Rename id → id2; value Toasted in NEW (unchanged), inline in OLD. + // Using distinct id2 forces a real row rename so assertions are non-trivial. + sub.handle(begin_copy_data(300)).await.unwrap(); + sub.handle(full_update_value_toasted_copy_data( + oid, &id, "initial", &id2, + )) + .await + .unwrap(); + sub.handle(commit_copy_data(400)).await.unwrap(); + + // Old row must be gone. + assert_eq!( + count_row(&mut verify, "public.sharded", &id).await, + 0, + "original id row must be gone after rename" + ); + // New row must exist. + assert_eq!( + count_row(&mut verify, "public.sharded", &id2).await, + 1, + "renamed id2 row must be present" + ); + // Toasted `value` must survive the rename — a regression that drops or zeroes the + // toasted column would produce NULL or an empty string here. + assert_eq!( + fetch_value(&mut verify, "public.sharded", &id2) + .await + .as_deref(), + Some("initial"), + "unchanged-TOAST column must be preserved across slow-path UPDATE" + ); + + cleanup(&mut verify, "public.sharded", &[&id, &id2]).await; +} + +/// Regression: real PG WAL never has `'u'` markers in OLD under REPLICA IDENTITY FULL +/// (PG calls `toast_flatten_tuple` on OLD before WAL-logging). Only NEW carries `'u'`. +/// Exercises the path the prior buggy `old.without_toasted()` failed on (n+k bind vs 2k SQL). +#[tokio::test] +async fn full_identity_update_slow_path_realistic_old_tuple() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut verify = test_server().await; + sub.connect().await.unwrap(); + + let oid = Oid(16384); + let id = random_id(); + cleanup(&mut verify, "public.sharded", &[&id]).await; + + // Seed the row. + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_identity_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(insert_copy_data(oid, &id, "initial")) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + + // Realistic UPDATE shape produced by PG: OLD has every column inline, + // NEW marks the unchanged `value` column as 'u'. + let realistic = x_update(XLogUpdate { + oid, + identity: UpdateIdentity::Old(TupleData { + columns: vec![text_column(&id), text_column("initial")], + }), + new: TupleData { + columns: vec![text_column(&id), toasted_column()], + }, + }); + + sub.handle(begin_copy_data(300)).await.unwrap(); + let result = sub.handle(realistic).await; + // Drain the commit so the connection state is clean even on failure. + let _ = sub.handle(commit_copy_data(400)).await; + + result.unwrap(); + + assert_eq!( + count_row(&mut verify, "public.sharded", &id).await, + 1, + "row must still exist after slow-path UPDATE" + ); + assert_eq!( + fetch_value(&mut verify, "public.sharded", &id) + .await + .as_deref(), + Some("initial"), + "unchanged-TOAST `value` must be preserved across slow-path UPDATE" + ); + + cleanup(&mut verify, "public.sharded", &[&id]).await; +} + +/// FULL identity UPDATE where every column is Toasted: nothing to do, skip silently. +#[tokio::test] +async fn full_identity_update_all_toasted_is_noop() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut verify = test_server().await; + sub.connect().await.unwrap(); + + let oid = Oid(16384); + let id = random_id(); + cleanup(&mut verify, "public.sharded", &[&id]).await; + + // Insert initial row. + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_identity_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(insert_copy_data(oid, &id, "stable")) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + + // All columns Toasted: no-op, must not error, row must be untouched. + sub.handle(begin_copy_data(300)).await.unwrap(); + sub.handle(full_update_all_toasted_copy_data(oid)) + .await + .unwrap(); + sub.handle(commit_copy_data(400)).await.unwrap(); + + assert_eq!(count_row(&mut verify, "public.sharded", &id).await, 1); + // Value column must be untouched — a no-op that silently zeros a column would + // still satisfy the count check but would fail here. + assert_eq!( + fetch_value(&mut verify, "public.sharded", &id) + .await + .as_deref(), + Some("stable"), + "all-toasted no-op must leave value column untouched" + ); + cleanup(&mut verify, "public.sharded", &[&id]).await; +} + +/// FULL identity DELETE: matches old-row tuple via IS NOT DISTINCT FROM on all columns. +#[tokio::test] +async fn full_identity_delete() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut verify = test_server().await; + sub.connect().await.unwrap(); + + let oid = Oid(16384); + let id = random_id(); + cleanup(&mut verify, "public.sharded", &[&id]).await; + + // Insert row. + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_identity_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(insert_copy_data(oid, &id, "to_delete")) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + assert_eq!(count_row(&mut verify, "public.sharded", &id).await, 1); + + // Delete via full old-row match. + sub.handle(begin_copy_data(300)).await.unwrap(); + sub.handle(full_delete_copy_data(oid, &id, "to_delete")) + .await + .unwrap(); + sub.handle(commit_copy_data(400)).await.unwrap(); + + assert_eq!(count_row(&mut verify, "public.sharded", &id).await, 0); + + cleanup(&mut verify, "public.sharded", &[&id]).await; +} + +// ── Omni dedup test ──────────────────────────────────────────────────────────────────────── + +/// FULL identity omni INSERT: verifies `ON CONFLICT DO NOTHING` deduplication during +/// the COPY-to-replication overlap window — same row inserted twice must land once. +#[tokio::test] +async fn full_identity_insert_omni_dedup() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_omni_dedup_table()]); + let mut verify = test_server().await; + + // Ensure destination table exists with unique index before relation() runs. + ensure_table(&mut verify, "public.full_omni_dedup").await; + verify + .execute("DELETE FROM public.full_omni_dedup") + .await + .unwrap(); + + sub.connect().await.unwrap(); + + let oid = Oid(16400); + // Send relation — tables_missing_unique_index() must return empty or relation() rejects. + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_omni_dedup_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + + // First INSERT: row lands. + sub.handle(begin_copy_data(300)).await.unwrap(); + sub.handle(omni_insert_copy_data(oid, "hello", "world")) + .await + .unwrap(); + sub.handle(commit_copy_data(400)).await.unwrap(); + + let predicate = "a = 'hello' AND b = 'world'"; + let count = count_where(&mut verify, "public.full_omni_dedup", predicate).await; + assert_eq!(count, 1, "first INSERT must land"); + + // Second INSERT: same values — ON CONFLICT DO NOTHING, count stays at 1. + sub.handle(begin_copy_data(500)).await.unwrap(); + sub.handle(omni_insert_copy_data(oid, "hello", "world")) + .await + .unwrap(); + sub.handle(commit_copy_data(600)).await.unwrap(); + + let count = count_where(&mut verify, "public.full_omni_dedup", predicate).await; + assert_eq!( + count, 1, + "second INSERT must be silently skipped by ON CONFLICT DO NOTHING" + ); + + verify + .execute("DELETE FROM public.full_omni_dedup") + .await + .unwrap(); +} + +// ── Duplicate-row handling tests ────────────────────────────────────────────────────────────────────────────────── + +/// FULL identity UPDATE on a table with two identical rows must succeed and affect exactly one row. +/// With REPLICA IDENTITY FULL, Postgres materialises all TOAST values into the WAL record, so the +/// old tuple is always complete. Two rows matching the old tuple are byte-for-byte identical; +/// the ctid-based WHERE targets one of them, which is semantically correct. +#[tokio::test] +async fn full_identity_update_duplicate_rows() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]); + let mut verify = test_server().await; + + ensure_table(&mut verify, "public.full_dup_rows").await; + + let id = random_id(); + // Clean slate. + verify + .execute(format!("DELETE FROM public.full_dup_rows WHERE id = {id}")) + .await + .unwrap(); + + sub.connect().await.unwrap(); + + let oid = Oid(16401); + + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_dup_rows_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + + // Seed two identical rows directly. + verify + .execute(format!( + "INSERT INTO public.full_dup_rows VALUES ({id}, 'dup')" + )) + .await + .unwrap(); + verify + .execute(format!( + "INSERT INTO public.full_dup_rows VALUES ({id}, 'dup')" + )) + .await + .unwrap(); + + // FULL UPDATE WAL event: old = (id, 'dup'), new = (id2, 'changed'). + // The ctid subquery must target exactly one of the two identical rows. + let id2 = random_id(); + sub.handle(begin_copy_data(300)).await.unwrap(); + sub.handle(full_update_copy_data(oid, &id, "dup", &id2, "changed")) + .await + .unwrap(); + sub.handle(commit_copy_data(400)).await.unwrap(); + + // Exactly one row was updated: the old (id, 'dup') row remains, the other became (id2, 'changed'). + assert_eq!( + count_row(&mut verify, "public.full_dup_rows", &id).await, + 1, + "exactly one of the two duplicate rows must have been updated" + ); + + // Cleanup. + verify + .execute(format!( + "DELETE FROM public.full_dup_rows WHERE id IN ({id}, {id2})" + )) + .await + .unwrap(); +} + +/// FULL identity DELETE on a table with two identical rows must succeed and remove exactly one row. +/// Same rationale as the UPDATE variant: ctid targets one byte-for-byte identical row. +#[tokio::test] +async fn full_identity_delete_duplicate_rows() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]); + let mut verify = test_server().await; + + ensure_table(&mut verify, "public.full_dup_rows").await; + + let id = random_id(); + verify + .execute(format!("DELETE FROM public.full_dup_rows WHERE id = {id}")) + .await + .unwrap(); + + sub.connect().await.unwrap(); + + let oid = Oid(16402); + + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_dup_rows_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + + // Seed two identical rows. + verify + .execute(format!( + "INSERT INTO public.full_dup_rows VALUES ({id}, 'dup')" + )) + .await + .unwrap(); + verify + .execute(format!( + "INSERT INTO public.full_dup_rows VALUES ({id}, 'dup')" + )) + .await + .unwrap(); + + // FULL DELETE: old = (id, 'dup') — ctid must remove exactly one of the two identical rows. + sub.handle(begin_copy_data(300)).await.unwrap(); + sub.handle(full_delete_copy_data(oid, &id, "dup")) + .await + .unwrap(); + sub.handle(commit_copy_data(400)).await.unwrap(); + + // Exactly one row deleted: one (id, 'dup') row must remain. + assert_eq!( + count_row(&mut verify, "public.full_dup_rows", &id).await, + 1, + "exactly one of the two duplicate rows must have been deleted" + ); + + // Cleanup. + verify + .execute(format!("DELETE FROM public.full_dup_rows WHERE id = {id}")) + .await + .unwrap(); +} + +// ── NULL-column FULL identity matching ───────────────────────────────────────────────────────── + +/// FULL identity UPDATE/DELETE matches a row whose `value` column is NULL. +/// +/// `IS NOT DISTINCT FROM` is required for this case — plain `=` on NULL evaluates to NULL +/// (not TRUE), so the WHERE clause would never match a NULL-valued row. A regression that +/// swapped the operator back to `=` would miss every NULL-keyed row and silently drop the +/// event. count_row alone in other FULL tests would not catch this. +#[tokio::test] +async fn full_identity_update_matches_null_column() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]); + let mut verify = test_server().await; + + // full_dup_rows has no NOT NULL on value — we can seed a NULL row. + ensure_table(&mut verify, "public.full_dup_rows").await; + + let id = random_id(); + verify + .execute(format!("DELETE FROM public.full_dup_rows WHERE id = {id}")) + .await + .unwrap(); + verify + .execute(format!( + "INSERT INTO public.full_dup_rows VALUES ({id}, NULL)" + )) + .await + .unwrap(); + + sub.connect().await.unwrap(); + let oid = Oid(16410); + + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_dup_rows_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + + // FULL UPDATE: old = (id, NULL), new = (id, "filled"). The WHERE clause must use + // IS NOT DISTINCT FROM so NULL participates in the match. + sub.handle(begin_copy_data(300)).await.unwrap(); + sub.handle(x_update(XLogUpdate { + oid, + identity: UpdateIdentity::Old(TupleData { + columns: vec![text_column(&id), null_column()], + }), + new: TupleData { + columns: vec![text_column(&id), text_column("filled")], + }, + })) + .await + .unwrap(); + sub.handle(commit_copy_data(400)).await.unwrap(); + + assert_eq!( + fetch_value(&mut verify, "public.full_dup_rows", &id) + .await + .as_deref(), + Some("filled"), + "FULL identity UPDATE must match NULL via IS NOT DISTINCT FROM" + ); + + verify + .execute(format!("DELETE FROM public.full_dup_rows WHERE id = {id}")) + .await + .unwrap(); +} + +/// FULL identity DELETE removes a row whose value column is NULL. +#[tokio::test] +async fn full_identity_delete_matches_null_column() { + let cluster = Cluster::new_test_single_shard(&config()); + let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]); + let mut verify = test_server().await; + + ensure_table(&mut verify, "public.full_dup_rows").await; + + let id = random_id(); + verify + .execute(format!("DELETE FROM public.full_dup_rows WHERE id = {id}")) + .await + .unwrap(); + verify + .execute(format!( + "INSERT INTO public.full_dup_rows VALUES ({id}, NULL)" + )) + .await + .unwrap(); + + sub.connect().await.unwrap(); + let oid = Oid(16411); + + sub.handle(begin_copy_data(100)).await.unwrap(); + sub.handle(full_dup_rows_relation_copy_data(oid)) + .await + .unwrap(); + sub.handle(commit_copy_data(200)).await.unwrap(); + + // DELETE with old = (id, NULL). + sub.handle(begin_copy_data(300)).await.unwrap(); + sub.handle(xlog_copy_data( + XLogDelete { + oid, + key: None, + old: Some(TupleData { + columns: vec![text_column(&id), null_column()], + }), + } + .to_bytes() + .unwrap(), + )) + .await + .unwrap(); + sub.handle(commit_copy_data(400)).await.unwrap(); + + assert_eq!( + count_row(&mut verify, "public.full_dup_rows", &id).await, + 0, + "FULL identity DELETE must match NULL via IS NOT DISTINCT FROM" + ); +} diff --git a/pgdog/src/frontend/router/parser/query/test/test_dml.rs b/pgdog/src/frontend/router/parser/query/test/test_dml.rs index cb085b75e..be197167f 100644 --- a/pgdog/src/frontend/router/parser/query/test/test_dml.rs +++ b/pgdog/src/frontend/router/parser/query/test/test_dml.rs @@ -65,3 +65,25 @@ fn test_select_for_update() { assert!(matches!(command.route().shard(), Shard::Direct(_))); assert!(command.route().is_write()); } + +#[test] +fn test_update_is_not_distinct_from_routes_to_shard() { + // IS NOT DISTINCT FROM must route the same as = for shard-key extraction. + let mut test = QueryParserTest::new(); + let command = test.execute(vec![ + Parse::named( + "__test_indf", + "UPDATE sharded SET email = $2 WHERE id IS NOT DISTINCT FROM $1", + ) + .into(), + Bind::new_params( + "__test_indf", + &[Parameter::new(b"1"), Parameter::new(b"test@test.com")], + ) + .into(), + Execute::new().into(), + Sync.into(), + ]); + assert!(matches!(command.route().shard(), Shard::Direct(_))); + assert!(command.route().is_write()); +} diff --git a/pgdog/src/frontend/router/parser/statement.rs b/pgdog/src/frontend/router/parser/statement.rs index cec05c806..dd45d0b9f 100644 --- a/pgdog/src/frontend/router/parser/statement.rs +++ b/pgdog/src/frontend/router/parser/statement.rs @@ -1110,22 +1110,22 @@ impl<'a, 'b, 'c> StatementParser<'a, 'b, 'c> { Some(NodeEnum::AExpr(ref expr)) => { let kind = expr.kind(); - let mut supported = false; - - if matches!( - kind, - AExprKind::AexprOp | AExprKind::AexprIn | AExprKind::AexprOpAny - ) { - supported = expr - .name - .first() - .map(|node| match node.node { - Some(NodeEnum::String(ref string)) => string.sval.as_str(), - _ => "", - }) - .unwrap_or_default() - == "="; - } + let supported = match kind { + // Kind carries the full semantic; no operator name to check. + AExprKind::AexprNotDistinct => true, + // Operator-based kinds: accept equality only. + AExprKind::AexprOp | AExprKind::AexprIn | AExprKind::AexprOpAny => { + expr.name + .first() + .map(|node| match node.node { + Some(NodeEnum::String(ref string)) => string.sval.as_str(), + _ => "", + }) + .unwrap_or_default() + == "=" + } + _ => false, + }; if !supported { return Ok(SearchResult::None); diff --git a/pgdog/src/net/error.rs b/pgdog/src/net/error.rs index 7d6ce0b39..13113ce60 100644 --- a/pgdog/src/net/error.rs +++ b/pgdog/src/net/error.rs @@ -102,6 +102,11 @@ pub enum Error { #[error("{0}")] TypeError(#[from] pgdog_postgres_types::Error), + + /// Internal invariant violated or API misused — not a network error. + /// Carry enough context to identify the violation without a debugger. + #[error("invariant violation: {0}")] + InvariantViolation(String), } impl Error { diff --git a/pgdog/src/net/messages/replication/logical/tuple_data.rs b/pgdog/src/net/messages/replication/logical/tuple_data.rs index 4f8077641..1cdfe2b15 100644 --- a/pgdog/src/net/messages/replication/logical/tuple_data.rs +++ b/pgdog/src/net/messages/replication/logical/tuple_data.rs @@ -116,6 +116,68 @@ impl TupleData { .iter() .any(|c| c.identifier == Identifier::Toasted) } + + /// Are every column in this tuple unchanged-TOAST (`'u'`)? + /// + /// True when nothing changed — used to detect no-op UPDATEs before routing. + pub fn all_toasted(&self) -> bool { + self.columns + .iter() + .all(|c| c.identifier == Identifier::Toasted) + } + + /// Return a copy with unchanged-TOAST (`'u'`) columns removed. + pub fn without_toasted(&self) -> TupleData { + TupleData { + columns: self + .columns + .iter() + .filter(|c| c.identifier != Identifier::Toasted) + .cloned() + .collect(), + } + } + + /// Return a copy of `self` with every `Toasted` (`'u'`) column replaced by the + /// corresponding column from `source`. + /// + /// Used when a REPLICA IDENTITY FULL cross-shard UPDATE needs to reconstruct the + /// complete new row. With FULL identity, `source` (the old tuple) is always fully + /// materialised — Postgres flattens every TOAST reference before writing the WAL + /// record. `'u'` columns in the new tuple are unchanged, so their values are + /// available in the old tuple at the same position. + /// + /// Both tuples must have the same column count; mismatches are a schema-change + /// race that is not recoverable here and should surface as an upstream error. + pub fn fill_toasted_from(&self, source: &TupleData) -> Result { + if self.columns.len() != source.columns.len() { + return Err(Error::InvariantViolation(format!( + "fill_toasted_from: column count mismatch ({} vs {}); schema-change race?", + self.columns.len(), + source.columns.len(), + ))); + } + let columns = self + .columns + .iter() + .zip(source.columns.iter()) + .map(|(new_col, old_col)| { + if new_col.identifier == Identifier::Toasted { + if old_col.identifier == Identifier::Toasted { + return Err(Error::InvariantViolation( + "fill_toasted_from: source column is Toasted; \ + FULL identity guarantees old tuple is always materialised" + .into(), + )); + } + Ok(old_col.clone()) + } else { + Ok(new_col.clone()) + } + }) + .collect::, _>>()?; + Ok(TupleData { columns }) + } } /// Explains what's inside the column. @@ -264,4 +326,71 @@ mod test { let c = toasted_col(); assert_eq!(c.to_sql().unwrap(), ""); } + + #[test] + fn fill_toasted_from_replaces_u_columns() { + // new tuple: col0 present, col1 toasted, col2 present. + let new = TupleData { + columns: vec![text_col("a"), toasted_col(), text_col("c")], + }; + // old tuple: all columns materialised (REPLICA IDENTITY FULL guarantee). + let old = TupleData { + columns: vec![text_col("a_old"), text_col("b_old"), text_col("c_old")], + }; + let filled = new.fill_toasted_from(&old).unwrap(); + // col0: from new (unchanged — was present) + assert_eq!(filled.columns[0].as_str(), Some("a")); + // col1: from old (was toasted in new) + assert_eq!(filled.columns[1].as_str(), Some("b_old")); + // col2: from new + assert_eq!(filled.columns[2].as_str(), Some("c")); + assert!( + !filled.has_toasted(), + "filled tuple must contain no toasted markers" + ); + } + + #[test] + fn fill_toasted_from_all_present_is_identity() { + // When no columns are toasted, the result equals the original. + let new = TupleData { + columns: vec![text_col("x"), text_col("y")], + }; + let old = TupleData { + columns: vec![text_col("x_old"), text_col("y_old")], + }; + let filled = new.fill_toasted_from(&old).unwrap(); + assert_eq!(filled.columns[0].as_str(), Some("x")); + assert_eq!(filled.columns[1].as_str(), Some("y")); + } + + #[test] + fn fill_toasted_from_all_toasted_uses_old() { + // When every column is toasted, the result equals the old tuple. + let new = TupleData { + columns: vec![toasted_col(), toasted_col()], + }; + let old = TupleData { + columns: vec![text_col("p"), text_col("q")], + }; + let filled = new.fill_toasted_from(&old).unwrap(); + assert_eq!(filled.columns[0].as_str(), Some("p")); + assert_eq!(filled.columns[1].as_str(), Some("q")); + assert!(!filled.has_toasted()); + } + + #[test] + fn fill_toasted_from_column_count_mismatch_is_err() { + // self has 2 columns, source has 1 — schema-change race; must not panic. + let new = TupleData { + columns: vec![text_col("a"), toasted_col()], + }; + let old = TupleData { + columns: vec![text_col("x")], + }; + assert!( + new.fill_toasted_from(&old).is_err(), + "column count mismatch must return Err, not panic" + ); + } } diff --git a/pgdog/src/net/messages/replication/logical/update.rs b/pgdog/src/net/messages/replication/logical/update.rs index 41a19dfa6..bdcb6ab25 100644 --- a/pgdog/src/net/messages/replication/logical/update.rs +++ b/pgdog/src/net/messages/replication/logical/update.rs @@ -2,15 +2,25 @@ use pgdog_postgres_types::Oid; use super::super::super::code; use super::super::super::prelude::*; -use super::tuple_data::{Column, Identifier, TupleData}; +use super::tuple_data::{Column, TupleData}; + +/// Pre-image in a WAL UPDATE record — exactly one variant per record: +/// - `Key` — byte `'K'`: identity index changed; old key columns sent. +/// - `Old` — byte `'O'`: `REPLICA IDENTITY FULL`; full old row sent. +/// - `Nothing` — no K/O block precedes the new-tuple marker; the pre-image is absent. +#[derive(Debug, Clone)] +pub enum UpdateIdentity { + Key(TupleData), + Old(TupleData), + Nothing, +} /// WAL UPDATE record. Use with [`Table::update`](crate::backend::replication::logical::publisher::Table::update) /// or [`Table::update_partial`](crate::backend::replication::logical::publisher::Table::update_partial). #[derive(Debug, Clone)] pub struct Update { pub oid: Oid, - pub key: Option, - pub old: Option, + pub identity: UpdateIdentity, pub new: TupleData, } @@ -20,15 +30,23 @@ impl Update { self.new.columns.get(index) } - /// Filters unchanged-TOAST columns out of `new` for use with - /// [`Table::update_partial`](crate::backend::replication::logical::publisher::Table::update_partial). + /// Filters unchanged-TOAST (`'u'`) columns out of `new`. + /// + /// In a WAL UPDATE record, columns whose value did not change are sent as `'u'` + /// (Toasted/unchanged) in the new tuple — the value is not included in the record. + /// Stripping them yields only the columns that were actually modified. pub fn partial_new(&self) -> TupleData { + self.new.without_toasted() + } + + /// Concatenate `where_cols` then `set_cols` into one `TupleData` for a FULL-identity UPDATE. + /// WHERE params (`$1..$k`) come from `where_cols`; SET params (`$k+1..$n`) from `set_cols`. + pub fn full_identity_bind_tuple(where_cols: &TupleData, set_cols: &TupleData) -> TupleData { TupleData { - columns: self - .new + columns: where_cols .columns .iter() - .filter(|c| c.identifier != Identifier::Toasted) + .chain(set_cols.columns.iter()) .cloned() .collect(), } @@ -41,28 +59,23 @@ impl FromBytes for Update { let oid = Oid(bytes.get_u32()); let identifier = bytes.get_u8() as char; - let key = if identifier == 'K' { - let key = TupleData::from_buffer(&mut bytes)?; - Some(key) - } else { - None + let identity = match identifier { + 'K' => UpdateIdentity::Key(TupleData::from_buffer(&mut bytes)?), + 'O' => UpdateIdentity::Old(TupleData::from_buffer(&mut bytes)?), + 'N' => UpdateIdentity::Nothing, + other => return Err(Error::UnexpectedMessage('N', other)), }; - let old = if identifier == 'O' { - let old = TupleData::from_buffer(&mut bytes)?; - Some(old) - } else { - None - }; - - let new = if identifier == 'N' { + // 'K' and 'O' are followed by the 'N' marker that introduces the new tuple. + // For Nothing, the identifier byte we already consumed *was* that 'N'. + let new = if matches!(identity, UpdateIdentity::Nothing) { TupleData::from_bytes(bytes)? } else { code!(bytes, 'N'); TupleData::from_bytes(bytes)? }; - Ok(Self { oid, key, old, new }) + Ok(Self { oid, identity, new }) } } @@ -72,12 +85,16 @@ impl ToBytes for Update { let mut buf = bytes::BytesMut::new(); buf.put_u8(b'U'); buf.put_u32(self.oid.0); - if let Some(ref key) = self.key { - buf.put_u8(b'K'); - buf.put(key.to_bytes()?); - } else if let Some(ref old) = self.old { - buf.put_u8(b'O'); - buf.put(old.to_bytes()?); + match &self.identity { + UpdateIdentity::Key(key) => { + buf.put_u8(b'K'); + buf.put(key.to_bytes()?); + } + UpdateIdentity::Old(old) => { + buf.put_u8(b'O'); + buf.put(old.to_bytes()?); + } + UpdateIdentity::Nothing => {} } buf.put_u8(b'N'); buf.put(self.new.to_bytes()?); @@ -89,23 +106,32 @@ impl ToBytes for Update { mod test { use super::*; use crate::net::messages::replication::logical::tuple_data::{ - text_col, toasted_col, TupleData, + text_col, toasted_col, Identifier, TupleData, }; use pgdog_postgres_types::Oid; - fn update(columns: Vec) -> Update { + fn make_update(new_cols: Vec) -> Update { Update { oid: Oid(1), - key: None, - old: None, - new: TupleData { columns }, + identity: UpdateIdentity::Nothing, + new: TupleData { columns: new_cols }, } } + fn make_update_with_old(new_cols: Vec, old_cols: Vec) -> Update { + Update { + oid: Oid(1), + identity: UpdateIdentity::Old(TupleData { columns: old_cols }), + new: TupleData { columns: new_cols }, + } + } + + // ── partial_new ─────────────────────────────────────────────────────── + #[test] fn partial_new_all_present() { // No TOAST columns — every column passes through unchanged. - let result = update(vec![text_col("1"), text_col("x"), text_col("y")]).partial_new(); + let result = make_update(vec![text_col("1"), text_col("x"), text_col("y")]).partial_new(); assert_eq!(result.columns.len(), 3); assert!(result .columns @@ -120,7 +146,7 @@ mod test { #[test] fn partial_new_filters_toasted_columns() { // id, a, b (TOAST), c — b is dropped; bind order is id($1), a($2), c($3). - let bind = update(vec![ + let bind = make_update(vec![ text_col("42"), text_col("aa"), toasted_col(), @@ -138,7 +164,7 @@ mod test { #[test] fn partial_new_identity_in_middle() { // a, id (middle), b (TOAST) — b is dropped; bind order is a($1), id($2). - let bind = update(vec![text_col("av"), text_col("1"), toasted_col()]) + let bind = make_update(vec![text_col("av"), text_col("1"), toasted_col()]) .partial_new() .to_bind("__pgdog"); assert_eq!(bind.statement(), "__pgdog"); @@ -150,7 +176,7 @@ mod test { #[test] fn partial_new_multiple_identity_columns() { // id1, id2, a (TOAST), b — a is dropped; bind order is id1($1), id2($2), b($3). - let bind = update(vec![ + let bind = make_update(vec![ text_col("1"), text_col("2"), toasted_col(), @@ -168,7 +194,7 @@ mod test { #[test] fn partial_new_two_identity_interleaved_two_toasted() { // a (TOAST), id1, b, c (TOAST), id2 — a and c dropped; bind order is id1($1), b($2), id2($3). - let bind = update(vec![ + let bind = make_update(vec![ toasted_col(), // a — dropped text_col("1"), // id1 text_col("bv"), // b @@ -183,4 +209,81 @@ mod test { assert_eq!(bind.parameter(2).unwrap().unwrap().bigint(), Some(2)); // id2 assert!(matches!(bind.parameter(3), Ok(None) | Err(_))); // no 4th param } + + /// Assert that every column in `got` has the same `Identifier` as the corresponding + /// column in `want`. Catches byte-level regressions in column-marker serialization + /// (e.g. swapping `'t'`/`'b'`/`'n'`/`'u'`) that a length-only check would miss. + fn assert_columns_match(got: &TupleData, want: &TupleData, label: &str) { + assert_eq!( + got.columns.len(), + want.columns.len(), + "{label}: column count mismatch" + ); + for (i, (g, w)) in got.columns.iter().zip(want.columns.iter()).enumerate() { + assert_eq!( + g.identifier, w.identifier, + "{label}[{i}] identifier mismatch" + ); + } + } + + fn assert_round_trip(u: &Update) { + let bytes = u.to_bytes().unwrap(); + let parsed = Update::from_bytes(bytes).unwrap(); + assert_eq!(parsed.oid, u.oid); + assert_columns_match(&parsed.new, &u.new, "new.columns"); + match (&parsed.identity, &u.identity) { + (UpdateIdentity::Key(a), UpdateIdentity::Key(b)) + | (UpdateIdentity::Old(a), UpdateIdentity::Old(b)) => { + assert_columns_match(a, b, "identity.columns"); + } + (UpdateIdentity::Nothing, UpdateIdentity::Nothing) => {} + _ => panic!("identity variant changed across round-trip"), + } + } + + #[test] + fn round_trip_nothing() { + assert_round_trip(&make_update(vec![text_col("v")])); + } + + #[test] + fn round_trip_key() { + let u = Update { + oid: Oid(7), + identity: UpdateIdentity::Key(TupleData { + columns: vec![text_col("42")], + }), + new: TupleData { + columns: vec![text_col("42"), text_col("new")], + }, + }; + assert_round_trip(&u); + } + + #[test] + fn round_trip_old() { + let u = make_update_with_old( + vec![text_col("a"), text_col("b")], + vec![text_col("x"), text_col("y")], + ); + assert_round_trip(&u); + } + + #[test] + fn from_bytes_rejects_unknown_marker() { + // 'U' + oid(4) + 'X' (not K/O/N) + … must error rather than silently fall through. + let mut buf = bytes::BytesMut::new(); + use bytes::BufMut; + buf.put_u8(b'U'); + buf.put_u32(1); + buf.put_u8(b'X'); + // Plausible TupleData header so the test fails because of marker, not framing. + buf.put_i16(0); + let err = Update::from_bytes(buf.freeze()); + assert!( + matches!(err, Err(Error::UnexpectedMessage('N', 'X'))), + "got: {err:?}" + ); + } } diff --git a/pgdog/src/net/messages/replication/mod.rs b/pgdog/src/net/messages/replication/mod.rs index 1f04ead99..28cb37c10 100644 --- a/pgdog/src/net/messages/replication/mod.rs +++ b/pgdog/src/net/messages/replication/mod.rs @@ -14,7 +14,7 @@ pub use logical::relation::Relation; pub use logical::stream_start::StreamStart; pub use logical::truncate::Truncate; pub use logical::tuple_data::TupleData; -pub use logical::update::Update; +pub use logical::update::{Update, UpdateIdentity}; pub use status_update::StatusUpdate; pub use xlog_data::XLogData;