Skip to content

Implement Asynchronous wrapper for DoFn in Java SDK#38609

Open
tejasiyer-dev wants to merge 2 commits into
apache:masterfrom
tejasiyer-dev:add-async-dofn-wrapper
Open

Implement Asynchronous wrapper for DoFn in Java SDK#38609
tejasiyer-dev wants to merge 2 commits into
apache:masterfrom
tejasiyer-dev:add-async-dofn-wrapper

Conversation

@tejasiyer-dev
Copy link
Copy Markdown

fixes #38529

R: @AMOOOMA

This PR introduces AsyncDoFn and AsyncDoFnTest to the Apache Beam Java SDK.

AsyncDoFn acts as an execution wrapper around a standard synchronous DoFn, offloading element processing to a background thread pool. Decoupling the runner's event loop (main thread) from high-latency, I/O-heavy element processing (background threads) prevents synchronous blocking, implements backpressure, and significantly increases pipeline throughput.

1. Ingestion & Local Deduplication (Main Thread)

  • JVM Isolation: Every AsyncDoFn instance generates a unique UUID upon instantiation to keep static JVM registries completely isolated.
  • Deduplication Boundary: Incoming elements pass through an idFn to extract an elementId. If the elementId is already present in the local activeElements Map in JVM memory, scheduling is skipped to enforce exactly-once execution grouping.

2. Backpressure & Capacity Check (Main Thread)

  • Capacity Management: The main thread checks if the background pool's active task count is below maxItemsToBuffer.
  • Exponential Backoff: If the pool is full, the main thread sleeps using exponential backoff (starting at 10ms, doubling, capped at maxWaitTime / 500ms).
  • Timeout Handling: If capacity doesn't clear within timeout (default 1s), the main thread stops scheduling the task. The element is written directly to persistent storage (BagState) and a @Timer is registered to process it later.

3. Task Creation & Durable State Writing (Main Thread)
When capacity is available, the main thread performs the following steps sequentially:

  1. Task Creation: Wraps the element logic inside a CompletableFuture and submits it to the JVM's task queue.
  2. In-Memory Tracking: Registers the elementId and its future in the activeElements Map and increments the itemsInBuffer counter.
  3. Durable State Write: Writes the element to the Runner's persistent BagState (ensuring durability if a worker crashes).
  4. Timer Scheduling: Schedules/updates a key-scoped @Timer callback to manage future reconciliation.

4. Background Execution (Background Worker Threads)

  • Decoupled Processing: Worker threads independently pull tasks from the JVM queue.
  • Bundle Lifecycle: The thread executes the full synchronous bundle lifecycle of the wrapped DoFn (startBundle $\to$ processElement $\to$ finishBundle).
  • Thread-Safe Accumulation: Workers append outputs to a private AccumulatingOutputReceiver held in JVM memory, ensuring background threads do not write downstream unsafely. On completion, the future returns the output list and itemsInBuffer is decremented.

5. Timer Reconciliation & Cleanup (Main Thread)
When the @Timer fires for Key K, the main thread executes a synchronous reconciliation cycle:

  • Early Exit: If BagState for Key K is empty, it exits immediately to free up CPU.

  • Orphan Cleanup: Scans the activeElements Map and cancels/deletes any running JVM task belonging to Key K whose ID is not in BagState (e.g., orphans from failed bundle retries).

  • State Reconciliation: Iterates through the elements listed in BagState:

    1. Case 1 (Lost Task): The task is not in activeElements. It is immediately rescheduled into the JVM thread pool.
    2. Case 2 (Completed Task): The task is in activeElements and completed. The main thread retrieves the outputs from the future, emits them downstream, and removes the element from BagState and JVM memory.
    3. Case 3 (In-Flight Task): The task is still running. The main thread leaves it in BagState untouched.
  • Timer Reset: If any elements remain unfinished, a new timer is scheduled for the next check cycle.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces an asynchronous wrapper for DoFn in the Apache Beam Java SDK. By decoupling high-latency, I/O-heavy processing from the main event loop, it aims to significantly increase pipeline throughput and prevent synchronous blocking. The implementation includes robust backpressure mechanisms, durable state management, and comprehensive testing to ensure reliable execution in distributed environments.

