Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 79 additions & 91 deletions internal/app/azldev/cmds/component/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package component

import (
"context"
"errors"
"fmt"
"log/slog"
Expand All @@ -12,7 +13,6 @@ import (
"slices"
"sort"
"strings"
"sync"

"github.com/microsoft/azure-linux-dev-tools/internal/app/azldev"
"github.com/microsoft/azure-linux-dev-tools/internal/app/azldev/core/components"
Expand All @@ -22,6 +22,7 @@ import (
"github.com/microsoft/azure-linux-dev-tools/internal/utils/dirdiff"
"github.com/microsoft/azure-linux-dev-tools/internal/utils/fileperms"
"github.com/microsoft/azure-linux-dev-tools/internal/utils/fileutils"
"github.com/microsoft/azure-linux-dev-tools/internal/utils/parmap"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -371,7 +372,6 @@ type preparedComponent struct {

// prepResult pairs a prepared component (on success) or a render result (on error).
type prepResult struct {
index int
prepared *preparedComponent
result *RenderResult // non-nil on error
}
Expand All @@ -381,9 +381,10 @@ type prepResult struct {
// ──────────────────────────────────────────────────────────────────────────────

// parallelPrepare prepares sources for all components concurrently, bounded by
// [concurrentRenderLimit]. Each component's sources are written to a subdirectory
// of stagingDir. Failed components get their result written directly; successful
// ones are returned in the prepared slice.
// [azldev.Env.IOBoundConcurrency]. Each component's sources are written to a
// subdirectory of stagingDir. Failed and cancelled components get their
// result written directly into results; successful ones are returned in the
// prepared slice for phase 2 / phase 3.
func parallelPrepare(
env *azldev.Env,
comps []components.Component,
Expand All @@ -397,49 +398,59 @@ func parallelPrepare(
workerEnv, cancel := env.WithCancel()
defer cancel()

resultsChan := make(chan prepResult, len(comps))
semaphore := make(chan struct{}, env.IOBoundConcurrency())

var waitGroup sync.WaitGroup

for compIdx, comp := range comps {
waitGroup.Add(1)

go func(idx int, comp components.Component) {
defer waitGroup.Done()

resultsChan <- prepWithSemaphore(workerEnv, semaphore, idx, comp, stagingDir, outputDir)
}(compIdx, comp)
}

go func() { waitGroup.Wait(); close(resultsChan) }()
total := int64(len(comps))

var prepared []*preparedComponent
parmapResults := parmap.Map(
workerEnv,
env.IOBoundConcurrency(),
comps,
func(done, _ int) { progressEvent.SetProgress(int64(done), total) },
func(_ context.Context, comp components.Component) prepResult {
Comment thread
dmcilvaney marked this conversation as resolved.
// workerEnv (captured) is the effective context for this call chain;
// the parmap-supplied ctx is identical and unused here.
return prepareOneComponent(workerEnv, comp, stagingDir, outputDir) //nolint:contextcheck // env carries the ctx
},
)

total := int64(len(comps))
prepared := make([]*preparedComponent, 0, len(comps))

var completed int64
for idx, result := range parmapResults {
switch {
case result.Cancelled:
// Worker never started — ctx ended before parmap reached it.
compName := comps[idx].GetName()

for prepRes := range resultsChan {
completed++
progressEvent.SetProgress(completed, total)
compOutputDir, nameErr := components.RenderedSpecDir(outputDir, compName)
if nameErr != nil {
compOutputDir = "(invalid)"
}

if prepRes.result != nil {
results[prepRes.index] = prepRes.result
} else {
prepared = append(prepared, prepRes.prepared)
results[idx] = &RenderResult{
Component: compName,
OutputDir: compOutputDir,
Status: renderStatusCancelled,
Error: "context cancelled",
}
Comment thread
dmcilvaney marked this conversation as resolved.
case result.Value.result != nil:
results[idx] = result.Value.result
default:
result.Value.prepared.index = idx
prepared = append(prepared, result.Value.prepared)
}
}

return prepared
}

// prepWithSemaphore acquires the semaphore (respecting context cancellation),
// prepares a single component's sources, and returns a prep result.
func prepWithSemaphore(
// prepareOneComponent validates the output path for a single component and
// prepares its sources. Returns a [prepResult] carrying either a successful
// preparedComponent or a [RenderResult] describing the error.
//
// Called from a [parmap.Map] worker; semaphore acquisition and ctx-aware
// cancellation are handled by parmap. Errors from [prepareComponentSources]
// (including ctx cancellation mid-flight) surface as [renderStatusError] here.
func prepareOneComponent(
env *azldev.Env,
semaphore chan struct{},
index int,
comp components.Component,
stagingDir string,
outputDir string,
Expand All @@ -449,44 +460,30 @@ func prepWithSemaphore(
// Validate component name and compute output directory.
compOutputDir, nameErr := components.RenderedSpecDir(outputDir, componentName)
if nameErr != nil {
return prepResult{index: index, result: &RenderResult{
return prepResult{result: &RenderResult{
Component: componentName,
OutputDir: "(invalid)",
Status: renderStatusError,
Error: nameErr.Error(),
}}
}

// Context-aware semaphore acquisition.
select {
case semaphore <- struct{}{}:
defer func() { <-semaphore }()
case <-env.Done():
return prepResult{index: index, result: &RenderResult{
Component: componentName,
OutputDir: compOutputDir,
Status: renderStatusCancelled,
Error: "context cancelled",
}}
}

prep, err := prepareComponentSources(env, comp, stagingDir)
if err != nil {
slog.Error("Failed to prepare component sources",
"component", componentName, "error", err)

return prepResult{index: index, result: &RenderResult{
return prepResult{result: &RenderResult{
Component: componentName,
OutputDir: compOutputDir,
Status: renderStatusError,
Error: err.Error(),
}}
}

prep.index = index
prep.compOutputDir = compOutputDir

return prepResult{index: index, prepared: prep}
return prepResult{prepared: prep}
}

// prepareComponentSources resolves the distro, creates a source manager, and
Expand Down Expand Up @@ -627,63 +624,54 @@ func parallelFinish(
workerEnv, cancel := env.WithCancel()
defer cancel()

type finishResult struct {
index int
result *RenderResult
}

resultsChan := make(chan finishResult, len(prepared))
semaphore := make(chan struct{}, env.IOBoundConcurrency())

var waitGroup sync.WaitGroup

for _, prep := range prepared {
waitGroup.Add(1)

go func(prep *preparedComponent) {
defer waitGroup.Done()

result := finishOneComponent(
workerEnv, env, prep, mockResultMap, semaphore, stagingDir, allowOverwrite, checkOnly,
)
resultsChan <- finishResult{index: prep.index, result: result}
}(prep)
}

go func() { waitGroup.Wait(); close(resultsChan) }()

total := int64(len(prepared))

var completed int64
parmapResults := parmap.Map(
workerEnv,
env.IOBoundConcurrency(),
prepared,
func(done, _ int) { progressEvent.SetProgress(int64(done), total) },
func(_ context.Context, prep *preparedComponent) *RenderResult {
Comment thread
dmcilvaney marked this conversation as resolved.
return finishOneComponent(workerEnv, env, prep, mockResultMap, stagingDir, allowOverwrite, checkOnly)
},
)

for fr := range resultsChan {
completed++
progressEvent.SetProgress(completed, total)
for i, result := range parmapResults {
prep := prepared[i]

results[fr.index] = fr.result
switch {
case result.Cancelled:
// Worker never started — ctx ended before parmap reached it.
results[prep.index] = &RenderResult{
Component: prep.comp.GetName(),
OutputDir: prep.compOutputDir,
Status: renderStatusCancelled,
Error: "context cancelled",
}
default:
results[prep.index] = result.Value
}
}
}

// finishOneComponent handles the semaphore, context cancellation, and error
// wrapping for finishing a single component's render.
// finishOneComponent wraps [finishComponentRender] with the per-component
// result bookkeeping (status, error message). Called from a [parmap.Map]
// worker; semaphore acquisition is handled by parmap.
func finishOneComponent(
workerEnv *azldev.Env,
env *azldev.Env,
prep *preparedComponent,
mockResultMap map[string]*sources.ComponentMockResult,
semaphore chan struct{},
stagingDir string,
allowOverwrite bool,
checkOnly bool,
) *RenderResult {
componentName := prep.comp.GetName()
compOutputDir := prep.compOutputDir

// Context-aware semaphore acquisition.
select {
case semaphore <- struct{}{}:
defer func() { <-semaphore }()
case <-workerEnv.Done():
// Bail out early if ctx is already done so we don't write to disk after
// a Ctrl-C while the worker pool is draining.
if workerEnv.Err() != nil {
return &RenderResult{
Component: componentName,
OutputDir: compOutputDir,
Expand Down
Loading