Skip to content

Commit

Permalink
fix lease management with flightcontrol
Browse files Browse the repository at this point in the history
When one flightcontrol callback gets canceled
ctx.Value() stops working for aquiring leases for
remaining callbacks. While this behavior should be
also looked at more carefully, returning a lease for
the first callback or for remaining callback would not
be correct as some objects can be tracked by first lease
and that lease could be already deleted by the first
callpath.

This fixes it so that any object tracked by flightcontrol
callback will be copied to the lease of every codepath
after the callback has returned.

Signed-off-by: Tonis Tiigi <[email protected]>
  • Loading branch information
tonistiigi committed Jan 8, 2024
1 parent 5809d41 commit 05abc3f
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 45 deletions.
97 changes: 67 additions & 30 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/converter"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/winlayers"
digest "github.com/opencontainers/go-digest"
imagespecidentity "github.com/opencontainers/image-spec/identity"
Expand All @@ -24,7 +25,7 @@ import (
"golang.org/x/sync/errgroup"
)

var g flightcontrol.Group[struct{}]
var g flightcontrol.Group[*leaseutil.LeaseRef]
var gFileList flightcontrol.Group[[]string]

var ErrNoBlobs = errors.Errorf("no blobs for snapshot")
Expand Down Expand Up @@ -87,14 +88,24 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool

if _, ok := filter[sr.ID()]; ok {
eg.Go(func() error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (struct{}, error) {
l, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (_ *leaseutil.LeaseRef, err error) {
if sr.getBlob() != "" {
return struct{}{}, nil
return nil, nil
}
if !createIfNeeded {
return struct{}{}, errors.WithStack(ErrNoBlobs)
return nil, errors.WithStack(ErrNoBlobs)
}

l, ctx, err := leaseutil.NewLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
l.Discard()
}
}()

compressorFunc, finalize := comp.Type.Compress(ctx, comp)
mediaType := comp.Type.MediaType()

Expand All @@ -109,12 +120,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if lowerRef != nil {
m, err := lowerRef.Mount(ctx, true, s)
if err != nil {
return struct{}{}, err
return nil, err
}
var release func() error
lower, release, err = m.Mount()
if err != nil {
return struct{}{}, err
return nil, err
}
if release != nil {
defer release()
Expand All @@ -132,27 +143,26 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if upperRef != nil {
m, err := upperRef.Mount(ctx, true, s)
if err != nil {
return struct{}{}, err
return nil, err
}
var release func() error
upper, release, err = m.Mount()
if err != nil {
return struct{}{}, err
return nil, err
}
if release != nil {
defer release()
}
}

var desc ocispecs.Descriptor
var err error

// Determine differ and error/log handling according to the platform, envvar and the snapshotter.
var enableOverlay, fallback, logWarnOnErr bool
if forceOvlStr := os.Getenv("BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF"); forceOvlStr != "" && sr.kind() != Diff {
enableOverlay, err = strconv.ParseBool(forceOvlStr)
if err != nil {
return struct{}{}, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
return nil, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
}
fallback = false // prohibit fallback on debug
} else if !isTypeWindows(sr) {
Expand All @@ -174,10 +184,10 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if !ok || err != nil {
if !fallback {
if !ok {
return struct{}{}, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
return nil, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
}
if err != nil {
return struct{}{}, errors.Wrapf(err, "failed to compute overlay diff")
return nil, errors.Wrapf(err, "failed to compute overlay diff")
}
}
if logWarnOnErr {
Expand Down Expand Up @@ -210,7 +220,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
diff.WithCompressor(compressorFunc),
)
if err != nil {
return struct{}{}, err
return nil, err
}
}

Expand All @@ -220,34 +230,40 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if finalize != nil {
a, err := finalize(ctx, sr.cm.ContentStore)
if err != nil {
return struct{}{}, errors.Wrapf(err, "failed to finalize compression")
return nil, errors.Wrapf(err, "failed to finalize compression")
}
for k, v := range a {
desc.Annotations[k] = v
}
}
info, err := sr.cm.ContentStore.Info(ctx, desc.Digest)
if err != nil {
return struct{}{}, err
return nil, err
}

if diffID, ok := info.Labels[labels.LabelUncompressed]; ok {
desc.Annotations[labels.LabelUncompressed] = diffID
} else if mediaType == ocispecs.MediaTypeImageLayer {
desc.Annotations[labels.LabelUncompressed] = desc.Digest.String()
} else {
return struct{}{}, errors.Errorf("unknown layer compression type")
return nil, errors.Errorf("unknown layer compression type")
}

if err := sr.setBlob(ctx, desc); err != nil {
return struct{}{}, err
return nil, err
}
return struct{}{}, nil
return l, nil
})
if err != nil {
return err
}

if l != nil {
if err := l.Adopt(ctx); err != nil {
return err
}
}

if comp.Force {
if err := ensureCompression(ctx, sr, comp, s); err != nil {
return errors.Wrapf(err, "failed to ensure compression type of %q", comp.Type)
Expand Down Expand Up @@ -416,29 +432,42 @@ func isTypeWindows(sr *immutableRef) bool {

// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (struct{}, error) {
l, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (_ *leaseutil.LeaseRef, err error) {
desc, err := ref.ociDesc(ctx, ref.descHandlers, true)
if err != nil {
return struct{}{}, err
return nil, err
}

l, ctx, err := leaseutil.NewLease(ctx, ref.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
l.Discard()
}
}()

// Resolve converters
layerConvertFunc, err := converter.New(ctx, ref.cm.ContentStore, desc, comp)
if err != nil {
return struct{}{}, err
return nil, err
} else if layerConvertFunc == nil {
if isLazy, err := ref.isLazy(ctx); err != nil {
return struct{}{}, err
return nil, err
} else if isLazy {
// This ref can be used as the specified compressionType. Keep it lazy.
return struct{}{}, nil
return l, nil
}
return struct{}{}, ref.linkBlob(ctx, desc)
if err := ref.linkBlob(ctx, desc); err != nil {
return nil, err
}
return l, nil
}

// First, lookup local content store
if _, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil {
return struct{}{}, nil // found the compression variant. no need to convert.
return l, nil // found the compression variant. no need to convert.
}

// Convert layer compression type
Expand All @@ -448,18 +477,26 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.
dh: ref.descHandlers[desc.Digest],
session: s,
}).Unlazy(ctx); err != nil {
return struct{}{}, err
return l, err
}
newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc)
if err != nil {
return struct{}{}, errors.Wrapf(err, "failed to convert")
return nil, errors.Wrapf(err, "failed to convert")
}

// Start to track converted layer
if err := ref.linkBlob(ctx, *newDesc); err != nil {
return struct{}{}, errors.Wrapf(err, "failed to add compression blob")
return nil, errors.Wrapf(err, "failed to add compression blob")
}
return struct{}{}, nil
return l, nil
})
return err
if err != nil {
return err
}
if l != nil {
if err := l.Adopt(ctx); err != nil {
return err
}
}
return nil
}
46 changes: 34 additions & 12 deletions cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,8 +1062,19 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context,
}

