diff --git a/internal/app/azldev/cmds/component/render.go b/internal/app/azldev/cmds/component/render.go index 7b27d293..2f2cc54f 100644 --- a/internal/app/azldev/cmds/component/render.go +++ b/internal/app/azldev/cmds/component/render.go @@ -4,6 +4,7 @@ package component import ( + "context" "errors" "fmt" "log/slog" @@ -12,7 +13,6 @@ import ( "slices" "sort" "strings" - "sync" "github.com/microsoft/azure-linux-dev-tools/internal/app/azldev" "github.com/microsoft/azure-linux-dev-tools/internal/app/azldev/core/components" @@ -22,6 +22,7 @@ import ( "github.com/microsoft/azure-linux-dev-tools/internal/utils/dirdiff" "github.com/microsoft/azure-linux-dev-tools/internal/utils/fileperms" "github.com/microsoft/azure-linux-dev-tools/internal/utils/fileutils" + "github.com/microsoft/azure-linux-dev-tools/internal/utils/parmap" "github.com/spf13/afero" "github.com/spf13/cobra" ) @@ -371,7 +372,6 @@ type preparedComponent struct { // prepResult pairs a prepared component (on success) or a render result (on error). type prepResult struct { - index int prepared *preparedComponent result *RenderResult // non-nil on error } @@ -381,9 +381,10 @@ type prepResult struct { // ────────────────────────────────────────────────────────────────────────────── // parallelPrepare prepares sources for all components concurrently, bounded by -// [concurrentRenderLimit]. Each component's sources are written to a subdirectory -// of stagingDir. Failed components get their result written directly; successful -// ones are returned in the prepared slice. +// [azldev.Env.IOBoundConcurrency]. Each component's sources are written to a +// subdirectory of stagingDir. Failed and cancelled components get their +// result written directly into results; successful ones are returned in the +// prepared slice for phase 2 / phase 3. func parallelPrepare( env *azldev.Env, comps []components.Component, @@ -397,49 +398,59 @@ func parallelPrepare( workerEnv, cancel := env.WithCancel() defer cancel() - resultsChan := make(chan prepResult, len(comps)) - semaphore := make(chan struct{}, env.IOBoundConcurrency()) - - var waitGroup sync.WaitGroup - - for compIdx, comp := range comps { - waitGroup.Add(1) - - go func(idx int, comp components.Component) { - defer waitGroup.Done() - - resultsChan <- prepWithSemaphore(workerEnv, semaphore, idx, comp, stagingDir, outputDir) - }(compIdx, comp) - } - - go func() { waitGroup.Wait(); close(resultsChan) }() + total := int64(len(comps)) - var prepared []*preparedComponent + parmapResults := parmap.Map( + workerEnv, + env.IOBoundConcurrency(), + comps, + func(done, _ int) { progressEvent.SetProgress(int64(done), total) }, + func(_ context.Context, comp components.Component) prepResult { + // workerEnv (captured) is the effective context for this call chain; + // the parmap-supplied ctx is identical and unused here. + return prepareOneComponent(workerEnv, comp, stagingDir, outputDir) //nolint:contextcheck // env carries the ctx + }, + ) - total := int64(len(comps)) + prepared := make([]*preparedComponent, 0, len(comps)) - var completed int64 + for idx, result := range parmapResults { + switch { + case result.Cancelled: + // Worker never started — ctx ended before parmap reached it. + compName := comps[idx].GetName() - for prepRes := range resultsChan { - completed++ - progressEvent.SetProgress(completed, total) + compOutputDir, nameErr := components.RenderedSpecDir(outputDir, compName) + if nameErr != nil { + compOutputDir = "(invalid)" + } - if prepRes.result != nil { - results[prepRes.index] = prepRes.result - } else { - prepared = append(prepared, prepRes.prepared) + results[idx] = &RenderResult{ + Component: compName, + OutputDir: compOutputDir, + Status: renderStatusCancelled, + Error: "context cancelled", + } + case result.Value.result != nil: + results[idx] = result.Value.result + default: + result.Value.prepared.index = idx + prepared = append(prepared, result.Value.prepared) } } return prepared } -// prepWithSemaphore acquires the semaphore (respecting context cancellation), -// prepares a single component's sources, and returns a prep result. -func prepWithSemaphore( +// prepareOneComponent validates the output path for a single component and +// prepares its sources. Returns a [prepResult] carrying either a successful +// preparedComponent or a [RenderResult] describing the error. +// +// Called from a [parmap.Map] worker; semaphore acquisition and ctx-aware +// cancellation are handled by parmap. Errors from [prepareComponentSources] +// (including ctx cancellation mid-flight) surface as [renderStatusError] here. +func prepareOneComponent( env *azldev.Env, - semaphore chan struct{}, - index int, comp components.Component, stagingDir string, outputDir string, @@ -449,7 +460,7 @@ func prepWithSemaphore( // Validate component name and compute output directory. compOutputDir, nameErr := components.RenderedSpecDir(outputDir, componentName) if nameErr != nil { - return prepResult{index: index, result: &RenderResult{ + return prepResult{result: &RenderResult{ Component: componentName, OutputDir: "(invalid)", Status: renderStatusError, @@ -457,25 +468,12 @@ func prepWithSemaphore( }} } - // Context-aware semaphore acquisition. - select { - case semaphore <- struct{}{}: - defer func() { <-semaphore }() - case <-env.Done(): - return prepResult{index: index, result: &RenderResult{ - Component: componentName, - OutputDir: compOutputDir, - Status: renderStatusCancelled, - Error: "context cancelled", - }} - } - prep, err := prepareComponentSources(env, comp, stagingDir) if err != nil { slog.Error("Failed to prepare component sources", "component", componentName, "error", err) - return prepResult{index: index, result: &RenderResult{ + return prepResult{result: &RenderResult{ Component: componentName, OutputDir: compOutputDir, Status: renderStatusError, @@ -483,10 +481,9 @@ func prepWithSemaphore( }} } - prep.index = index prep.compOutputDir = compOutputDir - return prepResult{index: index, prepared: prep} + return prepResult{prepared: prep} } // prepareComponentSources resolves the distro, creates a source manager, and @@ -627,51 +624,44 @@ func parallelFinish( workerEnv, cancel := env.WithCancel() defer cancel() - type finishResult struct { - index int - result *RenderResult - } - - resultsChan := make(chan finishResult, len(prepared)) - semaphore := make(chan struct{}, env.IOBoundConcurrency()) - - var waitGroup sync.WaitGroup - - for _, prep := range prepared { - waitGroup.Add(1) - - go func(prep *preparedComponent) { - defer waitGroup.Done() - - result := finishOneComponent( - workerEnv, env, prep, mockResultMap, semaphore, stagingDir, allowOverwrite, checkOnly, - ) - resultsChan <- finishResult{index: prep.index, result: result} - }(prep) - } - - go func() { waitGroup.Wait(); close(resultsChan) }() - total := int64(len(prepared)) - var completed int64 + parmapResults := parmap.Map( + workerEnv, + env.IOBoundConcurrency(), + prepared, + func(done, _ int) { progressEvent.SetProgress(int64(done), total) }, + func(_ context.Context, prep *preparedComponent) *RenderResult { + return finishOneComponent(workerEnv, env, prep, mockResultMap, stagingDir, allowOverwrite, checkOnly) + }, + ) - for fr := range resultsChan { - completed++ - progressEvent.SetProgress(completed, total) + for i, result := range parmapResults { + prep := prepared[i] - results[fr.index] = fr.result + switch { + case result.Cancelled: + // Worker never started — ctx ended before parmap reached it. + results[prep.index] = &RenderResult{ + Component: prep.comp.GetName(), + OutputDir: prep.compOutputDir, + Status: renderStatusCancelled, + Error: "context cancelled", + } + default: + results[prep.index] = result.Value + } } } -// finishOneComponent handles the semaphore, context cancellation, and error -// wrapping for finishing a single component's render. +// finishOneComponent wraps [finishComponentRender] with the per-component +// result bookkeeping (status, error message). Called from a [parmap.Map] +// worker; semaphore acquisition is handled by parmap. func finishOneComponent( workerEnv *azldev.Env, env *azldev.Env, prep *preparedComponent, mockResultMap map[string]*sources.ComponentMockResult, - semaphore chan struct{}, stagingDir string, allowOverwrite bool, checkOnly bool, @@ -679,11 +669,9 @@ func finishOneComponent( componentName := prep.comp.GetName() compOutputDir := prep.compOutputDir - // Context-aware semaphore acquisition. - select { - case semaphore <- struct{}{}: - defer func() { <-semaphore }() - case <-workerEnv.Done(): + // Bail out early if ctx is already done so we don't write to disk after + // a Ctrl-C while the worker pool is draining. + if workerEnv.Err() != nil { return &RenderResult{ Component: componentName, OutputDir: compOutputDir,