From c9452c6ad4ea4a7a5706b09e18694162a1ad4016 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 16 Feb 2017 13:16:10 -0800 Subject: [PATCH 1/3] clientv3: let user provide a client context through Config --- clientv3/client.go | 9 +++++++-- clientv3/config.go | 5 +++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/clientv3/client.go b/clientv3/client.go index 3d36c3ab52d..ecda6533365 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -272,7 +272,7 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo tokenMu: &sync.RWMutex{}, } - err := c.getToken(context.TODO()) + err := c.getToken(c.ctx) if err != nil { return nil, err } @@ -307,7 +307,12 @@ func newClient(cfg *Config) (*Client, error) { } // use a temporary skeleton client to bootstrap first connection - ctx, cancel := context.WithCancel(context.TODO()) + baseCtx := context.TODO() + if cfg.Context != nil { + baseCtx = cfg.Context + } + + ctx, cancel := context.WithCancel(baseCtx) client := &Client{ conn: nil, cfg: *cfg, diff --git a/clientv3/config.go b/clientv3/config.go index 2082f7b91ad..69bfe5d80ad 100644 --- a/clientv3/config.go +++ b/clientv3/config.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "time" + "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -43,4 +44,8 @@ type Config struct { // DialOptions is a list of dial options for the grpc client (e.g., for interceptors). DialOptions []grpc.DialOption + + // Context is the default client context; it can be used to cancel grpc dial out and + // other operations that do not have an explicit context. + Context context.Context } From 4d2aa80ecf0f276e1323ee55abd48d5d64fe017e Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 16 Feb 2017 13:18:59 -0800 Subject: [PATCH 2/3] clientv3: add cluster version checking --- clientv3/client.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++ clientv3/config.go | 3 +++ 2 files changed, 52 insertions(+) diff --git a/clientv3/client.go b/clientv3/client.go index ecda6533365..7cfb0b0eb4a 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "net/url" + "strconv" "strings" "sync" "time" @@ -35,6 +36,7 @@ import ( var ( ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") + ErrOldCluster = errors.New("etcdclient: old cluster version") ) // Client provides and manages an etcd v3 client session. @@ -358,10 +360,57 @@ func newClient(cfg *Config) (*Client, error) { client.Auth = NewAuth(client) client.Maintenance = NewMaintenance(client) + if cfg.RejectOldCluster { + if err := client.checkVersion(); err != nil { + client.Close() + return nil, err + } + } + go client.autoSync() return client, nil } +func (c *Client) checkVersion() (err error) { + var wg sync.WaitGroup + errc := make(chan error, len(c.cfg.Endpoints)) + ctx, cancel := context.WithCancel(c.ctx) + if c.cfg.DialTimeout > 0 { + ctx, _ = context.WithTimeout(ctx, c.cfg.DialTimeout) + } + wg.Add(len(c.cfg.Endpoints)) + for _, ep := range c.cfg.Endpoints { + // if cluster is current, any endpoint gives a recent version + go func(e string) { + defer wg.Done() + resp, rerr := c.Status(ctx, e) + if rerr != nil { + errc <- rerr + return + } + vs := strings.Split(resp.Version, ".") + maj, min := 0, 0 + if len(vs) >= 2 { + maj, rerr = strconv.Atoi(vs[0]) + min, rerr = strconv.Atoi(vs[1]) + } + if maj < 3 || (maj == 3 && min < 2) { + rerr = ErrOldCluster + } + errc <- rerr + }(ep) + } + // wait for success + for i := 0; i < len(c.cfg.Endpoints); i++ { + if err = <-errc; err == nil { + break + } + } + cancel() + wg.Wait() + return err +} + // ActiveConnection returns the current in-use connection func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn } diff --git a/clientv3/config.go b/clientv3/config.go index 69bfe5d80ad..dda72a748e6 100644 --- a/clientv3/config.go +++ b/clientv3/config.go @@ -42,6 +42,9 @@ type Config struct { // Password is a password for authentication. Password string `json:"password"` + // RejectOldCluster when set will refuse to create a client against an outdated cluster. + RejectOldCluster bool `json:"reject-old-cluster"` + // DialOptions is a list of dial options for the grpc client (e.g., for interceptors). DialOptions []grpc.DialOption From 51435df1794d73ee498ad252a221c0514bb0e80f Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 16 Feb 2017 14:48:57 -0800 Subject: [PATCH 3/3] integration: test RejectOldCluster --- clientv3/integration/dial_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/clientv3/integration/dial_test.go b/clientv3/integration/dial_test.go index 01e49675cbe..23aa502a413 100644 --- a/clientv3/integration/dial_test.go +++ b/clientv3/integration/dial_test.go @@ -70,3 +70,21 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) { } cancel() } + +func TestRejectOldCluster(t *testing.T) { + defer testutil.AfterTest(t) + // 2 endpoints to test multi-endpoint Status + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) + defer clus.Terminate(t) + + cfg := clientv3.Config{ + Endpoints: []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}, + DialTimeout: 5 * time.Second, + RejectOldCluster: true, + } + cli, err := clientv3.New(cfg) + if err != nil { + t.Fatal(err) + } + cli.Close() +}