Skip to content

Commit

Permalink
Add Full Object Checksum API (#2026)
Browse files Browse the repository at this point in the history
Add support for full object checksums as described here:

https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html

To enable, use `ChecksumCRC64NVME`, `ChecksumFullObjectCRC32` or `ChecksumFullObjectCRC32C` as checksum type when uploading.

Mint tests updated, but can be disabled with `MINT_NO_FULL_OBJECT=anything` env var.

PR will fail against community MinIO without above env var.
  • Loading branch information
klauspost authored Dec 7, 2024
1 parent 5e9a483 commit fdef7d2
Show file tree
Hide file tree
Showing 11 changed files with 606 additions and 151 deletions.
1 change: 1 addition & 0 deletions .github/workflows/go-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
ENABLE_HTTPS: 1
MINIO_KMS_MASTER_KEY: my-minio-key:6368616e676520746869732070617373776f726420746f206120736563726574
MINIO_CI_CD: true
MINT_NO_FULL_OBJECT: true
run: |
New-Item -ItemType Directory -Path "$env:temp/certs-dir"
Copy-Item -Path testcerts\* -Destination "$env:temp/certs-dir"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jobs:
MINIO_KMS_MASTER_KEY: my-minio-key:6368616e676520746869732070617373776f726420746f206120736563726574
SSL_CERT_FILE: /tmp/certs-dir/public.crt
MINIO_CI_CD: true
MINT_NO_FULL_OBJECT: true
run: |
sudo apt update -y
sudo apt install devscripts -y
Expand Down
18 changes: 10 additions & 8 deletions api-datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ type UploadInfo struct {
// Verified checksum values, if any.
// Values are base64 (standard) encoded.
// For multipart objects this is a checksum of the checksum of each part.
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
ChecksumCRC64NVME string
}

// RestoreInfo contains information of the restore operation of an archived object
Expand Down Expand Up @@ -215,10 +216,11 @@ type ObjectInfo struct {
Restore *RestoreInfo

// Checksum values
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
ChecksumCRC64NVME string

Internal *struct {
K int // Data blocks
Expand Down
48 changes: 22 additions & 26 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5, !opts.DisableContentSha256)
if len(hashSums) == 0 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
addAutoChecksumHeaders(&opts)
}

// Initiate a new multipart upload.
Expand All @@ -113,7 +110,6 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := opts.AutoChecksum.Hasher()
for partNumber <= totalPartsCount {
Expand Down Expand Up @@ -154,7 +150,6 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, sha256Hex: sha256Hex, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
Expand Down Expand Up @@ -182,18 +177,21 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
allParts := make([]ObjectPart, 0, len(partsInfo))
for i := 1; i < partNumber; i++ {
part, ok := partsInfo[i]
if !ok {
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
allParts = append(allParts, part)
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ChecksumCRC64NVME: part.ChecksumCRC64NVME,
})
}

Expand All @@ -203,12 +201,8 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{opts.AutoChecksum.Key(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
applyAutoChecksum(&opts, allParts)

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
Expand Down Expand Up @@ -354,10 +348,11 @@ func (c *Client) uploadPart(ctx context.Context, p uploadPartParams) (ObjectPart
// Once successfully uploaded, return completed part.
h := resp.Header
objPart := ObjectPart{
ChecksumCRC32: h.Get("x-amz-checksum-crc32"),
ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
ChecksumCRC32: h.Get(ChecksumCRC32.Key()),
ChecksumCRC32C: h.Get(ChecksumCRC32C.Key()),
ChecksumSHA1: h.Get(ChecksumSHA1.Key()),
ChecksumSHA256: h.Get(ChecksumSHA256.Key()),
ChecksumCRC64NVME: h.Get(ChecksumCRC64NVME.Key()),
}
objPart.Size = p.size
objPart.PartNumber = p.partNumber
Expand Down Expand Up @@ -457,9 +452,10 @@ func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, object
Expiration: expTime,
ExpirationRuleID: ruleID,

ChecksumSHA256: completeMultipartUploadResult.ChecksumSHA256,
ChecksumSHA1: completeMultipartUploadResult.ChecksumSHA1,
ChecksumCRC32: completeMultipartUploadResult.ChecksumCRC32,
ChecksumCRC32C: completeMultipartUploadResult.ChecksumCRC32C,
ChecksumSHA256: completeMultipartUploadResult.ChecksumSHA256,
ChecksumSHA1: completeMultipartUploadResult.ChecksumSHA1,
ChecksumCRC32: completeMultipartUploadResult.ChecksumCRC32,
ChecksumCRC32C: completeMultipartUploadResult.ChecksumCRC32C,
ChecksumCRC64NVME: completeMultipartUploadResult.ChecksumCRC64NVME,
}, nil
}
99 changes: 39 additions & 60 deletions api-put-object-streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
}
withChecksum := c.trailingHeaderSupport
if withChecksum {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
addAutoChecksumHeaders(&opts)
}
// Initiate a new multipart upload.
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
Expand Down Expand Up @@ -240,6 +237,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN

// Gather the responses as they occur and update any
// progress bar.
allParts := make([]ObjectPart, 0, totalPartsCount)
for u := 1; u <= totalPartsCount; u++ {
select {
case <-ctx.Done():
Expand All @@ -248,16 +246,17 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
if uploadRes.Error != nil {
return UploadInfo{}, uploadRes.Error
}

allParts = append(allParts, uploadRes.Part)
// Update the totalUploadedSize.
totalUploadedSize += uploadRes.Size
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: uploadRes.Part.ETag,
PartNumber: uploadRes.Part.PartNumber,
ChecksumCRC32: uploadRes.Part.ChecksumCRC32,
ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C,
ChecksumSHA1: uploadRes.Part.ChecksumSHA1,
ChecksumSHA256: uploadRes.Part.ChecksumSHA256,
ETag: uploadRes.Part.ETag,
PartNumber: uploadRes.Part.PartNumber,
ChecksumCRC32: uploadRes.Part.ChecksumCRC32,
ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C,
ChecksumSHA1: uploadRes.Part.ChecksumSHA1,
ChecksumSHA256: uploadRes.Part.ChecksumSHA256,
ChecksumCRC64NVME: uploadRes.Part.ChecksumCRC64NVME,
})
}
}
Expand All @@ -275,15 +274,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
AutoChecksum: opts.AutoChecksum,
}
if withChecksum {
// Add hash of hashes.
crc := opts.AutoChecksum.Hasher()
for _, part := range complMultipartUpload.Parts {
cs, err := base64.StdEncoding.DecodeString(part.Checksum(opts.AutoChecksum))
if err == nil {
crc.Write(cs)
}
}
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
applyAutoChecksum(&opts, allParts)
}

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
Expand Down Expand Up @@ -312,10 +303,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
}

