diff --git a/DEPS.bzl b/DEPS.bzl index c4724697cb0f..e43d3dd299f1 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -9304,10 +9304,10 @@ def go_deps(): name = "org_golang_google_grpc", build_file_proto_mode = "disable_global", importpath = "google.golang.org/grpc", - sha256 = "30dde2a858d77414c283994f1ace3dd52f2fa75e43a9b13057d458525808917f", - strip_prefix = "google.golang.org/grpc@v1.47.0", + sha256 = "bd07e8767218f67342fe6e673754db022d85a62167638791da477fad3f4a868f", + strip_prefix = "google.golang.org/grpc@v1.49.0", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/google.golang.org/grpc/org_golang_google_grpc-v1.47.0.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/google.golang.org/grpc/org_golang_google_grpc-v1.49.0.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 1c98a3d75dad..298b6578c0c7 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -895,7 +895,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/google.golang.org/genproto/org_golang_google_genproto-v0.0.0-20220505152158-f39f71e6c8f3.zip": "8aa446ba7fe5a28d398eceefdad569b604bd4f9c0f86662db0d35fbf61d316cf", "https://storage.googleapis.com/cockroach-godeps/gomod/google.golang.org/grpc/cmd/protoc-gen-go-grpc/org_golang_google_grpc_cmd_protoc_gen_go_grpc-v1.1.0.zip": "13877d86cbfa30bde4d62fef2bc58dd56377dcb502c16cf78197f6934193009a", "https://storage.googleapis.com/cockroach-godeps/gomod/google.golang.org/grpc/examples/org_golang_google_grpc_examples-v0.0.0-20210324172016-702608ffae4d.zip": "f5cad7b05a93557c91864a02890a35c6bc5c394897222978cff2b880a78f7a11", - "https://storage.googleapis.com/cockroach-godeps/gomod/google.golang.org/grpc/org_golang_google_grpc-v1.47.0.zip": "30dde2a858d77414c283994f1ace3dd52f2fa75e43a9b13057d458525808917f", + "https://storage.googleapis.com/cockroach-godeps/gomod/google.golang.org/grpc/org_golang_google_grpc-v1.49.0.zip": "bd07e8767218f67342fe6e673754db022d85a62167638791da477fad3f4a868f", "https://storage.googleapis.com/cockroach-godeps/gomod/google.golang.org/protobuf/org_golang_google_protobuf-v1.28.1.zip": "bf386bcd36987f898e70c8330c6f7ada03e5112909f0a92b7510961403bf61da", "https://storage.googleapis.com/cockroach-godeps/gomod/gopkg.in/DataDog/dd-trace-go.v1/in_gopkg_datadog_dd_trace_go_v1-v1.17.0.zip": "2ebcc818df0b2d560a61037da4492ae7effbaed67de94339a1d3a72728d2cb09", "https://storage.googleapis.com/cockroach-godeps/gomod/gopkg.in/airbrake/gobrake.v2/in_gopkg_airbrake_gobrake_v2-v2.0.9.zip": "2db903664908e5a9afafefba94821b9579bbf271e2929c1f0b7b1fdd23f7bbcf", diff --git a/go.mod b/go.mod index 9e23d56c2e28..2b5506ac902f 100644 --- a/go.mod +++ b/go.mod @@ -170,7 +170,7 @@ require ( golang.org/x/tools v0.1.11 google.golang.org/api v0.80.0 google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3 - google.golang.org/grpc v1.47.0 + google.golang.org/grpc v1.49.0 google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index a37e5c53c276..021adc20fc69 100644 --- a/go.sum +++ b/go.sum @@ -3084,8 +3084,8 @@ google.golang.org/grpc v1.41.0-dev.0.20210907181116-2f3355d2244e/go.mod h1:U3l9u google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= -google.golang.org/grpc v1.47.0 h1:9n77onPX5F3qfFCqjy9dhn8PbNQsIKeVU04J9G7umt8= -google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= +google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw= +google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/examples v0.0.0-20210324172016-702608ffae4d h1:CJP00gEaaYNJuaTXEg456rWNC1oUOfiAiUjuwyAhEmM= google.golang.org/grpc/examples v0.0.0-20210324172016-702608ffae4d/go.mod h1:Ly7ZA/ARzg8fnPU9TyZIxoz33sEUuWX7txiqs8lPTgE= diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 9b6a98d57840..2e526ba8f6fd 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -157,6 +157,12 @@ type TestServerArgs struct { // or if some of the functionality being tested is not accessible from // within tenants. DisableDefaultTestTenant bool + + // ShareMostTestingKnobsWithTenant should be set by tests that want their + // testing knobs shared with the any DefaultTestTenant that the server starts. + // See (*testserver).TestingKnobsForTenant for details on which knobs aren't + // shared. + ShareMostTestingKnobsWithTenant bool } // TestClusterArgs contains the parameters one can set when creating a test diff --git a/pkg/ccl/backupccl/show.go b/pkg/ccl/backupccl/show.go index fcfe0ec3f206..9452ecfd905d 100644 --- a/pkg/ccl/backupccl/show.go +++ b/pkg/ccl/backupccl/show.go @@ -670,6 +670,8 @@ func backupShowerDefault( var rows []tree.Datums for layer, manifest := range info.manifests { + ctx, sp := tracing.ChildSpan(ctx, "backupccl.backupShowerDefault.fn.layer") + // Map database ID to descriptor name. dbIDToName := make(map[descpb.ID]string) schemaIDToName := make(map[descpb.ID]string) @@ -692,11 +694,12 @@ func backupShowerDefault( } } } + var fileSizes []int64 if len(info.fileSizes) > 0 { fileSizes = info.fileSizes[layer] } - tableSizes, err := getTableSizes(manifest.Files, fileSizes) + tableSizes, err := getTableSizes(ctx, manifest.Files, fileSizes) if err != nil { return nil, err } @@ -772,18 +775,23 @@ func backupShowerDefault( rowCountDatum = tree.NewDInt(tree.DInt(tableSize.rowCount.Rows)) fileSizeDatum = tree.NewDInt(tree.DInt(tableSize.fileSize)) - displayOptions := sql.ShowCreateDisplayOptions{ - FKDisplayMode: sql.OmitMissingFKClausesFromCreate, - IgnoreComments: true, - } - createStmt, err := p.ShowCreate(ctx, dbName, manifest.Descriptors, - tabledesc.NewBuilder(desc.TableDesc()).BuildImmutableTable(), displayOptions) - if err != nil { - // We expect that we might get an error here due to X-DB - // references, which were possible on 20.2 betas and rcs. - log.Errorf(ctx, "error while generating create statement: %+v", err) + // Only resolve the table schemas if running `SHOW BACKUP SCHEMAS`. + // In all other cases we discard these results and so it is wasteful + // to construct the SQL representation of the table's schema. + if showSchemas { + displayOptions := sql.ShowCreateDisplayOptions{ + FKDisplayMode: sql.OmitMissingFKClausesFromCreate, + IgnoreComments: true, + } + createStmt, err := p.ShowCreate(ctx, dbName, manifest.Descriptors, + tabledesc.NewBuilder(desc.TableDesc()).BuildImmutableTable(), displayOptions) + if err != nil { + // We expect that we might get an error here due to X-DB + // references, which were possible on 20.2 betas and rcs. + log.Errorf(ctx, "error while generating create statement: %+v", err) + } + createStmtDatum = nullIfEmpty(createStmt) } - createStmtDatum = nullIfEmpty(createStmt) default: descriptorType = "unknown" } @@ -805,7 +813,7 @@ func backupShowerDefault( row = append(row, createStmtDatum) } if _, shouldShowPrivileges := opts[backupOptWithPrivileges]; shouldShowPrivileges { - row = append(row, tree.NewDString(showPrivileges(descriptor))) + row = append(row, tree.NewDString(showPrivileges(ctx, descriptor))) owner := desc.GetPrivileges().Owner().SQLIdentifier() row = append(row, tree.NewDString(owner)) } @@ -867,6 +875,7 @@ func backupShowerDefault( } rows = append(rows, row) } + sp.Finish() } return rows, nil }, @@ -881,7 +890,11 @@ type descriptorSize struct { // getLogicalSSTSize gets the total logical bytes stored in each SST. Note that a // BackupManifest_File identifies a span in an SST and there can be multiple // spans stored in an SST. -func getLogicalSSTSize(files []backuppb.BackupManifest_File) map[string]int64 { +func getLogicalSSTSize(ctx context.Context, files []backuppb.BackupManifest_File) map[string]int64 { + ctx, span := tracing.ChildSpan(ctx, "backupccl.getLogicalSSTSize") + defer span.Finish() + _ = ctx + sstDataSize := make(map[string]int64) for _, file := range files { sstDataSize[file.Path] += file.EntryCounts.DataSize @@ -898,8 +911,11 @@ func approximateSpanPhysicalSize( // getTableSizes gathers row and size count for each table in the manifest func getTableSizes( - files []backuppb.BackupManifest_File, fileSizes []int64, + ctx context.Context, files []backuppb.BackupManifest_File, fileSizes []int64, ) (map[descpb.ID]descriptorSize, error) { + ctx, span := tracing.ChildSpan(ctx, "backupccl.getTableSizes") + defer span.Finish() + tableSizes := make(map[descpb.ID]descriptorSize) if len(files) == 0 { return tableSizes, nil @@ -910,7 +926,7 @@ func getTableSizes( } showCodec := keys.MakeSQLCodec(tenantID) - logicalSSTSize := getLogicalSSTSize(files) + logicalSSTSize := getLogicalSSTSize(ctx, files) for i, file := range files { // TODO(dan): This assumes each file in the backup only @@ -985,7 +1001,11 @@ func showRegions(typeDesc catalog.TypeDescriptor, dbname string) (string, error) return regionsStringBuilder.String(), nil } -func showPrivileges(descriptor *descpb.Descriptor) string { +func showPrivileges(ctx context.Context, descriptor *descpb.Descriptor) string { + ctx, span := tracing.ChildSpan(ctx, "backupccl.showPrivileges") + defer span.Finish() + _ = ctx // ctx is currently unused, but this new ctx should be used below in the future. + var privStringBuilder strings.Builder b := descbuilder.NewBuilder(descriptor) @@ -1173,7 +1193,7 @@ func backupShowerFileSetup(inCol tree.StringOrPlaceholderOptList) backupShower { backupType = "incremental" } - logicalSSTSize := getLogicalSSTSize(manifest.Files) + logicalSSTSize := getLogicalSSTSize(ctx, manifest.Files) for j, file := range manifest.Files { filePath := file.Path if inCol != nil { diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go index 94b5bf1f3f78..9460945cb07a 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go @@ -322,7 +322,7 @@ func (ts *testState) request( return "" } -func (ts *testState) externalIngress(t *testing.T, d *datadriven.TestData, args cmdArgs) string { +func (ts *testState) externalIngress(t *testing.T, _ *datadriven.TestData, args cmdArgs) string { usage := multitenant.ExternalIOUsage{IngressBytes: args.bytes} if err := ts.controller.OnExternalIOWait(context.Background(), usage); err != nil { t.Errorf("OnExternalIOWait error: %s", err) @@ -341,12 +341,12 @@ func (ts *testState) externalEgress(t *testing.T, d *datadriven.TestData, args c return "" } -func (ts *testState) enableRUAccounting(t *testing.T, _ *datadriven.TestData, _ cmdArgs) string { +func (ts *testState) enableRUAccounting(_ *testing.T, _ *datadriven.TestData, _ cmdArgs) string { tenantcostclient.ExternalIORUAccountingMode.Override(context.Background(), &ts.settings.SV, "on") return "" } -func (ts *testState) disableRUAccounting(t *testing.T, _ *datadriven.TestData, _ cmdArgs) string { +func (ts *testState) disableRUAccounting(_ *testing.T, _ *datadriven.TestData, _ cmdArgs) string { tenantcostclient.ExternalIORUAccountingMode.Override(context.Background(), &ts.settings.SV, "off") return "" } @@ -424,7 +424,7 @@ func (ts *testState) advance(t *testing.T, d *datadriven.TestData, args cmdArgs) // waitForEvent waits until the tenant controller reports the given event // type(s), at the current time. -func (ts *testState) waitForEvent(t *testing.T, d *datadriven.TestData, args cmdArgs) string { +func (ts *testState) waitForEvent(t *testing.T, d *datadriven.TestData, _ cmdArgs) string { typs := make(map[string]tenantcostclient.TestEventType) for ev, evStr := range eventTypeStr { typs[evStr] = ev @@ -444,7 +444,7 @@ func (ts *testState) waitForEvent(t *testing.T, d *datadriven.TestData, args cmd // unblockRequest resumes a token bucket request that was blocked by the // "blockRequest" configuration option. -func (ts *testState) unblockRequest(t *testing.T, d *datadriven.TestData, args cmdArgs) string { +func (ts *testState) unblockRequest(t *testing.T, _ *datadriven.TestData, _ cmdArgs) string { ts.provider.unblockRequest(t) return "" } @@ -461,7 +461,7 @@ func (ts *testState) unblockRequest(t *testing.T, d *datadriven.TestData, args c // ---- // 00:00:01.000 // 00:00:02.000 -func (ts *testState) timers(t *testing.T, d *datadriven.TestData, args cmdArgs) string { +func (ts *testState) timers(t *testing.T, d *datadriven.TestData, _ cmdArgs) string { // If we are rewriting the test, just sleep a bit before returning the // timers. if d.Rewrite { @@ -491,7 +491,7 @@ func timesToString(times []time.Time) string { } // configure the test provider. -func (ts *testState) configure(t *testing.T, d *datadriven.TestData, args cmdArgs) string { +func (ts *testState) configure(t *testing.T, d *datadriven.TestData, _ cmdArgs) string { var cfg testProviderConfig if err := yaml.UnmarshalStrict([]byte(d.Input), &cfg); err != nil { d.Fatalf(t, "failed to parse request yaml: %v", err) @@ -501,13 +501,13 @@ func (ts *testState) configure(t *testing.T, d *datadriven.TestData, args cmdArg } // tokenBucket dumps the current state of the tenant's token bucket. -func (ts *testState) tokenBucket(t *testing.T, d *datadriven.TestData, args cmdArgs) string { +func (ts *testState) tokenBucket(*testing.T, *datadriven.TestData, cmdArgs) string { return tenantcostclient.TestingTokenBucketString(ts.controller) } // cpu adds CPU usage which will be observed by the controller on the next main // loop tick. -func (ts *testState) cpu(t *testing.T, d *datadriven.TestData, args cmdArgs) string { +func (ts *testState) cpu(t *testing.T, d *datadriven.TestData, _ cmdArgs) string { duration, err := time.ParseDuration(d.Input) if err != nil { d.Fatalf(t, "error parsing cpu duration: %v", err) @@ -518,7 +518,7 @@ func (ts *testState) cpu(t *testing.T, d *datadriven.TestData, args cmdArgs) str // pgwire adds PGWire egress usage which will be observed by the controller on the next // main loop tick. -func (ts *testState) pgwireEgress(t *testing.T, d *datadriven.TestData, args cmdArgs) string { +func (ts *testState) pgwireEgress(t *testing.T, d *datadriven.TestData, _ cmdArgs) string { bytes, err := strconv.Atoi(d.Input) if err != nil { d.Fatalf(t, "error parsing pgwire bytes value: %v", err) @@ -529,7 +529,7 @@ func (ts *testState) pgwireEgress(t *testing.T, d *datadriven.TestData, args cmd // usage prints out the latest consumption. Callers are responsible for // triggering calls to the token bucket provider and waiting for responses. -func (ts *testState) usage(t *testing.T, d *datadriven.TestData, args cmdArgs) string { +func (ts *testState) usage(*testing.T, *datadriven.TestData, cmdArgs) string { c := ts.provider.consumption() return fmt.Sprintf(""+ "RU: %.2f\n"+ @@ -695,7 +695,7 @@ func (tp *testProvider) unblockRequest(t *testing.T) { // TokenBucket implements the kvtenant.TokenBucketProvider interface. func (tp *testProvider) TokenBucket( - ctx context.Context, in *roachpb.TokenBucketRequest, + _ context.Context, in *roachpb.TokenBucketRequest, ) (*roachpb.TokenBucketResponse, error) { tp.mu.Lock() defer tp.mu.Unlock() @@ -930,7 +930,7 @@ func TestSQLLivenessExemption(t *testing.T) { // Make the tenant heartbeat like crazy. ctx := context.Background() //slinstance.DefaultTTL.Override(ctx, &st.SV, 20*time.Millisecond) - slinstance.DefaultHeartBeat.Override(ctx, &st.SV, time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &st.SV, 50*time.Millisecond) _, tenantDB := serverutils.StartTenant(t, hostServer, base.TestTenantArgs{ TenantID: tenantID, @@ -960,7 +960,6 @@ func TestSQLLivenessExemption(t *testing.T) { // Verify that heartbeats can go through and update the expiration time. val := livenessValue() - time.Sleep(2 * time.Millisecond) testutils.SucceedsSoon( t, func() error { diff --git a/pkg/cli/declarative_corpus.go b/pkg/cli/declarative_corpus.go index faacaad74673..43ea4086d0e9 100644 --- a/pkg/cli/declarative_corpus.go +++ b/pkg/cli/declarative_corpus.go @@ -50,7 +50,7 @@ a given corpus file. return jobID }, } - _, err := scplan.MakePlan(*state, params) + _, err := scplan.MakePlan(cmd.Context(), *state, params) if err != nil { fmt.Printf("failed to validate %s with error %v\n", name, err) } else { diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 38e60ad50e3d..406def4debe2 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -496,6 +496,17 @@ func (ts *TestServer) TestingKnobs() *base.TestingKnobs { return nil } +// TestingKnobsForTenant returns a TestingKnobs that is used when starting Test +// Tenants. These knobs are a copy of the Server's TestingKnobs with any knobs +// that have been found to be problematic to share by default removed. +func (ts *TestServer) TestingKnobsForTenant() base.TestingKnobs { + knobs := *ts.TestingKnobs() + // TODO(ssd): We don't share the Server knobs the testcluster setup code + // installed an RPC listener that is inappropriate for the tenant. + knobs.Server = nil + return knobs +} + // TenantStatusServer returns the TenantStatusServer used by the TestServer. func (ts *TestServer) TenantStatusServer() interface{} { return ts.status @@ -546,10 +557,14 @@ func (ts *TestServer) maybeStartDefaultTestTenant(ctx context.Context) error { UseDatabase: ts.params.UseDatabase, SSLCertsDir: ts.params.SSLCertsDir, AllowSettingClusterSettings: true, + } + if ts.params.ShareMostTestingKnobsWithTenant { + params.TestingKnobs = ts.TestingKnobsForTenant() + } else { // These settings are inherited from the SQL server creation in // logicTest.newCluster, and are required to run the logic test suite // successfully. - TestingKnobs: base.TestingKnobs{ + params.TestingKnobs = base.TestingKnobs{ SQLExecutor: &sql.ExecutorTestingKnobs{ DeterministicExplain: true, UseTransactionalDescIDGenerator: useTransactionalDescIDGenerator, @@ -558,7 +573,7 @@ func (ts *TestServer) maybeStartDefaultTestTenant(ctx context.Context) error { AOSTClause: "AS OF SYSTEM TIME '-1us'", }, RangeFeed: ts.TestingKnobs().RangeFeed, - }, + } } tenant, err := ts.StartTenant(ctx, params) diff --git a/pkg/sql/catalog/descs/BUILD.bazel b/pkg/sql/catalog/descs/BUILD.bazel index 90e467143770..52d4a1e9d4ec 100644 --- a/pkg/sql/catalog/descs/BUILD.bazel +++ b/pkg/sql/catalog/descs/BUILD.bazel @@ -66,6 +66,7 @@ go_library( "//pkg/util/log", "//pkg/util/mon", "//pkg/util/retry", + "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", "@com_github_lib_pq//oid", diff --git a/pkg/sql/catalog/descs/hydrate.go b/pkg/sql/catalog/descs/hydrate.go index fbb43b8df048..fe2c91be07f8 100644 --- a/pkg/sql/catalog/descs/hydrate.go +++ b/pkg/sql/catalog/descs/hydrate.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/tracing" ) // hydrateDescriptors installs user defined type metadata in all types.T present @@ -151,6 +152,9 @@ func makeImmutableTypeLookupFunc( // HydrateCatalog installs type metadata in the type.T objects present for all // objects referencing them in the catalog. func HydrateCatalog(ctx context.Context, c nstree.MutableCatalog) error { + ctx, sp := tracing.ChildSpan(ctx, "descs.HydrateCatalog") + defer sp.Finish() + fakeLookupFunc := func(_ context.Context, id descpb.ID) (catalog.Descriptor, error) { return nil, catalog.WrapDescRefErr(id, catalog.ErrDescriptorNotFound) } diff --git a/pkg/sql/explain_ddl.go b/pkg/sql/explain_ddl.go index 58031d7145a2..01bd212606d0 100644 --- a/pkg/sql/explain_ddl.go +++ b/pkg/sql/explain_ddl.go @@ -67,15 +67,17 @@ func (n *explainDDLNode) startExec(params runParams) error { return explainNotPossibleError } } - return n.setExplainValues(scNode.plannedState) + return n.setExplainValues(params.ctx, scNode.plannedState) } -func (n *explainDDLNode) setExplainValues(scState scpb.CurrentState) (err error) { +func (n *explainDDLNode) setExplainValues( + ctx context.Context, scState scpb.CurrentState, +) (err error) { defer func() { err = errors.WithAssertionFailure(err) }() var p scplan.Plan - p, err = scplan.MakePlan(scState, scplan.Params{ + p, err = scplan.MakePlan(ctx, scState, scplan.Params{ ExecutionPhase: scop.StatementPhase, SchemaChangerJobIDSupplier: func() jobspb.JobID { return 1 }, }) diff --git a/pkg/sql/pg_catalog.go b/pkg/sql/pg_catalog.go index bc2d5472c2c5..c6196a496c45 100644 --- a/pkg/sql/pg_catalog.go +++ b/pkg/sql/pg_catalog.go @@ -1145,7 +1145,9 @@ func makeAllRelationsVirtualTableWithDescriptorIDIndex( h := makeOidHasher() scResolver := oneAtATimeSchemaResolver{p: p, ctx: ctx} sc, err := p.Descriptors().GetImmutableSchemaByID( - ctx, p.txn, table.GetParentSchemaID(), tree.SchemaLookupFlags{}) + ctx, p.txn, table.GetParentSchemaID(), tree.SchemaLookupFlags{ + Required: true, + }) if err != nil { return false, err } diff --git a/pkg/sql/resolver.go b/pkg/sql/resolver.go index 1da116738289..fbb0f018815e 100644 --- a/pkg/sql/resolver.go +++ b/pkg/sql/resolver.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/lib/pq/oid" ) @@ -989,6 +990,9 @@ type tableLookupFn = *internalLookupCtx func newInternalLookupCtxFromDescriptorProtos( ctx context.Context, rawDescs []descpb.Descriptor, ) (*internalLookupCtx, error) { + ctx, sp := tracing.ChildSpan(ctx, "sql.newInternalLookupCtxFromDescriptorProtos") + defer sp.Finish() + var c nstree.MutableCatalog for i := range rawDescs { desc := descbuilder.NewBuilder(&rawDescs[i]).BuildImmutable() diff --git a/pkg/sql/schemachanger/corpus/corpus_test.go b/pkg/sql/schemachanger/corpus/corpus_test.go index e825b371e7a3..52bb87de4e8c 100644 --- a/pkg/sql/schemachanger/corpus/corpus_test.go +++ b/pkg/sql/schemachanger/corpus/corpus_test.go @@ -11,6 +11,7 @@ package corpus_test import ( + "context" "flag" "testing" @@ -40,7 +41,7 @@ func TestValidateCorpuses(t *testing.T) { jobID := jobspb.InvalidJobID name, state := reader.GetCorpus(corpusIdx) t.Run(name, func(t *testing.T) { - _, err := scplan.MakePlan(*state, scplan.Params{ + _, err := scplan.MakePlan(context.Background(), *state, scplan.Params{ ExecutionPhase: scop.LatestPhase, InRollback: state.InRollback, SchemaChangerJobIDSupplier: func() jobspb.JobID { diff --git a/pkg/sql/schemachanger/scdeps/sctestutils/sctestutils.go b/pkg/sql/schemachanger/scdeps/sctestutils/sctestutils.go index ba70e010dab4..ec3a14e4cde8 100644 --- a/pkg/sql/schemachanger/scdeps/sctestutils/sctestutils.go +++ b/pkg/sql/schemachanger/scdeps/sctestutils/sctestutils.go @@ -168,7 +168,7 @@ func ProtoDiff(a, b protoutil.Message, args DiffArgs, rewrites func(interface{}) // MakePlan is a convenient alternative to calling scplan.MakePlan in tests. func MakePlan(t *testing.T, state scpb.CurrentState, phase scop.Phase) scplan.Plan { - plan, err := scplan.MakePlan(state, scplan.Params{ + plan, err := scplan.MakePlan(context.Background(), state, scplan.Params{ ExecutionPhase: phase, SchemaChangerJobIDSupplier: func() jobspb.JobID { return 1 }, }) diff --git a/pkg/sql/schemachanger/scplan/internal/opgen/op_gen.go b/pkg/sql/schemachanger/scplan/internal/opgen/op_gen.go index fc6663e016ed..e64906446e67 100644 --- a/pkg/sql/schemachanger/scplan/internal/opgen/op_gen.go +++ b/pkg/sql/schemachanger/scplan/internal/opgen/op_gen.go @@ -81,17 +81,19 @@ func IterateTransitions( // BuildGraph constructs a graph with operation edges populated from an initial // state. -func BuildGraph(cs scpb.CurrentState) (*scgraph.Graph, error) { - return opRegistry.buildGraph(cs) +func BuildGraph(ctx context.Context, cs scpb.CurrentState) (*scgraph.Graph, error) { + return opRegistry.buildGraph(ctx, cs) } -func (r *registry) buildGraph(cs scpb.CurrentState) (_ *scgraph.Graph, err error) { +func (r *registry) buildGraph( + ctx context.Context, cs scpb.CurrentState, +) (_ *scgraph.Graph, err error) { start := timeutil.Now() defer func() { - if err != nil || !log.V(2) { + if err != nil || !log.ExpensiveLogEnabled(ctx, 2) { return } - log.Infof(context.TODO(), "operation graph generation took %v", timeutil.Since(start)) + log.Infof(ctx, "operation graph generation took %v", timeutil.Since(start)) }() g, err := scgraph.New(cs) if err != nil { diff --git a/pkg/sql/schemachanger/scplan/internal/rules/registry.go b/pkg/sql/schemachanger/scplan/internal/rules/registry.go index ee77102b54ec..8ecae0757246 100644 --- a/pkg/sql/schemachanger/scplan/internal/rules/registry.go +++ b/pkg/sql/schemachanger/scplan/internal/rules/registry.go @@ -27,7 +27,7 @@ import ( // ApplyDepRules adds dependency edges to the graph according to the // registered dependency rules. -func ApplyDepRules(g *scgraph.Graph) error { +func ApplyDepRules(ctx context.Context, g *scgraph.Graph) error { for _, dr := range registry.depRules { start := timeutil.Now() var added int @@ -41,9 +41,15 @@ func ApplyDepRules(g *scgraph.Graph) error { }); err != nil { return errors.Wrapf(err, "applying dep rule %s", dr.name) } - if log.V(2) { + // Applying the dep rules can be slow in some cases. Check for + // cancellation when applying the rules to ensure we don't spin for + // too long while the user is waiting for the task to exit cleanly. + if ctx.Err() != nil { + return ctx.Err() + } + if log.ExpensiveLogEnabled(ctx, 2) { log.Infof( - context.TODO(), "applying dep rule %s %d took %v", + ctx, "applying dep rule %s %d took %v", dr.name, added, timeutil.Since(start), ) } @@ -53,7 +59,7 @@ func ApplyDepRules(g *scgraph.Graph) error { // ApplyOpRules marks op edges as no-op in a shallow copy of the graph according // to the registered rules. -func ApplyOpRules(g *scgraph.Graph) (*scgraph.Graph, error) { +func ApplyOpRules(ctx context.Context, g *scgraph.Graph) (*scgraph.Graph, error) { db := g.Database() m := make(map[*screl.Node][]scgraph.RuleName) for _, rule := range registry.opRules { @@ -68,9 +74,9 @@ func ApplyOpRules(g *scgraph.Graph) (*scgraph.Graph, error) { if err != nil { return nil, errors.Wrapf(err, "applying op rule %s", rule.name) } - if log.V(2) { + if log.ExpensiveLogEnabled(ctx, 2) { log.Infof( - context.TODO(), "applying op rule %s %d took %v", + ctx, "applying op rule %s %d took %v", rule.name, added, timeutil.Since(start), ) } diff --git a/pkg/sql/schemachanger/scplan/internal/scstage/build.go b/pkg/sql/schemachanger/scplan/internal/scstage/build.go index 6c1912504bf6..8c1594799be6 100644 --- a/pkg/sql/schemachanger/scplan/internal/scstage/build.go +++ b/pkg/sql/schemachanger/scplan/internal/scstage/build.go @@ -11,6 +11,7 @@ package scstage import ( + "context" "fmt" "sort" "strings" @@ -31,7 +32,11 @@ import ( // Note that the scJobIDSupplier function is idempotent, and must return the // same value for all calls. func BuildStages( - init scpb.CurrentState, phase scop.Phase, g *scgraph.Graph, scJobIDSupplier func() jobspb.JobID, + ctx context.Context, + init scpb.CurrentState, + phase scop.Phase, + g *scgraph.Graph, + scJobIDSupplier func() jobspb.JobID, ) []Stage { c := buildContext{ rollback: init.InRollback, diff --git a/pkg/sql/schemachanger/scplan/plan.go b/pkg/sql/schemachanger/scplan/plan.go index 8d65e4e72ceb..5818abe00927 100644 --- a/pkg/sql/schemachanger/scplan/plan.go +++ b/pkg/sql/schemachanger/scplan/plan.go @@ -74,19 +74,19 @@ func (p Plan) StagesForCurrentPhase() []scstage.Stage { // MakePlan generates a Plan for a particular phase of a schema change, given // the initial state for a set of targets. Returns an error when planning fails. -func MakePlan(initial scpb.CurrentState, params Params) (p Plan, err error) { +func MakePlan(ctx context.Context, initial scpb.CurrentState, params Params) (p Plan, err error) { p = Plan{ CurrentState: initial, Params: params, } - err = makePlan(&p) - if err != nil { + err = makePlan(ctx, &p) + if err != nil && ctx.Err() == nil { err = p.DecorateErrorWithPlanDetails(err) } return p, err } -func makePlan(p *Plan) (err error) { +func makePlan(ctx context.Context, p *Plan) (err error) { defer func() { if r := recover(); r != nil { rAsErr, ok := r.(error) @@ -99,18 +99,18 @@ func makePlan(p *Plan) (err error) { }() { start := timeutil.Now() - p.Graph = buildGraph(p.CurrentState) - if log.V(2) { - log.Infof(context.TODO(), "graph generation took %v", timeutil.Since(start)) + p.Graph = buildGraph(ctx, p.CurrentState) + if log.ExpensiveLogEnabled(ctx, 2) { + log.Infof(ctx, "graph generation took %v", timeutil.Since(start)) } } { start := timeutil.Now() p.Stages = scstage.BuildStages( - p.CurrentState, p.Params.ExecutionPhase, p.Graph, p.Params.SchemaChangerJobIDSupplier, + ctx, p.CurrentState, p.Params.ExecutionPhase, p.Graph, p.Params.SchemaChangerJobIDSupplier, ) - if log.V(2) { - log.Infof(context.TODO(), "stage generation took %v", timeutil.Since(start)) + if log.ExpensiveLogEnabled(ctx, 2) { + log.Infof(ctx, "stage generation took %v", timeutil.Since(start)) } } if n := len(p.Stages); n > 0 && p.Stages[n-1].Phase > scop.PreCommitPhase { @@ -123,12 +123,12 @@ func makePlan(p *Plan) (err error) { return nil } -func buildGraph(cs scpb.CurrentState) *scgraph.Graph { - g, err := opgen.BuildGraph(cs) +func buildGraph(ctx context.Context, cs scpb.CurrentState) *scgraph.Graph { + g, err := opgen.BuildGraph(ctx, cs) if err != nil { panic(errors.Wrapf(err, "build graph op edges")) } - err = rules.ApplyDepRules(g) + err = rules.ApplyDepRules(ctx, g) if err != nil { panic(errors.Wrapf(err, "build graph dep edges")) } @@ -136,7 +136,7 @@ func buildGraph(cs scpb.CurrentState) *scgraph.Graph { if err != nil { panic(errors.Wrapf(err, "validate graph")) } - g, err = rules.ApplyOpRules(g) + g, err = rules.ApplyOpRules(ctx, g) if err != nil { panic(errors.Wrapf(err, "mark op edges as no-op")) } diff --git a/pkg/sql/schemachanger/scrun/scrun.go b/pkg/sql/schemachanger/scrun/scrun.go index 98557d0ddd70..a4d30d3cdcec 100644 --- a/pkg/sql/schemachanger/scrun/scrun.go +++ b/pkg/sql/schemachanger/scrun/scrun.go @@ -65,7 +65,7 @@ func runTransactionPhase( if len(state.Current) == 0 { return scpb.CurrentState{}, jobspb.InvalidJobID, nil } - sc, err := scplan.MakePlan(state, scplan.Params{ + sc, err := scplan.MakePlan(ctx, state, scplan.Params{ ExecutionPhase: phase, SchemaChangerJobIDSupplier: deps.TransactionalJobRegistry().SchemaChangerJobID, }) @@ -112,7 +112,7 @@ func RunSchemaChangesInJob( } return errors.Wrapf(err, "failed to construct state for job %d", jobID) } - sc, err := scplan.MakePlan(state, scplan.Params{ + sc, err := scplan.MakePlan(ctx, state, scplan.Params{ ExecutionPhase: scop.PostCommitPhase, SchemaChangerJobIDSupplier: func() jobspb.JobID { return jobID }, }) diff --git a/pkg/sql/schemachanger/sctest/end_to_end.go b/pkg/sql/schemachanger/sctest/end_to_end.go index 34960dd4c08a..3d473dc14da0 100644 --- a/pkg/sql/schemachanger/sctest/end_to_end.go +++ b/pkg/sql/schemachanger/sctest/end_to_end.go @@ -243,7 +243,7 @@ func checkExplainDiagrams( params.InRollback = true params.ExecutionPhase = scop.PostCommitNonRevertiblePhase } - pl, err := scplan.MakePlan(state, params) + pl, err := scplan.MakePlan(context.Background(), state, params) require.NoErrorf(t, err, "%s: %s", fileNameSuffix, explainedStmt) action(explainDir, "ddl", pl.ExplainCompact) action(explainVerboseDir, "ddl, verbose", pl.ExplainVerbose) diff --git a/pkg/sql/show_create.go b/pkg/sql/show_create.go index bdf44a1b9c19..47de95eb1028 100644 --- a/pkg/sql/show_create.go +++ b/pkg/sql/show_create.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/tracing" ) type shouldOmitFKClausesFromCreate int @@ -72,6 +73,9 @@ func ShowCreateTable( lCtx simpleSchemaResolver, displayOptions ShowCreateDisplayOptions, ) (string, error) { + ctx, sp := tracing.ChildSpan(ctx, "sql.ShowCreateTable") + defer sp.Finish() + a := &tree.DatumAlloc{} f := p.ExtendedEvalContext().FmtCtx(tree.FmtSimple) @@ -221,6 +225,9 @@ func (p *planner) ShowCreate( desc catalog.TableDescriptor, displayOptions ShowCreateDisplayOptions, ) (string, error) { + ctx, sp := tracing.ChildSpan(ctx, "sql.ShowCreate") + defer sp.Finish() + var stmt string var err error tn := tree.MakeUnqualifiedTableName(tree.Name(desc.GetName())) diff --git a/pkg/sql/sqlliveness/slinstance/BUILD.bazel b/pkg/sql/sqlliveness/slinstance/BUILD.bazel index 522c3e91a3ab..70bec476289e 100644 --- a/pkg/sql/sqlliveness/slinstance/BUILD.bazel +++ b/pkg/sql/sqlliveness/slinstance/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/sqlliveness", + "//pkg/util/contextutil", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/log", @@ -18,6 +19,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/uuid", + "@com_github_cockroachdb_errors//:errors", ], ) @@ -35,6 +37,7 @@ go_test( "//pkg/settings/cluster", "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/slstorage", + "//pkg/testutils", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index d76e8ca5a1c1..105e3549278d 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" ) var ( @@ -151,6 +153,14 @@ func (l *Instance) setSession(s *session) { } func (l *Instance) clearSession(ctx context.Context) { + l.checkExpiry(ctx) + l.mu.Lock() + defer l.mu.Unlock() + l.mu.s = nil + l.mu.blockCh = make(chan struct{}) +} + +func (l *Instance) checkExpiry(ctx context.Context) { l.mu.Lock() defer l.mu.Unlock() if expiration := l.mu.s.Expiration(); expiration.Less(l.clock.Now()) { @@ -158,8 +168,6 @@ func (l *Instance) clearSession(ctx context.Context) { // associated with the session. l.mu.s.invokeSessionExpiryCallbacks(ctx) } - l.mu.s = nil - l.mu.blockCh = make(chan struct{}) } // createSession tries until it can create a new session and returns an error @@ -253,8 +261,13 @@ func (l *Instance) heartbeatLoop(ctx context.Context) { t.Read = true s, _ := l.getSessionOrBlockCh() if s == nil { - newSession, err := l.createSession(ctx) - if err != nil { + var newSession *session + if err := contextutil.RunWithTimeout(ctx, "sqlliveness create session", l.hb(), func(ctx context.Context) error { + var err error + newSession, err = l.createSession(ctx) + return err + }); err != nil { + log.Errorf(ctx, "sqlliveness failed to create new session: %v", err) func() { l.mu.Lock() defer l.mu.Unlock() @@ -270,21 +283,37 @@ func (l *Instance) heartbeatLoop(ctx context.Context) { t.Reset(l.hb()) continue } - found, err := l.extendSession(ctx, s) - if err != nil { + var found bool + err := contextutil.RunWithTimeout(ctx, "sqlliveness extend session", l.hb(), func(ctx context.Context) error { + var err error + found, err = l.extendSession(ctx, s) + return err + }) + switch { + case errors.HasType(err, (*contextutil.TimeoutError)(nil)): + // Retry without clearing the session because we don't know the current status. + l.checkExpiry(ctx) + t.Reset(0) + continue + case err != nil && ctx.Err() == nil: + log.Errorf(ctx, "sqlliveness failed to extend session: %v", err) + fallthrough + case err != nil: + // TODO(ajwerner): Decide whether we actually should exit the heartbeat loop here if the context is not + // canceled. Consider the case of an ambiguous result error: shouldn't we try again? l.clearSession(ctx) return - } - if !found { + case !found: + // No existing session found, immediately create one. l.clearSession(ctx) // Start next loop iteration immediately to insert a new session. t.Reset(0) - continue - } - if log.V(2) { - log.Infof(ctx, "extended SQL liveness session %s", s.ID()) + default: + if log.V(2) { + log.Infof(ctx, "extended SQL liveness session %s", s.ID()) + } + t.Reset(l.hb()) } - t.Reset(l.hb()) } } } diff --git a/pkg/sql/sqlliveness/slinstance/slinstance_test.go b/pkg/sql/sqlliveness/slinstance/slinstance_test.go index 75b069bab914..c0283c41ea38 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance_test.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance_test.go @@ -12,6 +12,7 @@ package slinstance_test import ( "context" + "sync/atomic" "testing" "time" @@ -20,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -40,8 +42,8 @@ func TestSQLInstance(t *testing.T) { clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion, true /* initializeVersion */) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) fakeStorage := slstorage.NewFakeStorage() sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) @@ -91,3 +93,113 @@ func TestSQLInstance(t *testing.T) { _, err = sqlInstance.Session(ctx) require.Error(t, err) } + +// TestSQLInstanceDeadlines tests that we have proper deadlines set on the +// create and extend session operations. This is done by blocking the fake +// storage layer and ensuring that no sessions get created because the +// timeouts are constantly triggered. +func TestSQLInstanceDeadlines(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, stopper := context.Background(), stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */) + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + true /* initializeVersion */) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + + fakeStorage := slstorage.NewFakeStorage() + // block the fake storage + fakeStorage.SetBlockCh() + cleanUpFunc := func() { + fakeStorage.CloseBlockCh() + } + defer cleanUpFunc() + + sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) + sqlInstance.Start(ctx) + + // verify that we do not create a session + require.Never( + t, + func() bool { + _, err := sqlInstance.Session(ctx) + return err == nil + }, + 100*time.Millisecond, 10*time.Millisecond, + ) +} + +// TestSQLInstanceDeadlinesExtend tests that we have proper deadlines set on the +// create and extend session operations. This tests the case where the session is +// successfully created first and then blocks indefinitely. +func TestSQLInstanceDeadlinesExtend(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, stopper := context.Background(), stop.NewStopper() + defer stopper.Stop(ctx) + + mt := timeutil.NewManualTime(timeutil.Unix(0, 42)) + clock := hlc.NewClock(mt, time.Nanosecond /* maxOffset */) + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + true /* initializeVersion */) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + // Must be shorter than the storage sleep amount below + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + + fakeStorage := slstorage.NewFakeStorage() + sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) + sqlInstance.Start(ctx) + + // verify that eventually session is created successfully + testutils.SucceedsSoon( + t, + func() error { + _, err := sqlInstance.Session(ctx) + return err + }, + ) + + // verify that session is also extended successfully a few times + require.Never( + t, + func() bool { + _, err := sqlInstance.Session(ctx) + return err != nil + }, + 100*time.Millisecond, 10*time.Millisecond, + ) + + // register a callback for verification that this session expired + var sessionExpired atomic.Bool + s, _ := sqlInstance.Session(ctx) + s.RegisterCallbackForSessionExpiry(func(ctx context.Context) { + sessionExpired.Store(true) + }) + + // block the fake storage + fakeStorage.SetBlockCh() + cleanUpFunc := func() { + fakeStorage.CloseBlockCh() + } + defer cleanUpFunc() + // advance manual clock so that session expires + mt.Advance(20 * time.Millisecond) + + // expect session to expire + require.Eventually( + t, + func() bool { + return sessionExpired.Load() + }, + testutils.DefaultSucceedsSoonDuration, 10*time.Millisecond, + ) +} diff --git a/pkg/sql/sqlliveness/slstorage/test_helpers.go b/pkg/sql/sqlliveness/slstorage/test_helpers.go index 23eb1115f217..91952ff538d1 100644 --- a/pkg/sql/sqlliveness/slstorage/test_helpers.go +++ b/pkg/sql/sqlliveness/slstorage/test_helpers.go @@ -24,6 +24,7 @@ type FakeStorage struct { mu struct { syncutil.Mutex sessions map[sqlliveness.SessionID]hlc.Timestamp + blockCh chan struct{} } } @@ -46,8 +47,16 @@ func (s *FakeStorage) IsAlive( // Insert implements the sqlliveness.Storage interface. func (s *FakeStorage) Insert( - _ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, + ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, ) error { + if ch := s.getBlockCh(); ch != nil { + select { + case <-ch: + break + case <-ctx.Done(): + return ctx.Err() + } + } s.mu.Lock() defer s.mu.Unlock() if _, ok := s.mu.sessions[sid]; ok { @@ -59,8 +68,16 @@ func (s *FakeStorage) Insert( // Update implements the sqlliveness.Storage interface. func (s *FakeStorage) Update( - _ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, + ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, ) (bool, error) { + if ch := s.getBlockCh(); ch != nil { + select { + case <-ch: + break + case <-ctx.Done(): + return false, ctx.Err() + } + } s.mu.Lock() defer s.mu.Unlock() if _, ok := s.mu.sessions[sid]; !ok { @@ -77,3 +94,23 @@ func (s *FakeStorage) Delete(_ context.Context, sid sqlliveness.SessionID) error delete(s.mu.sessions, sid) return nil } + +// SetBlockCh is used to block the storage for testing purposes +func (s *FakeStorage) SetBlockCh() { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.blockCh = make(chan struct{}) +} + +// CloseBlockCh is used to unblock the storage for testing purposes +func (s *FakeStorage) CloseBlockCh() { + s.mu.Lock() + defer s.mu.Unlock() + close(s.mu.blockCh) +} + +func (s *FakeStorage) getBlockCh() chan struct{} { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.blockCh +} diff --git a/vendor b/vendor index f8334dab69a2..ade26f46dbe0 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit f8334dab69a264f4e29197812c0ae55e37455d01 +Subproject commit ade26f46dbe04df5767405bf0fa2f7c64e8352e4