Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python_Versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 3
"revision": 5
}
1 change: 1 addition & 0 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def sickbayTests = [
def createPrismValidatesRunnerTask = { name, environmentType ->
Task vrTask = tasks.create(name: name, type: Test, group: "Verification") {
description "PrismRunner Java $environmentType ValidatesRunner suite"
outputs.upToDateWhen { false }
classpath = configurations.validatesRunner

var prismBuildTask = dependsOn(':runners:prism:build')
Expand Down
10 changes: 7 additions & 3 deletions sdks/go/pkg/beam/core/runtime/harness/datamgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha
default:
log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
go func() {
m.mu.Lock()
defer m.mu.Unlock()
if curr, ok := m.ports[port.URL]; ok && curr == ch {
delete(m.ports, port.URL)
}
}()
}
m.ports[port.URL] = ch
return ch, nil
Expand Down
10 changes: 7 additions & 3 deletions sdks/go/pkg/beam/core/runtime/harness/statemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,13 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC
default:
log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
go func() {
m.mu.Lock()
defer m.mu.Unlock()
if curr, ok := m.ports[port.URL]; ok && curr == ch {
delete(m.ports, port.URL)
}
}()
}
m.ports[port.URL] = ch
return ch, nil
Expand Down
24 changes: 17 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ type ElementManager struct {
}

func (em *ElementManager) addPending(v int) {
prev := em.livePending.Load()
em.livePending.Add(int64(v))
em.pendingElements.Add(v)
slog.Info("em.addPending", "delta", v, "prev", prev, "current", em.livePending.Load())
}

// LinkID represents a fully qualified input or output.
Expand Down Expand Up @@ -530,7 +532,7 @@ func (em *ElementManager) DumpStages() string {
stageState = append(stageState, fmt.Sprintf("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n",
em.testStreamHandler.completed, em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), em.testStreamHandler.events, em.testStreamHandler.processingTime, mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents))
} else {
stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n", em.processingTimeNow(), em.processTimeEvents.events, em.injectedBundles))
stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v livePending: %v\n", em.processingTimeNow(), em.processTimeEvents.events, em.injectedBundles, em.livePending.Load()))
}
sort.Strings(ids)
for _, id := range ids {
Expand Down Expand Up @@ -1091,18 +1093,25 @@ func (em *ElementManager) FailBundle(rb RunBundle) {
em.markChangedAndClearBundle(rb.StageID, rb.BundleID, nil)
}

// ReturnResiduals is called after a successful split, so the remaining work
// can be re-assigned to a new bundle.
func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals Residuals) {
stage := em.stages[rb.StageID]

slog.Info("ElementManager.ReturnResiduals start", "bundle", rb, "firstRsIndex", firstRsIndex)

stage.mu.Lock()
completed := stage.inprogress[rb.BundleID]
originalRemainingCount := len(completed.es) - firstRsIndex
stage.mu.Unlock()

stage.splitBundle(rb, firstRsIndex, em)
unprocessedElements := reElementResiduals(residuals.Data, inputInfo, rb)
if len(unprocessedElements) > 0 {
slog.Debug("ReturnResiduals: unprocessed elements", "bundle", rb, "count", len(unprocessedElements))
count := stage.AddPending(em, unprocessedElements)
if len(unprocessedElements) > originalRemainingCount {
newResiduals := unprocessedElements[originalRemainingCount:]
slog.Info("ReturnResiduals: new residuals added back", "bundle", rb, "count", len(newResiduals))
count := stage.AddPending(em, newResiduals)
em.addPending(count)
}
slog.Info("ElementManager.ReturnResiduals end", "bundle", rb, "unprocessedCount", len(unprocessedElements), "livePending", em.livePending.Load())
em.markStagesAsChanged(singleSet(rb.StageID))
}

Expand Down Expand Up @@ -2187,7 +2196,7 @@ func (ss *stageState) splitBundle(rb RunBundle, firstResidual int, em *ElementMa
defer ss.mu.Unlock()

es := ss.inprogress[rb.BundleID]
slog.Debug("split elements", "bundle", rb, "elem count", len(es.es), "res", firstResidual)
slog.Info("splitBundle start", "bundle", rb, "elem count", len(es.es), "firstResidual", firstResidual, "livePending", em.livePending.Load())

prim := es.es[:firstResidual]
res := es.es[firstResidual:]
Expand All @@ -2207,6 +2216,7 @@ func (ss *stageState) splitBundle(rb RunBundle, firstResidual int, em *ElementMa
// we don't need to increment pending count in em, since it is already pending
ss.kind.addPending(ss, em, res)
ss.inprogress[rb.BundleID] = es
slog.Info("splitBundle completed", "bundle", rb, "primaryCount", len(prim), "residualCount", len(res), "livePending", em.livePending.Load())
}

// minimumPendingTimestamp returns the minimum pending timestamp from all pending elements,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,3 +684,73 @@ func TestElementManager_OnWindowExpiration(t *testing.T) {
validateSideBundles(t, singleSet("\u0004key5")) // still exist..
})
}

