From 24a8e5aa10f386ae41a698147ceae4b3d8bc1aab Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 31 Oct 2022 21:48:53 -0400 Subject: [PATCH 1/7] streamccl,sql: moving StreamIngestionManager and ReplicationStreamManager to eval This commit: 1. moves the definition of StreamIngestionManager and ReplicationStreamManager to eval; 2. has planner implements functions in StreamIngestionManager and ReplicationStreamManager, so that they won't take eval.Context and evalCtx.Txn as parameters. Release note: None --- docs/generated/redact_safe.md | 2 +- .../streamclient/partitioned_stream_client.go | 2 +- .../streamingest/stream_ingest_manager.go | 23 +++-- pkg/ccl/streamingccl/streampb/BUILD.bazel | 5 +- pkg/ccl/streamingccl/streampb/streamid.go | 18 ++++ .../streamproducer/event_stream.go | 5 +- .../streamproducer/replication_manager.go | 42 ++++---- .../replication_manager_test.go | 5 +- .../streamproducer/stream_lifetime.go | 23 +++-- pkg/sql/BUILD.bazel | 1 + pkg/sql/apply_join.go | 1 + pkg/sql/colexec/colexecbase/cast_test.go | 1 + pkg/sql/colexec/colexecbase/main_test.go | 1 + pkg/sql/conn_executor.go | 1 + pkg/sql/distsql/server.go | 1 + pkg/sql/faketreeeval/evalctx.go | 18 ++++ pkg/sql/opt/memo/memo_test.go | 1 + pkg/sql/planner.go | 14 +++ pkg/sql/schema_changer.go | 33 ++++--- .../scbuild/tree_context_builder.go | 23 ++--- pkg/sql/sem/builtins/BUILD.bazel | 2 +- pkg/sql/sem/builtins/replication_builtins.go | 37 ++++--- pkg/sql/sem/eval/BUILD.bazel | 2 + pkg/sql/sem/eval/cast_map_test.go | 1 + pkg/sql/sem/eval/context.go | 75 ++++++++++++++ pkg/streaming/BUILD.bazel | 2 - pkg/streaming/api.go | 98 ++----------------- 27 files changed, 243 insertions(+), 194 deletions(-) create mode 100644 pkg/ccl/streamingccl/streampb/streamid.go diff --git a/docs/generated/redact_safe.md b/docs/generated/redact_safe.md index 6f0aa1e545a1..d65990bd80ef 100644 --- a/docs/generated/redact_safe.md +++ b/docs/generated/redact_safe.md @@ -7,6 +7,7 @@ pkg/base/node_id.go | `*SQLIDContainer` pkg/base/node_id.go | `*StoreIDContainer` pkg/ccl/backupccl/backuppb/backup.go | `sz` pkg/ccl/backupccl/backuppb/backup.go | `timing` +pkg/ccl/streamingccl/streampb/streamid.go | `StreamID` pkg/cli/exit/exit.go | `Code` pkg/jobs/jobspb/wrap.go | `Type` pkg/kv/bulk/bulk_metrics.go | `sz` @@ -51,7 +52,6 @@ pkg/storage/enginepb/mvcc.go | `TxnEpoch` pkg/storage/enginepb/mvcc.go | `TxnSeq` pkg/storage/enginepb/mvcc3.go | `*MVCCStats` pkg/storage/enginepb/mvcc3.go | `MVCCStatsDelta` -pkg/streaming/api.go | `StreamID` pkg/util/hlc/timestamp.go | `ClockTimestamp` pkg/util/hlc/timestamp.go | `LegacyTimestamp` pkg/util/hlc/timestamp.go | `Timestamp` diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index 7337b8943c4e..44cddf7e9ddc 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -75,7 +75,7 @@ func (p *partitionedStreamClient) Create( row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64()) err := row.Scan(&streamID) if err != nil { - return streaming.InvalidStreamID, + return streampb.InvalidStreamID, errors.Wrapf(err, "error creating replication stream for tenant %s", tenantID.String()) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go index 9ed03a234d4d..db7bdcc457c0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go @@ -23,29 +23,28 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) -type streamIngestManagerImpl struct{} +type streamIngestManagerImpl struct { + evalCtx *eval.Context + txn *kv.Txn +} // CompleteStreamIngestion implements streaming.StreamIngestManager interface. func (r *streamIngestManagerImpl) CompleteStreamIngestion( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - ingestionJobID jobspb.JobID, - cutoverTimestamp hlc.Timestamp, + ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp, ) error { - return completeStreamIngestion(ctx, evalCtx, txn, ingestionJobID, cutoverTimestamp) + return completeStreamIngestion(ctx, r.evalCtx, r.txn, ingestionJobID, cutoverTimestamp) } // GetStreamIngestionStats implements streaming.StreamIngestManager interface. func (r *streamIngestManagerImpl) GetStreamIngestionStats( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID, + ctx context.Context, ingestionJobID jobspb.JobID, ) (*streampb.StreamIngestionStats, error) { - return getStreamIngestionStats(ctx, evalCtx, txn, ingestionJobID) + return getStreamIngestionStats(ctx, r.evalCtx, r.txn, ingestionJobID) } func newStreamIngestManagerWithPrivilegesCheck( - ctx context.Context, evalCtx *eval.Context, -) (streaming.StreamIngestManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.StreamIngestManager, error) { isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx) if err != nil { return nil, err @@ -64,7 +63,7 @@ func newStreamIngestManagerWithPrivilegesCheck( pgcode.InsufficientPrivilege, "replication requires enterprise license") } - return &streamIngestManagerImpl{}, nil + return &streamIngestManagerImpl{evalCtx: evalCtx, txn: txn}, nil } func init() { diff --git a/pkg/ccl/streamingccl/streampb/BUILD.bazel b/pkg/ccl/streamingccl/streampb/BUILD.bazel index fcf080739dc9..7ed7afceeada 100644 --- a/pkg/ccl/streamingccl/streampb/BUILD.bazel +++ b/pkg/ccl/streamingccl/streampb/BUILD.bazel @@ -35,7 +35,10 @@ go_proto_library( go_library( name = "streampb", - srcs = ["empty.go"], + srcs = [ + "empty.go", + "streamid.go", + ], embed = [":streampb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb", visibility = ["//visibility:public"], diff --git a/pkg/ccl/streamingccl/streampb/streamid.go b/pkg/ccl/streamingccl/streampb/streamid.go new file mode 100644 index 000000000000..4521d2b7400d --- /dev/null +++ b/pkg/ccl/streamingccl/streampb/streamid.go @@ -0,0 +1,18 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streampb + +// StreamID is the ID of a replication stream. +type StreamID int64 + +// SafeValue implements the redact.SafeValue interface. +func (j StreamID) SafeValue() {} + +// InvalidStreamID is the zero value for StreamID corresponding to no stream. +const InvalidStreamID StreamID = 0 diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index a6aaf392b069..6d0086cfe73a 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -38,7 +37,7 @@ import ( ) type eventStream struct { - streamID streaming.StreamID + streamID streampb.StreamID execCfg *sql.ExecutorConfig spec streampb.StreamPartitionSpec subscribedSpans roachpb.SpanGroup @@ -515,7 +514,7 @@ func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) { } func streamPartition( - evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte, + evalCtx *eval.Context, streamID streampb.StreamID, opaqueSpec []byte, ) (eval.ValueGenerator, error) { if !evalCtx.SessionData().AvoidBuffering { return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option") diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager.go b/pkg/ccl/streamingccl/streamproducer/replication_manager.go index d7f156eb39b7..be8f04a7cfb8 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager.go @@ -22,54 +22,49 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) -type replicationStreamManagerImpl struct{} +type replicationStreamManagerImpl struct { + evalCtx *eval.Context + txn *kv.Txn +} // StartReplicationStream implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) StartReplicationStream( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64, -) (streaming.StreamID, error) { - return startReplicationStreamJob(ctx, evalCtx, txn, tenantID) + ctx context.Context, tenantID uint64, +) (streampb.StreamID, error) { + return startReplicationStreamJob(ctx, r.evalCtx, r.txn, tenantID) } // HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) HeartbeatReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - streamID streaming.StreamID, - frontier hlc.Timestamp, - txn *kv.Txn, + ctx context.Context, streamID streampb.StreamID, frontier hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { - return heartbeatReplicationStream(ctx, evalCtx, streamID, frontier, txn) + return heartbeatReplicationStream(ctx, r.evalCtx, r.txn, streamID, frontier) } // StreamPartition implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) StreamPartition( - evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte, + streamID streampb.StreamID, opaqueSpec []byte, ) (eval.ValueGenerator, error) { - return streamPartition(evalCtx, streamID, opaqueSpec) + return streamPartition(r.evalCtx, streamID, opaqueSpec) } // GetReplicationStreamSpec implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) GetReplicationStreamSpec( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, + ctx context.Context, streamID streampb.StreamID, ) (*streampb.ReplicationStreamSpec, error) { - return getReplicationStreamSpec(ctx, evalCtx, txn, streamID) + return getReplicationStreamSpec(ctx, r.evalCtx, streamID) } // CompleteReplicationStream implements ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) CompleteReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - streamID streaming.StreamID, - successfulIngestion bool, + ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, ) error { - return completeReplicationStream(ctx, evalCtx, txn, streamID, successfulIngestion) + return completeReplicationStream(ctx, r.evalCtx, r.txn, streamID, successfulIngestion) } func newReplicationStreamManagerWithPrivilegesCheck( - ctx context.Context, evalCtx *eval.Context, -) (streaming.ReplicationStreamManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.ReplicationStreamManager, error) { isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx) if err != nil { return nil, err @@ -81,6 +76,7 @@ func newReplicationStreamManagerWithPrivilegesCheck( } execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) + enterpriseCheckErr := utilccl.CheckEnterpriseEnabled( execCfg.Settings, execCfg.NodeInfo.LogicalClusterID(), execCfg.Organization(), "REPLICATION") if enterpriseCheckErr != nil { @@ -88,7 +84,7 @@ func newReplicationStreamManagerWithPrivilegesCheck( pgcode.InsufficientPrivilege, "replication requires enterprise license") } - return &replicationStreamManagerImpl{}, nil + return &replicationStreamManagerImpl{evalCtx: evalCtx, txn: txn}, nil } func init() { diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go b/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go index d893ed2e0dde..2090ae5cdc1c 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -44,7 +43,7 @@ func TestReplicationManagerRequiresAdminRole(t *testing.T) { require.NoError(t, protoutil.Unmarshal(sessionSerialized, &sessionData)) } - getManagerForUser := func(u string) (streaming.ReplicationStreamManager, error) { + getManagerForUser := func(u string) (eval.ReplicationStreamManager, error) { sqlUser, err := username.MakeSQLUsernameFromUserInput(u, username.PurposeValidation) require.NoError(t, err) execCfg := s.ExecutorConfig().(sql.ExecutorConfig) @@ -52,7 +51,7 @@ func TestReplicationManagerRequiresAdminRole(t *testing.T) { p, cleanup := sql.NewInternalPlanner("test", txn, sqlUser, &sql.MemoryMetrics{}, &execCfg, sessionData) defer cleanup() ec := p.(interface{ EvalContext() *eval.Context }).EvalContext() - return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec) + return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec, txn) } for _, tc := range []struct { diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 357f9b9253cb..652de91ec09f 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -36,16 +35,16 @@ import ( // 2. TODO(casper): Updates the protected timestamp for spans being replicated func startReplicationStreamJob( ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64, -) (streaming.StreamID, error) { +) (streampb.StreamID, error) { execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) hasAdminRole, err := evalCtx.SessionAccessor.HasAdminRole(ctx) if err != nil { - return streaming.InvalidStreamID, err + return streampb.InvalidStreamID, err } if !hasAdminRole { - return streaming.InvalidStreamID, errors.New("admin role required to start stream replication jobs") + return streampb.InvalidStreamID, errors.New("admin role required to start stream replication jobs") } registry := execConfig.JobRegistry @@ -53,7 +52,7 @@ func startReplicationStreamJob( ptsID := uuid.MakeV4() jr := makeProducerJobRecord(registry, tenantID, timeout, evalCtx.SessionData().User(), ptsID) if _, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil { - return streaming.InvalidStreamID, err + return streampb.InvalidStreamID, err } ptp := execConfig.ProtectedTimestampProvider @@ -68,9 +67,9 @@ func startReplicationStreamJob( deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect) if err := ptp.Protect(ctx, txn, pts); err != nil { - return streaming.InvalidStreamID, err + return streampb.InvalidStreamID, err } - return streaming.StreamID(jr.JobID), nil + return streampb.StreamID(jr.JobID), nil } // Convert the producer job's status into corresponding replication @@ -99,7 +98,7 @@ func updateReplicationStreamProgress( expiration time.Time, ptsProvider protectedts.Provider, registry *jobs.Registry, - streamID streaming.StreamID, + streamID streampb.StreamID, consumedTime hlc.Timestamp, txn *kv.Txn, ) (status streampb.StreamReplicationStatus, err error) { @@ -157,9 +156,9 @@ func updateReplicationStreamProgress( func heartbeatReplicationStream( ctx context.Context, evalCtx *eval.Context, - streamID streaming.StreamID, - frontier hlc.Timestamp, txn *kv.Txn, + streamID streampb.StreamID, + frontier hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV) @@ -198,7 +197,7 @@ func heartbeatReplicationStream( // getReplicationStreamSpec gets a replication stream specification for the specified stream. func getReplicationStreamSpec( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, + ctx context.Context, evalCtx *eval.Context, streamID streampb.StreamID, ) (*streampb.ReplicationStreamSpec, error) { jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext) // Returns error if the replication stream is not active @@ -257,7 +256,7 @@ func completeReplicationStream( ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, - streamID streaming.StreamID, + streamID streampb.StreamID, successfulIngestion bool, ) error { registry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 3dc104bc4cc0..4f4e883f9f6b 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -457,6 +457,7 @@ go_library( "//pkg/sql/vtable", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/streaming", "//pkg/testutils/serverutils", "//pkg/upgrade", "//pkg/util", diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index ebedc697f156..4d648255142b 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -328,6 +328,7 @@ func runPlanInsidePlan( ctx, evalCtx, &plannerCopy, params.p.txn, distributeType) planCtx.planner.curPlan.planComponents = *plan planCtx.ExtendedEvalCtx.Planner = &plannerCopy + planCtx.ExtendedEvalCtx.StreamManagerFactory = &plannerCopy planCtx.stmtType = recv.stmtType params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun( diff --git a/pkg/sql/colexec/colexecbase/cast_test.go b/pkg/sql/colexec/colexecbase/cast_test.go index 85f1448ad660..deda61f49bd6 100644 --- a/pkg/sql/colexec/colexecbase/cast_test.go +++ b/pkg/sql/colexec/colexecbase/cast_test.go @@ -41,6 +41,7 @@ func TestRandomizedCast(t *testing.T) { evalCtx := eval.MakeTestingEvalContext(st) defer evalCtx.Stop(ctx) evalCtx.Planner = &faketreeeval.DummyEvalPlanner{} + evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{} rng, _ := randutil.NewTestRand() getValidSupportedCast := func() (from, to *types.T) { diff --git a/pkg/sql/colexec/colexecbase/main_test.go b/pkg/sql/colexec/colexecbase/main_test.go index 61be63cc423c..e44b9d36ffe8 100644 --- a/pkg/sql/colexec/colexecbase/main_test.go +++ b/pkg/sql/colexec/colexecbase/main_test.go @@ -58,6 +58,7 @@ func TestMain(m *testing.M) { testMemAcc = &memAcc evalCtx := eval.MakeTestingEvalContext(st) evalCtx.Planner = &faketreeeval.DummyEvalPlanner{} + evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{} testColumnFactory = coldataext.NewExtendedColumnFactory(&evalCtx) testAllocator = colmem.NewAllocator(ctx, testMemAcc, testColumnFactory) defer testMemAcc.Close(ctx) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 93daf681086a..f35f09d7731c 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2763,6 +2763,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo *evalCtx = extendedEvalContext{ Context: eval.Context{ Planner: p, + StreamManagerFactory: p, PrivilegedAccessor: p, SessionAccessor: p, JobExecContext: p, diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 817a88703e88..0e0d49ded9c1 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -356,6 +356,7 @@ func (ds *ServerImpl) setupFlow( Locality: ds.ServerConfig.Locality, Tracer: ds.ServerConfig.Tracer, Planner: &faketreeeval.DummyEvalPlanner{Monitor: monitor}, + StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{}, PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, SessionAccessor: &faketreeeval.DummySessionAccessor{}, ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{}, diff --git a/pkg/sql/faketreeeval/evalctx.go b/pkg/sql/faketreeeval/evalctx.go index d98a45bb6f83..aa84ef39ceb6 100644 --- a/pkg/sql/faketreeeval/evalctx.go +++ b/pkg/sql/faketreeeval/evalctx.go @@ -156,6 +156,24 @@ func (so *DummyRegionOperator) ResetMultiRegionZoneConfigsForDatabase( return errors.WithStack(errRegionOperator) } +// DummyStreamManagerFactory implements the eval.StreamManagerFactory interface by +// returning errors. +type DummyStreamManagerFactory struct{} + +// GetReplicationStreamManager implements the eval.StreamManagerFactory interface. +func (smf *DummyStreamManagerFactory) GetReplicationStreamManager( + ctx context.Context, +) (eval.ReplicationStreamManager, error) { + return nil, errors.WithStack(errors.New("Stream manager factory not implemented")) +} + +// GetStreamIngestManager implements the eval.StreamManagerFactory interface. +func (smf *DummyStreamManagerFactory) GetStreamIngestManager( + ctx context.Context, +) (eval.StreamIngestManager, error) { + return nil, errors.WithStack(errors.New("Stream manager factory not implemented")) +} + // DummyEvalPlanner implements the eval.Planner interface by returning // errors. type DummyEvalPlanner struct { diff --git a/pkg/sql/opt/memo/memo_test.go b/pkg/sql/opt/memo/memo_test.go index 2d577d74c012..3b5cb8bf5b42 100644 --- a/pkg/sql/opt/memo/memo_test.go +++ b/pkg/sql/opt/memo/memo_test.go @@ -152,6 +152,7 @@ func TestMemoIsStale(t *testing.T) { // which can handle a case of nil planner but cannot a case when the // planner's GetMultiregionConfig is nil, so we nil out the planner. evalCtx.Planner = nil + evalCtx.StreamManagerFactory = nil var o xform.Optimizer opttestutils.BuildQuery(t, &o, catalog, &evalCtx, "SELECT a, b+1 FROM abcview WHERE c='foo'") diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 3e8d8964ad0f..89a1632ece9e 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -380,6 +381,7 @@ func newInternalPlanner( p.extendedEvalCtx = internalExtendedEvalCtx(ctx, sds, params.collection, txn, ts, ts, execCfg) p.extendedEvalCtx.Planner = p + p.extendedEvalCtx.StreamManagerFactory = p p.extendedEvalCtx.PrivilegedAccessor = p p.extendedEvalCtx.SessionAccessor = p p.extendedEvalCtx.ClientNoticeSender = p @@ -828,3 +830,15 @@ func (p *planner) resetPlanner( p.skipDescriptorCache = false p.typeResolutionDbID = descpb.InvalidID } + +// GetReplicationStreamManager returns a ReplicationStreamManager. +func (p *planner) GetReplicationStreamManager( + ctx context.Context, +) (eval.ReplicationStreamManager, error) { + return streaming.GetReplicationStreamManager(ctx, p.EvalContext(), p.Txn()) +} + +// GetStreamIngestManager returns a StreamIngestManager. +func (p *planner) GetStreamIngestManager(ctx context.Context) (eval.StreamIngestManager, error) { + return streaming.GetStreamIngestManager(ctx, p.EvalContext(), p.Txn()) +} diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 51c8531be7e0..5863cc4b82cb 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2501,22 +2501,23 @@ func createSchemaChangeEvalCtx( ExecCfg: execCfg, Descs: descriptors, Context: eval.Context{ - SessionDataStack: sessiondata.NewStack(sd), - Planner: &faketreeeval.DummyEvalPlanner{}, - PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, - SessionAccessor: &faketreeeval.DummySessionAccessor{}, - ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{}, - Sequence: &faketreeeval.DummySequenceOperators{}, - Tenant: &faketreeeval.DummyTenantOperator{}, - Regions: &faketreeeval.DummyRegionOperator{}, - Settings: execCfg.Settings, - TestingKnobs: execCfg.EvalContextTestingKnobs, - ClusterID: execCfg.NodeInfo.LogicalClusterID(), - ClusterName: execCfg.RPCContext.ClusterName(), - NodeID: execCfg.NodeInfo.NodeID, - Codec: execCfg.Codec, - Locality: execCfg.Locality, - Tracer: execCfg.AmbientCtx.Tracer, + SessionDataStack: sessiondata.NewStack(sd), + Planner: &faketreeeval.DummyEvalPlanner{}, + StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{}, + PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, + SessionAccessor: &faketreeeval.DummySessionAccessor{}, + ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{}, + Sequence: &faketreeeval.DummySequenceOperators{}, + Tenant: &faketreeeval.DummyTenantOperator{}, + Regions: &faketreeeval.DummyRegionOperator{}, + Settings: execCfg.Settings, + TestingKnobs: execCfg.EvalContextTestingKnobs, + ClusterID: execCfg.NodeInfo.LogicalClusterID(), + ClusterName: execCfg.RPCContext.ClusterName(), + NodeID: execCfg.NodeInfo.NodeID, + Codec: execCfg.Codec, + Locality: execCfg.Locality, + Tracer: execCfg.AmbientCtx.Tracer, }, } // TODO(andrei): This is wrong (just like on the main code path on diff --git a/pkg/sql/schemachanger/scbuild/tree_context_builder.go b/pkg/sql/schemachanger/scbuild/tree_context_builder.go index 769110d4f742..8cf9cdafd886 100644 --- a/pkg/sql/schemachanger/scbuild/tree_context_builder.go +++ b/pkg/sql/schemachanger/scbuild/tree_context_builder.go @@ -46,17 +46,18 @@ func (b buildCtx) EvalCtx() *eval.Context { func newEvalCtx(ctx context.Context, d Dependencies) *eval.Context { evalCtx := &eval.Context{ - ClusterID: d.ClusterID(), - SessionDataStack: sessiondata.NewStack(d.SessionData()), - Planner: &faketreeeval.DummyEvalPlanner{}, - PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, - SessionAccessor: &faketreeeval.DummySessionAccessor{}, - ClientNoticeSender: d.ClientNoticeSender(), - Sequence: &faketreeeval.DummySequenceOperators{}, - Tenant: &faketreeeval.DummyTenantOperator{}, - Regions: &faketreeeval.DummyRegionOperator{}, - Settings: d.ClusterSettings(), - Codec: d.Codec(), + ClusterID: d.ClusterID(), + SessionDataStack: sessiondata.NewStack(d.SessionData()), + Planner: &faketreeeval.DummyEvalPlanner{}, + StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{}, + PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, + SessionAccessor: &faketreeeval.DummySessionAccessor{}, + ClientNoticeSender: d.ClientNoticeSender(), + Sequence: &faketreeeval.DummySequenceOperators{}, + Tenant: &faketreeeval.DummyTenantOperator{}, + Regions: &faketreeeval.DummyRegionOperator{}, + Settings: d.ClusterSettings(), + Codec: d.Codec(), } evalCtx.SetDeprecatedContext(ctx) return evalCtx diff --git a/pkg/sql/sem/builtins/BUILD.bazel b/pkg/sql/sem/builtins/BUILD.bazel index 2c19204b3464..e864ee6822ee 100644 --- a/pkg/sql/sem/builtins/BUILD.bazel +++ b/pkg/sql/sem/builtins/BUILD.bazel @@ -30,6 +30,7 @@ go_library( deps = [ "//pkg/base", "//pkg/build", + "//pkg/ccl/streamingccl/streampb", "//pkg/clusterversion", "//pkg/config/zonepb", "//pkg/geo", @@ -86,7 +87,6 @@ go_library( "//pkg/sql/storageparam", "//pkg/sql/storageparam/indexstorageparam", "//pkg/sql/types", - "//pkg/streaming", "//pkg/util", "//pkg/util/arith", "//pkg/util/bitarray", diff --git a/pkg/sql/sem/builtins/replication_builtins.go b/pkg/sql/sem/builtins/replication_builtins.go index f01a2a99b42f..346d2b0596d8 100644 --- a/pkg/sql/sem/builtins/replication_builtins.go +++ b/pkg/sql/sem/builtins/replication_builtins.go @@ -14,13 +14,13 @@ import ( "context" gojson "encoding/json" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" @@ -50,7 +50,7 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Int), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetStreamIngestManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetStreamIngestManager(ctx) if err != nil { return nil, err } @@ -58,7 +58,7 @@ var replicationBuiltins = map[string]builtinDefinition{ ingestionJobID := jobspb.JobID(*args[0].(*tree.DInt)) cutoverTime := args[1].(*tree.DTimestampTZ).Time cutoverTimestamp := hlc.Timestamp{WallTime: cutoverTime.UnixNano()} - err = mgr.CompleteStreamIngestion(ctx, evalCtx, evalCtx.Txn, ingestionJobID, cutoverTimestamp) + err = mgr.CompleteStreamIngestion(ctx, ingestionJobID, cutoverTimestamp) if err != nil { return nil, err } @@ -91,12 +91,12 @@ var replicationBuiltins = map[string]builtinDefinition{ if args[0] == tree.DNull { return tree.DNull, errors.New("job_id cannot be specified with null argument") } - mgr, err := streaming.GetStreamIngestManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetStreamIngestManager(ctx) if err != nil { return nil, err } ingestionJobID := int64(tree.MustBeDInt(args[0])) - stats, err := mgr.GetStreamIngestionStats(ctx, evalCtx, evalCtx.Txn, jobspb.JobID(ingestionJobID)) + stats, err := mgr.GetStreamIngestionStats(ctx, jobspb.JobID(ingestionJobID)) if err != nil { return nil, err } @@ -131,12 +131,12 @@ var replicationBuiltins = map[string]builtinDefinition{ if args[0] == tree.DNull { return tree.DNull, errors.New("job_id cannot be specified with null argument") } - mgr, err := streaming.GetStreamIngestManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetStreamIngestManager(ctx) if err != nil { return nil, err } ingestionJobID := int64(tree.MustBeDInt(args[0])) - stats, err := mgr.GetStreamIngestionStats(ctx, evalCtx, evalCtx.Txn, jobspb.JobID(ingestionJobID)) + stats, err := mgr.GetStreamIngestionStats(ctx, jobspb.JobID(ingestionJobID)) if err != nil { return nil, err } @@ -164,7 +164,7 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Int), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } @@ -172,7 +172,7 @@ var replicationBuiltins = map[string]builtinDefinition{ if err != nil { return nil, err } - jobID, err := mgr.StartReplicationStream(ctx, evalCtx, evalCtx.Txn, uint64(tenantID)) + jobID, err := mgr.StartReplicationStream(ctx, uint64(tenantID)) if err != nil { return nil, err } @@ -201,7 +201,7 @@ var replicationBuiltins = map[string]builtinDefinition{ if args[0] == tree.DNull || args[1] == tree.DNull { return tree.DNull, errors.New("stream_id or frontier_ts cannot be specified with null argument") } - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } @@ -209,8 +209,8 @@ var replicationBuiltins = map[string]builtinDefinition{ if err != nil { return nil, err } - streamID := streaming.StreamID(int(tree.MustBeDInt(args[0]))) - sps, err := mgr.HeartbeatReplicationStream(ctx, evalCtx, streamID, frontier, evalCtx.Txn) + streamID := streampb.StreamID(int(tree.MustBeDInt(args[0]))) + sps, err := mgr.HeartbeatReplicationStream(ctx, streamID, frontier) if err != nil { return nil, err } @@ -243,13 +243,12 @@ var replicationBuiltins = map[string]builtinDefinition{ []string{"stream_event"}, ), func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (eval.ValueGenerator, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } return mgr.StreamPartition( - evalCtx, - streaming.StreamID(tree.MustBeDInt(args[0])), + streampb.StreamID(tree.MustBeDInt(args[0])), []byte(tree.MustBeDBytes(args[1])), ) }, @@ -269,13 +268,13 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Bytes), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } streamID := int64(tree.MustBeDInt(args[0])) - spec, err := mgr.GetReplicationStreamSpec(ctx, evalCtx, evalCtx.Txn, streaming.StreamID(streamID)) + spec, err := mgr.GetReplicationStreamSpec(ctx, streampb.StreamID(streamID)) if err != nil { return nil, err } @@ -304,7 +303,7 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Int), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } @@ -312,7 +311,7 @@ var replicationBuiltins = map[string]builtinDefinition{ streamID := int64(tree.MustBeDInt(args[0])) successfulIngestion := bool(tree.MustBeDBool(args[1])) if err := mgr.CompleteReplicationStream( - ctx, evalCtx, evalCtx.Txn, streaming.StreamID(streamID), successfulIngestion, + ctx, streampb.StreamID(streamID), successfulIngestion, ); err != nil { return nil, err } diff --git a/pkg/sql/sem/eval/BUILD.bazel b/pkg/sql/sem/eval/BUILD.bazel index baf6d6676a1e..02e5413c29de 100644 --- a/pkg/sql/sem/eval/BUILD.bazel +++ b/pkg/sql/sem/eval/BUILD.bazel @@ -37,9 +37,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/ccl/streamingccl/streampb", "//pkg/clusterversion", "//pkg/geo", "//pkg/geo/geopb", + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/kvserverbase", diff --git a/pkg/sql/sem/eval/cast_map_test.go b/pkg/sql/sem/eval/cast_map_test.go index ebf1f92c8569..f456070a1fb0 100644 --- a/pkg/sql/sem/eval/cast_map_test.go +++ b/pkg/sql/sem/eval/cast_map_test.go @@ -39,6 +39,7 @@ func TestCastMap(t *testing.T) { evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) rng, _ := randutil.NewTestRand() evalCtx.Planner = &faketreeeval.DummyEvalPlanner{} + evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{} cast.ForEachCast(func(src, tgt oid.Oid, _ cast.Context, _ cast.ContextOrigin, _ volatility.V) { srcType := types.OidToType[src] diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index e45486b733ea..2439dc0ff821 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -17,6 +17,8 @@ import ( "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -138,6 +140,8 @@ type Context struct { Planner Planner + StreamManagerFactory StreamManagerFactory + // Not using sql.JobExecContext type to avoid cycle dependency with sql package JobExecContext interface{} @@ -345,6 +349,7 @@ func MakeTestingEvalContextWithMon(st *cluster.Settings, monitor *mon.BytesMonit monitor.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) ctx.TestingMon = monitor ctx.Planner = &fakePlannerWithMonitor{monitor: monitor} + ctx.StreamManagerFactory = &fakeStreamManagerFactory{} ctx.deprecatedContext = context.TODO() now := timeutil.Now() ctx.SetTxnTimestamp(now) @@ -362,6 +367,10 @@ func (p *fakePlannerWithMonitor) Mon() *mon.BytesMonitor { return p.monitor } +type fakeStreamManagerFactory struct { + StreamManagerFactory +} + // SessionData returns the SessionData the current EvalCtx should use to eval. func (ec *Context) SessionData() *sessiondata.SessionData { if ec.SessionDataStack == nil { @@ -682,3 +691,69 @@ func UnwrapDatum(ctx context.Context, evalCtx *Context, d tree.Datum) tree.Datum } return d } + +// StreamManagerFactory stores methods that return the streaming managers. +type StreamManagerFactory interface { + GetReplicationStreamManager(ctx context.Context) (ReplicationStreamManager, error) + GetStreamIngestManager(ctx context.Context) (StreamIngestManager, error) +} + +// ReplicationStreamManager represents a collection of APIs that streaming replication supports +// on the production side. +type ReplicationStreamManager interface { + // StartReplicationStream starts a stream replication job for the specified tenant on the producer side. + StartReplicationStream( + ctx context.Context, + tenantID uint64, + ) (streampb.StreamID, error) + + // HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating + // consumer has consumed until the given 'frontier' timestamp. This updates the producer job + // progress and extends its life, and the new producer progress will be returned. + // If 'frontier' is hlc.MaxTimestamp, returns the producer progress without updating it. + HeartbeatReplicationStream( + ctx context.Context, + streamID streampb.StreamID, + frontier hlc.Timestamp, + ) (streampb.StreamReplicationStatus, error) + + // StreamPartition starts streaming replication on the producer side for the partition specified + // by opaqueSpec which contains serialized streampb.StreamPartitionSpec protocol message and + // returns a value generator which yields events for the specified partition. + StreamPartition( + streamID streampb.StreamID, + opaqueSpec []byte, + ) (ValueGenerator, error) + + // GetReplicationStreamSpec gets a stream replication spec on the producer side. + GetReplicationStreamSpec( + ctx context.Context, + streamID streampb.StreamID, + ) (*streampb.ReplicationStreamSpec, error) + + // CompleteReplicationStream completes a replication stream job on the producer side. + // 'successfulIngestion' indicates whether the stream ingestion finished successfully and + // determines the fate of the producer job, succeeded or canceled. + CompleteReplicationStream( + ctx context.Context, + streamID streampb.StreamID, + successfulIngestion bool, + ) error +} + +// StreamIngestManager represents a collection of APIs that streaming replication supports +// on the ingestion side. +type StreamIngestManager interface { + // CompleteStreamIngestion signals a running stream ingestion job to complete on the consumer side. + CompleteStreamIngestion( + ctx context.Context, + ingestionJobID jobspb.JobID, + cutoverTimestamp hlc.Timestamp, + ) error + + // GetStreamIngestionStats gets a statistics summary for a stream ingestion job. + GetStreamIngestionStats( + ctx context.Context, + ingestionJobID jobspb.JobID, + ) (*streampb.StreamIngestionStats, error) +} diff --git a/pkg/streaming/BUILD.bazel b/pkg/streaming/BUILD.bazel index 0edf9069f018..e1ce04c319c8 100644 --- a/pkg/streaming/BUILD.bazel +++ b/pkg/streaming/BUILD.bazel @@ -8,10 +8,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ccl/streamingccl/streampb", - "//pkg/jobs/jobspb", "//pkg/kv", "//pkg/sql/sem/eval", - "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/streaming/api.go b/pkg/streaming/api.go index 2d96bbb468f4..2d70e4324c12 100644 --- a/pkg/streaming/api.go +++ b/pkg/streaming/api.go @@ -14,118 +14,38 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) // StreamID is the ID of a replication stream. -type StreamID int64 - -// SafeValue implements the redact.SafeValue interface. -func (j StreamID) SafeValue() {} - -// InvalidStreamID is the zero value for StreamID corresponding to no stream. -const InvalidStreamID StreamID = 0 +type StreamID = streampb.StreamID // GetReplicationStreamManagerHook is the hook to get access to the producer side replication APIs. // Used by builtin functions to trigger streaming replication. -var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context) (ReplicationStreamManager, error) +var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.ReplicationStreamManager, error) // GetStreamIngestManagerHook is the hook to get access to the ingestion side replication APIs. // Used by builtin functions to trigger streaming replication. -var GetStreamIngestManagerHook func(ctx context.Context, evalCtx *eval.Context) (StreamIngestManager, error) - -// ReplicationStreamManager represents a collection of APIs that streaming replication supports -// on the production side. -type ReplicationStreamManager interface { - // StartReplicationStream starts a stream replication job for the specified tenant on the producer side. - StartReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - tenantID uint64, - ) (StreamID, error) - - // HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating - // consumer has consumed until the given 'frontier' timestamp. This updates the producer job - // progress and extends its life, and the new producer progress will be returned. - // If 'frontier' is hlc.MaxTimestamp, returns the producer progress without updating it. - HeartbeatReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - streamID StreamID, - frontier hlc.Timestamp, - txn *kv.Txn) (streampb.StreamReplicationStatus, error) - - // StreamPartition starts streaming replication on the producer side for the partition specified - // by opaqueSpec which contains serialized streampb.StreamPartitionSpec protocol message and - // returns a value generator which yields events for the specified partition. - StreamPartition( - evalCtx *eval.Context, - streamID StreamID, - opaqueSpec []byte, - ) (eval.ValueGenerator, error) - - // GetReplicationStreamSpec gets a stream replication spec on the producer side. - GetReplicationStreamSpec( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - streamID StreamID, - ) (*streampb.ReplicationStreamSpec, error) - - // CompleteReplicationStream completes a replication stream job on the producer side. - // 'successfulIngestion' indicates whether the stream ingestion finished successfully and - // determines the fate of the producer job, succeeded or canceled. - CompleteReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - streamID StreamID, - successfulIngestion bool, - ) error -} - -// StreamIngestManager represents a collection of APIs that streaming replication supports -// on the ingestion side. -type StreamIngestManager interface { - // CompleteStreamIngestion signals a running stream ingestion job to complete on the consumer side. - CompleteStreamIngestion( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - ingestionJobID jobspb.JobID, - cutoverTimestamp hlc.Timestamp, - ) error - - // GetStreamIngestionStats gets a statistics summary for a stream ingestion job. - GetStreamIngestionStats( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - ingestionJobID jobspb.JobID, - ) (*streampb.StreamIngestionStats, error) -} +var GetStreamIngestManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.StreamIngestManager, error) // GetReplicationStreamManager returns a ReplicationStreamManager if a CCL binary is loaded. func GetReplicationStreamManager( - ctx context.Context, evalCtx *eval.Context, -) (ReplicationStreamManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.ReplicationStreamManager, error) { if GetReplicationStreamManagerHook == nil { return nil, errors.New("replication streaming requires a CCL binary") } - return GetReplicationStreamManagerHook(ctx, evalCtx) + return GetReplicationStreamManagerHook(ctx, evalCtx, txn) } // GetStreamIngestManager returns a StreamIngestManager if a CCL binary is loaded. func GetStreamIngestManager( - ctx context.Context, evalCtx *eval.Context, -) (StreamIngestManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.StreamIngestManager, error) { if GetReplicationStreamManagerHook == nil { return nil, errors.New("replication streaming requires a CCL binary") } - return GetStreamIngestManagerHook(ctx, evalCtx) + return GetStreamIngestManagerHook(ctx, evalCtx, txn) } From 36a7f0a21867ab5be1dd7be0385b12fad49e59c5 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 7 Nov 2022 09:56:09 -0500 Subject: [PATCH 2/7] spanconfigsqltranslator: add sqlutil.InternalExecutor to SQLTranslator This commit is part of the effort of having an internal executor better bound to its outer txn if there's one. The next step after this commit is to replace the executor used in `s.ptsProvider.GetState()` in `SQLTranslator.Translate()` to the one hanging off `SQLTranslator`. Release note: None --- .../spanconfigsqltranslatorccl/BUILD.bazel | 1 + .../datadriven_test.go | 13 +++-- .../spanconfigreconciler/BUILD.bazel | 1 + .../spanconfigreconciler/reconciler.go | 54 ++++++++--------- .../spanconfigsqltranslator/BUILD.bazel | 1 + .../spanconfigsqltranslator/sqltranslator.go | 58 ++++++++++++++----- 6 files changed, 83 insertions(+), 45 deletions(-) diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel index 7e64bcc79c86..2090a487638e 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel @@ -30,6 +30,7 @@ go_test( "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/tabledesc", + "//pkg/sql/sqlutil", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go index ac147ff34fab..c57c12403702 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -180,10 +181,10 @@ func TestDataDriven(t *testing.T) { var records []spanconfig.Record sqlTranslatorFactory := tenant.SpanConfigSQLTranslatorFactory().(*spanconfigsqltranslator.Factory) - err := sql.DescsTxn(ctx, &execCfg, func( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + err := execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, ) error { - sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, descsCol) + sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, ie, descsCol) var err error records, _, err = sqlTranslator.Translate(ctx, descIDs, generateSystemSpanConfigs) require.NoError(t, err) @@ -212,10 +213,10 @@ func TestDataDriven(t *testing.T) { case "full-translate": sqlTranslatorFactory := tenant.SpanConfigSQLTranslatorFactory().(*spanconfigsqltranslator.Factory) var records []spanconfig.Record - err := sql.DescsTxn(ctx, &execCfg, func( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + err := execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, ) error { - sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, descsCol) + sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, ie, descsCol) var err error records, _, err = spanconfig.FullTranslate(ctx, sqlTranslator) require.NoError(t, err) diff --git a/pkg/spanconfig/spanconfigreconciler/BUILD.bazel b/pkg/spanconfig/spanconfigreconciler/BUILD.bazel index ce01e4f0b07c..4ff015f80a00 100644 --- a/pkg/spanconfig/spanconfigreconciler/BUILD.bazel +++ b/pkg/spanconfig/spanconfigreconciler/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/sql/catalog/descs", "//pkg/sql/sem/tree", "//pkg/sql/sqlliveness", + "//pkg/sql/sqlutil", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/retry", diff --git a/pkg/spanconfig/spanconfigreconciler/reconciler.go b/pkg/spanconfig/spanconfigreconciler/reconciler.go index 8b994665d762..db2c1bad9fcd 100644 --- a/pkg/spanconfig/spanconfigreconciler/reconciler.go +++ b/pkg/spanconfig/spanconfigreconciler/reconciler.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -230,10 +231,10 @@ func (f *fullReconciler) reconcile( // view of things. var records []spanconfig.Record - if err := sql.DescsTxn(ctx, f.execCfg, func( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + if err := f.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, f.execCfg.DB, nil /* session data */, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, ) error { - translator := f.sqlTranslatorFactory.NewSQLTranslator(txn, descsCol) + translator := f.sqlTranslatorFactory.NewSQLTranslator(txn, ie, descsCol) records, reconciledUpUntil, err = spanconfig.FullTranslate(ctx, translator) return err }); err != nil { @@ -475,30 +476,31 @@ func (r *incrementalReconciler) reconcile( var missingProtectedTimestampTargets []spanconfig.SystemTarget var records []spanconfig.Record - if err := sql.DescsTxn(ctx, r.execCfg, - func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { - var err error - - // TODO(irfansharif): Instead of these filter methods for missing - // tables and system targets that live on the Reconciler, we could - // move this to the SQLTranslator instead, now that the SQLTranslator - // is transaction scoped. - missingTableIDs, err = r.filterForMissingTableIDs(ctx, txn, descsCol, sqlUpdates) - if err != nil { - return err - } - - missingProtectedTimestampTargets, err = r.filterForMissingProtectedTimestampSystemTargets( - ctx, txn, sqlUpdates, - ) - if err != nil { - return err - } - - translator := r.sqlTranslatorFactory.NewSQLTranslator(txn, descsCol) - records, _, err = translator.Translate(ctx, allIDs, generateSystemSpanConfigurations) + if err := r.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, r.execCfg.DB, nil /* session data */, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, + ) error { + var err error + + // TODO(irfansharif): Instead of these filter methods for missing + // tables and system targets that live on the Reconciler, we could + // move this to the SQLTranslator instead, now that the SQLTranslator + // is transaction scoped. + missingTableIDs, err = r.filterForMissingTableIDs(ctx, txn, descsCol, sqlUpdates) + if err != nil { return err - }); err != nil { + } + + missingProtectedTimestampTargets, err = r.filterForMissingProtectedTimestampSystemTargets( + ctx, txn, sqlUpdates, + ) + if err != nil { + return err + } + + translator := r.sqlTranslatorFactory.NewSQLTranslator(txn, ie, descsCol) + records, _, err = translator.Translate(ctx, allIDs, generateSystemSpanConfigurations) + return err + }); err != nil { return err } diff --git a/pkg/spanconfig/spanconfigsqltranslator/BUILD.bazel b/pkg/spanconfig/spanconfigsqltranslator/BUILD.bazel index 9aa346822354..8b619ebcd433 100644 --- a/pkg/spanconfig/spanconfigsqltranslator/BUILD.bazel +++ b/pkg/spanconfig/spanconfigsqltranslator/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/sem/tree", + "//pkg/sql/sqlutil", "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go b/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go index 2db46ff6fb26..894d6aa4cb61 100644 --- a/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go +++ b/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -34,14 +35,25 @@ import ( // SQLTranslator implements the spanconfig.SQLTranslator interface. var _ spanconfig.SQLTranslator = &SQLTranslator{} +// txnBundle is created to emphasize that the SQL translator is correspond to +// a certain txn, and all fields here are a whole. It essentially keeps the +// semantics of “translate at a snapshot in time”. This means that this +// txnBundle should be written only in `NewTranslator`. +type txnBundle struct { + txn *kv.Txn + descsCol *descs.Collection + // TODO(janexing): we inject ie here is to replace the executor used in + // s.ptsProvider.GetState() in SQLTranslator.Translate(). + ie sqlutil.InternalExecutor +} + // SQLTranslator is the concrete implementation of spanconfig.SQLTranslator. type SQLTranslator struct { ptsProvider protectedts.Provider codec keys.SQLCodec knobs *spanconfig.TestingKnobs - txn *kv.Txn - descsCol *descs.Collection + txnBundle txnBundle } // Factory is used to construct transaction-scoped SQLTranslators. @@ -66,18 +78,38 @@ func NewFactory( } // NewSQLTranslator constructs and returns a transaction-scoped -// spanconfig.SQLTranslator. The caller must ensure that the collection passed -// in is associated with the supplied transaction. -func (f *Factory) NewSQLTranslator(txn *kv.Txn, descsCol *descs.Collection) *SQLTranslator { +// spanconfig.SQLTranslator. The caller must ensure that the collection and +// internal executor and the transaction are associated with each other. +func (f *Factory) NewSQLTranslator( + txn *kv.Txn, ie sqlutil.InternalExecutor, descsCol *descs.Collection, +) *SQLTranslator { return &SQLTranslator{ ptsProvider: f.ptsProvider, codec: f.codec, knobs: f.knobs, - txn: txn, - descsCol: descsCol, + txnBundle: txnBundle{ + txn: txn, + descsCol: descsCol, + ie: ie, + }, } } +// GetTxn returns the txn bound to this sql translator. +func (s *SQLTranslator) GetTxn() *kv.Txn { + return s.txnBundle.txn +} + +// GetDescsCollection returns the descriptor collection bound to this sql translator. +func (s *SQLTranslator) GetDescsCollection() *descs.Collection { + return s.txnBundle.descsCol +} + +// GetInternalExecutor returns the internal executor bound to this sql translator. +func (s *SQLTranslator) GetInternalExecutor() sqlutil.InternalExecutor { + return s.txnBundle.ie +} + // Translate is part of the spanconfig.SQLTranslator interface. func (s *SQLTranslator) Translate( ctx context.Context, ids descpb.IDs, generateSystemSpanConfigurations bool, @@ -91,7 +123,7 @@ func (s *SQLTranslator) Translate( // timestamp subsystem, and the internal limits to limit the size of this // table, there is scope for improvement in the future. One option could be // a rangefeed-backed materialized view of the system table. - ptsState, err := s.ptsProvider.GetState(ctx, s.txn) + ptsState, err := s.ptsProvider.GetState(ctx, s.GetTxn()) if err != nil { return nil, hlc.Timestamp{}, errors.Wrap(err, "failed to get protected timestamp state") } @@ -110,7 +142,7 @@ func (s *SQLTranslator) Translate( seen := make(map[descpb.ID]struct{}) var leafIDs descpb.IDs for _, id := range ids { - descendantLeafIDs, err := s.findDescendantLeafIDs(ctx, id, s.txn, s.descsCol) + descendantLeafIDs, err := s.findDescendantLeafIDs(ctx, id, s.GetTxn(), s.GetDescsCollection()) if err != nil { return nil, hlc.Timestamp{}, err } @@ -122,13 +154,13 @@ func (s *SQLTranslator) Translate( } } - pseudoTableRecords, err := s.maybeGeneratePseudoTableRecords(ctx, s.txn, ids) + pseudoTableRecords, err := s.maybeGeneratePseudoTableRecords(ctx, s.GetTxn(), ids) if err != nil { return nil, hlc.Timestamp{}, err } records = append(records, pseudoTableRecords...) - scratchRangeRecord, err := s.maybeGenerateScratchRangeRecord(ctx, s.txn, ids) + scratchRangeRecord, err := s.maybeGenerateScratchRangeRecord(ctx, s.GetTxn(), ids) if err != nil { return nil, hlc.Timestamp{}, err } @@ -138,14 +170,14 @@ func (s *SQLTranslator) Translate( // For every unique leaf ID, generate span configurations. for _, leafID := range leafIDs { - translatedRecords, err := s.generateSpanConfigurations(ctx, leafID, s.txn, s.descsCol, ptsStateReader) + translatedRecords, err := s.generateSpanConfigurations(ctx, leafID, s.GetTxn(), s.GetDescsCollection(), ptsStateReader) if err != nil { return nil, hlc.Timestamp{}, err } records = append(records, translatedRecords...) } - return records, s.txn.CommitTimestamp(), nil + return records, s.GetTxn().CommitTimestamp(), nil } // descLookupFlags is the set of look up flags used when fetching descriptors. From 1c3f5cce202a5e0fbb672edce1e1fc22f35a47fc Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Thu, 3 Nov 2022 17:00:14 -0400 Subject: [PATCH 3/7] ptcache.Cache.IeFactory --- .../kvserver/protectedts/ptcache/BUILD.bazel | 1 + pkg/kv/kvserver/protectedts/ptcache/cache.go | 14 +++-- .../protectedts/ptcache/cache_test.go | 56 ++++++++++++------- .../protectedts/ptprovider/provider.go | 20 ++++--- pkg/server/server.go | 9 +-- pkg/server/tenant.go | 9 +-- 6 files changed, 67 insertions(+), 42 deletions(-) diff --git a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel index df1b46288ab9..9516b7038637 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", "//pkg/settings/cluster", + "//pkg/sql/sqlutil", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/stop", diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache.go b/pkg/kv/kvserver/protectedts/ptcache/cache.go index 6d277ffb6561..564bab913bbb 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -32,6 +33,7 @@ import ( // Cache implements protectedts.Cache. type Cache struct { db *kv.DB + ief sqlutil.InternalExecutorFactory storage protectedts.Storage stopper *stop.Stopper settings *cluster.Settings @@ -56,15 +58,17 @@ type Cache struct { // Config configures a Cache. type Config struct { - DB *kv.DB - Storage protectedts.Storage - Settings *cluster.Settings + DB *kv.DB + IeFactory sqlutil.InternalExecutorFactory + Storage protectedts.Storage + Settings *cluster.Settings } // New returns a new cache. func New(config Config) *Cache { c := &Cache{ db: config.DB, + ief: config.IeFactory, storage: config.Storage, settings: config.Settings, } @@ -226,7 +230,9 @@ func (c *Cache) doUpdate(ctx context.Context) error { state ptpb.State ts hlc.Timestamp ) - err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + err := c.ief.TxnWithExecutor(ctx, c.db, nil /* sessionData */, func( + ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, + ) (err error) { // NB: because this is a read-only transaction, the commit will be a no-op; // returning nil here means the transaction will commit and will never need // to change its read timestamp. diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go index 64ec9b9ab781..6dd9cf1e5273 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go @@ -56,6 +56,7 @@ func TestCacheBasic(t *testing.T) { }) defer tc.Stopper().Stop(ctx) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := ptstorage.WithDatabase( ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), @@ -67,9 +68,10 @@ func TestCacheBasic(t *testing.T) { protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Microsecond) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) @@ -133,6 +135,7 @@ func TestRefresh(t *testing.T) { }) defer tc.Stopper().Stop(ctx) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := ptstorage.WithDatabase( ptstorage.New( s.ClusterSettings(), @@ -145,9 +148,10 @@ func TestRefresh(t *testing.T) { protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) t.Run("already up-to-date", func(t *testing.T) { @@ -254,13 +258,15 @@ func TestStart(t *testing.T) { }, }) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := s.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) return tc, c } @@ -286,6 +292,7 @@ func TestQueryRecord(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := ptstorage.WithDatabase( ptstorage.New( s.ClusterSettings(), @@ -296,9 +303,10 @@ func TestQueryRecord(t *testing.T) { // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) @@ -348,6 +356,7 @@ func TestIterate(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := ptstorage.WithDatabase( ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), @@ -359,9 +368,10 @@ func TestIterate(t *testing.T) { protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) @@ -426,6 +436,7 @@ func TestGetProtectionTimestamps(t *testing.T) { defer tc.Stopper().Stop(ctx) // Set the poll interval to be very long. s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) ts := func(nanos int) hlc.Timestamp { @@ -505,9 +516,10 @@ func TestGetProtectionTimestamps(t *testing.T) { ) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) @@ -525,6 +537,7 @@ func TestSettingChangedLeadsToFetch(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := ptstorage.WithDatabase( ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), @@ -536,9 +549,10 @@ func TestSettingChangedLeadsToFetch(t *testing.T) { protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) diff --git a/pkg/kv/kvserver/protectedts/ptprovider/provider.go b/pkg/kv/kvserver/protectedts/ptprovider/provider.go index 377039e80c53..61c9226ee82c 100644 --- a/pkg/kv/kvserver/protectedts/ptprovider/provider.go +++ b/pkg/kv/kvserver/protectedts/ptprovider/provider.go @@ -30,12 +30,13 @@ import ( // Config configures the Provider. type Config struct { - Settings *cluster.Settings - DB *kv.DB - Stores *kvserver.Stores - ReconcileStatusFuncs ptreconcile.StatusFuncs - InternalExecutor sqlutil.InternalExecutor - Knobs *protectedts.TestingKnobs + Settings *cluster.Settings + DB *kv.DB + Stores *kvserver.Stores + ReconcileStatusFuncs ptreconcile.StatusFuncs + InternalExecutorFactory sqlutil.InternalExecutorFactory + InternalExecutor sqlutil.InternalExecutor + Knobs *protectedts.TestingKnobs } // Provider is the concrete implementation of protectedts.Provider interface. @@ -54,9 +55,10 @@ func New(cfg Config) (protectedts.Provider, error) { storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor, cfg.Knobs) reconciler := ptreconcile.New(cfg.Settings, cfg.DB, storage, cfg.ReconcileStatusFuncs) cache := ptcache.New(ptcache.Config{ - DB: cfg.DB, - Storage: storage, - Settings: cfg.Settings, + DB: cfg.DB, + IeFactory: cfg.InternalExecutorFactory, + Storage: storage, + Settings: cfg.Settings, }) return &Provider{ diff --git a/pkg/server/server.go b/pkg/server/server.go index 757145f1455d..db694b15cc88 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -499,10 +499,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { protectedtsKnobs, _ := cfg.TestingKnobs.ProtectedTS.(*protectedts.TestingKnobs) protectedtsProvider, err := ptprovider.New(ptprovider.Config{ - DB: db, - InternalExecutor: internalExecutor, - Settings: st, - Knobs: protectedtsKnobs, + DB: db, + InternalExecutor: internalExecutor, + InternalExecutorFactory: internalExecutorFactory, + Settings: st, + Knobs: protectedtsKnobs, ReconcileStatusFuncs: ptreconcile.StatusFuncs{ jobsprotectedts.GetMetaType(jobsprotectedts.Jobs): jobsprotectedts.MakeStatusFunc( jobRegistry, internalExecutor, jobsprotectedts.Jobs), diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index b89c387fc85b..7a998cd820a0 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -822,10 +822,11 @@ func makeTenantSQLServerArgs( var protectedTSProvider protectedts.Provider protectedtsKnobs, _ := baseCfg.TestingKnobs.ProtectedTS.(*protectedts.TestingKnobs) pp, err := ptprovider.New(ptprovider.Config{ - DB: db, - InternalExecutor: circularInternalExecutor, - Settings: st, - Knobs: protectedtsKnobs, + DB: db, + InternalExecutor: circularInternalExecutor, + InternalExecutorFactory: internalExecutorFactory, + Settings: st, + Knobs: protectedtsKnobs, ReconcileStatusFuncs: ptreconcile.StatusFuncs{ jobsprotectedts.GetMetaType(jobsprotectedts.Jobs): jobsprotectedts.MakeStatusFunc( circularJobRegistry, circularInternalExecutor, jobsprotectedts.Jobs), From 619d5d40a4a4e9ec6ccff29fbf44f0cd89e6d82c Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Thu, 3 Nov 2022 17:09:51 -0400 Subject: [PATCH 4/7] add ief to ptstorage.storageWithDatabase --- pkg/kv/kvserver/client_protectedts_test.go | 4 +++- pkg/kv/kvserver/protectedts/ptcache/cache_test.go | 6 ++++++ .../protectedts/ptstorage/storage_with_database.go | 12 ++++++++---- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvserver/client_protectedts_test.go b/pkg/kv/kvserver/client_protectedts_test.go index d14d6424d28f..d90444cdf4d0 100644 --- a/pkg/kv/kvserver/client_protectedts_test.go +++ b/pkg/kv/kvserver/client_protectedts_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -61,6 +62,7 @@ func TestProtectedTimestamps(t *testing.T) { tc := testcluster.StartTestCluster(t, 3, args) defer tc.Stopper().Stop(ctx) s0 := tc.Server(0) + ief := s0.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) conn := tc.ServerConn(0) _, err := conn.Exec("CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)") @@ -182,7 +184,7 @@ func TestProtectedTimestamps(t *testing.T) { pts := ptstorage.New(s0.ClusterSettings(), s0.InternalExecutor().(*sql.InternalExecutor), nil /* knobs */) - ptsWithDB := ptstorage.WithDatabase(pts, s0.DB()) + ptsWithDB := ptstorage.WithDatabase(pts, s0.DB(), ief) ptsRec := ptpb.Record{ ID: uuid.MakeV4().GetBytes(), Timestamp: s0.Clock().Now(), diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go index 6dd9cf1e5273..12dee8c37d86 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go @@ -62,6 +62,7 @@ func TestCacheBasic(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor), &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB(), + ief, ) // Set the poll interval to be very short. @@ -142,6 +143,7 @@ func TestRefresh(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor), ptsKnobs), s.DB(), + ief, ) // Set the poll interval to be very long. @@ -299,6 +301,7 @@ func TestQueryRecord(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor), &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB(), + ief, ) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) @@ -362,6 +365,7 @@ func TestIterate(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor), &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB(), + ief, ) // Set the poll interval to be very long. @@ -513,6 +517,7 @@ func TestGetProtectionTimestamps(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor), &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB(), + ief, ) c := ptcache.New(ptcache.Config{ @@ -543,6 +548,7 @@ func TestSettingChangedLeadsToFetch(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor), &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB(), + ief, ) // Set the poll interval to be very long. diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go index 8eb62e7650a9..6e9402bc35cc 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go @@ -16,19 +16,23 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // WithDatabase wraps s such that any calls made with a nil *Txn will be wrapped // in a call to db.Txn. This is often convenient in testing. -func WithDatabase(s protectedts.Storage, db *kv.DB) protectedts.Storage { - return &storageWithDatabase{s: s, db: db} +func WithDatabase( + s protectedts.Storage, db *kv.DB, ief sqlutil.InternalExecutorFactory, +) protectedts.Storage { + return &storageWithDatabase{s: s, db: db, ief: ief} } type storageWithDatabase struct { - db *kv.DB - s protectedts.Storage + db *kv.DB + ief sqlutil.InternalExecutorFactory + s protectedts.Storage } func (s *storageWithDatabase) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) error { From a6a5bafeebfaee8f3c8c5fd6b961fe2ece78042b Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Thu, 3 Nov 2022 17:25:54 -0400 Subject: [PATCH 5/7] add ief to Reconciler --- pkg/kv/kvserver/protectedts/ptprovider/provider.go | 2 +- pkg/kv/kvserver/protectedts/ptreconcile/BUILD.bazel | 2 ++ pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go | 11 +++++++++-- .../protectedts/ptreconcile/reconciler_test.go | 4 +++- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/protectedts/ptprovider/provider.go b/pkg/kv/kvserver/protectedts/ptprovider/provider.go index 61c9226ee82c..f38ccc633ae0 100644 --- a/pkg/kv/kvserver/protectedts/ptprovider/provider.go +++ b/pkg/kv/kvserver/protectedts/ptprovider/provider.go @@ -53,7 +53,7 @@ func New(cfg Config) (protectedts.Provider, error) { return nil, err } storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor, cfg.Knobs) - reconciler := ptreconcile.New(cfg.Settings, cfg.DB, storage, cfg.ReconcileStatusFuncs) + reconciler := ptreconcile.New(cfg.Settings, cfg.DB, cfg.InternalExecutorFactory, storage, cfg.ReconcileStatusFuncs) cache := ptcache.New(ptcache.Config{ DB: cfg.DB, IeFactory: cfg.InternalExecutorFactory, diff --git a/pkg/kv/kvserver/protectedts/ptreconcile/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptreconcile/BUILD.bazel index f990a6b58c67..880d84a58a20 100644 --- a/pkg/kv/kvserver/protectedts/ptreconcile/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptreconcile/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql/sqlutil", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/stop", @@ -45,6 +46,7 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/sqlutil", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", diff --git a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go index 14a522a66c15..35e3d0a113ac 100644 --- a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go +++ b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -53,6 +54,7 @@ type StatusFuncs map[string]StatusFunc type Reconciler struct { settings *cluster.Settings db *kv.DB + ief sqlutil.InternalExecutorFactory pts protectedts.Storage metrics Metrics statusFuncs StatusFuncs @@ -60,11 +62,16 @@ type Reconciler struct { // New constructs a Reconciler. func New( - st *cluster.Settings, db *kv.DB, storage protectedts.Storage, statusFuncs StatusFuncs, + st *cluster.Settings, + db *kv.DB, + ief sqlutil.InternalExecutorFactory, + storage protectedts.Storage, + statusFuncs StatusFuncs, ) *Reconciler { return &Reconciler{ settings: st, db: db, + ief: ief, pts: storage, metrics: makeMetrics(), statusFuncs: statusFuncs, @@ -119,7 +126,7 @@ func (r *Reconciler) run(ctx context.Context, stopper *stop.Stopper) { func (r *Reconciler) reconcile(ctx context.Context) { // Load protected timestamp records. var state ptpb.State - if err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := r.ief.TxnWithExecutor(ctx, r.db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { var err error state, err = r.pts.GetState(ctx, txn) return err diff --git a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go index ca24342b7a52..6d1b4b893e35 100644 --- a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go +++ b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -59,7 +60,8 @@ func TestReconciler(t *testing.T) { toRemove map[string]struct{} }{} state.toRemove = map[string]struct{}{} - r := ptreconcile.New(settings, s0.DB(), ptp, + ief := s0.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) + r := ptreconcile.New(settings, s0.DB(), ief, ptp, ptreconcile.StatusFuncs{ testTaskType: func( ctx context.Context, txn *kv.Txn, meta []byte, From 86f6189d4261267239e449949ab1d163e74f0c19 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Thu, 3 Nov 2022 17:21:08 -0400 Subject: [PATCH 6/7] add ie to GetState --- pkg/kv/kvserver/client_protectedts_test.go | 3 +- pkg/kv/kvserver/protectedts/BUILD.bazel | 1 + pkg/kv/kvserver/protectedts/protectedts.go | 3 +- pkg/kv/kvserver/protectedts/ptcache/cache.go | 2 +- .../protectedts/ptreconcile/reconciler.go | 2 +- .../kvserver/protectedts/ptstorage/storage.go | 12 +++++--- .../protectedts/ptstorage/storage_test.go | 29 +++++++++++-------- .../ptstorage/storage_with_database.go | 8 ++--- .../spanconfigreconciler/reconciler.go | 6 ++-- .../spanconfigsqltranslator/sqltranslator.go | 2 +- pkg/sql/gcjob/BUILD.bazel | 1 + pkg/sql/gcjob/refresh_statuses.go | 5 ++-- 12 files changed, 44 insertions(+), 30 deletions(-) diff --git a/pkg/kv/kvserver/client_protectedts_test.go b/pkg/kv/kvserver/client_protectedts_test.go index d90444cdf4d0..e3d9243752f5 100644 --- a/pkg/kv/kvserver/client_protectedts_test.go +++ b/pkg/kv/kvserver/client_protectedts_test.go @@ -276,7 +276,8 @@ func TestProtectedTimestamps(t *testing.T) { // Release the failed record. require.NoError(t, ptsWithDB.Release(ctx, nil, failedRec.ID.GetUUID())) require.NoError(t, ptsWithDB.Release(ctx, nil, laterRec.ID.GetUUID())) - state, err := ptsWithDB.GetState(ctx, nil) + ie := s0.InternalExecutorFactory().(sqlutil.InternalExecutorFactory).MakeInternalExecutorWithoutTxn() + state, err := ptsWithDB.GetState(ctx, nil /* txn */, ie) require.NoError(t, err) require.Len(t, state.Records, 0) require.Equal(t, int(state.NumRecords), len(state.Records)) diff --git a/pkg/kv/kvserver/protectedts/BUILD.bazel b/pkg/kv/kvserver/protectedts/BUILD.bazel index d28ca25f5e44..8a7d634c49a7 100644 --- a/pkg/kv/kvserver/protectedts/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/roachpb", "//pkg/settings", "//pkg/spanconfig", + "//pkg/sql/sqlutil", "//pkg/util/hlc", "//pkg/util/metric", "//pkg/util/stop", diff --git a/pkg/kv/kvserver/protectedts/protectedts.go b/pkg/kv/kvserver/protectedts/protectedts.go index 2fedb889f32f..d367de962177 100644 --- a/pkg/kv/kvserver/protectedts/protectedts.go +++ b/pkg/kv/kvserver/protectedts/protectedts.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -97,7 +98,7 @@ type Storage interface { // GetState retrieves the entire state of protectedts.Storage with the // provided Txn. - GetState(context.Context, *kv.Txn) (ptpb.State, error) + GetState(context.Context, *kv.Txn, sqlutil.InternalExecutor) (ptpb.State, error) // UpdateTimestamp updates the timestamp protected by the record with the // specified UUID. diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache.go b/pkg/kv/kvserver/protectedts/ptcache/cache.go index 564bab913bbb..ac1eb72970ff 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache.go @@ -248,7 +248,7 @@ func (c *Cache) doUpdate(ctx context.Context) error { if versionChanged = md.Version != prev.Version; !versionChanged { return nil } - if state, err = c.storage.GetState(ctx, txn); err != nil { + if state, err = c.storage.GetState(ctx, txn, ie); err != nil { return errors.Wrap(err, "failed to fetch protectedts state") } return nil diff --git a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go index 35e3d0a113ac..227f97baf589 100644 --- a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go +++ b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go @@ -128,7 +128,7 @@ func (r *Reconciler) reconcile(ctx context.Context) { var state ptpb.State if err := r.ief.TxnWithExecutor(ctx, r.db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { var err error - state, err = r.pts.GetState(ctx, txn) + state, err = r.pts.GetState(ctx, txn, ie) return err }); err != nil { r.metrics.ReconciliationErrors.Inc(1) diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage.go b/pkg/kv/kvserver/protectedts/ptstorage/storage.go index 995b58e435aa..53b38b296c1f 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage.go @@ -311,7 +311,9 @@ func (p *storage) GetMetadata(ctx context.Context, txn *kv.Txn) (ptpb.Metadata, }, nil } -func (p *storage) GetState(ctx context.Context, txn *kv.Txn) (ptpb.State, error) { +func (p *storage) GetState( + ctx context.Context, txn *kv.Txn, executor sqlutil.InternalExecutor, +) (ptpb.State, error) { if txn == nil { return ptpb.State{}, errNoTxn } @@ -319,7 +321,7 @@ func (p *storage) GetState(ctx context.Context, txn *kv.Txn) (ptpb.State, error) if err != nil { return ptpb.State{}, err } - records, err := p.getRecords(ctx, txn) + records, err := p.getRecords(ctx, txn, executor) if err != nil { return ptpb.State{}, err } @@ -352,12 +354,14 @@ func (p *storage) deprecatedGetRecords(ctx context.Context, txn *kv.Txn) ([]ptpb return records, nil } -func (p *storage) getRecords(ctx context.Context, txn *kv.Txn) ([]ptpb.Record, error) { +func (p *storage) getRecords( + ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, +) ([]ptpb.Record, error) { if useDeprecatedProtectedTSStorage(ctx, p.settings, p.knobs) { return p.deprecatedGetRecords(ctx, txn) } - it, err := p.ex.QueryIteratorEx(ctx, "protectedts-GetRecords", txn, + it, err := ie.QueryIteratorEx(ctx, "protectedts-GetRecords", txn, sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, getRecordsQuery) if err != nil { return nil, errors.Wrap(err, "failed to read records") diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index be96b9d0aa60..0b0e3b892e32 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -301,6 +301,7 @@ var testCases = []testCase{ ops: []op{ funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now(), "", nil, tableTarget(42), tableSpan(42)) + ie := tCtx.tc.Server(0).InternalExecutorFactory().(sqlutil.InternalExecutorFactory).MakeInternalExecutorWithoutTxn() const msg = "must provide a non-nil transaction" require.Regexp(t, msg, tCtx.pts.Protect(ctx, nil /* txn */, &rec).Error()) require.Regexp(t, msg, tCtx.pts.Release(ctx, nil /* txn */, uuid.MakeV4()).Error()) @@ -309,7 +310,7 @@ var testCases = []testCase{ require.Regexp(t, msg, err.Error()) _, err = tCtx.pts.GetMetadata(ctx, nil /* txn */) require.Regexp(t, msg, err.Error()) - _, err = tCtx.pts.GetState(ctx, nil /* txn */) + _, err = tCtx.pts.GetState(ctx, nil, ie) require.Regexp(t, msg, err.Error()) }), }, @@ -483,6 +484,7 @@ func (test testCase) run(t *testing.T) { s := tc.Server(0) pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), ptsKnobs) db := s.DB() + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) tCtx := testContext{ pts: pts, db: db, @@ -491,12 +493,12 @@ func (test testCase) run(t *testing.T) { } verify := func(t *testing.T) { var state ptpb.State - require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - state, err = pts.GetState(ctx, txn) + require.NoError(t, ief.TxnWithExecutor(ctx, db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { + state, err = pts.GetState(ctx, txn, ie) return err })) var md ptpb.Metadata - require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + require.NoError(t, ief.TxnWithExecutor(ctx, db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { md, err = pts.GetMetadata(ctx, txn) return err })) @@ -641,8 +643,9 @@ func TestCorruptData(t *testing.T) { return err }).Error()) require.Nil(t, got) - require.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - _, err = pts.GetState(ctx, txn) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) + require.NoError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { + _, err = pts.GetState(ctx, txn, ie) return err })) log.Flush() @@ -731,8 +734,9 @@ func TestCorruptData(t *testing.T) { return err })) require.Nil(t, got) - require.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - _, err = pts.GetState(ctx, txn) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) + require.NoError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { + _, err = pts.GetState(ctx, txn, ie) return err })) log.Flush() @@ -782,8 +786,9 @@ func TestErrorsFromSQL(t *testing.T) { _, err := pts.GetMetadata(ctx, txn) return err }), "failed to read metadata: boom") - require.EqualError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - _, err := pts.GetState(ctx, txn) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) + require.EqualError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + _, err := pts.GetState(ctx, txn, ie) return err }), "failed to read metadata: boom") // Test that we get an error retrieving the records in GetState. @@ -797,8 +802,8 @@ func TestErrorsFromSQL(t *testing.T) { } return errors.New("boom") }) - require.EqualError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - _, err := pts.GetState(ctx, txn) + require.EqualError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + _, err := pts.GetState(ctx, txn, ie) return err }), "failed to read records: boom") } diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go index 6e9402bc35cc..fd04d0513fdc 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go @@ -89,16 +89,16 @@ func (s *storageWithDatabase) GetMetadata( } func (s *storageWithDatabase) GetState( - ctx context.Context, txn *kv.Txn, + ctx context.Context, txn *kv.Txn, executor sqlutil.InternalExecutor, ) (state ptpb.State, err error) { if txn == nil { - err = s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - state, err = s.s.GetState(ctx, txn) + err = s.ief.TxnWithExecutor(ctx, s.db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { + state, err = s.s.GetState(ctx, txn, ie) return err }) return state, err } - return s.s.GetState(ctx, txn) + return s.s.GetState(ctx, txn, executor) } func (s *storageWithDatabase) UpdateTimestamp( diff --git a/pkg/spanconfig/spanconfigreconciler/reconciler.go b/pkg/spanconfig/spanconfigreconciler/reconciler.go index db2c1bad9fcd..055975a86532 100644 --- a/pkg/spanconfig/spanconfigreconciler/reconciler.go +++ b/pkg/spanconfig/spanconfigreconciler/reconciler.go @@ -491,7 +491,7 @@ func (r *incrementalReconciler) reconcile( } missingProtectedTimestampTargets, err = r.filterForMissingProtectedTimestampSystemTargets( - ctx, txn, sqlUpdates, + ctx, txn, sqlUpdates, ie, ) if err != nil { return err @@ -549,7 +549,7 @@ func (r *incrementalReconciler) reconcile( // correspond to cluster or tenant target protected timestamp records that are // no longer found, because they've been released. func (r *incrementalReconciler) filterForMissingProtectedTimestampSystemTargets( - ctx context.Context, txn *kv.Txn, updates []spanconfig.SQLUpdate, + ctx context.Context, txn *kv.Txn, updates []spanconfig.SQLUpdate, ie sqlutil.InternalExecutor, ) ([]spanconfig.SystemTarget, error) { seen := make(map[spanconfig.SystemTarget]struct{}) var missingSystemTargets []spanconfig.SystemTarget @@ -568,7 +568,7 @@ func (r *incrementalReconciler) filterForMissingProtectedTimestampSystemTargets( // timestamp subsystem, and the internal limits to limit the size of this // table, there is scope for improvement in the future. One option could be // a rangefeed-backed materialized view of the system table. - ptsState, err := r.execCfg.ProtectedTimestampProvider.GetState(ctx, txn) + ptsState, err := r.execCfg.ProtectedTimestampProvider.GetState(ctx, txn, ie) if err != nil { return nil, errors.Wrap(err, "failed to get protected timestamp state") } diff --git a/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go b/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go index 894d6aa4cb61..0551d909db1e 100644 --- a/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go +++ b/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go @@ -123,7 +123,7 @@ func (s *SQLTranslator) Translate( // timestamp subsystem, and the internal limits to limit the size of this // table, there is scope for improvement in the future. One option could be // a rangefeed-backed materialized view of the system table. - ptsState, err := s.ptsProvider.GetState(ctx, s.GetTxn()) + ptsState, err := s.ptsProvider.GetState(ctx, s.GetTxn(), s.GetInternalExecutor()) if err != nil { return nil, hlc.Timestamp{}, errors.Wrap(err, "failed to get protected timestamp state") } diff --git a/pkg/sql/gcjob/BUILD.bazel b/pkg/sql/gcjob/BUILD.bazel index 3f52e41061f2..e0d6e02a105c 100644 --- a/pkg/sql/gcjob/BUILD.bazel +++ b/pkg/sql/gcjob/BUILD.bazel @@ -39,6 +39,7 @@ go_library( "//pkg/sql/pgwire/pgerror", "//pkg/sql/sem/tree", "//pkg/sql/sqlerrors", + "//pkg/sql/sqlutil", "//pkg/storage", "//pkg/util/admission/admissionpb", "//pkg/util/hlc", diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index b4f29361de5a..ae6000d86593 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -410,8 +411,8 @@ func isTenantProtected( isProtected := false ptsProvider := execCfg.ProtectedTimestampProvider - if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ptsState, err := ptsProvider.GetState(ctx, txn) + if err := execCfg.InternalExecutorFactory.TxnWithExecutor(ctx, execCfg.DB, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + ptsState, err := ptsProvider.GetState(ctx, txn, ie) if err != nil { return errors.Wrap(err, "failed to get protectedts State") } From cccac5eb052249c16d69fc981815b712e72cfc13 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Fri, 4 Nov 2022 09:29:44 -0400 Subject: [PATCH 7/7] add ie to GetMetadata --- pkg/kv/kvserver/protectedts/protectedts.go | 2 +- pkg/kv/kvserver/protectedts/ptcache/cache.go | 2 +- pkg/kv/kvserver/protectedts/ptstorage/storage.go | 8 +++++--- .../kvserver/protectedts/ptstorage/storage_test.go | 12 ++++++------ .../protectedts/ptstorage/storage_with_database.go | 8 ++++---- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/pkg/kv/kvserver/protectedts/protectedts.go b/pkg/kv/kvserver/protectedts/protectedts.go index d367de962177..003da6bebf67 100644 --- a/pkg/kv/kvserver/protectedts/protectedts.go +++ b/pkg/kv/kvserver/protectedts/protectedts.go @@ -94,7 +94,7 @@ type Storage interface { Release(context.Context, *kv.Txn, uuid.UUID) error // GetMetadata retrieves the metadata with the provided Txn. - GetMetadata(context.Context, *kv.Txn) (ptpb.Metadata, error) + GetMetadata(context.Context, *kv.Txn, sqlutil.InternalExecutor) (ptpb.Metadata, error) // GetState retrieves the entire state of protectedts.Storage with the // provided Txn. diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache.go b/pkg/kv/kvserver/protectedts/ptcache/cache.go index ac1eb72970ff..b94ceca966fe 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache.go @@ -241,7 +241,7 @@ func (c *Cache) doUpdate(ctx context.Context) error { ts = txn.ReadTimestamp() } }() - md, err := c.storage.GetMetadata(ctx, txn) + md, err := c.storage.GetMetadata(ctx, txn, ie) if err != nil { return errors.Wrap(err, "failed to fetch protectedts metadata") } diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage.go b/pkg/kv/kvserver/protectedts/ptstorage/storage.go index 53b38b296c1f..c44ce56e7f13 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage.go @@ -290,11 +290,13 @@ func (p *storage) Release(ctx context.Context, txn *kv.Txn, id uuid.UUID) error return nil } -func (p *storage) GetMetadata(ctx context.Context, txn *kv.Txn) (ptpb.Metadata, error) { +func (p *storage) GetMetadata( + ctx context.Context, txn *kv.Txn, executor sqlutil.InternalExecutor, +) (ptpb.Metadata, error) { if txn == nil { return ptpb.Metadata{}, errNoTxn } - row, err := p.ex.QueryRowEx(ctx, "protectedts-GetMetadata", txn, + row, err := executor.QueryRowEx(ctx, "protectedts-GetMetadata", txn, sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, getMetadataQuery) if err != nil { @@ -317,7 +319,7 @@ func (p *storage) GetState( if txn == nil { return ptpb.State{}, errNoTxn } - md, err := p.GetMetadata(ctx, txn) + md, err := p.GetMetadata(ctx, txn, executor) if err != nil { return ptpb.State{}, err } diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index 0b0e3b892e32..c24789bf5fce 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -308,9 +308,9 @@ var testCases = []testCase{ require.Regexp(t, msg, tCtx.pts.MarkVerified(ctx, nil /* txn */, uuid.MakeV4()).Error()) _, err := tCtx.pts.GetRecord(ctx, nil /* txn */, uuid.MakeV4()) require.Regexp(t, msg, err.Error()) - _, err = tCtx.pts.GetMetadata(ctx, nil /* txn */) + _, err = tCtx.pts.GetMetadata(ctx, nil /* txn */, ie) require.Regexp(t, msg, err.Error()) - _, err = tCtx.pts.GetState(ctx, nil, ie) + _, err = tCtx.pts.GetState(ctx, nil /* txn */, ie) require.Regexp(t, msg, err.Error()) }), }, @@ -499,7 +499,7 @@ func (test testCase) run(t *testing.T) { })) var md ptpb.Metadata require.NoError(t, ief.TxnWithExecutor(ctx, db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { - md, err = pts.GetMetadata(ctx, txn) + md, err = pts.GetMetadata(ctx, txn, ie) return err })) require.EqualValues(t, tCtx.state, state) @@ -762,6 +762,7 @@ func TestErrorsFromSQL(t *testing.T) { s := tc.Server(0) ie := s.InternalExecutor().(sqlutil.InternalExecutor) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) wrappedIE := &wrappedInternalExecutor{wrapped: ie} pts := ptstorage.New(s.ClusterSettings(), wrappedIE, &protectedts.TestingKnobs{}) @@ -782,11 +783,10 @@ func TestErrorsFromSQL(t *testing.T) { require.EqualError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return pts.Release(ctx, txn, rec.ID.GetUUID()) }), fmt.Sprintf("failed to release record %v: boom", rec.ID)) - require.EqualError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - _, err := pts.GetMetadata(ctx, txn) + require.EqualError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + _, err := pts.GetMetadata(ctx, txn, ie) return err }), "failed to read metadata: boom") - ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) require.EqualError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { _, err := pts.GetState(ctx, txn, ie) return err diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go index fd04d0513fdc..e30b4258a213 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go @@ -76,16 +76,16 @@ func (s *storageWithDatabase) Release(ctx context.Context, txn *kv.Txn, id uuid. } func (s *storageWithDatabase) GetMetadata( - ctx context.Context, txn *kv.Txn, + ctx context.Context, txn *kv.Txn, executor sqlutil.InternalExecutor, ) (md ptpb.Metadata, err error) { if txn == nil { - err = s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - md, err = s.s.GetMetadata(ctx, txn) + err = s.ief.TxnWithExecutor(ctx, s.db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + md, err = s.s.GetMetadata(ctx, txn, ie) return err }) return md, err } - return s.s.GetMetadata(ctx, txn) + return s.s.GetMetadata(ctx, txn, executor) } func (s *storageWithDatabase) GetState(