Skip to content

Commit afc4b01

Browse files
committed
Handled safe channel closure
1 parent 8babf76 commit afc4b01

2 files changed

Lines changed: 15 additions & 4 deletions

File tree

pkg/distribution/oci/remote/remote.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -743,8 +743,8 @@ func Write(ref reference.Reference, img oci.Image, w io.Writer, opts ...Option)
743743
safeWriter = &syncWriter{w: w}
744744
}
745745

746-
var completed int64
747746
for _, layer := range layers {
747+
var completed int64
748748
digest, err := layer.Digest()
749749
if err != nil {
750750
return fmt.Errorf("getting layer digest: %w", err)
@@ -790,6 +790,8 @@ func Write(ref reference.Reference, img oci.Image, w io.Writer, opts ...Option)
790790
Complete: completed,
791791
Total: size,
792792
}
793+
closeProgress(progressChan)
794+
closeReporter(pr)
793795
}
794796
continue
795797
}
@@ -808,6 +810,7 @@ func Write(ref reference.Reference, img oci.Image, w io.Writer, opts ...Option)
808810
cw.Close()
809811
rc.Close()
810812
closeProgress(progressChan)
813+
closeReporter(pr)
811814
return fmt.Errorf("writing layer: %w", err)
812815
}
813816

@@ -816,6 +819,7 @@ func Write(ref reference.Reference, img oci.Image, w io.Writer, opts ...Option)
816819
rc.Close()
817820
if !errdefs.IsAlreadyExists(err) && !strings.Contains(err.Error(), "already exists") {
818821
closeProgress(progressChan)
822+
closeReporter(pr)
819823
return fmt.Errorf("committing layer: %w", err)
820824
}
821825
// If it already exists, we still want to update progress
@@ -837,6 +841,7 @@ func Write(ref reference.Reference, img oci.Image, w io.Writer, opts ...Option)
837841
}
838842
}
839843
closeProgress(progressChan)
844+
closeReporter(pr)
840845
cw.Close()
841846
rc.Close()
842847
}
@@ -933,6 +938,15 @@ func closeProgress(ch chan<- oci.Update) {
933938
}
934939
}
935940

941+
// closeReporter safely closes the progress reporter if not nil
942+
func closeReporter(pr *progress.Reporter) {
943+
if pr != nil {
944+
if waitErr := pr.Wait(); waitErr != nil {
945+
fmt.Printf("reporter finished with non-fatal error: %v\n", waitErr)
946+
}
947+
}
948+
}
949+
936950
// Ensure remoteImage is cleaned up properly
937951
func (i *remoteImage) Close() error {
938952
// The local content store doesn't expose its root path, so cleanup

pkg/distribution/registry/client.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,15 +263,12 @@ func (t *Target) Write(ctx context.Context, model types.ModelArtifact, progressW
263263
}
264264
imageSize += size
265265
}
266-
//pr := progress.NewProgressReporter(progressWriter, progress.PushMsg, imageSize, nil)
267-
//defer pr.Wait()
268266

269267
// Set up authentication options
270268
authOpts := []remote.Option{
271269
remote.WithContext(ctx),
272270
remote.WithTransport(t.transport),
273271
remote.WithUserAgent(t.userAgent),
274-
//remote.WithProgress(pr.Updates()),
275272
remote.WithPlainHTTP(t.plainHTTP),
276273
}
277274

0 commit comments

Comments
 (0)