Skip to content

Commit

Permalink
Implement etcd-backup-restore on OSS of Alicloud
Browse files Browse the repository at this point in the history
  • Loading branch information
Minchao Wang committed Feb 15, 2019
1 parent 4e33761 commit 74d9d08
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ A clear and concise description of what you expected to happen.
**Environment (please complete the following information):**
- Etcd version/commit ID :
- Etcd-backup-restore version/commit ID:
- Cloud Provider [All/AWS/GCS/ABS/Swift]:
- Cloud Provider [All/AWS/GCS/ABS/Swift/OSS]:

**Anything else we need to know?**:
10 changes: 9 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ required = ["github.com/coreos/bbolt"]
name = "github.com/prometheus/client_golang"
version = "0.9.2"

[[constraint]]
name = "github.com/aliyun/aliyun-oss-go-sdk"
version = "1.9.4"

[[override]]
name = "gopkg.in/fsnotify.v1"
source = "https://github.com/fsnotify/fsnotify.git"
source = "https://github.com/fsnotify/fsnotify.git"

[[override]]
name = "github.com/ugorji/go"
revision = "b4c50a2b199d93b13dc15e78929cfb23bfdf21ab"
4 changes: 4 additions & 0 deletions LICENSE.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ https://github.com/gophercloud/gophercloud.
Copyright 2012-2013 Rackspace, Inc.
Apache 2 license (https://github.com/gophercloud/gophercloud/blob/master/LICENSE)

Alibaba Cloud OSS SDK for Go.
https://github.com/aliyun/aliyun-oss-go-sdk.
Copyright 2019 The Alibaba Cloud Authors
Apache 2 license (https://github.com/aliyun/aliyun-oss-go-sdk/blob/master/README.md)

Prometheus instrumentation library for Go applications.
https://github.com/prometheus/client_golang.
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Please find the design doc [here](doc/design.md).

## Usage

You can follow the `help` flag on `etcdbrctl` command and its sub-commands to know the usage details. Some of the common use cases are mentioned below. Although examples below uses AWS S3 as storage provider, we have added support for AWS, GCS, Azure and Openstack swift object store. It also supports local disk as storage provider.
You can follow the `help` flag on `etcdbrctl` command and its sub-commands to know the usage details. Some of the common use cases are mentioned below. Although examples below uses AWS S3 as storage provider, we have added support for AWS, GCS, Azure, Openstack swift and Alicloud OSS object store. It also supports local disk as storage provider.

### Cloud Provider Credentials

Expand All @@ -129,6 +129,8 @@ For `Azure Blob storage`, `STORAGE_ACCOUNT` and `STORAGE_KEY` should be made ava

For `Openstack Swift`, `OS_USERNAME`, `OS_PASSWORD`, `OS_AUTH_URL`, `OS_TENANT_ID` and `OS_DOMAIN_ID` should be made available as environment variables.

For `Alicloud OSS`, `ALICLOUD_ENDPOINT`, `ALICLOUD_ACCESS_KEY_ID`, `ALICLOUD_ACCESS_KEY_SECRET` should be made available as environment variables.

### Taking scheduled snapshot


Expand Down
22 changes: 21 additions & 1 deletion chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ backup:
# mentions the policy for garbage collecting old backups. Allowed values are Exponential(default), LimtBased.
garbageCollectionPolicy: Exponential

storageProvider: "Local" # Abs,Gcs,S3,Swift empty means no backup,
storageProvider: "Local" # Abs,Gcs,S3,Swift,Oss empty means no backup,

backupSecret: etcd-backup

Expand Down Expand Up @@ -58,6 +58,26 @@ tls:
# podAnnotations will be placed to the resulting etcd pod
podAnnotations: {}

# Alicloud OSS storage configuration
# Note: No volumeMounts needed
# storageProvider: "OSS"
# env:
# - name: "ALICLOUD_ENDPOINT"
# valueFrom:
# secretKeyRef:
# name: etcd-backup
# key: "storageEndpoint"
# - name: "ALICLOUD_ACCESS_KEY_ID"
# valueFrom:
# secretKeyRef:
# name: etcd-backup
# key: "accessKeyID"
# - name: "ALICLOUD_ACCESS_KEY_SECRET"
# valueFrom:
# secretKeyRef:
# name: etcd-backup
# key: "accessKeySecret"

# Aws S3 storage configuration
# Note: No volumeMounts variable needed
# storageProvider: "S3"
Expand Down
276 changes: 276 additions & 0 deletions pkg/snapstore/oss_snapstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
// Copyright (c) 2019 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package snapstore

import (
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path"
"sort"
"sync"

"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/sirupsen/logrus"
)

// OSSBucket is an interface for oss.Bucket used in snapstore
type OSSBucket interface {
GetObject(objectKey string, options ...oss.Option) (io.ReadCloser, error)
InitiateMultipartUpload(objectKey string, options ...oss.Option) (oss.InitiateMultipartUploadResult, error)
CompleteMultipartUpload(imur oss.InitiateMultipartUploadResult, parts []oss.UploadPart, options ...oss.Option) (oss.CompleteMultipartUploadResult, error)
ListObjects(options ...oss.Option) (oss.ListObjectsResult, error)
DeleteObject(objectKey string) error
UploadPart(imur oss.InitiateMultipartUploadResult, reader io.Reader, partSize int64, partNumber int, options ...oss.Option) (oss.UploadPart, error)
AbortMultipartUpload(imur oss.InitiateMultipartUploadResult, options ...oss.Option) error
}

const (
ossNoOfChunk int64 = 10000
ossEndPoint = "ALICLOUD_ENDPOINT"
accessKeyID = "ALICLOUD_ACCESS_KEY_ID"
accessKeySecret = "ALICLOUD_ACCESS_KEY_SECRET"
)

type authOptions struct {
endpoint string
accessID string
accessKey string
}

// OSSSnapStore is snapstore with Alicloud OSS object store as backend
type OSSSnapStore struct {
prefix string
bucket OSSBucket
multiPart sync.Mutex
maxParallelChunkUploads int
tempDir string
}

// NewOSSSnapStore create new OSSSnapStore from shared configuration with specified bucket
func NewOSSSnapStore(bucket, prefix, tempDir string, maxParallelChunkUploads int) (*OSSSnapStore, error) {
ao, err := authOptionsFromEnv()
if err != nil {
return nil, err
}
return newOSSFromAuthOpt(bucket, prefix, tempDir, maxParallelChunkUploads, ao)
}

func newOSSFromAuthOpt(bucket, prefix, tempDir string, maxParallelChunkUploads int, ao authOptions) (*OSSSnapStore, error) {
client, err := oss.New(ao.endpoint, ao.accessID, ao.accessKey)
if err != nil {
return nil, err
}

bucketOSS, err := client.Bucket(bucket)
if err != nil {
return nil, err
}

return NewOSSFromBucket(prefix, tempDir, maxParallelChunkUploads, bucketOSS), nil
}

// NewOSSFromBucket will create the new OSS snapstore object from OSS bucket
func NewOSSFromBucket(prefix, tempDir string, maxParallelChunkUploads int, bucket OSSBucket) *OSSSnapStore {
return &OSSSnapStore{
prefix: prefix,
bucket: bucket,
maxParallelChunkUploads: maxParallelChunkUploads,
tempDir: tempDir,
}
}

// Fetch should open reader for the snapshot file from store
func (s *OSSSnapStore) Fetch(snap Snapshot) (io.ReadCloser, error) {
body, err := s.bucket.GetObject(path.Join(s.prefix, snap.SnapDir, snap.SnapName))
if err != nil {
return nil, err
}
return body, nil
}

// Save will write the snapshot to store
func (s *OSSSnapStore) Save(snap Snapshot, rc io.ReadCloser) error {
tmpfile, err := ioutil.TempFile(s.tempDir, tmpBackupFilePrefix)
if err != nil {
rc.Close()
return fmt.Errorf("failed to create snapshot tempfile: %v", err)
}
defer func() {
tmpfile.Close()
os.Remove(tmpfile.Name())
}()

size, err := io.Copy(tmpfile, rc)
rc.Close()
if err != nil {
return fmt.Errorf("failed to save snapshot to tmpfile: %v", err)
}
_, err = tmpfile.Seek(0, io.SeekStart)
if err != nil {
return err
}

var (
chunkSize = int64(math.Max(float64(minChunkSize), float64(size/ossNoOfChunk)))
noOfChunks = size / chunkSize
)
if size%chunkSize != 0 {
noOfChunks++
}

ossChunks, err := oss.SplitFileByPartNum(tmpfile.Name(), int(noOfChunks))
if err != nil {
return err
}

imur, err := s.bucket.InitiateMultipartUpload(path.Join(s.prefix, snap.SnapDir, snap.SnapName))
if err != nil {
return err
}

var (
completedParts = make([]oss.UploadPart, noOfChunks)
chunkUploadCh = make(chan chunk, noOfChunks)
resCh = make(chan chunkUploadResult, noOfChunks)
cancelCh = make(chan struct{})
wg sync.WaitGroup
)

for i := 0; i < s.maxParallelChunkUploads; i++ {
wg.Add(1)
go s.partUploader(&wg, imur, tmpfile, completedParts, chunkUploadCh, cancelCh, resCh)
}

for _, ossChunk := range ossChunks {
chunk := chunk{
offset: ossChunk.Offset,
size: ossChunk.Size,
id: ossChunk.Number,
}
logrus.Debugf("Triggering chunk upload for offset: %d", chunk.offset)
chunkUploadCh <- chunk
}

logrus.Infof("Triggered chunk upload for all chunks, total: %d", noOfChunks)
snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks)
wg.Wait()

if snapshotErr == nil {
_, err := s.bucket.CompleteMultipartUpload(imur, completedParts)
if err != nil {
return err
}
logrus.Infof("Finishing the multipart upload with upload ID : %s", imur.UploadID)
} else {
logrus.Infof("Aborting the multipart upload with upload ID : %s", imur.UploadID)
err := s.bucket.AbortMultipartUpload(imur)
if err != nil {
return snapshotErr.err
}
}

return nil
}

func (s *OSSSnapStore) partUploader(wg *sync.WaitGroup, imur oss.InitiateMultipartUploadResult, file *os.File, completedParts []oss.UploadPart, chunkUploadCh <-chan chunk, stopCh <-chan struct{}, errCh chan<- chunkUploadResult) {
defer wg.Done()
for {
select {
case <-stopCh:
return
case chunk, ok := <-chunkUploadCh:
if !ok {
return
}
logrus.Infof("Uploading chunk with id: %d, offset: %d, size: %d", chunk.id, chunk.offset, chunk.size)
err := s.uploadPart(imur, file, completedParts, chunk.offset, chunk.size, chunk.id)
errCh <- chunkUploadResult{
err: err,
chunk: &chunk,
}
}
}
}

func (s *OSSSnapStore) uploadPart(imur oss.InitiateMultipartUploadResult, file *os.File, completedParts []oss.UploadPart, offset, chunkSize int64, number int) error {
fd := io.NewSectionReader(file, offset, chunkSize)
part, err := s.bucket.UploadPart(imur, fd, chunkSize, number)

if err == nil {
completedParts[number-1] = part
}
return err
}

// List will list the snapshots from store
func (s *OSSSnapStore) List() (SnapList, error) {
var snapList SnapList

marker := ""
for {
lsRes, err := s.bucket.ListObjects(oss.Marker(marker))
if err != nil {
return nil, err
}
for _, object := range lsRes.Objects {
snap, err := ParseSnapshot(object.Key[len(s.prefix)+1:])
if err != nil {
// Warning
logrus.Warnf("Invalid snapshot found. Ignoring it: %s", object.Key)
} else {
snapList = append(snapList, snap)
}
}
if lsRes.IsTruncated {
marker = lsRes.NextMarker
} else {
break
}
}
sort.Sort(snapList)

return snapList, nil
}

// Delete should delete the snapshot file from store
func (s *OSSSnapStore) Delete(snap Snapshot) error {
return s.bucket.DeleteObject(path.Join(s.prefix, snap.SnapDir, snap.SnapName))
}

func authOptionsFromEnv() (authOptions, error) {
endpoint, err := GetEnvVarOrError(ossEndPoint)
if err != nil {
return authOptions{}, err
}
accessID, err := GetEnvVarOrError(accessKeyID)
if err != nil {
return authOptions{}, err
}
accessKey, err := GetEnvVarOrError(accessKeySecret)
if err != nil {
return authOptions{}, err
}

ao := authOptions{
endpoint: endpoint,
accessID: accessID,
accessKey: accessKey,
}

return ao, nil
}
Loading

0 comments on commit 74d9d08

Please sign in to comment.