Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
pkg/backup: use context to controll SaveSnap()
Browse files Browse the repository at this point in the history
  • Loading branch information
fanminshi committed Feb 6, 2018
1 parent 9a64882 commit a641328
Showing 1 changed file with 8 additions and 15 deletions.
23 changes: 8 additions & 15 deletions pkg/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"

"github.com/coreos/etcd-operator/pkg/backup/writer"
"github.com/coreos/etcd-operator/pkg/util/constants"

"github.com/coreos/etcd/clientv3"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -51,22 +50,18 @@ func NewBackupManagerFromWriter(kubecli kubernetes.Interface, bw writer.Writer,

// SaveSnap uses backup writer to save etcd snapshot to a specified S3 path
// and returns backup etcd server's kv store revision and its version.
func (bm *BackupManager) SaveSnap(s3Path string) (int64, string, error) {
etcdcli, rev, err := bm.etcdClientWithMaxRevision()
func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string) (int64, string, error) {
etcdcli, rev, err := bm.etcdClientWithMaxRevision(ctx)
if err != nil {
return 0, "", fmt.Errorf("create etcd client failed: %v", err)
}
defer etcdcli.Close()

ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
resp, err := etcdcli.Status(ctx, etcdcli.Endpoints()[0])
cancel()
if err != nil {
return 0, "", fmt.Errorf("failed to retrieve etcd version from the status call: %v", err)
}

ctx, cancel = context.WithTimeout(context.Background(), constants.DefaultSnapshotTimeout)
defer cancel() // Can't cancel() after Snapshot() because that will close the reader.
rc, err := etcdcli.Snapshot(ctx)
if err != nil {
return 0, "", fmt.Errorf("failed to receive snapshot (%v)", err)
Expand All @@ -82,24 +77,24 @@ func (bm *BackupManager) SaveSnap(s3Path string) (int64, string, error) {

// etcdClientWithMaxRevision gets the etcd endpoint with the maximum kv store revision
// and returns the etcd client of that member.
func (bm *BackupManager) etcdClientWithMaxRevision() (*clientv3.Client, int64, error) {
etcdcli, rev, err := getClientWithMaxRev(bm.endpoints, bm.etcdTLSConfig)
func (bm *BackupManager) etcdClientWithMaxRevision(ctx context.Context) (*clientv3.Client, int64, error) {
etcdcli, rev, err := getClientWithMaxRev(ctx, bm.endpoints, bm.etcdTLSConfig)
if err != nil {
return nil, 0, fmt.Errorf("failed to get etcd client with maximum kv store revision: %v", err)
}
return etcdcli, rev, nil
}

func getClientWithMaxRev(endpoints []string, tc *tls.Config) (*clientv3.Client, int64, error) {
func getClientWithMaxRev(ctx context.Context, endpoints []string, tc *tls.Config) (*clientv3.Client, int64, error) {
mapEps := make(map[string]*clientv3.Client)
var maxClient *clientv3.Client
maxRev := int64(0)
errors := make([]string, 0)
for _, endpoint := range endpoints {
cfg := clientv3.Config{
Endpoints: []string{endpoint},
DialTimeout: constants.DefaultDialTimeout,
TLS: tc,
Endpoints: []string{endpoint},
Context: ctx,
TLS: tc,
}
etcdcli, err := clientv3.New(cfg)
if err != nil {
Expand All @@ -108,9 +103,7 @@ func getClientWithMaxRev(endpoints []string, tc *tls.Config) (*clientv3.Client,
}
mapEps[endpoint] = etcdcli

ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
resp, err := etcdcli.Get(ctx, "/", clientv3.WithSerializable())
cancel()
if err != nil {
errors = append(errors, fmt.Sprintf("failed to get revision from endpoint (%s)", endpoint))
continue
Expand Down

0 comments on commit a641328

Please sign in to comment.