func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s session.Group) error {
_, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ struct{}, rerr error) {
l, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ *leaseutil.LeaseRef, rerr error) {
dhs := sr.descHandlers

l, ctx, err := leaseutil.NewLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return nil, err
}
defer func() {
if rerr != nil {
l.Discard()
}
}()

for _, r := range sr.layerChain() {
r := r
snapshotID := r.getSnapshotID()
Expand All @@ -1074,7 +1085,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s
dh := dhs[digest.Digest(r.getBlob())]
if dh == nil {
// We cannot prepare remote snapshots without descHandler.
return struct{}{}, nil
return l, nil
}

// tmpLabels contains dh.SnapshotLabels + session IDs. All keys contain
Expand Down Expand Up @@ -1135,9 +1146,17 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s
break
}

return struct{}{}, nil
return l, nil
})
return err
if err != nil {
return err
}
if l != nil {
if err := l.Adopt(ctx); err != nil {
return err
}
}
return nil
}

func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields []string, res map[string]string) {
Expand All @@ -1158,28 +1177,31 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields
}

func (sr *immutableRef) unlazy(ctx context.Context, dhs DescHandlers, pg progress.Controller, s session.Group, topLevel bool, ensureContentStore bool) error {
_, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ struct{}, rerr error) {
_, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ *leaseutil.LeaseRef, rerr error) {
if _, err := sr.cm.Snapshotter.Stat(ctx, sr.getSnapshotID()); err == nil {
if !ensureContentStore {
return struct{}{}, nil
return nil, nil
}
if blob := sr.getBlob(); blob == "" {
return struct{}{}, nil
return nil, nil
}
if _, err := sr.cm.ContentStore.Info(ctx, sr.getBlob()); err == nil {
return struct{}{}, nil
return nil, nil
}
}

switch sr.kind() {
case Merge, Diff:
return struct{}{}, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel, ensureContentStore)
return nil, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel, ensureContentStore)
case Layer, BaseLayer:
return struct{}{}, sr.unlazyLayer(ctx, dhs, pg, s, ensureContentStore)
return nil, sr.unlazyLayer(ctx, dhs, pg, s, ensureContentStore)
}
return struct{}{}, nil
return nil, nil
})
return err
if err != nil {
return err
}
return nil
}

// should be called within sizeG.Do call for this ref's ID
Expand Down
Loading

0 comments on commit 05abc3f

Please sign in to comment.