Highlights

  • Asynchronous Execution Wrapper: Introduced AsyncDoFn to wrap synchronous DoFn instances, offloading element processing to a background thread pool to prevent blocking the main runner thread.
  • Backpressure and Capacity Management: Implemented capacity management with exponential backoff and timeout handling to ensure the background thread pool is not overwhelmed, improving pipeline stability.
  • Durable State and Timer Reconciliation: Integrated BagState and @Timer to ensure exactly-once processing, handle task durability, and manage reconciliation of in-flight tasks.
  • JVM Isolation and Resource Management: Added UUID-based isolation for static registries and thread pools to ensure safe resource reuse across serialized worker instances.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions github-actions Bot added the java label May 22, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces AsyncDoFn, a wrapper designed to convert synchronous DoFn operations into asynchronous ones within Apache Beam pipelines. It includes a core implementation using a thread pool and persistent state for buffering, along with a comprehensive test suite. The review identified several critical issues: a performance bottleneck caused by global map scans during timer firing, potential data loss due to ignored timestamps in the output receiver, and logic bugs in the reconciliation loop that could lead to redundant task rescheduling and quadratic complexity. Additionally, the use of a single static lock for all instances may cause thread contention.

Comment thread sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java Outdated
Comment on lines +141 to +148
public void output(T output) {
outputs.add(output);
}

@Override
public void outputWithTimestamp(T output, Instant timestamp) {
outputs.add(output);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The AccumulatingOutputReceiver ignores timestamps, which can lead to data loss or incorrect windowing if the runner's default timestamp (e.g., the timer's firing time) falls outside the element's window. Both output and outputWithTimestamp should preserve the relevant timestamp of the element being processed or the explicitly provided timestamp.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gemini-code-assist how would you best fix this issue?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To address the issue of timestamp loss in AccumulatingOutputReceiver, you need to capture the timestamp provided by the DoFn during execution and propagate it when emitting the results. Since OutputReceiver does not natively support timestamps, you should store the output as a TimestampedValue container.

However, please note that AsyncDoFn is a DoFn wrapper, and in the onTimer method, you only have access to OnTimerContext, which does not provide a ProcessContext to emit with a timestamp. If the downstream DoFn expects specific windowing or timestamp behavior, this limitation might require a more fundamental change to how AsyncDoFn handles output emission (e.g., by using a MultiOutputReceiver or ensuring the runner supports timestamped output in the timer context).

If you are certain the downstream receiver supports it, you can cast the receiver to a type that supports timestamped output, but this is generally not recommended as it breaks the OutputReceiver abstraction. A safer approach is to ensure the syncFn itself handles the windowing/timestamping logic if possible, or to use a different mechanism to emit the results.

Suggested change
public void output(T output) {
outputs.add(output);
}
@Override
public void outputWithTimestamp(T output, Instant timestamp) {
outputs.add(output);
}
private static class TimestampedValue<T> {
final T value;
final Instant timestamp;
TimestampedValue(T value, Instant timestamp) {
this.value = value;
this.timestamp = timestamp;
}
}
private static class AccumulatingOutputReceiver<T> implements OutputReceiver<T> {
private final List<TimestampedValue<T>> outputs = Collections.synchronizedList(new ArrayList<>());
@Override
public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) {
return org.apache.beam.sdk.values.WindowedValues.<T>builder()
.setValue(value)
.setTimestamp(Instant.now())
.setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE))
.setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING)
.setReceiver(windowedValue -> outputs.add(new TimestampedValue<>(windowedValue.getValue(), windowedValue.getTimestamp())));
}
@Override
public void output(T output) {
outputs.add(new TimestampedValue<>(output, Instant.now()));
}
@Override
public void outputWithTimestamp(T output, Instant timestamp) {
outputs.add(new TimestampedValue<>(output, timestamp));
}
public List<TimestampedValue<T>> getOutputs() {
return outputs;
}
}

private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer =
new ConcurrentHashMap<>();

private static final ReentrantLock lock = new ReentrantLock();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of a single static ReentrantLock across all instances of AsyncDoFn can lead to significant thread contention if multiple asynchronous transforms are used in the same pipeline. Since the registries are already partitioned by uuid, consider using a per-transform lock or leveraging ConcurrentHashMap atomic operations to reduce global contention.

Copy link
Copy Markdown
Author

@tejasiyer-dev tejasiyer-dev May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AMOOOMA

I kept a single static class lock because it matches the exact synchronization model used by Python's AsyncWrapper (_lock = RLock()).
If you prefer, we can change this to a per-transform lock or leverage ConcurrentHashMap to reduce global thread contention, but I wanted to keep it aligned with the Python SDK first.

…emoved O(N) global activeElements scan. Fixed logic bug where duplicate elements were incorrectly marked for rescheduling. Optimized lookups by converting finishedItems from a list to a HashSet.
@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request]: Implement AsyncDoFn in Java

1 participant