if !opts.SendContentMd5 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
addAutoChecksumHeaders(&opts)
}

// Calculate the optimal parts info for a given size.
Expand All @@ -342,7 +330,6 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := opts.AutoChecksum.Hasher()
md5Hash := c.md5Hasher()
Expand Down Expand Up @@ -389,7 +376,6 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set(opts.AutoChecksum.KeyCapitalized(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

// Update progress reader appropriately to the latest offset
Expand Down Expand Up @@ -420,18 +406,21 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
allParts := make([]ObjectPart, 0, len(partsInfo))
for i := 1; i < partNumber; i++ {
part, ok := partsInfo[i]
if !ok {
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
allParts = append(allParts, part)
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ChecksumCRC64NVME: part.ChecksumCRC64NVME,
})
}

Expand All @@ -442,12 +431,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
applyAutoChecksum(&opts, allParts)
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
Expand Down Expand Up @@ -475,10 +459,7 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
opts.AutoChecksum = opts.Checksum
}
if !opts.SendContentMd5 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
addAutoChecksumHeaders(&opts)
}

// Cancel all when an error occurs.
Expand Down Expand Up @@ -510,7 +491,6 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
crc := opts.AutoChecksum.Hasher()

// Total data read and written to server. should be equal to 'size' at the end of the call.
Expand Down Expand Up @@ -570,7 +550,6 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

wg.Add(1)
Expand Down Expand Up @@ -630,18 +609,21 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
allParts := make([]ObjectPart, 0, len(partsInfo))
for i := 1; i < partNumber; i++ {
part, ok := partsInfo[i]
if !ok {
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
allParts = append(allParts, part)
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ChecksumCRC64NVME: part.ChecksumCRC64NVME,
})
}

Expand All @@ -652,12 +634,8 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
applyAutoChecksum(&opts, allParts)

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
Expand Down Expand Up @@ -823,9 +801,10 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string,
ExpirationRuleID: ruleID,

// Checksum values
ChecksumCRC32: h.Get("x-amz-checksum-crc32"),
ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
ChecksumCRC32: h.Get(ChecksumCRC32.Key()),
ChecksumCRC32C: h.Get(ChecksumCRC32C.Key()),
ChecksumSHA1: h.Get(ChecksumSHA1.Key()),
ChecksumSHA256: h.Get(ChecksumSHA256.Key()),
ChecksumCRC64NVME: h.Get(ChecksumCRC64NVME.Key()),
}, nil
}
Loading

0 comments on commit fdef7d2

Please sign in to comment.