Refactor shared cache and debug deadline exceeded#38612
Draft
shunping wants to merge 13 commits into
Draft
Conversation
…ilure (apache#38572)" This reverts commit 930b94c.
When a SubprocessServer fails to start (e.g., due to a process exit or startup error), the server process could leak if standard purging is blocked by other active owners sharing the cached subprocess. To fix this: - Implement `_SharedCache.force_remove()` to immediately remove a key from the cache and run its destructor regardless of active owners. - Add `SubprocessServer.stop_force()` which calls `force_remove()` to completely terminate the server's process. - Call `stop_force()` in the `except` block of `SubprocessServer.start()`
This ensures we can download pre-built wheels for environment staging rather than relying on tarball building, which is sometimes slow.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #38612 +/- ##
=========================================
Coverage 58.07% 58.07%
Complexity 12941 12941
=========================================
Files 2511 2511
Lines 262223 262285 +62
Branches 10652 10651 -1
=========================================
+ Hits 152282 152323 +41
- Misses 104243 104259 +16
- Partials 5698 5703 +5
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Fixes a bug in the Go Prism runner's ElementManager where returning residuals from a split bundle (such as during BigQuery export reads) would double-count the residual elements in the pipeline's pending elements waitgroup. Specifically: - `stage.splitBundle` already places split residuals back into the stage's pending queue without incrementing the global counter (since they are already counted as pending in the active bundle). - `ReturnResiduals` was incorrectly decoding the same residuals and calling `stage.AddPending` and `em.addPending` a second time, causing a global count mismatch. - When the actual elements finished executing, the waitgroup stayed at a non-zero value (e.g., 1), causing the runner to hang and deadlock indefinitely until a deadline timeout.
82d24d7 to
6d901a6
Compare
Fixes a bug where splittable DoFN (SDF) splits and checkpoints would completely lose their deferred residual restrictions, causing the pipeline to terminate prematurely and downstream assertions to fail due to missing elements (e.g., test_register_finalizations). Root Cause: - The previous patch in v2 fixed a double-counting bug of livePending by removing the decoding and rescheduling of split residuals (stage.AddPending and em.addPending) from ReturnResiduals(), assuming they were already placed back by stage.splitBundle(). - This holds true for normal non-SDF channel splits, where stage.splitBundle already puts the unprocessed original elements back. - However, when a splittable DoFn (SDF) checkpoints itself, the active element splits on its restriction rather than simple unprocessed channel elements. In this case, the original remaining elements (res) in splitBundle() has a length of 0, but the SDK worker returns a new restriction in the residuals.Data (e.g. unprocessedElements length 1). - Because the previous patch completely removed rescheduling from ReturnResiduals, this new residual restriction was completely lost and never added back to the pending queue. Solution: - Inside ReturnResiduals(), we dynamically calculate the original remaining elements in the bundle: originalRemainingCount := len(completed.es) - firstRsIndex. - We compare the total returned residuals (unprocessedElements) against originalRemainingCount. - If len(unprocessedElements) > originalRemainingCount, the difference represents the new SDF residual restrictions. We selectively add ONLY these new residuals back to the stage pending heap and safely increment em.livePending by this difference. - This elegantly preserves the fix for normal channel splits (preventing double-counting), while ensuring SDF checkpoint residuals are correctly scheduled. - Also includes detailed slog.Info logging during execution to track livePending state changes accurately in addPending, ReturnResiduals, and splitBundle.
6d901a6 to
d5e0add
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.