Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
backup operator: support large multiblock friles on azure blob storage.
Browse files Browse the repository at this point in the history
Very large files were previously loaded in-memory (causing fatal crashes).
  • Loading branch information
kapouille committed Apr 4, 2019
1 parent 47f2552 commit 6f84f8a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 21 deletions.
13 changes: 7 additions & 6 deletions pkg/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config

resp, err := etcdcli.Get(ctx, "/", clientv3.WithSerializable())
if err != nil {
errors = append(errors, fmt.Sprintf("failed to get revision from endpoint (%s)", endpoint))
errors = append(errors, fmt.Sprintf("failed to get revision from endpoint (%s): %v", endpoint, err))
continue
}

Expand All @@ -152,10 +152,6 @@ func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config
cli.Close()
}

if maxClient == nil {
return nil, 0, fmt.Errorf("could not create an etcd client for the max revision purpose from given endpoints (%v)", endpoints)
}

var err error
if len(errors) > 0 {
errorStr := ""
Expand All @@ -165,5 +161,10 @@ func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config
err = fmt.Errorf(errorStr)
}

if maxClient == nil {
return nil, 0, fmt.Errorf("could not create an etcd client for the max maxirevision purpose from given endpoints (%v)", endpoints)
}


return maxClient, maxRev, err
}
}
35 changes: 20 additions & 15 deletions pkg/backup/writer/abs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
package writer

import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"

"github.com/coreos/etcd-operator/pkg/backup/util"
"io"

"github.com/Azure/azure-sdk-for-go/storage"
"github.com/pborman/uuid"
Expand All @@ -47,12 +45,14 @@ const (
func (absw *absWriter) Write(ctx context.Context, path string, r io.Reader) (int64, error) {
// TODO: support context.
container, key, err := util.ParseBucketAndKey(path)

if err != nil {
return 0, err
}

containerRef := absw.abs.GetContainerReference(container)
containerExists, err := containerRef.Exists()

if err != nil {
return 0, err
}
Expand All @@ -62,30 +62,35 @@ func (absw *absWriter) Write(ctx context.Context, path string, r io.Reader) (int
}

blob := containerRef.GetBlobReference(key)

err = blob.CreateBlockBlob(&storage.PutBlobOptions{})
if err != nil {
return 0, err
}

buf := new(bytes.Buffer)
buf.ReadFrom(r)
len := len(buf.Bytes())
chunckCount := len/AzureBlobBlockChunkLimitInBytes + 1
blocks := make([]storage.Block, 0, chunckCount)
for i := 0; i < chunckCount; i++ {
blocks := make([]storage.Block, 0)
block := make([]byte, AzureBlobBlockChunkLimitInBytes)
for {

nbRead, maybeEof := io.ReadFull(r, block)

if maybeEof == io.EOF {
break
}

blockID := base64.StdEncoding.EncodeToString([]byte(uuid.New()))
blocks = append(blocks, storage.Block{ID: blockID, Status: storage.BlockStatusLatest})
start := i * AzureBlobBlockChunkLimitInBytes
end := (i + 1) * AzureBlobBlockChunkLimitInBytes
if len < end {
end = len
}

chunk := buf.Bytes()[start:end]
chunk := block[0:nbRead]
err = blob.PutBlock(blockID, chunk, &storage.PutBlockOptions{})

if err != nil {
return 0, err
}

if maybeEof == io.ErrUnexpectedEOF {
break
}
}

err = blob.PutBlockList(blocks, &storage.PutBlockListOptions{})
Expand Down

0 comments on commit 6f84f8a

Please sign in to comment.