Skip to content

Refactor shared cache and debug deadline exceeded#38612

Draft
shunping wants to merge 13 commits into
apache:masterfrom
shunping:refactor-shared-cache
Draft

Refactor shared cache and debug deadline exceeded#38612
shunping wants to merge 13 commits into
apache:masterfrom
shunping:refactor-shared-cache

Conversation

@shunping
Copy link
Copy Markdown
Collaborator

No description provided.

shunping added 8 commits May 22, 2026 19:24
When a SubprocessServer fails to start (e.g., due to a process exit or
startup error), the server process could leak if standard purging
is blocked by other active owners sharing the cached subprocess.

To fix this:
- Implement `_SharedCache.force_remove()` to immediately remove a key
  from the cache and run its destructor regardless of active owners.
- Add `SubprocessServer.stop_force()` which calls `force_remove()` to
  completely terminate the server's process.
- Call `stop_force()` in the `except` block of `SubprocessServer.start()`
This ensures we can download pre-built wheels for environment staging
rather than relying on tarball building, which is sometimes slow.
@codecov
Copy link
Copy Markdown

codecov Bot commented May 23, 2026

Codecov Report

❌ Patch coverage is 56.52174% with 50 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.07%. Comparing base (0c9b272) to head (d5e0add).
⚠️ Report is 18 commits behind head on master.

Files with missing lines Patch % Lines
...o/pkg/beam/runners/prism/internal/worker/worker.go 15.78% 16 Missing ⚠️
sdks/go/pkg/beam/runners/prism/internal/execute.go 0.00% 9 Missing ⚠️
sdks/go/pkg/beam/core/runtime/harness/datamgr.go 0.00% 6 Missing ⚠️
sdks/go/pkg/beam/core/runtime/harness/statemgr.go 0.00% 6 Missing ⚠️
...go/pkg/beam/runners/prism/internal/environments.go 25.00% 2 Missing and 1 partial ⚠️
sdks/go/pkg/beam/runners/prism/internal/stage.go 57.14% 2 Missing and 1 partial ⚠️
...beam/runners/portability/expansion_service_main.py 0.00% 3 Missing ⚠️
sdks/python/apache_beam/utils/subprocess_server.py 94.64% 3 Missing ⚠️
...s/python/apache_beam/runners/portability/stager.py 75.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff            @@
##             master   #38612   +/-   ##
=========================================
  Coverage     58.07%   58.07%           
  Complexity    12941    12941           
=========================================
  Files          2511     2511           
  Lines        262223   262285   +62     
  Branches      10652    10651    -1     
=========================================
+ Hits         152282   152323   +41     
- Misses       104243   104259   +16     
- Partials       5698     5703    +5     
Flag Coverage Δ
python 79.87% <88.88%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

shunping added 3 commits May 23, 2026 08:41
Fixes a bug in the Go Prism runner's ElementManager where returning
residuals from a split bundle (such as during BigQuery export reads)
would double-count the residual elements in the pipeline's pending
elements waitgroup.

Specifically:
- `stage.splitBundle` already places split residuals back into the
  stage's pending queue without incrementing the global counter (since
  they are already counted as pending in the active bundle).
- `ReturnResiduals` was incorrectly decoding the same residuals and
  calling `stage.AddPending` and `em.addPending` a second time, causing
  a global count mismatch.
- When the actual elements finished executing, the waitgroup stayed at
  a non-zero value (e.g., 1), causing the runner to hang and deadlock
  indefinitely until a deadline timeout.
@shunping shunping force-pushed the refactor-shared-cache branch from 82d24d7 to 6d901a6 Compare May 24, 2026 00:32
Fixes a bug where splittable DoFN (SDF) splits and checkpoints would completely lose
their deferred residual restrictions, causing the pipeline to terminate prematurely
and downstream assertions to fail due to missing elements (e.g., test_register_finalizations).

Root Cause:
- The previous patch in v2 fixed a double-counting bug of livePending by removing the
  decoding and rescheduling of split residuals (stage.AddPending and em.addPending)
  from ReturnResiduals(), assuming they were already placed back by stage.splitBundle().
- This holds true for normal non-SDF channel splits, where stage.splitBundle already
  puts the unprocessed original elements back.
- However, when a splittable DoFn (SDF) checkpoints itself, the active element splits on
  its restriction rather than simple unprocessed channel elements. In this case, the original
  remaining elements (res) in splitBundle() has a length of 0, but the SDK worker
  returns a new restriction in the residuals.Data (e.g. unprocessedElements length 1).
- Because the previous patch completely removed rescheduling from ReturnResiduals, this new
  residual restriction was completely lost and never added back to the pending queue.

Solution:
- Inside ReturnResiduals(), we dynamically calculate the original remaining elements in
  the bundle: originalRemainingCount := len(completed.es) - firstRsIndex.
- We compare the total returned residuals (unprocessedElements) against originalRemainingCount.
- If len(unprocessedElements) > originalRemainingCount, the difference represents the new SDF
  residual restrictions. We selectively add ONLY these new residuals back to the stage pending
  heap and safely increment em.livePending by this difference.
- This elegantly preserves the fix for normal channel splits (preventing double-counting), while
  ensuring SDF checkpoint residuals are correctly scheduled.
- Also includes detailed slog.Info logging during execution to track livePending state changes
  accurately in addPending, ReturnResiduals, and splitBundle.
@shunping shunping force-pushed the refactor-shared-cache branch from 6d901a6 to d5e0add Compare May 24, 2026 00:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant