diff --git a/client/v3/concurrency/session.go b/client/v3/concurrency/session.go index 2275e96c972..4d19f378997 100644 --- a/client/v3/concurrency/session.go +++ b/client/v3/concurrency/session.go @@ -154,3 +154,8 @@ func WithContext(ctx context.Context) SessionOption { so.ctx = ctx } } + +// Expired returns true iff the session is expired. +func (s *Session) Expired() bool { + return s.client.Expired(s.id) +} diff --git a/client/v3/lease.go b/client/v3/lease.go index 11b58348286..543fa7c50a0 100644 --- a/client/v3/lease.go +++ b/client/v3/lease.go @@ -136,6 +136,10 @@ type Lease interface { // (see https://github.com/etcd-io/etcd/pull/7866) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) + // Expired returns true iff the lease is expired (more precisely: iff the + // lease was expired during the execution of the Expired() call). + Expired(id LeaseID) bool + // KeepAliveOnce renews the lease once. The response corresponds to the // first message from calling KeepAlive. If the response has a recoverable // error, KeepAliveOnce will retry the RPC with a new keep alive message. @@ -624,3 +628,13 @@ func (ka *keepAlive) close() { close(ch) } } + +func (l *lessor) Expired(id LeaseID) bool { + l.mu.Lock() + defer l.mu.Unlock() + ka, ok := l.keepAlives[id] + if !ok { + return true + } + return ka.deadline.Before(time.Now()) +} diff --git a/client/v3/leasing/kv.go b/client/v3/leasing/kv.go index c14af78d629..5fe899ab2c8 100644 --- a/client/v3/leasing/kv.go +++ b/client/v3/leasing/kv.go @@ -466,12 +466,7 @@ func (lkv *leasingKV) readySession() bool { if lkv.session == nil { return false } - select { - case <-lkv.session.Done(): - default: - return true - } - return false + return !lkv.session.Expired() } func (lkv *leasingKV) leaseID() v3.LeaseID { diff --git a/tests/integration/clientv3/lease/leasing_test.go b/tests/integration/clientv3/lease/leasing_test.go index 22c98f9ed05..8bcbfbbe07e 100644 --- a/tests/integration/clientv3/lease/leasing_test.go +++ b/tests/integration/clientv3/lease/leasing_test.go @@ -416,6 +416,80 @@ func TestLeasingConcurrentPut(t *testing.T) { } } +func TestLeasingGetChecksForExpiration(t *testing.T) { + integration2.BeforeTest(t) + + ttl := 6 + + // There's no way to partition a client from the server, so use multiple + // servers and shut down a single server to partition the client from the + // system. + clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3, UseBridge: true}) + defer clus.Terminate(t) + + // Try to make sure server 0 is not the leader + if clus.Members[0].Server.Leader() == clus.Members[0].Server.MemberID() { + err := clus.Members[0].Server.MoveLeader(context.TODO(), + clus.Members[0].Server.Lead(), + uint64(clus.Members[1].Server.MemberID())) + if err != nil { + panic(err) + } + } + time.Sleep(2 * time.Second) + + leaderID := clus.Members[0].Server.Leader() + if leaderID == clus.Members[0].Server.MemberID() { + panic("test wants 0 to not be leader") + } + + // This client will get partitioned away from the system (by killing the + // node it's connected to). + lkv0, closeLKV0, err := leasing.NewKV(clus.Client(0), "pfx/", concurrency.WithTTL(ttl)) + require.NoError(t, err) + defer closeLKV0() + + lkv1, closeLKV1, err := leasing.NewKV(clus.Client(1), "pfx/") + require.NoError(t, err) + defer closeLKV1() + + if _, err = lkv0.Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + if _, err = lkv0.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + time.Sleep(6500 * time.Millisecond) + clus.Members[0].Stop(t) + + if _, err = lkv1.Put(context.TODO(), "k", "def"); err != nil { + t.Fatal(err) + } + + resp, err := lkv1.Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + if len(resp.Kvs) != 1 || string(resp.Kvs[0].Value) != "def" { + t.Fatalf(`expected "k"->"def" from lkv1, got response %+v`, resp) + } + + go func() { + // Eventually bring back the server so the disconnected client can + // finish its last `Get()`. + time.Sleep(1 * time.Second) + clus.Members[0].Restart(t) + }() + cachedResp, err := lkv0.Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + if len(cachedResp.Kvs) != 1 || string(cachedResp.Kvs[0].Value) != "def" { + t.Fatalf(`expected "k"->"def", got response %+v`, cachedResp) + } +} + func TestLeasingDisconnectedGet(t *testing.T) { integration2.BeforeTest(t) clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1, UseBridge: true})