Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
*: implement configurable backup timeout (#1908)
Browse files Browse the repository at this point in the history
* apis: update Timeout comment

* pkg/backup: use context to controll SaveSnap()

* *: support backup timeout in backup operator
  • Loading branch information
fanminshi authored and hongchaodeng committed Feb 6, 2018
1 parent 0792635 commit ceac222
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 22 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/etcd/v1beta2/backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type BackupSource struct {

// BackupPolicy defines backup policy.
type BackupPolicy struct {
// timeout is the maximal time of retriving plus saving an etcd backup.
Timeout int64 `json:"timeout,omitempty"`
// TimeoutInSecond is the maximal allowed time in second of the entire backup process.
TimeoutInSecond int64 `json:"timeoutInSecond,omitempty"`
}

// BackupStatus represents the status of the EtcdBackup Custom Resource.
Expand Down
17 changes: 6 additions & 11 deletions pkg/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,18 @@ func NewBackupManagerFromWriter(kubecli kubernetes.Interface, bw writer.Writer,

// SaveSnap uses backup writer to save etcd snapshot to a specified S3 path
// and returns backup etcd server's kv store revision and its version.
func (bm *BackupManager) SaveSnap(s3Path string) (int64, string, error) {
etcdcli, rev, err := bm.etcdClientWithMaxRevision()
func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string) (int64, string, error) {
etcdcli, rev, err := bm.etcdClientWithMaxRevision(ctx)
if err != nil {
return 0, "", fmt.Errorf("create etcd client failed: %v", err)
}
defer etcdcli.Close()

ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
resp, err := etcdcli.Status(ctx, etcdcli.Endpoints()[0])
cancel()
if err != nil {
return 0, "", fmt.Errorf("failed to retrieve etcd version from the status call: %v", err)
}

ctx, cancel = context.WithTimeout(context.Background(), constants.DefaultSnapshotTimeout)
defer cancel() // Can't cancel() after Snapshot() because that will close the reader.
rc, err := etcdcli.Snapshot(ctx)
if err != nil {
return 0, "", fmt.Errorf("failed to receive snapshot (%v)", err)
Expand All @@ -82,20 +78,21 @@ func (bm *BackupManager) SaveSnap(s3Path string) (int64, string, error) {

// etcdClientWithMaxRevision gets the etcd endpoint with the maximum kv store revision
// and returns the etcd client of that member.
func (bm *BackupManager) etcdClientWithMaxRevision() (*clientv3.Client, int64, error) {
etcdcli, rev, err := getClientWithMaxRev(bm.endpoints, bm.etcdTLSConfig)
func (bm *BackupManager) etcdClientWithMaxRevision(ctx context.Context) (*clientv3.Client, int64, error) {
etcdcli, rev, err := getClientWithMaxRev(ctx, bm.endpoints, bm.etcdTLSConfig)
if err != nil {
return nil, 0, fmt.Errorf("failed to get etcd client with maximum kv store revision: %v", err)
}
return etcdcli, rev, nil
}

func getClientWithMaxRev(endpoints []string, tc *tls.Config) (*clientv3.Client, int64, error) {
func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config) (*clientv3.Client, int64, error) {
mapEps := make(map[string]*clientv3.Client)
var maxClient *clientv3.Client
maxRev := int64(0)
errors := make([]string, 0)
for _, endpoint := range endpoints {
// TODO: update clientv3 to 3.2.x and then use ctx as in clientv3.Config.
cfg := clientv3.Config{
Endpoints: []string{endpoint},
DialTimeout: constants.DefaultDialTimeout,
Expand All @@ -108,9 +105,7 @@ func getClientWithMaxRev(endpoints []string, tc *tls.Config) (*clientv3.Client,
}
mapEps[endpoint] = etcdcli

ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
resp, err := etcdcli.Get(ctx, "/", clientv3.WithSerializable())
cancel()
if err != nil {
errors = append(errors, fmt.Sprintf("failed to get revision from endpoint (%s)", endpoint))
continue
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/backup-operator/abs_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package controller

import (
"context"
"crypto/tls"
"fmt"

Expand All @@ -27,7 +28,8 @@ import (
)

// handleABS saves etcd cluster's backup to specificed ABS path.
func handleABS(kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
// TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx.
cli, err := absfactory.NewClientFromSecret(kubecli, namespace, s.ABSSecret)
if err != nil {
return nil, err
Expand All @@ -39,7 +41,8 @@ func handleABS(kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints [
}

bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewABSWriter(cli.ABS), tlsConfig, endpoints, namespace)
rev, etcdVersion, err := bm.SaveSnap(s.Path)

rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path)
if err != nil {
return nil, fmt.Errorf("failed to save snapshot (%v)", err)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/backup-operator/s3_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package controller

import (
"context"
"crypto/tls"
"fmt"

Expand All @@ -28,7 +29,8 @@ import (

// TODO: replace this with generic backend interface for other options (PV, Azure)
// handleS3 saves etcd cluster's backup to specificed S3 path.
func handleS3(kubecli kubernetes.Interface, s *api.S3BackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
func handleS3(ctx context.Context, kubecli kubernetes.Interface, s *api.S3BackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
// TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx.
cli, err := s3factory.NewClientFromSecret(kubecli, namespace, s.Endpoint, s.AWSSecret)
if err != nil {
return nil, err
Expand All @@ -41,7 +43,8 @@ func handleS3(kubecli kubernetes.Interface, s *api.S3BackupSource, endpoints []s
}

bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewS3Writer(cli.S3), tlsConfig, endpoints, namespace)
rev, etcdVersion, err := bm.SaveSnap(s.Path)

rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path)
if err != nil {
return nil, fmt.Errorf("failed to save snapshot (%v)", err)
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/backup-operator/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package controller

import (
"context"
"errors"
"time"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"github.com/coreos/etcd-operator/pkg/util/constants"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -119,15 +122,23 @@ func (b *Backup) handleBackup(spec *api.BackupSpec) (*api.BackupStatus, error) {
return nil, err
}

// When BackupPolicy.Timeout <= 0, use default DefaultBackupTimeout.
backupTimeout := time.Duration(constants.DefaultBackupTimeout)
if spec.BackupPolicy != nil && spec.BackupPolicy.TimeoutInSecond > 0 {
backupTimeout = time.Duration(spec.BackupPolicy.TimeoutInSecond) * time.Second
}

ctx, cancel := context.WithTimeout(context.Background(), backupTimeout)
defer cancel()
switch spec.StorageType {
case api.BackupStorageTypeS3:
bs, err := handleS3(b.kubecli, spec.S3, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace)
bs, err := handleS3(ctx, b.kubecli, spec.S3, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace)
if err != nil {
return nil, err
}
return bs, nil
case api.BackupStorageTypeABS:
bs, err := handleABS(b.kubecli, spec.ABS, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace)
bs, err := handleABS(ctx, b.kubecli, spec.ABS, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/util/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package constants
import "time"

const (
DefaultDialTimeout = 5 * time.Second
DefaultRequestTimeout = 5 * time.Second
DefaultSnapshotTimeout = 1 * time.Minute
DefaultDialTimeout = 5 * time.Second
DefaultRequestTimeout = 5 * time.Second
// DefaultBackupTimeout is the default maximal allowed time of the entire backup process.
DefaultBackupTimeout = 1 * time.Minute
DefaultSnapshotInterval = 1800 * time.Second

DefaultBackupPodHTTPPort = 19999
Expand Down

0 comments on commit ceac222

Please sign in to comment.