Skip to content

feat(utilities): propagate per-partition event-time watermarks from upstream Hudi sources#18802

Open
shangxinli wants to merge 2 commits into
apache:masterfrom
shangxinli:feat/freshness-phase2-upstream-propagation
Open

feat(utilities): propagate per-partition event-time watermarks from upstream Hudi sources#18802
shangxinli wants to merge 2 commits into
apache:masterfrom
shangxinli:feat/freshness-phase2-upstream-propagation

Conversation

@shangxinli
Copy link
Copy Markdown
Contributor

Describe the issue this Pull Request addresses

Part of #17512 (Phase 2 of the reconcile plan). Builds on #18778 (Phase 1) which exposed per-partition event-time rollup on HoodieCommitMetadata and decoupled watermark tracking from EVENT_TIME_ORDERING.

Phase 2 closes the remaining ask from the issue discussion: when a derived table is built from a Hudi source via HoodieIncrSource, propagate the upstream commit(s)' per-partition (min, max) event time into the downstream commit's per-partition write stats. Derived tables inherit upstream freshness without retaining the event-time column in their schema.

Summary and Changelog

The propagation is gated on a new opt-in config hoodie.write.track.event.time.propagate.from.upstream (default false, advanced, since 1.2.0). When enabled, HoodieIncrSource reads the per-partition rollup from each upstream commit in the read range (via the Phase 1 HoodieCommitMetadata.getMinAndMaxEventTimePerPartition() API), folds across commits, and exposes the result through a new Source hook. StreamSync reads the propagated map from the InputBatch and folds it into the downstream commit's per-partition write stats via the existing extraPreCommitFunc BiConsumer hook on BaseHoodieWriteClient.commit(...) — which was previously passed Option.empty() and now carries the propagation closure when there are watermarks to fold.

Fold semantics are inherited from Phase 1: HoodieWriteStat.setMinEventTime / setMaxEventTime already do null-aware Math.min / Math.max folds. So propagation only fills in fields the per-record path left unset, and per-record extreme values are preserved when more extreme than the upstream value. Back-fills cannot regress max watermarks at the per-commit level. Upstream partitions with no matching downstream stat are dropped silently (the common projection / aggregation case).

Changes:

  • HoodieWriteConfig.TRACK_EVENT_TIME_PROPAGATE_FROM_UPSTREAM: new opt-in config.
  • ConfigUtils.isPropagatingEventTimeFromUpstream: accessor mirroring the existing isTrackingEventTimeWatermark.
  • UpstreamEventTimeWatermarkExtractor: helper that folds per-partition min/max across a set of upstream HoodieInstants.
  • Source.getUpstreamEventTimeWatermarks(): new EVOLVING hook, default returns empty map. Concrete sources override when they have batch-scoped propagated watermarks.
  • InputBatch: optional Map<String, Pair<Long, Long>> upstreamEventTimeWatermarks field, plumbed through RowSource. Backward-compatible constructor overloads keep existing callers intact.
  • HoodieIncrSource: when the config is on, computes folded watermarks for both the completion-time path (Hudi 1.x source tables) and the request-time path (legacy/Hudi 0.x source tables) and exposes via the new Source hook. Best-effort — read failures are logged and skipped rather than failing the source.
  • StreamSync.buildUpstreamWatermarkPreCommitFunc: BiConsumer pre-commit hook that folds the propagated values into per-partition write stats. Returns Option.empty() when there is nothing to propagate (most common case), keeping the existing commit path unchanged.

Tests:

  • TestUpstreamEventTimeWatermarkExtractor (8 tests): single-commit rollup, multi-commit fold, partial min/max, commit without watermark, read-failure tolerance, empty cases.
  • TestStreamSyncUpstreamWatermarkPropagation (7 tests): BiConsumer fold semantics including the never-regress invariant, unmatched partitions in both directions, multiple stats per partition.

Regression: TestHoodieCommitMetadata (18), TestHoodieWriteHandle (11), TestHoodieIncrSource (21), TestStreamSync (32) all pass locally.

Impact

Public API additions:

  • Source.getUpstreamEventTimeWatermarks() (EVOLVING) — external Source subclasses outside the repo are unaffected (default returns empty map; no-op).
  • New constructor overload on InputBatch carrying the propagated map. All existing constructors are preserved.

Behavior change for opted-in pipelines: HoodieIncrSource users that set both hoodie.write.track.event.time.propagate.from.upstream=true and read from an upstream Hudi table that itself was written with hoodie.write.track.event.time.watermark=true will see per-partition minEventTime / maxEventTime populated on the downstream commit's write stats, inherited from upstream. Tables that have not set the new flag see no change.

