diff --git a/cmd/backup-operator/main.go b/cmd/backup-operator/main.go index fe2ebed0c..4e0d45e29 100644 --- a/cmd/backup-operator/main.go +++ b/cmd/backup-operator/main.go @@ -24,7 +24,7 @@ import ( controller "github.com/coreos/etcd-operator/pkg/controller/backup-operator" "github.com/coreos/etcd-operator/pkg/util/constants" "github.com/coreos/etcd-operator/pkg/util/k8sutil" - version "github.com/coreos/etcd-operator/version" + "github.com/coreos/etcd-operator/version" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" @@ -37,11 +37,14 @@ import ( ) var ( - createCRD bool + createCRD bool + namespace string + clusterWide bool ) func init() { flag.BoolVar(&createCRD, "create-crd", true, "The backup operator will not create the EtcdBackup CRD when this flag is set to false.") + flag.BoolVar(&clusterWide, "cluster-wide", false, "Enable operator to watch clusters in all namespaces") flag.Parse() } @@ -106,9 +109,19 @@ func createRecorder(kubecli kubernetes.Interface, name, namespace string) record func run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() - c := controller.New(createCRD) + c := controller.New(newControllerConfig()) err := c.Start(ctx) if err != nil { logrus.Fatalf("operator stopped with error: %v", err) } } + +func newControllerConfig() controller.Config { + cfg := controller.Config{ + Namespace: namespace, + ClusterWide: clusterWide, + CreateCRD: createCRD, + } + + return cfg +} diff --git a/pkg/backup/backup_manager.go b/pkg/backup/backup_manager.go index a82ae32eb..82b26b524 100644 --- a/pkg/backup/backup_manager.go +++ b/pkg/backup/backup_manager.go @@ -133,7 +133,7 @@ func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config resp, err := etcdcli.Get(ctx, "/", clientv3.WithSerializable()) if err != nil { - errors = append(errors, fmt.Sprintf("failed to get revision from endpoint (%s)", endpoint)) + errors = append(errors, fmt.Sprintf("failed to get revision from endpoint (%s): %v", endpoint, err)) continue } @@ -152,10 +152,6 @@ func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config cli.Close() } - if maxClient == nil { - return nil, 0, fmt.Errorf("could not create an etcd client for the max revision purpose from given endpoints (%v)", endpoints) - } - var err error if len(errors) > 0 { errorStr := "" @@ -165,5 +161,9 @@ func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config err = fmt.Errorf(errorStr) } + if maxClient == nil { + return nil, 0, fmt.Errorf("could not create an etcd client for the max maxirevision purpose from given endpoints (%v)", endpoints) + } + return maxClient, maxRev, err } diff --git a/pkg/backup/writer/abs_writer.go b/pkg/backup/writer/abs_writer.go index eac27e7ed..2ebe88c44 100644 --- a/pkg/backup/writer/abs_writer.go +++ b/pkg/backup/writer/abs_writer.go @@ -15,13 +15,11 @@ package writer import ( - "bytes" "context" "encoding/base64" "fmt" - "io" - "github.com/coreos/etcd-operator/pkg/backup/util" + "io" "github.com/Azure/azure-sdk-for-go/storage" "github.com/pborman/uuid" @@ -47,12 +45,14 @@ const ( func (absw *absWriter) Write(ctx context.Context, path string, r io.Reader) (int64, error) { // TODO: support context. container, key, err := util.ParseBucketAndKey(path) + if err != nil { return 0, err } containerRef := absw.abs.GetContainerReference(container) containerExists, err := containerRef.Exists() + if err != nil { return 0, err } @@ -62,30 +62,35 @@ func (absw *absWriter) Write(ctx context.Context, path string, r io.Reader) (int } blob := containerRef.GetBlobReference(key) + err = blob.CreateBlockBlob(&storage.PutBlobOptions{}) if err != nil { return 0, err } - buf := new(bytes.Buffer) - buf.ReadFrom(r) - len := len(buf.Bytes()) - chunckCount := len/AzureBlobBlockChunkLimitInBytes + 1 - blocks := make([]storage.Block, 0, chunckCount) - for i := 0; i < chunckCount; i++ { + blocks := make([]storage.Block, 0) + block := make([]byte, AzureBlobBlockChunkLimitInBytes) + for { + + nbRead, maybeEof := io.ReadFull(r, block) + + if maybeEof == io.EOF { + break + } + blockID := base64.StdEncoding.EncodeToString([]byte(uuid.New())) blocks = append(blocks, storage.Block{ID: blockID, Status: storage.BlockStatusLatest}) - start := i * AzureBlobBlockChunkLimitInBytes - end := (i + 1) * AzureBlobBlockChunkLimitInBytes - if len < end { - end = len - } - chunk := buf.Bytes()[start:end] + chunk := block[0:nbRead] err = blob.PutBlock(blockID, chunk, &storage.PutBlockOptions{}) + if err != nil { return 0, err } + + if maybeEof == io.ErrUnexpectedEOF { + break + } } err = blob.PutBlockList(blocks, &storage.PutBlockListOptions{}) diff --git a/pkg/controller/backup-operator/operator.go b/pkg/controller/backup-operator/operator.go index 6c34c25b7..0115a5ccf 100644 --- a/pkg/controller/backup-operator/operator.go +++ b/pkg/controller/backup-operator/operator.go @@ -17,17 +17,16 @@ package controller import ( "context" "fmt" - "os" "sync" api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2" "github.com/coreos/etcd-operator/pkg/client" "github.com/coreos/etcd-operator/pkg/generated/clientset/versioned" - "github.com/coreos/etcd-operator/pkg/util/constants" "github.com/coreos/etcd-operator/pkg/util/k8sutil" "github.com/sirupsen/logrus" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -51,20 +50,33 @@ type Backup struct { createCRD bool } +type Config struct { + Namespace string + ClusterWide bool + CreateCRD bool +} + type BackupRunner struct { spec api.BackupSpec cancelFunc context.CancelFunc } // New creates a backup operator. -func New(createCRD bool) *Backup { +func New(config Config) *Backup { + var ns string + if config.ClusterWide { + ns = metav1.NamespaceAll + } else { + ns = config.Namespace + } + return &Backup{ logger: logrus.WithField("pkg", "controller"), - namespace: os.Getenv(constants.EnvOperatorPodNamespace), + namespace: ns, kubecli: k8sutil.MustNewKubeClient(), backupCRCli: client.MustNewInCluster(), kubeExtCli: k8sutil.MustNewKubeExtClient(), - createCRD: createCRD, + createCRD: config.CreateCRD, } } diff --git a/pkg/controller/backup-operator/sync.go b/pkg/controller/backup-operator/sync.go index e5ae56567..e569a970a 100644 --- a/pkg/controller/backup-operator/sync.go +++ b/pkg/controller/backup-operator/sync.go @@ -107,7 +107,7 @@ func (b *Backup) processItem(key string) error { } else if !isPeriodic { // Perform backup - bs, err := b.handleBackup(nil, &eb.Spec, false) + bs, err := b.handleBackup(nil, &eb.Spec, false, eb.Namespace) // Report backup status b.reportBackupStatus(bs, err, eb) } @@ -140,7 +140,7 @@ func (b *Backup) addFinalizerOfPeriodicBackupIfNeed(eb *api.EtcdBackup) (*api.Et } if !containsString(metadata.GetFinalizers(), "backup-operator-periodic") { metadata.SetFinalizers(append(metadata.GetFinalizers(), "backup-operator-periodic")) - _, err := b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Update(ebNew.(*api.EtcdBackup)) + _, err := b.backupCRCli.EtcdV1beta2().EtcdBackups(eb.ObjectMeta.Namespace).Update(ebNew.(*api.EtcdBackup)) if err != nil { return eb, err } @@ -163,7 +163,7 @@ func (b *Backup) removeFinalizerOfPeriodicBackup(eb *api.EtcdBackup) error { finalizers = append(finalizers, finalizer) } metadata.SetFinalizers(finalizers) - _, err = b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Update(ebNew.(*api.EtcdBackup)) + _, err = b.backupCRCli.EtcdV1beta2().EtcdBackups(eb.Namespace).Update(ebNew.(*api.EtcdBackup)) return err } @@ -179,7 +179,7 @@ func (b *Backup) periodicRunnerFunc(ctx context.Context, t *time.Ticker, eb *api var err error retryLimit := 5 for i := 1; i < retryLimit+1; i++ { - latestEb, err = b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Get(eb.Name, metav1.GetOptions{}) + latestEb, err = b.backupCRCli.EtcdV1beta2().EtcdBackups(eb.Namespace).Get(eb.Name, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { b.logger.Infof("Could not find EtcdBackup. Stopping periodic backup for EtcdBackup CR %v", @@ -195,7 +195,7 @@ func (b *Backup) periodicRunnerFunc(ctx context.Context, t *time.Ticker, eb *api } if err == nil { // Perform backup - bs, err = b.handleBackup(&ctx, &latestEb.Spec, true) + bs, err = b.handleBackup(&ctx, &latestEb.Spec, true, latestEb.Namespace) } // Report backup status b.reportBackupStatus(bs, err, latestEb) @@ -214,7 +214,7 @@ func (b *Backup) reportBackupStatus(bs *api.BackupStatus, berr error, eb *api.Et eb.Status.EtcdVersion = bs.EtcdVersion eb.Status.LastSuccessDate = bs.LastSuccessDate } - _, err := b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Update(eb) + _, err := b.backupCRCli.EtcdV1beta2().EtcdBackups(eb.Namespace).Update(eb) if err != nil { b.logger.Warningf("failed to update status of backup CR %v : (%v)", eb.Name, err) } @@ -244,7 +244,7 @@ func (b *Backup) handleErr(err error, key interface{}) { b.logger.Infof("Dropping etcd backup (%v) out of the queue: %v", key, err) } -func (b *Backup) handleBackup(parentContext *context.Context, spec *api.BackupSpec, isPeriodic bool) (*api.BackupStatus, error) { +func (b *Backup) handleBackup(parentContext *context.Context, spec *api.BackupSpec, isPeriodic bool, namespace string) (*api.BackupStatus, error) { err := validate(spec) if err != nil { return nil, err @@ -269,28 +269,28 @@ func (b *Backup) handleBackup(parentContext *context.Context, spec *api.BackupSp switch spec.StorageType { case api.BackupStorageTypeS3: bs, err := handleS3(ctx, b.kubecli, spec.S3, spec.EtcdEndpoints, spec.ClientTLSSecret, - b.namespace, isPeriodic, backupMaxCount) + namespace, isPeriodic, backupMaxCount) if err != nil { return nil, err } return bs, nil case api.BackupStorageTypeABS: bs, err := handleABS(ctx, b.kubecli, spec.ABS, spec.EtcdEndpoints, spec.ClientTLSSecret, - b.namespace, isPeriodic, backupMaxCount) + namespace, isPeriodic, backupMaxCount) if err != nil { return nil, err } return bs, nil case api.BackupStorageTypeGCS: bs, err := handleGCS(ctx, b.kubecli, spec.GCS, spec.EtcdEndpoints, spec.ClientTLSSecret, - b.namespace, isPeriodic, backupMaxCount) + namespace, isPeriodic, backupMaxCount) if err != nil { return nil, err } return bs, nil case api.BackupStorageTypeOSS: bs, err := handleOSS(ctx, b.kubecli, spec.OSS, spec.EtcdEndpoints, spec.ClientTLSSecret, - b.namespace, isPeriodic, backupMaxCount) + namespace, isPeriodic, backupMaxCount) if err != nil { return nil, err }