feat(utilities): propagate per-partition event-time watermarks from upstream Hudi sources#18802
Conversation
…pstream Hudi sources Part of apache#17512 (Phase 2 of the reconcile plan). Builds on PR apache#18778 (Phase 1) which exposed per-partition event-time rollup on HoodieCommitMetadata and decoupled watermark tracking from EVENT_TIME_ORDERING. When a derived table is built from a Hudi source via HoodieIncrSource and the user opts into hoodie.write.track.event.time.propagate.from.upstream, fold each upstream commit's per-partition (min, max) event time into the downstream commit's per-partition write stats. Derived tables inherit upstream freshness without retaining the event-time column. Changes: - HoodieWriteConfig.TRACK_EVENT_TIME_PROPAGATE_FROM_UPSTREAM: new opt-in config, default false, advanced, since 1.2.0. - ConfigUtils.isPropagatingEventTimeFromUpstream: accessor mirroring the existing isTrackingEventTimeWatermark. - UpstreamEventTimeWatermarkExtractor: folds per-partition min/max across a set of upstream HoodieInstants using the Phase 1 getMinAndMaxEventTimePerPartition API. - Source.getUpstreamEventTimeWatermarks: new hook on the abstract source, default empty. - InputBatch: optional Map<String, Pair<Long, Long>> field for batch-scoped propagated watermarks, plumbed through RowSource. - HoodieIncrSource: when the config is on, computes folded watermarks for both the completion-time and request-time fetch paths and exposes via the new Source hook. - StreamSync.buildUpstreamWatermarkPreCommitFunc: BiConsumer pre-commit hook that folds the propagated values into per-partition write stats via HoodieWriteStat.setMinEventTime / setMaxEventTime (which already do null-aware Math.min / Math.max folds, so per-record values are never regressed). Fold semantics: setMinEventTime / setMaxEventTime are null-aware and use Math.min / Math.max. Propagation only fills in fields the per-record path left unset, and per-record extreme values are preserved if more extreme than the upstream value. Back-fills cannot regress max watermarks at the per-commit level. Upstream partitions with no matching downstream stat are dropped silently. Tests: - TestUpstreamEventTimeWatermarkExtractor (8): single-commit rollup, multi-commit fold, partial min/max, commit without watermark, read failure tolerance, empty cases. - TestStreamSyncUpstreamWatermarkPropagation (7): BiConsumer fold semantics including the never-regress invariant, unmatched partitions in both directions, multiple stats per partition. Regression: TestHoodieCommitMetadata (18), TestHoodieWriteHandle (11), TestHoodieIncrSource (21), TestStreamSync (32) all pass. Out of scope for this PR (follow-ups): Spark SQL Hudi source propagation, Flink source propagation, two-hop end-to-end test fixture, website docs for the new config.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR wires per-partition event-time watermarks through HoodieIncrSource → InputBatch → StreamSync's pre-commit hook so derived tables inherit upstream freshness. The opt-in config, default-empty Source hook, and reliance on Phase 1's null-aware Math.min/Math.max setters keep the change well-contained, and the extractor has good per-instant error containment. A couple of edge cases on the two maybeExtractUpstreamWatermarks* paths worth double-checking in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One small logging nit — the two maybeExtract* methods emit identical info messages, making it impossible to distinguish code paths in production logs.
| HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder() | ||
| .setConf(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration())) | ||
| .setBasePath(srcPath) | ||
| .setLoadActiveTimelineOnLoad(true) |
There was a problem hiding this comment.
🤖 getCommitsTimeline() only includes COMMIT + DELTA_COMMIT and excludes REPLACE_COMMIT (clustering / insert_overwrite). If the upstream table had clustering instants in this range that carried event-time stats, the request-time path will silently skip them — while the completion-time path above can include them via queryContext.getInstants(). Should this use getCommitsAndCompactionTimeline() (or getCommitAndReplaceTimeline()) to keep the two paths symmetric?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| return; | ||
| } | ||
| if (StringUtils.isNullOrEmpty(endCompletionTime)) { | ||
| return; |
There was a problem hiding this comment.
🤖 Is there a reason the completion-time path doesn't wrap the extraction in a try/catch the way the request-time path below does? Today any unexpected exception here (e.g. from compareTimestamps or getActiveTimeline()) will fail the source, while the same failure on the request-time path would only log a warning. Worth keeping the best-effort posture symmetric across the two paths.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| this.upstreamEventTimeWatermarks = UpstreamEventTimeWatermarkExtractor | ||
| .extractPerPartitionWatermarks(queryContext.getActiveTimeline(), instants); | ||
| if (!this.upstreamEventTimeWatermarks.isEmpty()) { | ||
| log.info("Propagating upstream event-time watermarks for {} partition(s) from {} commit(s)", |
There was a problem hiding this comment.
🤖 nit: both maybeExtractUpstreamWatermarksCompletionTime and maybeExtractUpstreamWatermarksRequestTime emit the identical info message here and at line 525, so in production logs there's no way to tell which code path fired. The warn at line 530 already sets a good example with "(request-time path)" — could you add a similar discriminator to both info logs, e.g. "...from {} commit(s) (completion-time path)"?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the cleanup! The only change since the last review is removing unused List and assertNull imports from TestUpstreamEventTimeWatermarkExtractor.java to satisfy checkstyle — verified to be a safe, mechanical edit (all 8 tests are still present and the removed imports were genuinely unused). The three prior comments on HoodieIncrSource.java (log discriminator between the two paths, getCommitsTimeline() missing REPLACE_COMMIT, and missing try/catch on the completion-time path) are not addressed by this commit and remain open from the previous pass, but no new issues are introduced here. Please take a look at the still-open inline comments from the prior review, and this should be ready for a Hudi committer or PMC member to take it from here.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18802 +/- ##
============================================
- Coverage 68.25% 68.24% -0.01%
- Complexity 29337 29352 +15
============================================
Files 2527 2528 +1
Lines 141858 141968 +110
Branches 17627 17646 +19
============================================
+ Hits 96827 96892 +65
- Misses 37068 37108 +40
- Partials 7963 7968 +5
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Part of #17512 (Phase 2 of the reconcile plan). Builds on #18778 (Phase 1) which exposed per-partition event-time rollup on
HoodieCommitMetadataand decoupled watermark tracking fromEVENT_TIME_ORDERING.Phase 2 closes the remaining ask from the issue discussion: when a derived table is built from a Hudi source via
HoodieIncrSource, propagate the upstream commit(s)' per-partition (min, max) event time into the downstream commit's per-partition write stats. Derived tables inherit upstream freshness without retaining the event-time column in their schema.Summary and Changelog
The propagation is gated on a new opt-in config
hoodie.write.track.event.time.propagate.from.upstream(defaultfalse, advanced, since 1.2.0). When enabled,HoodieIncrSourcereads the per-partition rollup from each upstream commit in the read range (via the Phase 1HoodieCommitMetadata.getMinAndMaxEventTimePerPartition()API), folds across commits, and exposes the result through a newSourcehook.StreamSyncreads the propagated map from theInputBatchand folds it into the downstream commit's per-partition write stats via the existingextraPreCommitFuncBiConsumer hook onBaseHoodieWriteClient.commit(...)— which was previously passedOption.empty()and now carries the propagation closure when there are watermarks to fold.Fold semantics are inherited from Phase 1:
HoodieWriteStat.setMinEventTime/setMaxEventTimealready do null-awareMath.min/Math.maxfolds. So propagation only fills in fields the per-record path left unset, and per-record extreme values are preserved when more extreme than the upstream value. Back-fills cannot regress max watermarks at the per-commit level. Upstream partitions with no matching downstream stat are dropped silently (the common projection / aggregation case).Changes:
HoodieWriteConfig.TRACK_EVENT_TIME_PROPAGATE_FROM_UPSTREAM: new opt-in config.ConfigUtils.isPropagatingEventTimeFromUpstream: accessor mirroring the existingisTrackingEventTimeWatermark.UpstreamEventTimeWatermarkExtractor: helper that folds per-partition min/max across a set of upstreamHoodieInstants.Source.getUpstreamEventTimeWatermarks(): new EVOLVING hook, default returns empty map. Concrete sources override when they have batch-scoped propagated watermarks.InputBatch: optionalMap<String, Pair<Long, Long>> upstreamEventTimeWatermarksfield, plumbed throughRowSource. Backward-compatible constructor overloads keep existing callers intact.HoodieIncrSource: when the config is on, computes folded watermarks for both the completion-time path (Hudi 1.x source tables) and the request-time path (legacy/Hudi 0.x source tables) and exposes via the new Source hook. Best-effort — read failures are logged and skipped rather than failing the source.StreamSync.buildUpstreamWatermarkPreCommitFunc: BiConsumer pre-commit hook that folds the propagated values into per-partition write stats. ReturnsOption.empty()when there is nothing to propagate (most common case), keeping the existing commit path unchanged.Tests:
TestUpstreamEventTimeWatermarkExtractor(8 tests): single-commit rollup, multi-commit fold, partial min/max, commit without watermark, read-failure tolerance, empty cases.TestStreamSyncUpstreamWatermarkPropagation(7 tests): BiConsumer fold semantics including the never-regress invariant, unmatched partitions in both directions, multiple stats per partition.Regression:
TestHoodieCommitMetadata(18),TestHoodieWriteHandle(11),TestHoodieIncrSource(21),TestStreamSync(32) all pass locally.Impact
Public API additions:
Source.getUpstreamEventTimeWatermarks()(EVOLVING) — externalSourcesubclasses outside the repo are unaffected (default returns empty map; no-op).InputBatchcarrying the propagated map. All existing constructors are preserved.Behavior change for opted-in pipelines:
HoodieIncrSourceusers that set bothhoodie.write.track.event.time.propagate.from.upstream=trueand read from an upstream Hudi table that itself was written withhoodie.write.track.event.time.watermark=truewill see per-partitionminEventTime/maxEventTimepopulated on the downstream commit's write stats, inherited from upstream. Tables that have not set the new flag see no change.No performance impact: the extractor walks at most N upstream commits per batch (already reads them) and reads each commit metadata once; the BiConsumer fold is a small in-memory loop over the downstream commit's stats. No new RDD transformations, no broadcast variables.
Risk Level
low
The new config is
falseby default. The newSourcehook returns an empty map by default; theStreamSynchook returnsOption.empty()when the map is empty, which produces anextraPreCommitFuncthat is byte-identical to the previousOption.empty()argument at the same call site.InputBatchadditions are constructor overloads. Verified by running the full local hudi-common and hudi-utilities test suites for the relevant test classes with no regressions; the existingTestHoodieIncrSource(21 tests) andTestStreamSync(32 tests) both pass against the modified files.Documentation Update
hoodie.write.track.event.time.propagate.from.upstreamconfig description is documented inline (itsConfigProperty.withDocumentation(...)) so it shows up on the Hudi configurations page on the website auto-generation pass.Follow-ups (called out in the Phase 2 plan in #17512, deliberately out of scope here):
Contributor's checklist