[fix](streamingjob) fix postgres DML silently dropped on task restart#61481
[fix](streamingjob) fix postgres DML silently dropped on task restart#61481JNSimba wants to merge 2 commits intoapache:masterfrom
Conversation
|
run buildall |
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
TPC-H: Total hot run time: 27048 ms |
TPC-DS: Total hot run time: 168708 ms |
FE UT Coverage ReportIncrement line coverage |
|
run external |
|
/review |
There was a problem hiding this comment.
Pull request overview
Fixes an intermittent Postgres CDC failure where the first DML after a task restart could be silently dropped due to Debezium’s WalPositionLocator behavior in pgoutput non-streaming mode.
Changes:
- Override
extractBinlogStateOffset()inPostgresSourceReaderto removelsn_proc/lsn_commitkeys before Debezium consumes the offset. - Clarify a log message in
StreamingMultiTblTaskto accurately reflect “timeout reason”.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java | Adjusts Postgres offset passed to Debezium to avoid WalPositionLocator incorrectly filtering DML on restart. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java | Fixes misleading error log text for timeout-reason retrieval failures. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
...nt/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
Show resolved
Hide resolved
...nt/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
Show resolved
Hide resolved
...re/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
Show resolved
Hide resolved
Code Review SummaryPR: fix fix postgres DML silently dropped on task restart OverviewThis PR fixes a bug where the first DML of a new Postgres streaming transaction is intermittently dropped (10-20% failure rate) on task restart, with no error logged. The fix strips Critical Checkpoint Conclusions
VerdictNo blocking issues. The fix is correct, well-documented, and safe for production. The only suggestion is to consider adding a unit test for the |
|
run external |
What problem does this PR solve?
Problem
When a streaming job restarts a task, the first DML of the new transaction
is occasionally silently dropped (10-20% failure rate). The affected record
never appears in the Doris target table, with no error logged — only
"identified as already processed" in cdc-client.log.
Root Cause
debezium 1.9.x hardcodes
proto_version=1(non-streaming pgoutput) for allPG versions. In non-streaming mode, the walsender batches all changes of a
transaction and sends them after COMMIT, and all messages (BEGIN + DML) share
the same
XLogData.data_start= the transaction'sbegin_lsn.When this
begin_lsnequals the previous transaction'scommit_lsn(i.e.the two transactions are adjacent in WAL with no other writes between them),
WalPositionLocatorbehaves incorrectly:Find phase:
COMMIT(T1)atlsn=YsetsstoreLsnAfterLastEventStoredLsn=true.BEGIN(T2)andINSERT(T2)both havelsn=Y, so they keep returningOptional.empty(). OnlyCOMMIT(T2)atlsn=ZsetsstartStreamingLsn=Z, withlsnSeen={Y, Z}.Actual streaming:
INSERT(T2)arrives withlastReceiveLsn=Y.skipMessage(Y):Y ∈ lsnSeenandY ≠ startStreamingLsn(Z)→ filtered.The bug is intermittent because it only triggers when no other WAL activity
(autovacuum, other connections) occurs between the two transactions.
Fix
Override
extractBinlogStateOffset()inPostgresSourceReaderto striplsn_procandlsn_commitfrom the offset before it is passed to debezium.This constructs
WalPositionLocator(lastCommitStoredLsn=null, lsn=Y), whichcauses the find phase to exit immediately at the first received message
(
startStreamingLsn=Y). In actual streaming,COMMIT(T1)triggersswitch-off (
lastReceiveLsn=Y = startStreamingLsn), and all subsequentmessages including
INSERT(T2)pass through.See https://issues.apache.org/jira/browse/FLINK-39265.
Test
Run
test_streaming_postgres_jobmultiple times. Before this fix the'Apache' assertion fails ~10-20% of the time; after this fix it passes
consistently.
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)