-
Notifications
You must be signed in to change notification settings - Fork 379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support layer deltas #902
base: main
Are you sure you want to change the base?
Support layer deltas #902
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ import ( | |
"io/ioutil" | ||
"os" | ||
"reflect" | ||
"sort" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
@@ -23,6 +24,7 @@ import ( | |
"github.com/containers/image/v5/types" | ||
"github.com/containers/ocicrypt" | ||
encconfig "github.com/containers/ocicrypt/config" | ||
"github.com/containers/tar-diff/pkg/tar-patch" | ||
digest "github.com/opencontainers/go-digest" | ||
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" | ||
"github.com/pkg/errors" | ||
|
@@ -790,6 +792,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { | |
srcInfosUpdated = true | ||
} | ||
|
||
deltaLayers, err := types.ImageDeltaLayers(ic.src, ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
type copyLayerData struct { | ||
destInfo types.BlobInfo | ||
diffID digest.Digest | ||
|
@@ -809,7 +816,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { | |
} | ||
|
||
data := make([]copyLayerData, numLayers) | ||
copyLayerHelper := func(index int, srcLayer types.BlobInfo, toEncrypt bool, pool *mpb.Progress) { | ||
copyLayerHelper := func(index int, srcLayer types.BlobInfo, toEncrypt bool, pool *mpb.Progress, deltaLayers []types.BlobInfo) { | ||
defer copySemaphore.Release(1) | ||
defer copyGroup.Done() | ||
cld := copyLayerData{} | ||
|
@@ -824,7 +831,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { | |
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name()) | ||
} | ||
} else { | ||
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool) | ||
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, index, srcLayer, toEncrypt, pool, deltaLayers) | ||
} | ||
data[index] = cld | ||
} | ||
|
@@ -861,7 +868,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { | |
return errors.Wrapf(err, "Can't acquire semaphore") | ||
} | ||
copyGroup.Add(1) | ||
go copyLayerHelper(i, srcLayer, encLayerBitmap[i], progressPool) | ||
go copyLayerHelper(i, srcLayer, encLayerBitmap[i], progressPool, deltaLayers) | ||
} | ||
|
||
// A call to copyGroup.Wait() is done at this point by the defer above. | ||
|
@@ -1043,9 +1050,83 @@ type diffIDResult struct { | |
err error | ||
} | ||
|
||
// Get all the deltas that apply to this layer | ||
func (ic *imageCopier) getMatchingDeltaLayers(ctx context.Context, srcIndex int, deltaLayers []types.BlobInfo) (digest.Digest, []*types.BlobInfo) { | ||
if deltaLayers == nil { | ||
return "", nil | ||
} | ||
config, _ := ic.src.OCIConfig(ctx) | ||
if config == nil || config.RootFS.DiffIDs == nil || len(config.RootFS.DiffIDs) <= srcIndex { | ||
return "", nil | ||
} | ||
|
||
layerDiffId := config.RootFS.DiffIDs[srcIndex] | ||
|
||
var matchingLayers []*types.BlobInfo | ||
for i := range deltaLayers { | ||
deltaLayer := &deltaLayers[i] | ||
to := deltaLayer.Annotations["io.github.containers.delta.to"] | ||
if to == layerDiffId.String() { | ||
matchingLayers = append(matchingLayers, deltaLayer) | ||
} | ||
} | ||
|
||
return layerDiffId, matchingLayers | ||
} | ||
|
||
// Looks at which of the matching delta froms have locally available data and picks the best one | ||
func (ic *imageCopier) resolveDeltaLayer(ctx context.Context, matchingDeltas []*types.BlobInfo) (io.ReadCloser, tar_patch.DataSource, types.BlobInfo, error) { | ||
// Sort smallest deltas so we favour the smallest useable one | ||
sort.Slice(matchingDeltas, func(i, j int) bool { | ||
return matchingDeltas[i].Size < matchingDeltas[j].Size | ||
}) | ||
|
||
for i := range matchingDeltas { | ||
matchingDelta := matchingDeltas[i] | ||
from := matchingDelta.Annotations["io.github.containers.delta.from"] | ||
fromDigest, err := digest.Parse(from) | ||
if err != nil { | ||
continue // Silently ignore if server specified a werid format | ||
} | ||
|
||
dataSource, err := types.ImageDestinationGetLayerDeltaData(ic.c.dest, ctx, fromDigest) | ||
if err != nil { | ||
return nil, nil, types.BlobInfo{}, err // Internal error | ||
} | ||
if dataSource == nil { | ||
continue // from layer doesn't exist | ||
} | ||
|
||
logrus.Debugf("Using delta %v for DiffID %v", matchingDelta.Digest, fromDigest) | ||
|
||
deltaStream, _, err := ic.c.rawSource.GetBlob(ctx, *matchingDelta, ic.c.blobInfoCache) | ||
if err != nil { | ||
return nil, nil, types.BlobInfo{}, errors.Wrapf(err, "Error reading delta blob %s", matchingDelta.Digest) | ||
} | ||
return deltaStream, dataSource, *matchingDelta, nil | ||
} | ||
return nil, nil, types.BlobInfo{}, nil | ||
} | ||
|
||
func (ic *imageCopier) canUseDeltas(srcInfo types.BlobInfo) (bool, string) { | ||
// Deltas rewrite the manifest to refer to the uncompressed digest, so we must be able to substiture blobs | ||
if !ic.canSubstituteBlobs { | ||
return false, "" | ||
} | ||
|
||
switch srcInfo.MediaType { | ||
case manifest.DockerV2Schema2LayerMediaType, manifest.DockerV2SchemaLayerMediaTypeUncompressed: | ||
return true, manifest.DockerV2SchemaLayerMediaTypeUncompressed | ||
case imgspecv1.MediaTypeImageLayer, imgspecv1.MediaTypeImageLayerGzip, imgspecv1.MediaTypeImageLayerZstd: | ||
return true, imgspecv1.MediaTypeImageLayer | ||
} | ||
|
||
return false, "" | ||
} | ||
|
||
// copyLayer copies a layer with srcInfo (with known Digest and Annotations and possibly known Size) in src to dest, perhaps compressing it if canCompress, | ||
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded | ||
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { | ||
func (ic *imageCopier) copyLayer(ctx context.Context, srcIndex int, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress, deltaLayers []types.BlobInfo) (types.BlobInfo, digest.Digest, error) { | ||
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" | ||
// Diffs are needed if we are encrypting an image or trying to decrypt an image | ||
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" || toEncrypt || (isOciEncrypted(srcInfo.MediaType) && ic.ociDecryptConfig != nil) | ||
|
@@ -1064,6 +1145,52 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to | |
} | ||
} | ||
|
||
// First look for a delta that matches this layer and substitute the result of that | ||
if ok, deltaResultMediaType := ic.canUseDeltas(srcInfo); ok { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interaction for encrypting path with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what you mean here, it seem alright to me. Applying deltas naturally create the uncompressed layers, so we get the diffid etc. That said, I'm not well versed in how crypto works here, so I might be missing something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... so the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. copy never generates deltas, it just applies them to regenerate the tar. Creating deltas is currently done by a separate patch to skopeo. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case of encrypted images, skopeo |
||
// Get deltas going TO this layer | ||
deltaDiffID, matchingDeltas := ic.getMatchingDeltaLayers(ctx, srcIndex, deltaLayers) | ||
// Get best possible FROM delta | ||
deltaStream, deltaDataSource, matchingDelta, err := ic.resolveDeltaLayer(ctx, matchingDeltas) | ||
if err != nil { | ||
return types.BlobInfo{}, "", err | ||
} | ||
if deltaStream != nil { | ||
bar := ic.c.createProgressBar(pool, matchingDelta, "delta", "done") | ||
|
||
wrappedDeltaStream := bar.ProxyReader(deltaStream) | ||
|
||
// Convert deltaStream to uncompressed tar layer stream | ||
pr, pw := io.Pipe() | ||
go func() { | ||
if err := tar_patch.Apply(wrappedDeltaStream, deltaDataSource, pw); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are the deltas always uncompressed/unencrypted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deltas are inherently compressed. I.e. they are not wrapped in a compression layer but instead compressed on the parts inside that need it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That said, they are always unencrypted, so layering wise we might want to wrap some encryption around them. |
||
// We will notice this error when failing to verify the digest, so leave it be | ||
logrus.Infof("Failed to apply layer delta: %v", err) | ||
} | ||
deltaDataSource.Close() | ||
deltaStream.Close() | ||
wrappedDeltaStream.Close() | ||
pw.Close() | ||
}() | ||
defer pr.Close() | ||
|
||
// Copy uncompressed tar layer to destination, verifying the diffID | ||
blobInfo, err := ic.c.copyBlobFromStream(ctx, pr, types.BlobInfo{Digest: deltaDiffID, Size: -1, MediaType: deltaResultMediaType, Annotations: srcInfo.Annotations}, nil, ic.canModifyManifest, false, toEncrypt, nil) | ||
if err != nil { | ||
return types.BlobInfo{}, "", err | ||
} | ||
|
||
bar.SetTotal(matchingDelta.Size, true) | ||
|
||
// We verified this when streaming the applied delta above | ||
diffID := deltaDiffID | ||
|
||
// Record the fact that this blob is uncompressed | ||
ic.c.blobInfoCache.RecordDigestUncompressedPair(diffID, diffID) | ||
|
||
return blobInfo, diffID, err | ||
} | ||
} | ||
|
||
// Fallback: copy the layer, computing the diffID if we need to do so | ||
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache) | ||
if err != nil { | ||
|
@@ -1213,7 +1340,9 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr | |
return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest) | ||
} | ||
isCompressed := decompressor != nil | ||
destStream = bar.ProxyReader(destStream) | ||
if bar != nil { | ||
destStream = bar.ProxyReader(destStream) | ||
} | ||
|
||
// === Send a copy of the original, uncompressed, stream, to a separate path if necessary. | ||
var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so. | ||
|
@@ -1232,6 +1361,11 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr | |
logrus.Debugf("Using original blob without modification for encrypted blob") | ||
compressionOperation = types.PreserveOriginal | ||
inputInfo = srcInfo | ||
} else if canModifyBlob && manifest.IsNoCompressType(srcInfo.MediaType) { | ||
// This is a blob we should not repack, such as a delta | ||
logrus.Debugf("Using original blob without modification for no-compress type") | ||
compressionOperation = types.PreserveOriginal | ||
inputInfo = srcInfo | ||
} else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Compress && !isCompressed { | ||
logrus.Debugf("Compressing blob on the fly") | ||
compressionOperation = types.Compress | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interaction for encrypting path with decryption routine
Encryption mediatypes https://github.com/containers/ocicrypt/blob/master/spec/spec.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
canUseDeltas() currently returns false (due to the above media checks) for encrypted manifests, because for now we don't support encrypted deltas. However, once we do these checks need to be widened, yes.