func TestElementManager_ReturnResidualsPendingCount(t *testing.T) {
tests := []struct {
name string
firstRsIndex int
wantFinalPending int64
}{
{
name: "ChannelSplit",
firstRsIndex: 0,
wantFinalPending: 1,
},
{
name: "SDFCheckpoint",
firstRsIndex: 1,
wantFinalPending: 2, // Incremented by 1 because the active portion (index 0) is still in progress and will be completed/decremented in PersistBundle.
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn", []string{"input"}, nil, nil)
em.Impulse("impulse")

stage := em.stages["dofn"]
info := PColInfo{
GlobalID: "generic_info",
WDec: exec.MakeWindowDecoder(coder.NewGlobalWindow()),
WEnc: exec.MakeWindowEncoder(coder.NewGlobalWindow()),
EDec: func(r io.Reader) []byte {
b, _ := io.ReadAll(r)
return b
},
}

// Initial state should have 1 pending element from impulse
if got, want := em.livePending.Load(), int64(1); got != want {
t.Fatalf("initial livePending = %v, want %v", got, want)
}

// Start a bundle
bundID, ok, _, _ := stage.startEventTimeBundle(mtime.MaxTimestamp, func() string { return "inst0" })
if !ok {
t.Fatalf("failed to start bundle")
}

// Waitgroup/livePending shouldn't change on starting a bundle (it's still pending)
if got, want := em.livePending.Load(), int64(1); got != want {
t.Fatalf("livePending after startEventTimeBundle = %v, want %v", got, want)
}

// Prepare residuals
residBytes := []byte{127, 223, 59, 100, 90, 28, 172, 9, 0, 0, 0, 1, 15, 3, 65, 66, 67} // windowed value header + ABC
residuals := Residuals{
Data: []Residual{{Element: residBytes}},
}

rb := RunBundle{StageID: "dofn", BundleID: bundID}

// Return residuals (Simulates splitting)
em.ReturnResiduals(rb, test.firstRsIndex, info, residuals)

if got, want := em.livePending.Load(), test.wantFinalPending; got != want {
t.Errorf("livePending after ReturnResiduals = %v, want %v", got, want)
}
})
}
}
5 changes: 4 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,12 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo

// Previous context cancelled so we need a new one
// for this request.
pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{
_, err = pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{
WorkerId: wk.ID,
})
if err != nil {
slog.Warn("StopWorker failed", "worker", wk, "error", err)
}
wk.Stop()
}

Expand Down
10 changes: 10 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,16 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
// Log a heartbeat every 60 seconds
case <-ticker.C:
j.Logger.Info("pipeline is running", slog.String("job", j.String()))
j.Logger.Info("pipeline stages state", slog.String("stages", em.DumpStages()))
for envID, wk := range wks {
if wk != nil && wk.Connected() && !wk.Stopped() {
j.Logger.Info("worker status",
slog.String("workerID", wk.ID),
slog.String("envID", envID),
slog.Duration("uptime", wk.Uptime()),
slog.Any("active_bundles", wk.ActiveBundles()))
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (h *runner) handleReshuffle(tid string, t *pipepb.PTransform, comps *pipepb
}

// And all the sub transforms.
toRemove = append(toRemove, t.GetSubtransforms()...)
toRemove = append(toRemove, removeSubTransforms(comps, t.GetSubtransforms())...)

// Return the new components which is the transforms consumer
return prepareResult{
Expand Down
8 changes: 7 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c
panic(err)
}

bundleStart := time.Now()

// Progress + split loop.
previousIndex := int64(-2)
previousTotalCount := int64(-2) // Total count of all pcollection elements.
Expand Down Expand Up @@ -232,7 +234,11 @@ progress:
md := wk.MonitoringMetadata(ctx, unknownIDs)
j.AddMetricShortIDs(md)
}
slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex)
runningFor := time.Since(bundleStart)
slog.Debug("progress report", "bundle", rb, "runningFor", runningFor, "index", index, "prevIndex", previousIndex)
if runningFor > 5*time.Minute {
slog.Warn("Bundle has been running for a long time", "bundle", rb, "runningFor", runningFor, "worker", wk.ID)
}

// Check if there has been any measurable progress by the input, or all output pcollections since last report.
slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"]
Expand Down
27 changes: 27 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type W struct {
// These are the ID sources
inst uint64
connected, stopped atomic.Bool
StartTime time.Time
StoppedChan chan struct{} // Channel to Broadcast stopped state.

InstReqs chan *fnpb.InstructionRequest
Expand Down Expand Up @@ -292,11 +293,37 @@ func (wk *W) Stopped() bool {
return wk.stopped.Load()
}

// Uptime returns how long the worker has been connected.
func (wk *W) Uptime() time.Duration {
wk.mu.Lock()
defer wk.mu.Unlock()
if wk.StartTime.IsZero() {
return 0
}
return time.Since(wk.StartTime)
}

// ActiveBundles returns a list of active bundles currently processing on this worker.
func (wk *W) ActiveBundles() []string {
wk.mu.Lock()
defer wk.mu.Unlock()
var bundles []string
for id, responder := range wk.activeInstructions {
if b, ok := responder.(*B); ok {
bundles = append(bundles, fmt.Sprintf("%s (%s)", id, b.PBDID))
}
}
return bundles
}

// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
//
// Requests come from the runner, and are sent to the client in the SDK.
func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
wk.connected.Store(true)
wk.mu.Lock()
wk.StartTime = time.Now()
wk.mu.Unlock()
done := make(chan error, 1)
go func() {
for {
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ def _populate_requirements_cache(
platform_tag
])
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)
processes.check_call(cmd_args)

# Get list of downloaded packages and copy them to the cache
downloaded_packages = set()
Expand Down
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,13 +985,13 @@ def test_populate_requirements_cache_uses_find_links(self):

captured_cmd_args = []

def mock_check_output(cmd_args, **kwargs):
def mock_check_call(cmd_args, **kwargs):
captured_cmd_args.extend(cmd_args)
return b''
return 0

with mock.patch(
'apache_beam.runners.portability.stager.processes.check_output',
side_effect=mock_check_output):
'apache_beam.runners.portability.stager.processes.check_call',
side_effect=mock_check_call):
stager.Stager._populate_requirements_cache(
requirements_file, requirements_cache_dir)

Expand Down
Loading
Loading