Skip to content

Commit

Permalink
Merge pull request #7342 from heyitsanthony/client-version
Browse files Browse the repository at this point in the history
clientv3: version checking
  • Loading branch information
heyitsanthony committed Feb 21, 2017
2 parents 3c20bdd + 51435df commit 0c0fbbd
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 2 deletions.
58 changes: 56 additions & 2 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"net/url"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -35,6 +36,7 @@ import (

var (
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
ErrOldCluster = errors.New("etcdclient: old cluster version")
)

// Client provides and manages an etcd v3 client session.
Expand Down Expand Up @@ -272,7 +274,7 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo
tokenMu: &sync.RWMutex{},
}

err := c.getToken(context.TODO())
err := c.getToken(c.ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -307,7 +309,12 @@ func newClient(cfg *Config) (*Client, error) {
}

// use a temporary skeleton client to bootstrap first connection
ctx, cancel := context.WithCancel(context.TODO())
baseCtx := context.TODO()
if cfg.Context != nil {
baseCtx = cfg.Context
}

ctx, cancel := context.WithCancel(baseCtx)
client := &Client{
conn: nil,
cfg: *cfg,
Expand Down Expand Up @@ -353,10 +360,57 @@ func newClient(cfg *Config) (*Client, error) {
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)

if cfg.RejectOldCluster {
if err := client.checkVersion(); err != nil {
client.Close()
return nil, err
}
}

go client.autoSync()
return client, nil
}

func (c *Client) checkVersion() (err error) {
var wg sync.WaitGroup
errc := make(chan error, len(c.cfg.Endpoints))
ctx, cancel := context.WithCancel(c.ctx)
if c.cfg.DialTimeout > 0 {
ctx, _ = context.WithTimeout(ctx, c.cfg.DialTimeout)
}
wg.Add(len(c.cfg.Endpoints))
for _, ep := range c.cfg.Endpoints {
// if cluster is current, any endpoint gives a recent version
go func(e string) {
defer wg.Done()
resp, rerr := c.Status(ctx, e)
if rerr != nil {
errc <- rerr
return
}
vs := strings.Split(resp.Version, ".")
maj, min := 0, 0
if len(vs) >= 2 {
maj, rerr = strconv.Atoi(vs[0])
min, rerr = strconv.Atoi(vs[1])
}
if maj < 3 || (maj == 3 && min < 2) {
rerr = ErrOldCluster
}
errc <- rerr
}(ep)
}
// wait for success
for i := 0; i < len(c.cfg.Endpoints); i++ {
if err = <-errc; err == nil {
break
}
}
cancel()
wg.Wait()
return err
}

// ActiveConnection returns the current in-use connection
func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }

Expand Down
8 changes: 8 additions & 0 deletions clientv3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"crypto/tls"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
)

Expand All @@ -41,6 +42,13 @@ type Config struct {
// Password is a password for authentication.
Password string `json:"password"`

// RejectOldCluster when set will refuse to create a client against an outdated cluster.
RejectOldCluster bool `json:"reject-old-cluster"`

// DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
DialOptions []grpc.DialOption

// Context is the default client context; it can be used to cancel grpc dial out and
// other operations that do not have an explicit context.
Context context.Context
}
18 changes: 18 additions & 0 deletions clientv3/integration/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,21 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
}
cancel()
}

func TestRejectOldCluster(t *testing.T) {
defer testutil.AfterTest(t)
// 2 endpoints to test multi-endpoint Status
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t)

cfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()},
DialTimeout: 5 * time.Second,
RejectOldCluster: true,
}
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatal(err)
}
cli.Close()
}

0 comments on commit 0c0fbbd

Please sign in to comment.