From f5a0aa3c7e1b286ce610a9df593e96e9f0eee261 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jasionowski?= Date: Fri, 19 Nov 2021 14:52:30 +0000 Subject: [PATCH 1/2] lease,integration: add checkpoint scheduling after leader change Current checkpointing mechanism is buggy. New checkpoints for any lease are scheduled only until the first leader change. Added fix for that and a test that will check it. --- server/lease/lessor.go | 1 + tests/integration/v3_lease_test.go | 128 +++++++++++++++++++---------- 2 files changed, 85 insertions(+), 44 deletions(-) diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 5dba54db02e..abe5ec1b420 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -447,6 +447,7 @@ func (le *lessor) Promote(extend time.Duration) { l.refresh(extend) item := &LeaseWithTime{id: l.ID, time: l.expiry} le.leaseExpiredNotifier.RegisterOrUpdate(item) + le.scheduleCheckpointIfNeeded(l) } if len(le.leaseMap) < leaseRevokeRate { diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 41f33ef6266..4beb463765b 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -229,56 +229,96 @@ func TestV3LeaseKeepAlive(t *testing.T) { // TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted // across leader elections. func TestV3LeaseCheckpoint(t *testing.T) { - BeforeTest(t) - - var ttl int64 = 300 - leaseInterval := 2 * time.Second - BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{ - Size: 3, - EnableLeaseCheckpoint: true, - LeaseCheckpointInterval: leaseInterval, - }) - defer clus.Terminate(t) - - // create lease - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - c := toGRPC(clus.RandClient()) - lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl}) - if err != nil { - t.Fatal(err) - } + tcs := []struct { + name string + checkpointingEnabled bool + ttl time.Duration + checkpointingInterval time.Duration + leaderChanges int + expectTTLIsGT time.Duration + expectTTLIsLT time.Duration + }{ + { + name: "Checkpointing disabled, lease TTL is reset", + ttl: 300 * time.Second, + leaderChanges: 1, + expectTTLIsGT: 298 * time.Second, + }, + { + name: "Checkpointing enabled 10s, lease TTL is preserved after leader change", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 1, + expectTTLIsLT: 290 * time.Second, + }, + { + // Checking if checkpointing continues after the first leader change. + name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 2, + expectTTLIsLT: 280 * time.Second, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + BeforeTest(t) + config := &ClusterConfig{ + Size: 3, + EnableLeaseCheckpoint: tc.checkpointingEnabled, + LeaseCheckpointInterval: tc.checkpointingInterval, + } + clus := NewClusterV3(t, config) + defer clus.Terminate(t) + + // create lease + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := toGRPC(clus.RandClient()) + lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())}) + if err != nil { + t.Fatal(err) + } - // wait for a checkpoint to occur - time.Sleep(leaseInterval + 1*time.Second) + for i := 0; i < tc.leaderChanges; i++ { + // wait for a checkpoint to occur + time.Sleep(tc.checkpointingInterval + 1*time.Second) - // Force a leader election - leaderId := clus.WaitLeader(t) - leader := clus.Members[leaderId] - leader.Stop(t) - time.Sleep(time.Duration(3*electionTicks) * tickDuration) - leader.Restart(t) - newLeaderId := clus.WaitLeader(t) - c2 := toGRPC(clus.Client(newLeaderId)) + // Force a leader election + leaderId := clus.WaitLeader(t) + leader := clus.Members[leaderId] + leader.Stop(t) + time.Sleep(time.Duration(3*electionTicks) * tickDuration) + leader.Restart(t) + } - time.Sleep(250 * time.Millisecond) + newLeaderId := clus.WaitLeader(t) + c2 := toGRPC(clus.Client(newLeaderId)) + + time.Sleep(250 * time.Millisecond) + + // Check the TTL of the new leader + var ttlresp *pb.LeaseTimeToLiveResponse + for i := 0; i < 10; i++ { + if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil { + if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable { + time.Sleep(time.Millisecond * 250) + } else { + t.Fatal(err) + } + } + } - // Check the TTL of the new leader - var ttlresp *pb.LeaseTimeToLiveResponse - for i := 0; i < 10; i++ { - if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil { - if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable { - time.Sleep(time.Millisecond * 250) - } else { - t.Fatal(err) + if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT { + t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT) } - } - } - expectedTTL := ttl - int64(leaseInterval.Seconds()) - if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL { - t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL) + if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT { + t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT) + } + }) } } From 4b38a68457006fec4cc4dce5914e017f5bd12896 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jasionowski?= Date: Fri, 19 Nov 2021 15:02:25 +0000 Subject: [PATCH 2/2] etcdserver,integration: apply old LeaseCheckpoint reqs to EtcdServer To extend lease checkpointing mechanism to cases when the whole etcd cluster is restarted. If etcd server has to restore its state from the raft logs, all LeaseCheckpoint requests will be applied to the server, regardles of the index value. This will set remaining TTLs to values from before the restart. Otherwise, remaining TTLs would be reset to initial TTLs after each cluster restart. Added integration test to cover this case. --- server/etcdserver/server.go | 5 +++++ tests/integration/v3_lease_test.go | 15 ++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 791740df97a..4e5837c7251 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2163,6 +2163,11 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp))) return } + if !shouldApplyV3 && raftReq.LeaseCheckpoint != nil { + shouldApplyV3 = membership.ApplyBoth + s.lg.Debug("flipping should-applyV3 to reapply old lease checkpoint", + zap.Bool("should-applyV3", bool(shouldApplyV3))) + } s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) if raftReq.V2 != nil { diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 4beb463765b..aa9b8a74f67 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -235,6 +235,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { ttl time.Duration checkpointingInterval time.Duration leaderChanges int + clusterSize int expectTTLIsGT time.Duration expectTTLIsLT time.Duration }{ @@ -242,6 +243,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { name: "Checkpointing disabled, lease TTL is reset", ttl: 300 * time.Second, leaderChanges: 1, + clusterSize: 3, expectTTLIsGT: 298 * time.Second, }, { @@ -250,6 +252,16 @@ func TestV3LeaseCheckpoint(t *testing.T) { checkpointingEnabled: true, checkpointingInterval: 10 * time.Second, leaderChanges: 1, + clusterSize: 3, + expectTTLIsLT: 290 * time.Second, + }, + { + name: "Checkpointing enabled 10s, lease TTL is preserved after cluster restart", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 1, + clusterSize: 1, expectTTLIsLT: 290 * time.Second, }, { @@ -259,6 +271,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { checkpointingEnabled: true, checkpointingInterval: 10 * time.Second, leaderChanges: 2, + clusterSize: 3, expectTTLIsLT: 280 * time.Second, }, } @@ -266,7 +279,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { t.Run(tc.name, func(t *testing.T) { BeforeTest(t) config := &ClusterConfig{ - Size: 3, + Size: tc.clusterSize, EnableLeaseCheckpoint: tc.checkpointingEnabled, LeaseCheckpointInterval: tc.checkpointingInterval, }