No performance impact: the extractor walks at most N upstream commits per batch (already reads them) and reads each commit metadata once; the BiConsumer fold is a small in-memory loop over the downstream commit's stats. No new RDD transformations, no broadcast variables.

Risk Level

low

The new config is false by default. The new Source hook returns an empty map by default; the StreamSync hook returns Option.empty() when the map is empty, which produces an extraPreCommitFunc that is byte-identical to the previous Option.empty() argument at the same call site. InputBatch additions are constructor overloads. Verified by running the full local hudi-common and hudi-utilities test suites for the relevant test classes with no regressions; the existing TestHoodieIncrSource (21 tests) and TestStreamSync (32 tests) both pass against the modified files.

Documentation Update

  • The new hoodie.write.track.event.time.propagate.from.upstream config description is documented inline (its ConfigProperty.withDocumentation(...)) so it shows up on the Hudi configurations page on the website auto-generation pass.
  • A user-facing website page covering the end-to-end propagation story (raw → derived freshness) can land alongside the planned follow-up PRs that wire Spark SQL Hudi source + Flink source.

Follow-ups (called out in the Phase 2 plan in #17512, deliberately out of scope here):

  • Spark SQL Hudi source propagation (Spark Datasource v2 path).
  • Flink source propagation.
  • Two-hop end-to-end test fixture (write raw table with event-time field, run streamer raw → derived with propagation on, assert derived commit inherits per-partition min/max). The propagation surface is small and well-tested at the unit level; an E2E fixture is most useful once the Spark SQL / Flink paths land so it can exercise all three engines.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

…pstream Hudi sources

Part of apache#17512 (Phase 2 of the reconcile plan). Builds on PR apache#18778 (Phase 1) which
exposed per-partition event-time rollup on HoodieCommitMetadata and decoupled watermark
tracking from EVENT_TIME_ORDERING.

When a derived table is built from a Hudi source via HoodieIncrSource and the user opts
into hoodie.write.track.event.time.propagate.from.upstream, fold each upstream commit's
per-partition (min, max) event time into the downstream commit's per-partition write
stats. Derived tables inherit upstream freshness without retaining the event-time column.

Changes:
- HoodieWriteConfig.TRACK_EVENT_TIME_PROPAGATE_FROM_UPSTREAM: new opt-in config, default
  false, advanced, since 1.2.0.
- ConfigUtils.isPropagatingEventTimeFromUpstream: accessor mirroring the existing
  isTrackingEventTimeWatermark.
- UpstreamEventTimeWatermarkExtractor: folds per-partition min/max across a set of
  upstream HoodieInstants using the Phase 1 getMinAndMaxEventTimePerPartition API.
- Source.getUpstreamEventTimeWatermarks: new hook on the abstract source, default empty.
- InputBatch: optional Map<String, Pair<Long, Long>> field for batch-scoped propagated
  watermarks, plumbed through RowSource.
- HoodieIncrSource: when the config is on, computes folded watermarks for both the
  completion-time and request-time fetch paths and exposes via the new Source hook.
- StreamSync.buildUpstreamWatermarkPreCommitFunc: BiConsumer pre-commit hook that folds
  the propagated values into per-partition write stats via
  HoodieWriteStat.setMinEventTime / setMaxEventTime (which already do null-aware
  Math.min / Math.max folds, so per-record values are never regressed).

Fold semantics: setMinEventTime / setMaxEventTime are null-aware and use Math.min /
Math.max. Propagation only fills in fields the per-record path left unset, and per-record
extreme values are preserved if more extreme than the upstream value. Back-fills cannot
regress max watermarks at the per-commit level. Upstream partitions with no matching
downstream stat are dropped silently.

Tests:
- TestUpstreamEventTimeWatermarkExtractor (8): single-commit rollup, multi-commit fold,
  partial min/max, commit without watermark, read failure tolerance, empty cases.
- TestStreamSyncUpstreamWatermarkPropagation (7): BiConsumer fold semantics including
  the never-regress invariant, unmatched partitions in both directions, multiple stats
  per partition.

Regression: TestHoodieCommitMetadata (18), TestHoodieWriteHandle (11),
TestHoodieIncrSource (21), TestStreamSync (32) all pass.

Out of scope for this PR (follow-ups): Spark SQL Hudi source propagation, Flink source
propagation, two-hop end-to-end test fixture, website docs for the new config.
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR wires per-partition event-time watermarks through HoodieIncrSourceInputBatchStreamSync's pre-commit hook so derived tables inherit upstream freshness. The opt-in config, default-empty Source hook, and reliance on Phase 1's null-aware Math.min/Math.max setters keep the change well-contained, and the extractor has good per-instant error containment. A couple of edge cases on the two maybeExtractUpstreamWatermarks* paths worth double-checking in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One small logging nit — the two maybeExtract* methods emit identical info messages, making it impossible to distinguish code paths in production logs.

HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder()
.setConf(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration()))
.setBasePath(srcPath)
.setLoadActiveTimelineOnLoad(true)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 getCommitsTimeline() only includes COMMIT + DELTA_COMMIT and excludes REPLACE_COMMIT (clustering / insert_overwrite). If the upstream table had clustering instants in this range that carried event-time stats, the request-time path will silently skip them — while the completion-time path above can include them via queryContext.getInstants(). Should this use getCommitsAndCompactionTimeline() (or getCommitAndReplaceTimeline()) to keep the two paths symmetric?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return;
}
if (StringUtils.isNullOrEmpty(endCompletionTime)) {
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Is there a reason the completion-time path doesn't wrap the extraction in a try/catch the way the request-time path below does? Today any unexpected exception here (e.g. from compareTimestamps or getActiveTimeline()) will fail the source, while the same failure on the request-time path would only log a warning. Worth keeping the best-effort posture symmetric across the two paths.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

this.upstreamEventTimeWatermarks = UpstreamEventTimeWatermarkExtractor
.extractPerPartitionWatermarks(queryContext.getActiveTimeline(), instants);
if (!this.upstreamEventTimeWatermarks.isEmpty()) {
log.info("Propagating upstream event-time watermarks for {} partition(s) from {} commit(s)",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: both maybeExtractUpstreamWatermarksCompletionTime and maybeExtractUpstreamWatermarksRequestTime emit the identical info message here and at line 525, so in production logs there's no way to tell which code path fired. The warn at line 530 already sets a good example with "(request-time path)" — could you add a similar discriminator to both info logs, e.g. "...from {} commit(s) (completion-time path)"?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@github-actions github-actions Bot added the size:L PR with lines of changes in (300, 1000] label May 21, 2026
@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the cleanup! The only change since the last review is removing unused List and assertNull imports from TestUpstreamEventTimeWatermarkExtractor.java to satisfy checkstyle — verified to be a safe, mechanical edit (all 8 tests are still present and the removed imports were genuinely unused). The three prior comments on HoodieIncrSource.java (log discriminator between the two paths, getCommitsTimeline() missing REPLACE_COMMIT, and missing try/catch on the completion-time path) are not addressed by this commit and remain open from the previous pass, but no new issues are introduced here. Please take a look at the still-open inline comments from the prior review, and this should be ready for a Hudi committer or PMC member to take it from here.

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 61.40351% with 44 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.24%. Comparing base (9565926) to head (7cef443).
⚠️ Report is 17 commits behind head on master.

Files with missing lines Patch % Lines
...pache/hudi/utilities/sources/HoodieIncrSource.java 15.55% 36 Missing and 2 partials ⚠️
...s/helpers/UpstreamEventTimeWatermarkExtractor.java 89.18% 2 Missing and 2 partials ⚠️
...org/apache/hudi/utilities/streamer/StreamSync.java 89.47% 0 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18802      +/-   ##
============================================
- Coverage     68.25%   68.24%   -0.01%     
- Complexity    29337    29352      +15     
============================================
  Files          2527     2528       +1     
  Lines        141858   141968     +110     
  Branches      17627    17646      +19     
============================================
+ Hits          96827    96892      +65     
- Misses        37068    37108      +40     
- Partials       7963     7968       +5     
Flag Coverage Δ
common-and-other-modules 44.42% <55.26%> (+<0.01%) ⬆️
hadoop-mr-java-client 44.92% <85.71%> (+<0.01%) ⬆️
spark-client-hadoop-common 48.23% <85.71%> (-0.01%) ⬇️
spark-java-tests 48.83% <5.26%> (-0.05%) ⬇️
spark-scala-tests 44.89% <5.26%> (-0.04%) ⬇️
utilities 37.43% <20.17%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...java/org/apache/hudi/config/HoodieWriteConfig.java 90.01% <100.00%> (+0.03%) ⬆️
.../java/org/apache/hudi/common/util/ConfigUtils.java 90.97% <100.00%> (+0.03%) ⬆️
.../org/apache/hudi/utilities/sources/InputBatch.java 100.00% <100.00%> (ø)
...a/org/apache/hudi/utilities/sources/RowSource.java 92.85% <100.00%> (ø)
...java/org/apache/hudi/utilities/sources/Source.java 68.57% <100.00%> (+0.45%) ⬆️
...org/apache/hudi/utilities/streamer/StreamSync.java 76.58% <89.47%> (+0.33%) ⬆️
...s/helpers/UpstreamEventTimeWatermarkExtractor.java 89.18% <89.18%> (ø)
...pache/hudi/utilities/sources/HoodieIncrSource.java 68.46% <15.55%> (-12.15%) ⬇️

... and 9 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants