Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
Merge branch 'clusterwide-backup'
Browse files Browse the repository at this point in the history
  • Loading branch information
kapouille committed Apr 4, 2019
2 parents 1060c24 + 7e40295 commit a64cb82
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 39 deletions.
19 changes: 16 additions & 3 deletions cmd/backup-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
controller "github.com/coreos/etcd-operator/pkg/controller/backup-operator"
"github.com/coreos/etcd-operator/pkg/util/constants"
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
version "github.com/coreos/etcd-operator/version"
"github.com/coreos/etcd-operator/version"

"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
Expand All @@ -37,11 +37,14 @@ import (
)

var (
createCRD bool
createCRD bool
namespace string
clusterWide bool
)

func init() {
flag.BoolVar(&createCRD, "create-crd", true, "The backup operator will not create the EtcdBackup CRD when this flag is set to false.")
flag.BoolVar(&clusterWide, "cluster-wide", false, "Enable operator to watch clusters in all namespaces")
flag.Parse()
}

Expand Down Expand Up @@ -106,9 +109,19 @@ func createRecorder(kubecli kubernetes.Interface, name, namespace string) record
func run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c := controller.New(createCRD)
c := controller.New(newControllerConfig())
err := c.Start(ctx)
if err != nil {
logrus.Fatalf("operator stopped with error: %v", err)
}
}

func newControllerConfig() controller.Config {
cfg := controller.Config{
Namespace: namespace,
ClusterWide: clusterWide,
CreateCRD: createCRD,
}

return cfg
}
10 changes: 5 additions & 5 deletions pkg/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config

resp, err := etcdcli.Get(ctx, "/", clientv3.WithSerializable())
if err != nil {
errors = append(errors, fmt.Sprintf("failed to get revision from endpoint (%s)", endpoint))
errors = append(errors, fmt.Sprintf("failed to get revision from endpoint (%s): %v", endpoint, err))
continue
}

Expand All @@ -152,10 +152,6 @@ func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config
cli.Close()
}

if maxClient == nil {
return nil, 0, fmt.Errorf("could not create an etcd client for the max revision purpose from given endpoints (%v)", endpoints)
}

var err error
if len(errors) > 0 {
errorStr := ""
Expand All @@ -165,5 +161,9 @@ func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config
err = fmt.Errorf(errorStr)
}

if maxClient == nil {
return nil, 0, fmt.Errorf("could not create an etcd client for the max maxirevision purpose from given endpoints (%v)", endpoints)
}

return maxClient, maxRev, err
}
35 changes: 20 additions & 15 deletions pkg/backup/writer/abs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
package writer

import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"

"github.com/coreos/etcd-operator/pkg/backup/util"
"io"

