Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

*: implement configurable backup timeout #1908

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/apis/etcd/v1beta2/backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type BackupSource struct {

// BackupPolicy defines backup policy.
type BackupPolicy struct {
// timeout is the maximal time of retriving plus saving an etcd backup.
Timeout int64 `json:"timeout,omitempty"`
// TimeoutInSecond is the maximal allowed time in second of the entire backup process.
TimeoutInSecond int64 `json:"timeoutInSecond,omitempty"`
}

// BackupStatus represents the status of the EtcdBackup Custom Resource.
Expand Down
17 changes: 6 additions & 11 deletions pkg/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,18 @@ func NewBackupManagerFromWriter(kubecli kubernetes.Interface, bw writer.Writer,

// SaveSnap uses backup writer to save etcd snapshot to a specified S3 path
// and returns backup etcd server's kv store revision and its version.
func (bm *BackupManager) SaveSnap(s3Path string) (int64, string, error) {
etcdcli, rev, err := bm.etcdClientWithMaxRevision()
func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string) (int64, string, error) {
etcdcli, rev, err := bm.etcdClientWithMaxRevision(ctx)
if err != nil {
return 0, "", fmt.Errorf("create etcd client failed: %v", err)
}
defer etcdcli.Close()

ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
resp, err := etcdcli.Status(ctx, etcdcli.Endpoints()[0])
cancel()
if err != nil {
return 0, "", fmt.Errorf("failed to retrieve etcd version from the status call: %v", err)
}

ctx, cancel = context.WithTimeout(context.Background(), constants.DefaultSnapshotTimeout)
defer cancel() // Can't cancel() after Snapshot() because that will close the reader.
rc, err := etcdcli.Snapshot(ctx)
if err != nil {
return 0, "", fmt.Errorf("failed to receive snapshot (%v)", err)
Expand All @@ -82,20 +78,21 @@ func (bm *BackupManager) SaveSnap(s3Path string) (int64, string, error) {

// etcdClientWithMaxRevision gets the etcd endpoint with the maximum kv store revision
// and returns the etcd client of that member.
func (bm *BackupManager) etcdClientWithMaxRevision() (*clientv3.Client, int64, error) {
etcdcli, rev, err := getClientWithMaxRev(bm.endpoints, bm.etcdTLSConfig)
func (bm *BackupManager) etcdClientWithMaxRevision(ctx context.Context) (*clientv3.Client, int64, error) {
etcdcli, rev, err := getClientWithMaxRev(ctx, bm.endpoints, bm.etcdTLSConfig)
if err != nil {
return nil, 0, fmt.Errorf("failed to get etcd client with maximum kv store revision: %v", err)
}
return etcdcli, rev, nil
}

func getClientWithMaxRev(endpoints []string, tc *tls.Config) (*clientv3.Client, int64, error) {
func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config) (*clientv3.Client, int64, error) {
mapEps := make(map[string]*clientv3.Client)
var maxClient *clientv3.Client
maxRev := int64(0)
errors := make([]string, 0)
for _, endpoint := range endpoints {
// TODO: update clientv3 to 3.2.x and then use ctx as in clientv3.Config.
cfg := clientv3.Config{
Endpoints: []string{endpoint},
DialTimeout: constants.DefaultDialTimeout,
Expand All @@ -108,9 +105,7 @@ func getClientWithMaxRev(endpoints []string, tc *tls.Config) (*clientv3.Client,
}
mapEps[endpoint] = etcdcli

ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
resp, err := etcdcli.Get(ctx, "/", clientv3.WithSerializable())
cancel()
if err != nil {
errors = append(errors, fmt.Sprintf("failed to get revision from endpoint (%s)", endpoint))
continue
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/backup-operator/abs_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package controller

import (
"context"
"crypto/tls"
"fmt"

Expand All @@ -27,7 +28,8 @@ import (
)

// handleABS saves etcd cluster's backup to specificed ABS path.
func handleABS(kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
// TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx.
cli, err := absfactory.NewClientFromSecret(kubecli, namespace, s.ABSSecret)
if err != nil {
return nil, err
Expand All @@ -39,7 +41,8 @@ func handleABS(kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints [
}

bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewABSWriter(cli.ABS), tlsConfig, endpoints, namespace)
rev, etcdVersion, err := bm.SaveSnap(s.Path)

rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path)
if err != nil {
return nil, fmt.Errorf("failed to save snapshot (%v)", err)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/backup-operator/s3_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package controller

import (
"context"
"crypto/tls"
"fmt"

Expand All @@ -28,7 +29,8 @@ import (

// TODO: replace this with generic backend interface for other options (PV, Azure)
// handleS3 saves etcd cluster's backup to specificed S3 path.
func handleS3(kubecli kubernetes.Interface, s *api.S3BackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
func handleS3(ctx context.Context, kubecli kubernetes.Interface, s *api.S3BackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
// TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx.
cli, err := s3factory.NewClientFromSecret(kubecli, namespace, s.Endpoint, s.AWSSecret)
if err != nil {
return nil, err
Expand All @@ -41,7 +43,8 @@ func handleS3(kubecli kubernetes.Interface, s *api.S3BackupSource, endpoints []s
}

bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewS3Writer(cli.S3), tlsConfig, endpoints, namespace)
rev, etcdVersion, err := bm.SaveSnap(s.Path)

rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path)
if err != nil {
return nil, fmt.Errorf("failed to save snapshot (%v)", err)
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/backup-operator/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package controller

import (
"context"
"errors"
"time"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"github.com/coreos/etcd-operator/pkg/util/constants"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -119,15 +122,23 @@ func (b *Backup) handleBackup(spec *api.BackupSpec) (*api.BackupStatus, error) {
return nil, err
}

// When BackupPolicy.Timeout <= 0, use default DefaultBackupTimeout.
backupTimeout := time.Duration(constants.DefaultBackupTimeout)
if spec.BackupPolicy != nil && spec.BackupPolicy.TimeoutInSecond > 0 {
backupTimeout = time.Duration(spec.BackupPolicy.TimeoutInSecond) * time.Second
}

ctx, cancel := context.WithTimeout(context.Background(), backupTimeout)
defer cancel()
switch spec.StorageType {
case api.BackupStorageTypeS3:
bs, err := handleS3(b.kubecli, spec.S3, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace)
bs, err := handleS3(ctx, b.kubecli, spec.S3, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace)
if err != nil {
return nil, err
}
return bs, nil
case api.BackupStorageTypeABS:
bs, err := handleABS(b.kubecli, spec.ABS, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace)
bs, err := handleABS(ctx, b.kubecli, spec.ABS, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/util/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package constants
import "time"

const (
DefaultDialTimeout = 5 * time.Second
DefaultRequestTimeout = 5 * time.Second
DefaultSnapshotTimeout = 1 * time.Minute
DefaultDialTimeout = 5 * time.Second
DefaultRequestTimeout = 5 * time.Second
// DefaultBackupTimeout is the default maximal allowed time of the entire backup process.
DefaultBackupTimeout = 1 * time.Minute
DefaultSnapshotInterval = 1800 * time.Second

DefaultBackupPodHTTPPort = 19999
Expand Down