Skip to content

Commit

Permalink
catalog/lease: detect if synchronous lease releases are successful
Browse files Browse the repository at this point in the history
Previously, for unit testing, we added support for synchronously
releasing leases. If the context was cancelled when releasing a lease
synchronously, it was possible for the lease to be erased from memory
and not from storage. As a result, reacquisition could hit an error when
session-based leasing is enabled. To address this, this patch re-orders
operations so that we clear storage first for synchronous lease release,
followed by the in-memory copy.

Fixes: #118522, #118523, #118521

Release note: None
  • Loading branch information
fqazi committed Jan 31, 2024
1 parent 07adffb commit 49592b7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
11 changes: 9 additions & 2 deletions pkg/sql/catalog/lease/descriptor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,18 @@ func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState
t.mu.Lock()
defer t.mu.Unlock()
if l := maybeMarkRemoveStoredLease(s); l != nil {
t.mu.active.remove(s)
leaseReleased := true
// For testing, we will synchronously release leases, but that
// exposes us to the danger of the context getting cancelled. To
// eliminate this risk, we are going first remove the lease from
// storage and then delete if from mqemory.
if t.m.storage.testingKnobs.RemoveOnceDereferenced {
releaseLease(ctx, l, t.m)
leaseReleased = releaseLease(ctx, l, t.m)
l = nil
}
if leaseReleased {
t.mu.active.remove(s)
}
return l
}
return nil
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func acquireNodeLease(
}

// releaseLease deletes an entry from system.lease.
func releaseLease(ctx context.Context, lease *storedLease, m *Manager) {
func releaseLease(ctx context.Context, lease *storedLease, m *Manager) (released bool) {
// Force the release to happen synchronously, if we are draining or, when we
// force removals for unit tests. This didn't matter with expiration based leases
// since each renewal would have a different expiry (but the same version in
Expand All @@ -647,8 +647,9 @@ func releaseLease(ctx context.Context, lease *storedLease, m *Manager) {
// because we release only if a new version exists.
if m.IsDraining() || m.removeOnceDereferenced() {
// Release synchronously to guarantee release before exiting.
m.storage.release(ctx, m.stopper, lease)
return
// It's possible for the context to get cancelled, so return if
// it was released.
return m.storage.release(ctx, m.stopper, lease)
}

// Release to the store asynchronously, without the descriptorState lock.
Expand All @@ -663,6 +664,8 @@ func releaseLease(ctx context.Context, lease *storedLease, m *Manager) {
}); err != nil {
log.Warningf(ctx, "error: %s, not releasing lease: %q", err, lease)
}
// Asynchronous job is releasing it.
return true
}

// purgeOldVersions removes old unused descriptor versions older than
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/catalog/lease/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (s storage) acquire(
// written a value to the database, which we'd leak if we did not delete it.
// Note that the expiration is part of the primary key in the table, so we
// would not overwrite the old entry if we just were to do another insert.
//repeatIteration = desc != nil
if (!expiration.IsEmpty() || sessionID != nil) && desc != nil {
prevExpirationTS := storedLeaseExpiration(expiration)
if err := s.writer.deleteLease(ctx, txn, leaseFields{
Expand Down Expand Up @@ -191,7 +192,6 @@ func (s storage) acquire(
log.VEventf(ctx, 2, "storage attempting to acquire lease %v@%v", desc, expiration)

ts := storedLeaseExpiration(expiration)

var isLeaseRenewal bool
var lastLeaseWasWrittenWithSessionID bool
// If there was a previous lease then determine if this a renewal and
Expand Down Expand Up @@ -254,7 +254,9 @@ func (s storage) acquire(
// read a descriptor because it can be called while modifying a
// descriptor through a schema change before the schema change has committed
// that can result in a deadlock.
func (s storage) release(ctx context.Context, stopper *stop.Stopper, lease *storedLease) {
func (s storage) release(
ctx context.Context, stopper *stop.Stopper, lease *storedLease,
) (released bool) {
ctx = multitenant.WithTenantCostControlExemption(ctx)
retryOptions := base.DefaultRetryOptions()
retryOptions.Closer = stopper.ShouldQuiesce()
Expand Down Expand Up @@ -283,13 +285,15 @@ func (s storage) release(ctx context.Context, stopper *stop.Stopper, lease *stor
}
continue
}
released = true
s.outstandingLeases.Dec(1)
if s.testingKnobs.LeaseReleasedEvent != nil {
s.testingKnobs.LeaseReleasedEvent(
lease.id, descpb.DescriptorVersion(lease.version), err)
}
break
}
return released
}

// Get the descriptor valid for the expiration time from the store.
Expand Down

0 comments on commit 49592b7

Please sign in to comment.