From 9a726b424db125c94264efc6e0cd471bd2f427c8 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 6 Sep 2017 15:58:07 -0700 Subject: [PATCH] *: fix leaky context creation with cancel Signed-off-by: Gyu-Ho Lee --- clientv3/balancer_test.go | 6 ++++-- clientv3/client.go | 8 +++++--- clientv3/concurrency/election.go | 2 ++ clientv3/concurrency/session.go | 1 + etcdserver/server_test.go | 3 ++- integration/cluster.go | 5 +++-- tools/functional-tester/etcd-tester/lease_stresser.go | 1 + 7 files changed, 18 insertions(+), 8 deletions(-) diff --git a/clientv3/balancer_test.go b/clientv3/balancer_test.go index cffa5d4178f..4485a474be6 100644 --- a/clientv3/balancer_test.go +++ b/clientv3/balancer_test.go @@ -84,8 +84,9 @@ func TestBalancerGetBlocking(t *testing.T) { } blockingOpts := grpc.BalancerGetOptions{BlockingWait: true} - ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) _, _, err := sb.Get(ctx, blockingOpts) + cancel() if err != context.DeadlineExceeded { t.Errorf("Get() with no up endpoints should timeout, got %v", err) } @@ -124,8 +125,9 @@ func TestBalancerGetBlocking(t *testing.T) { t.Errorf("closing the only connection should triggered balancer to send the all endpoints via Notify chan so that we can establish a connection") } down2(errors.New("error")) - ctx, _ = context.WithTimeout(context.Background(), time.Millisecond*100) + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100) _, _, err = sb.Get(ctx, blockingOpts) + cancel() if err != context.DeadlineExceeded { t.Errorf("Get() with no up endpoints should timeout, got %v", err) } diff --git a/clientv3/client.go b/clientv3/client.go index 2f56d511ea8..e5bc4cc0049 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -143,8 +143,10 @@ func (c *Client) autoSync() { case <-c.ctx.Done(): return case <-time.After(c.cfg.AutoSyncInterval): - ctx, _ := context.WithTimeout(c.ctx, 5*time.Second) - if err := c.Sync(ctx); err != nil && err != c.ctx.Err() { + ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second) + err := c.Sync(ctx) + cancel() + if err != nil && err != c.ctx.Err() { logger.Println("Auto sync endpoints failed:", err) } } @@ -429,7 +431,7 @@ func (c *Client) checkVersion() (err error) { errc := make(chan error, len(c.cfg.Endpoints)) ctx, cancel := context.WithCancel(c.ctx) if c.cfg.DialTimeout > 0 { - ctx, _ = context.WithTimeout(ctx, c.cfg.DialTimeout) + ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout) } wg.Add(len(c.cfg.Endpoints)) for _, ep := range c.cfg.Endpoints { diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index 1a6bbfc90fb..af5f72e425f 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -213,6 +213,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { for !keyDeleted { wr, ok := <-wch if !ok { + cancel() return } for _, ev := range wr.Events { @@ -225,6 +226,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { select { case ch <- *resp: case <-cctx.Done(): + cancel() return } } diff --git a/clientv3/concurrency/session.go b/clientv3/concurrency/session.go index 7e2ff89b4e4..c399d64a61d 100644 --- a/clientv3/concurrency/session.go +++ b/clientv3/concurrency/session.go @@ -53,6 +53,7 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) { ctx, cancel := context.WithCancel(ops.ctx) keepAlive, err := client.KeepAlive(ctx, id) if err != nil || keepAlive == nil { + cancel() return nil, err } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index bafacad6cd3..e3ea0f9250c 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -741,8 +741,9 @@ func TestDoProposalTimeout(t *testing.T) { } srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} - ctx, _ := context.WithTimeout(context.Background(), 0) + ctx, cancel := context.WithTimeout(context.Background(), 0) _, err := srv.Do(ctx, pb.Request{Method: "PUT"}) + cancel() if err != ErrTimeout { t.Fatalf("err = %v, want %v", err, ErrTimeout) } diff --git a/integration/cluster.go b/integration/cluster.go index c85eccccd05..ec9a9f49357 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -277,10 +277,11 @@ func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - if _, err := ma.Add(ctx, peerURL); err != nil { + _, err := ma.Add(ctx, peerURL) + cancel() + if err != nil { return err } - cancel() // wait for the add node entry applied in the cluster members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) diff --git a/tools/functional-tester/etcd-tester/lease_stresser.go b/tools/functional-tester/etcd-tester/lease_stresser.go index 1ca788410c4..ea15fd7a7ef 100644 --- a/tools/functional-tester/etcd-tester/lease_stresser.go +++ b/tools/functional-tester/etcd-tester/lease_stresser.go @@ -290,6 +290,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { cancel() ctx, cancel = context.WithCancel(ls.ctx) stream, err = ls.lc.LeaseKeepAlive(ctx) + cancel() continue } err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})