Skip to content

Commit

Permalink
Merge #62954 #62959
Browse files Browse the repository at this point in the history
62954: sql: Re-enable multi_region_backup test r=arulajmani a=ajstorm

With #60835 merged, this test no longer flakes. I've stressed it on my
GCE worker now for a while an it's all good.

Resolves #60773.

Release note: None

62959: sql: lease acquisition of OFFLINE descs may starve bulk operations r=ajwerner a=fqazi

Fixes: #61798

Previously, offline descriptors would never have their leases
cached and they would be released once the reference count hit zero.
This was inadequate because when attempting to online these tables
again the lease acquisition could be pushed back by other operations,
leading to starvation / live locks. To address this, this patch will
allow the leases of offline descriptors to be cached.

Release note (bug fix): Lease acquisitions of descriptor in a
offline state may starve out bulk operations (backup / restore)

Co-authored-by: Adam Storm <storm@cockroachlabs.com>
Co-authored-by: Faizan Qazi <faizan@cockroachlabs.com>
  • Loading branch information
3 people committed Apr 1, 2021
3 parents 07c8080 + 82df3a1 + 4b6518c commit 2a16884
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 20 deletions.
18 changes: 17 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2257,11 +2257,18 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor
}
return nil
})

if err != nil {
return err
}

// Wait for the table to be public before completing.
for _, tbl := range details.Tables {
_, err := lm.WaitForOneVersion(ctx, tbl.Desc.ID, retry.Options{})
if err != nil {
return errors.Wrap(err, "publishing tables waiting for one version")
}
}

// Initiate a run of CREATE STATISTICS. We don't know the actual number of
// rows affected per table, so we use a large number because we want to make
// sure that stats always get created/refreshed here.
Expand Down Expand Up @@ -2309,6 +2316,15 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{})
}); err != nil {
return err
}
// Wait for the tables to become public before completing.
if details.PrepareComplete {
for _, tableDesc := range details.Tables {
_, err := cfg.LeaseManager.WaitForOneVersion(ctx, tableDesc.Desc.ID, retry.Options{})
if err != nil {
return errors.Wrap(err, "rolling back tables waiting for them to be public")
}
}
}

// Run any jobs which might have been queued when dropping the schemas.
// This would be a job to drop all the schemas, and a job to update the parent
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/logictestccl/testdata/logic_test/multi_region_backup
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# LogicTest: multiregion-9node-3region-3azs

skip flaky # see #60773

query TTTT
SHOW REGIONS
----
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (tc *Collection) getLeasedDescriptorByName(
// Read the descriptor from the store in the face of some specific errors
// because of a known limitation of AcquireByName. See the known
// limitations of AcquireByName for details.
if catalog.HasInactiveDescriptorError(err) ||
if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) ||
errors.Is(err, catalog.ErrDescriptorNotFound) {
return nil, true, nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -68,7 +69,9 @@ go_test(
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqltestutils",
"//pkg/sql/sqlutil",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
Expand All @@ -85,6 +88,7 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_cockroach_go//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
52 changes: 37 additions & 15 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s storage) acquire(
return err
}
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{}, // filter all non-public state
desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only
); err != nil {
return err
}
Expand Down Expand Up @@ -985,7 +985,7 @@ func purgeOldVersions(
ctx context.Context,
db *kv.DB,
id descpb.ID,
takenOffline bool,
dropped bool,
minVersion descpb.DescriptorVersion,
m *Manager,
) error {
Expand All @@ -999,24 +999,24 @@ func purgeOldVersions(
}
empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0
t.mu.Unlock()
if empty && !takenOffline {
if empty && !dropped {
// We don't currently have a version on this descriptor, so no need to refresh
// anything.
return nil
}

removeInactives := func(takenOffline bool) {
removeInactives := func(dropped bool) {
t.mu.Lock()
t.mu.takenOffline = takenOffline
t.mu.takenOffline = dropped
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
}
}

if takenOffline {
removeInactives(true /* takenOffline */)
if dropped {
removeInactives(true /* dropped */)
return nil
}

Expand All @@ -1032,7 +1032,7 @@ func purgeOldVersions(
return errRenewLease
}
newest.incRefcount()
removeInactives(false /* takenOffline */)
removeInactives(false /* dropped */)
s, err := t.release(newest.Descriptor, m.removeOnceDereferenced())
if err != nil {
return err
Expand Down Expand Up @@ -1402,6 +1402,28 @@ func (m *Manager) AcquireByName(
parentSchemaID descpb.ID,
name string,
) (catalog.Descriptor, hlc.Timestamp, error) {
// When offline descriptor leases were not allowed to be cached,
// attempt to acquire a lease on them would generate a descriptor
// offline error. Recent changes allow offline descriptor leases
// to be cached, but callers still need the offline error generated.
// This logic will release the lease (the lease manager will still
// cache it), and generate the offline descriptor error.
validateDescriptorForReturn := func(desc catalog.Descriptor,
expiration hlc.Timestamp) (catalog.Descriptor, hlc.Timestamp, error) {
if desc.Offline() {
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{},
); err != nil {
releaseErr := m.Release(desc)
if releaseErr != nil {
log.Warningf(ctx, "error releasing lease: %s", releaseErr)
}
return nil, hlc.Timestamp{}, err
}
}
return desc, expiration, nil
}

// Check if we have cached an ID for this name.
descVersion := m.names.get(parentID, parentSchemaID, name, timestamp)
if descVersion != nil {
Expand All @@ -1416,7 +1438,7 @@ func (m *Manager) AcquireByName(
}
}
}
return descVersion.Descriptor, descVersion.expiration, nil
return validateDescriptorForReturn(descVersion.Descriptor, descVersion.expiration)
}
if err := m.Release(descVersion); err != nil {
return nil, hlc.Timestamp{}, err
Expand All @@ -1426,7 +1448,7 @@ func (m *Manager) AcquireByName(
if err != nil {
return nil, hlc.Timestamp{}, err
}
return desc, expiration, nil
return validateDescriptorForReturn(desc, expiration)
}

// We failed to find something in the cache, or what we found is not
Expand Down Expand Up @@ -1495,7 +1517,7 @@ func (m *Manager) AcquireByName(
return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound
}
}
return desc, expiration, nil
return validateDescriptorForReturn(desc, expiration)
}

// resolveName resolves a descriptor name to a descriptor ID at a particular
Expand Down Expand Up @@ -1698,11 +1720,11 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB)
}

id, version, name, state := descpb.GetDescriptorMetadata(desc)
goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE
dropped := state == descpb.DescriptorState_DROP
// Try to refresh the lease to one >= this version.
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)",
id, version, goingOffline)
if err := purgeOldVersions(ctx, db, id, goingOffline, version, m); err != nil {
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)",
id, version, dropped)
if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil {
log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s",
id, name, err)
}
Expand Down
Loading

0 comments on commit 2a16884

Please sign in to comment.