Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3: version checking #7342

Merged
merged 3 commits into from
Feb 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"net/url"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -35,6 +36,7 @@ import (

var (
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
ErrOldCluster = errors.New("etcdclient: old cluster version")
)

// Client provides and manages an etcd v3 client session.
Expand Down Expand Up @@ -272,7 +274,7 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo
tokenMu: &sync.RWMutex{},
}

err := c.getToken(context.TODO())
err := c.getToken(c.ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -307,7 +309,12 @@ func newClient(cfg *Config) (*Client, error) {
}

// use a temporary skeleton client to bootstrap first connection
ctx, cancel := context.WithCancel(context.TODO())
baseCtx := context.TODO()
if cfg.Context != nil {
baseCtx = cfg.Context
}

ctx, cancel := context.WithCancel(baseCtx)
client := &Client{
conn: nil,
cfg: *cfg,
Expand Down Expand Up @@ -353,10 +360,57 @@ func newClient(cfg *Config) (*Client, error) {
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)

if cfg.RejectOldCluster {
if err := client.checkVersion(); err != nil {
client.Close()
return nil, err
}
}

go client.autoSync()
return client, nil
}

func (c *Client) checkVersion() (err error) {
var wg sync.WaitGroup
errc := make(chan error, len(c.cfg.Endpoints))
ctx, cancel := context.WithCancel(c.ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does cancel func preserver even when we overwrite ctx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, canceling the parent context will cancel any descendents

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see it now context.WithTimeout(ctx, not context.WithTimeout(c.ctx.

Makes sense.

if c.cfg.DialTimeout > 0 {
ctx, _ = context.WithTimeout(ctx, c.cfg.DialTimeout)
}
wg.Add(len(c.cfg.Endpoints))
for _, ep := range c.cfg.Endpoints {
// if cluster is current, any endpoint gives a recent version
go func(e string) {
defer wg.Done()
resp, rerr := c.Status(ctx, e)
if rerr != nil {
errc <- rerr
return
}
vs := strings.Split(resp.Version, ".")
maj, min := 0, 0
if len(vs) >= 2 {
maj, rerr = strconv.Atoi(vs[0])
min, rerr = strconv.Atoi(vs[1])
}
if maj < 3 || (maj == 3 && min < 2) {
rerr = ErrOldCluster
}
errc <- rerr
}(ep)
}
// wait for success
for i := 0; i < len(c.cfg.Endpoints); i++ {
if err = <-errc; err == nil {
break
}
}
cancel()
wg.Wait()
return err
}

// ActiveConnection returns the current in-use connection
func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }

Expand Down
8 changes: 8 additions & 0 deletions clientv3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"crypto/tls"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
)

Expand All @@ -41,6 +42,13 @@ type Config struct {
// Password is a password for authentication.
Password string `json:"password"`

// RejectOldCluster when set will refuse to create a client against an outdated cluster.
RejectOldCluster bool `json:"reject-old-cluster"`

// DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
DialOptions []grpc.DialOption

// Context is the default client context; it can be used to cancel grpc dial out and
// other operations that do not have an explicit context.
Context context.Context
}
18 changes: 18 additions & 0 deletions clientv3/integration/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,21 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
}
cancel()
}

func TestRejectOldCluster(t *testing.T) {
defer testutil.AfterTest(t)
// 2 endpoints to test multi-endpoint Status
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t)

cfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()},
DialTimeout: 5 * time.Second,
RejectOldCluster: true,
}
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatal(err)
}
cli.Close()
}