Skip to content

Commit

Permalink
lessor: fix le.itemMap leak after lease is revoked
Browse files Browse the repository at this point in the history
Signed-off-by: qsyqian <qsyqian@gmail.com>
Co-authored-by: Thomas Jungblut <tjungblu@redhat.com>
  • Loading branch information
2 people authored and qianshuangyang.qsy committed Nov 28, 2023
1 parent 03d5512 commit 563662f
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 0 deletions.
6 changes: 6 additions & 0 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ func (le *lessor) Revoke(id LeaseID) error {

txn.End()

for _, key := range keys {
le.mu.Lock()
delete(le.itemMap, LeaseItem{Key: key})
le.mu.Unlock()
}

leaseRevoked.Inc()
return nil
}
Expand Down
37 changes: 37 additions & 0 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,43 @@ func TestLessorRevoke(t *testing.T) {
}
}

func TestLessorGrantAttachRevokeDetach(t *testing.T) {
lg := zap.NewNop()
_, be := NewTestBackend(t)
defer be.Close()

le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })

// grant a lease with long term (100 seconds) to
// avoid early termination during the test.
l, err := le.Grant(1, 100)
if err != nil {
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
}

items := []LeaseItem{
{"foo"},
{"bar"},
}

if err := le.Attach(l.ID, items); err != nil {
t.Fatalf("failed to attach items to the lease: %v", err)
}

if err := le.Revoke(l.ID); err != nil {
t.Fatalf("failed to revoke lease: %v", err)
}

if len(le.leaseMap) != 0 {
t.Errorf("le.leaseMap's length is %d, expected: 0", len(le.leaseMap))
}
if len(le.itemMap) != 0 {
t.Errorf("le.itemMap's length is %d, expected: 0", len(le.leaseMap))
}
}

func renew(t *testing.T, le *lessor, id LeaseID) int64 {
ch := make(chan int64, 1)
errch := make(chan error, 1)
Expand Down
104 changes: 104 additions & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,110 @@ func TestStorePut(t *testing.T) {
}
}

func TestStorePutDeleteWithLease(t *testing.T) {
lg := zaptest.NewLogger(t)
key := newTestRevBytes(Revision{2, 0})
kv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Lease: 1,
}
kvb, err := kv.Marshal()
if err != nil {
t.Fatal(err)
}

s := newFakeStore(lg)
s.le = lease.NewLessor(lg, s.b, nil, lease.LessorConfig{MinLeaseTTL: 5})
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)

l, err := s.le.Grant(1, 10)
if err != nil {
t.Errorf("failed to grant lease with ttl(10s), err: %v", err)
}

fi.indexGetRespc <- indexGetResp{Revision{}, Revision{}, 0, ErrRevisionNotFound}
s.Put([]byte("foo"), []byte("bar"), l.ID)

if !reflect.DeepEqual(s.le.Lookup(l.ID).Keys(), []string{"foo"}) {
t.Errorf("lease %d has keys: %v, expected: %v", l.ID, s.le.Lookup(l.ID).Keys(), []string{"foo"})
}
if l.ID != s.le.GetLease(lease.LeaseItem{Key: "foo"}) {
t.Errorf("key 'foo' has leaseId: %d, expected: %d", s.le.GetLease(lease.LeaseItem{Key: "foo"}), l.ID)
}

fi.indexRangeRespc <- indexRangeResp{[][]byte{[]byte("foo")}, []Revision{{2, 0}}}
b.tx.rangeRespc <- rangeResp{[][]byte{key}, [][]byte{kvb}}
s.DeleteRange([]byte("foo"), nil)

if len(s.le.Lookup(l.ID).Keys()) != 0 {
t.Errorf("lease %d should has keys %v, expect empty", l.ID, s.le.Lookup(l.ID).Keys())
}
if s.le.GetLease(lease.LeaseItem{Key: "foo"}) != lease.NoLease {
t.Errorf("key 'foo' should has leaseId: %d, expected: %d", s.le.GetLease(lease.LeaseItem{Key: "foo"}), lease.NoLease)
}

s.Close()
}

func TestStorePutWithLeaseRevoke(t *testing.T) {
lg := zaptest.NewLogger(t)
key := newTestRevBytes(Revision{2, 0})
kv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Lease: 1,
}
kvb, err := kv.Marshal()
if err != nil {
t.Fatal(err)
}

s := newFakeStore(lg)
s.le = lease.NewLessor(lg, s.b, nil, lease.LessorConfig{MinLeaseTTL: 5})
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)

l, err := s.le.Grant(1, 10)
if err != nil {
t.Errorf("failed to grant lease with ttl(10s), err: %v", err)
}

fi.indexGetRespc <- indexGetResp{Revision{}, Revision{}, 0, ErrRevisionNotFound}
s.Put([]byte("foo"), []byte("bar"), l.ID)

if !reflect.DeepEqual(s.le.Lookup(l.ID).Keys(), []string{"foo"}) {
t.Errorf("lease %d has keys: %v, expected: %v", l.ID, s.le.Lookup(l.ID).Keys(), []string{"foo"})
}
if l.ID != s.le.GetLease(lease.LeaseItem{Key: "foo"}) {
t.Errorf("key 'foo' has leaseId: %d, expected: %d", s.le.GetLease(lease.LeaseItem{Key: "foo"}), l.ID)
}

fi.indexRangeRespc <- indexRangeResp{[][]byte{[]byte("foo")}, []Revision{{2, 0}}}
b.tx.rangeRespc <- rangeResp{[][]byte{key}, [][]byte{kvb}}
if err := s.le.Revoke(l.ID); err != nil {
t.Errorf("failed to revoke lease ID: %d, err: %v", l.ID, err)
}

if s.le.Lookup(l.ID) != nil {
t.Errorf("lease %d should not exist in lessor", l.ID)
}
if s.le.GetLease(lease.LeaseItem{Key: "foo"}) != lease.NoLease {
t.Errorf("key 'foo' should has leaseId: %d, expected: %d", s.le.GetLease(lease.LeaseItem{Key: "foo"}), lease.NoLease)
}

s.Close()

}

func TestStoreRange(t *testing.T) {
lg := zaptest.NewLogger(t)
key := newTestRevBytes(Revision{Main: 2})
Expand Down

0 comments on commit 563662f

Please sign in to comment.