diff --git a/clientv3/lease.go b/clientv3/lease.go index a6494ceee45b..55eacf0a7522 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -41,8 +41,10 @@ type LeaseGrantResponse struct { // LeaseKeepAliveResponse is used to convert the protobuf keepalive response. type LeaseKeepAliveResponse struct { *pb.ResponseHeader - ID LeaseID - TTL int64 + ID LeaseID + TTL int64 + Err error + Deadline time.Time } // LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response. @@ -73,20 +75,7 @@ const ( retryConnWait = 500 * time.Millisecond ) -// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error. -// -// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected. -type ErrKeepAliveHalted struct { - Reason error -} - -func (e ErrKeepAliveHalted) Error() string { - s := "etcdclient: leases keep alive halted" - if e.Reason != nil { - s += ": " + e.Reason.Error() - } - return s -} +type LeaseKeepAliveChan <-chan LeaseKeepAliveResponse type Lease interface { // Grant creates a new lease. @@ -98,12 +87,14 @@ type Lease interface { // TimeToLive retrieves the lease information of the given lease ID. TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) - // KeepAlive keeps the given lease alive forever. - KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) + // KeepAlive keeps the given lease alive forever. If the keepalive response posted to + // the channel is not consumed immediately, the lease client will continue sending keep alive requests + // to the etcd server at least every second until latest response is consumed. + KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan // KeepAliveOnce renews the lease once. In most of the cases, Keepalive // should be used instead of KeepAliveOnce. - KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) + KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse // Close releases all resources Lease keeps for efficient communication // with the etcd server. @@ -113,9 +104,8 @@ type Lease interface { type lessor struct { mu sync.Mutex // guards all fields - // donec is closed and loopErr is set when recvKeepAliveLoop stops - donec chan struct{} - loopErr error + // donec is closed when all goroutines are torn down from Close() + donec chan struct{} remote pb.LeaseClient @@ -137,7 +127,7 @@ type lessor struct { // keepAlive multiplexes a keepalive for a lease over multiple channels type keepAlive struct { - chs []chan<- *LeaseKeepAliveResponse + chs []chan<- LeaseKeepAliveResponse ctxs []context.Context // deadline is the time the keep alive channels close if no response deadline time.Time @@ -219,24 +209,22 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption } } -func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { - ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize) +func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan { + ch := make(chan LeaseKeepAliveResponse, leaseResponseChSize) l.mu.Lock() // ensure that recvKeepAliveLoop is still running select { case <-l.donec: - err := l.loopErr - l.mu.Unlock() close(ch) - return ch, ErrKeepAliveHalted{Reason: err} + return ch default: } ka, ok := l.keepAlives[id] if !ok { // create fresh keep alive ka = &keepAlive{ - chs: []chan<- *LeaseKeepAliveResponse{ch}, + chs: []chan<- LeaseKeepAliveResponse{ch}, ctxs: []context.Context{ctx}, deadline: time.Now().Add(l.firstKeepAliveTimeout), nextKeepAlive: time.Now(), @@ -252,24 +240,46 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl go l.keepAliveCtxCloser(id, ctx, ka.donec) l.firstKeepAliveOnce.Do(func() { - go l.recvKeepAliveLoop() + go func() { + defer func() { + l.mu.Lock() + for _, ka := range l.keepAlives { + ka.Close(nil) + } + close(l.donec) + l.mu.Unlock() + }() + + for l.stopCtx.Err() == nil { + err := l.recvKeepAliveLoop() + if err == context.Canceled { + err = nil + } + l.mu.Lock() + for _, ka := range l.keepAlives { + ka.Close(err) + } + l.keepAlives = make(map[LeaseID]*keepAlive) + l.mu.Unlock() + } + }() go l.deadlineLoop() }) - return ch, nil + return ch } -func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { +func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse { for { - resp, err := l.keepAliveOnce(ctx, id) - if err == nil { + resp := l.keepAliveOnce(ctx, id) + if resp.Err == nil { if resp.TTL <= 0 { - err = rpctypes.ErrLeaseNotFound + resp.Err = rpctypes.ErrLeaseNotFound } - return resp, err + return resp } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + if isHaltErr(ctx, resp.Err) { + return resp } } } @@ -339,7 +349,7 @@ func (l *lessor) closeRequireLeader() { continue } // remove all channels that required a leader from keepalive - newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs) + newChs := make([]chan<- LeaseKeepAliveResponse, len(ka.chs)-reqIdxs) newCtxs := make([]context.Context, len(newChs)) newIdx := 0 for i := range ka.chs { @@ -353,45 +363,34 @@ func (l *lessor) closeRequireLeader() { } } -func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { +func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse { cctx, cancel := context.WithCancel(ctx) defer cancel() stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false)) if err != nil { - return nil, toErr(ctx, err) + return LeaseKeepAliveResponse{Err: toErr(ctx, err)} } err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)}) if err != nil { - return nil, toErr(ctx, err) + return LeaseKeepAliveResponse{Err: toErr(ctx, err)} } resp, rerr := stream.Recv() if rerr != nil { - return nil, toErr(ctx, rerr) + return LeaseKeepAliveResponse{Err: toErr(ctx, rerr)} } - karesp := &LeaseKeepAliveResponse{ + return LeaseKeepAliveResponse{ ResponseHeader: resp.GetHeader(), ID: LeaseID(resp.ID), TTL: resp.TTL, + Deadline: time.Now().Add(time.Duration(resp.TTL) * time.Second), } - return karesp, nil } func (l *lessor) recvKeepAliveLoop() (gerr error) { - defer func() { - l.mu.Lock() - close(l.donec) - l.loopErr = gerr - for _, ka := range l.keepAlives { - ka.Close() - } - l.keepAlives = make(map[LeaseID]*keepAlive) - l.mu.Unlock() - }() - stream, serr := l.resetRecv() for serr == nil { resp, err := stream.Recv() @@ -443,6 +442,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { ResponseHeader: resp.GetHeader(), ID: LeaseID(resp.ID), TTL: resp.TTL, + Deadline: time.Now().Add(time.Duration(resp.TTL) * time.Second), } l.mu.Lock() @@ -456,7 +456,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { if karesp.TTL <= 0 { // lease expired; close all keep alive channels delete(l.keepAlives, karesp.ID) - ka.Close() + ka.Close(nil) return } @@ -465,7 +465,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second) for _, ch := range ka.chs { select { - case ch <- karesp: + case ch <- *karesp: ka.nextKeepAlive = nextKeepAlive default: } @@ -486,7 +486,7 @@ func (l *lessor) deadlineLoop() { for id, ka := range l.keepAlives { if ka.deadline.Before(now) { // waited too long for response; lease may be expired - ka.Close() + ka.Close(nil) delete(l.keepAlives, id) } } @@ -528,9 +528,16 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { } } -func (ka *keepAlive) Close() { +func (ka *keepAlive) Close(err error) { close(ka.donec) for _, ch := range ka.chs { + if err != nil { + // try to post error if buffer space available + select { + case ch <- LeaseKeepAliveResponse{Err: err}: + default: + } + } close(ch) } }