Skip to content

Commit

Permalink
*: fix leaky context creation with cancel
Browse files Browse the repository at this point in the history
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho committed Sep 7, 2017
1 parent 9d12ba2 commit 9a726b4
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 8 deletions.
6 changes: 4 additions & 2 deletions clientv3/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ func TestBalancerGetBlocking(t *testing.T) {
}
blockingOpts := grpc.BalancerGetOptions{BlockingWait: true}

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
_, _, err := sb.Get(ctx, blockingOpts)
cancel()
if err != context.DeadlineExceeded {
t.Errorf("Get() with no up endpoints should timeout, got %v", err)
}
Expand Down Expand Up @@ -124,8 +125,9 @@ func TestBalancerGetBlocking(t *testing.T) {
t.Errorf("closing the only connection should triggered balancer to send the all endpoints via Notify chan so that we can establish a connection")
}
down2(errors.New("error"))
ctx, _ = context.WithTimeout(context.Background(), time.Millisecond*100)
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
_, _, err = sb.Get(ctx, blockingOpts)
cancel()
if err != context.DeadlineExceeded {
t.Errorf("Get() with no up endpoints should timeout, got %v", err)
}
Expand Down
8 changes: 5 additions & 3 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ func (c *Client) autoSync() {
case <-c.ctx.Done():
return
case <-time.After(c.cfg.AutoSyncInterval):
ctx, _ := context.WithTimeout(c.ctx, 5*time.Second)
if err := c.Sync(ctx); err != nil && err != c.ctx.Err() {
ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
err := c.Sync(ctx)
cancel()
if err != nil && err != c.ctx.Err() {
logger.Println("Auto sync endpoints failed:", err)
}
}
Expand Down Expand Up @@ -429,7 +431,7 @@ func (c *Client) checkVersion() (err error) {
errc := make(chan error, len(c.cfg.Endpoints))
ctx, cancel := context.WithCancel(c.ctx)
if c.cfg.DialTimeout > 0 {
ctx, _ = context.WithTimeout(ctx, c.cfg.DialTimeout)
ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
}
wg.Add(len(c.cfg.Endpoints))
for _, ep := range c.cfg.Endpoints {
Expand Down
2 changes: 2 additions & 0 deletions clientv3/concurrency/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
for !keyDeleted {
wr, ok := <-wch
if !ok {
cancel()
return
}
for _, ev := range wr.Events {
Expand All @@ -225,6 +226,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
select {
case ch <- *resp:
case <-cctx.Done():
cancel()
return
}
}
Expand Down
1 change: 1 addition & 0 deletions clientv3/concurrency/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
ctx, cancel := context.WithCancel(ops.ctx)
keepAlive, err := client.KeepAlive(ctx, id)
if err != nil || keepAlive == nil {
cancel()
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,9 @@ func TestDoProposalTimeout(t *testing.T) {
}
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}

ctx, _ := context.WithTimeout(context.Background(), 0)
ctx, cancel := context.WithTimeout(context.Background(), 0)
_, err := srv.Do(ctx, pb.Request{Method: "PUT"})
cancel()
if err != ErrTimeout {
t.Fatalf("err = %v, want %v", err, ErrTimeout)
}
Expand Down
5 changes: 3 additions & 2 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,11 @@ func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error
cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := ma.Add(ctx, peerURL); err != nil {
_, err := ma.Add(ctx, peerURL)
cancel()
if err != nil {
return err
}
cancel()

// wait for the add node entry applied in the cluster
members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
Expand Down
1 change: 1 addition & 0 deletions tools/functional-tester/etcd-tester/lease_stresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
cancel()
ctx, cancel = context.WithCancel(ls.ctx)
stream, err = ls.lc.LeaseKeepAlive(ctx)
cancel()
continue
}
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
Expand Down

0 comments on commit 9a726b4

Please sign in to comment.