"github.com/Azure/azure-sdk-for-go/storage"
"github.com/pborman/uuid"
Expand All @@ -47,12 +45,14 @@ const (
func (absw *absWriter) Write(ctx context.Context, path string, r io.Reader) (int64, error) {
// TODO: support context.
container, key, err := util.ParseBucketAndKey(path)

if err != nil {
return 0, err
}

containerRef := absw.abs.GetContainerReference(container)
containerExists, err := containerRef.Exists()

if err != nil {
return 0, err
}
Expand All @@ -62,30 +62,35 @@ func (absw *absWriter) Write(ctx context.Context, path string, r io.Reader) (int
}

blob := containerRef.GetBlobReference(key)

err = blob.CreateBlockBlob(&storage.PutBlobOptions{})
if err != nil {
return 0, err
}

buf := new(bytes.Buffer)
buf.ReadFrom(r)
len := len(buf.Bytes())
chunckCount := len/AzureBlobBlockChunkLimitInBytes + 1
blocks := make([]storage.Block, 0, chunckCount)
for i := 0; i < chunckCount; i++ {
blocks := make([]storage.Block, 0)
block := make([]byte, AzureBlobBlockChunkLimitInBytes)
for {

nbRead, maybeEof := io.ReadFull(r, block)

if maybeEof == io.EOF {
break
}

blockID := base64.StdEncoding.EncodeToString([]byte(uuid.New()))
blocks = append(blocks, storage.Block{ID: blockID, Status: storage.BlockStatusLatest})
start := i * AzureBlobBlockChunkLimitInBytes
end := (i + 1) * AzureBlobBlockChunkLimitInBytes
if len < end {
end = len
}

chunk := buf.Bytes()[start:end]
chunk := block[0:nbRead]
err = blob.PutBlock(blockID, chunk, &storage.PutBlockOptions{})

if err != nil {
return 0, err
}

if maybeEof == io.ErrUnexpectedEOF {
break
}
}

err = blob.PutBlockList(blocks, &storage.PutBlockListOptions{})
Expand Down
22 changes: 17 additions & 5 deletions pkg/controller/backup-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ package controller
import (
"context"
"fmt"
"os"
"sync"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"github.com/coreos/etcd-operator/pkg/client"
"github.com/coreos/etcd-operator/pkg/generated/clientset/versioned"
"github.com/coreos/etcd-operator/pkg/util/constants"
"github.com/coreos/etcd-operator/pkg/util/k8sutil"

"github.com/sirupsen/logrus"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand All @@ -51,20 +50,33 @@ type Backup struct {
createCRD bool
}

type Config struct {
Namespace string
ClusterWide bool
CreateCRD bool
}

type BackupRunner struct {
spec api.BackupSpec
cancelFunc context.CancelFunc
}

// New creates a backup operator.
func New(createCRD bool) *Backup {
func New(config Config) *Backup {
var ns string
if config.ClusterWide {
ns = metav1.NamespaceAll
} else {
ns = config.Namespace
}

return &Backup{
logger: logrus.WithField("pkg", "controller"),
namespace: os.Getenv(constants.EnvOperatorPodNamespace),
namespace: ns,
kubecli: k8sutil.MustNewKubeClient(),
backupCRCli: client.MustNewInCluster(),
kubeExtCli: k8sutil.MustNewKubeExtClient(),
createCRD: createCRD,
createCRD: config.CreateCRD,
}
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/controller/backup-operator/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (b *Backup) processItem(key string) error {

} else if !isPeriodic {
// Perform backup
bs, err := b.handleBackup(nil, &eb.Spec, false)
bs, err := b.handleBackup(nil, &eb.Spec, false, eb.Namespace)
// Report backup status
b.reportBackupStatus(bs, err, eb)
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (b *Backup) addFinalizerOfPeriodicBackupIfNeed(eb *api.EtcdBackup) (*api.Et
}
if !containsString(metadata.GetFinalizers(), "backup-operator-periodic") {
metadata.SetFinalizers(append(metadata.GetFinalizers(), "backup-operator-periodic"))
_, err := b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Update(ebNew.(*api.EtcdBackup))
_, err := b.backupCRCli.EtcdV1beta2().EtcdBackups(eb.ObjectMeta.Namespace).Update(ebNew.(*api.EtcdBackup))
if err != nil {
return eb, err
}
Expand All @@ -163,7 +163,7 @@ func (b *Backup) removeFinalizerOfPeriodicBackup(eb *api.EtcdBackup) error {
finalizers = append(finalizers, finalizer)
}
metadata.SetFinalizers(finalizers)
_, err = b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Update(ebNew.(*api.EtcdBackup))
_, err = b.backupCRCli.EtcdV1beta2().EtcdBackups(eb.Namespace).Update(ebNew.(*api.EtcdBackup))
return err
}

Expand All @@ -179,7 +179,7 @@ func (b *Backup) periodicRunnerFunc(ctx context.Context, t *time.Ticker, eb *api
var err error
retryLimit := 5
for i := 1; i < retryLimit+1; i++ {
latestEb, err = b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Get(eb.Name, metav1.GetOptions{})
latestEb, err = b.backupCRCli.EtcdV1beta2().EtcdBackups(eb.Namespace).Get(eb.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
b.logger.Infof("Could not find EtcdBackup. Stopping periodic backup for EtcdBackup CR %v",
Expand All @@ -195,7 +195,7 @@ func (b *Backup) periodicRunnerFunc(ctx context.Context, t *time.Ticker, eb *api
}
if err == nil {
// Perform backup
bs, err = b.handleBackup(&ctx, &latestEb.Spec, true)
bs, err = b.handleBackup(&ctx, &latestEb.Spec, true, latestEb.Namespace)
}
// Report backup status
b.reportBackupStatus(bs, err, latestEb)
Expand All @@ -214,7 +214,7 @@ func (b *Backup) reportBackupStatus(bs *api.BackupStatus, berr error, eb *api.Et
eb.Status.EtcdVersion = bs.EtcdVersion
eb.Status.LastSuccessDate = bs.LastSuccessDate
}
_, err := b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Update(eb)
_, err := b.backupCRCli.EtcdV1beta2().EtcdBackups(eb.Namespace).Update(eb)
if err != nil {
b.logger.Warningf("failed to update status of backup CR %v : (%v)", eb.Name, err)
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func (b *Backup) handleErr(err error, key interface{}) {
b.logger.Infof("Dropping etcd backup (%v) out of the queue: %v", key, err)
}

func (b *Backup) handleBackup(parentContext *context.Context, spec *api.BackupSpec, isPeriodic bool) (*api.BackupStatus, error) {
func (b *Backup) handleBackup(parentContext *context.Context, spec *api.BackupSpec, isPeriodic bool, namespace string) (*api.BackupStatus, error) {
err := validate(spec)
if err != nil {
return nil, err
Expand All @@ -269,28 +269,28 @@ func (b *Backup) handleBackup(parentContext *context.Context, spec *api.BackupSp
switch spec.StorageType {
case api.BackupStorageTypeS3:
bs, err := handleS3(ctx, b.kubecli, spec.S3, spec.EtcdEndpoints, spec.ClientTLSSecret,
b.namespace, isPeriodic, backupMaxCount)
namespace, isPeriodic, backupMaxCount)
if err != nil {
return nil, err
}
return bs, nil
case api.BackupStorageTypeABS:
bs, err := handleABS(ctx, b.kubecli, spec.ABS, spec.EtcdEndpoints, spec.ClientTLSSecret,
b.namespace, isPeriodic, backupMaxCount)
namespace, isPeriodic, backupMaxCount)
if err != nil {
return nil, err
}
return bs, nil
case api.BackupStorageTypeGCS:
bs, err := handleGCS(ctx, b.kubecli, spec.GCS, spec.EtcdEndpoints, spec.ClientTLSSecret,
b.namespace, isPeriodic, backupMaxCount)
namespace, isPeriodic, backupMaxCount)
if err != nil {
return nil, err
}
return bs, nil
case api.BackupStorageTypeOSS:
bs, err := handleOSS(ctx, b.kubecli, spec.OSS, spec.EtcdEndpoints, spec.ClientTLSSecret,
b.namespace, isPeriodic, backupMaxCount)
namespace, isPeriodic, backupMaxCount)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a64cb82

Please sign in to comment.