diff --git a/docs/generated/redact_safe.md b/docs/generated/redact_safe.md index 6f0aa1e545a1..d65990bd80ef 100644 --- a/docs/generated/redact_safe.md +++ b/docs/generated/redact_safe.md @@ -7,6 +7,7 @@ pkg/base/node_id.go | `*SQLIDContainer` pkg/base/node_id.go | `*StoreIDContainer` pkg/ccl/backupccl/backuppb/backup.go | `sz` pkg/ccl/backupccl/backuppb/backup.go | `timing` +pkg/ccl/streamingccl/streampb/streamid.go | `StreamID` pkg/cli/exit/exit.go | `Code` pkg/jobs/jobspb/wrap.go | `Type` pkg/kv/bulk/bulk_metrics.go | `sz` @@ -51,7 +52,6 @@ pkg/storage/enginepb/mvcc.go | `TxnEpoch` pkg/storage/enginepb/mvcc.go | `TxnSeq` pkg/storage/enginepb/mvcc3.go | `*MVCCStats` pkg/storage/enginepb/mvcc3.go | `MVCCStatsDelta` -pkg/streaming/api.go | `StreamID` pkg/util/hlc/timestamp.go | `ClockTimestamp` pkg/util/hlc/timestamp.go | `LegacyTimestamp` pkg/util/hlc/timestamp.go | `Timestamp` diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel index 7e64bcc79c86..2090a487638e 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel @@ -30,6 +30,7 @@ go_test( "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/tabledesc", + "//pkg/sql/sqlutil", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go index ac147ff34fab..c57c12403702 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -180,10 +181,10 @@ func TestDataDriven(t *testing.T) { var records []spanconfig.Record sqlTranslatorFactory := tenant.SpanConfigSQLTranslatorFactory().(*spanconfigsqltranslator.Factory) - err := sql.DescsTxn(ctx, &execCfg, func( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + err := execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, ) error { - sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, descsCol) + sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, ie, descsCol) var err error records, _, err = sqlTranslator.Translate(ctx, descIDs, generateSystemSpanConfigs) require.NoError(t, err) @@ -212,10 +213,10 @@ func TestDataDriven(t *testing.T) { case "full-translate": sqlTranslatorFactory := tenant.SpanConfigSQLTranslatorFactory().(*spanconfigsqltranslator.Factory) var records []spanconfig.Record - err := sql.DescsTxn(ctx, &execCfg, func( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + err := execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, ) error { - sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, descsCol) + sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, ie, descsCol) var err error records, _, err = spanconfig.FullTranslate(ctx, sqlTranslator) require.NoError(t, err) diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index 7337b8943c4e..44cddf7e9ddc 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -75,7 +75,7 @@ func (p *partitionedStreamClient) Create( row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64()) err := row.Scan(&streamID) if err != nil { - return streaming.InvalidStreamID, + return streampb.InvalidStreamID, errors.Wrapf(err, "error creating replication stream for tenant %s", tenantID.String()) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go index 9ed03a234d4d..db7bdcc457c0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go @@ -23,29 +23,28 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) -type streamIngestManagerImpl struct{} +type streamIngestManagerImpl struct { + evalCtx *eval.Context + txn *kv.Txn +} // CompleteStreamIngestion implements streaming.StreamIngestManager interface. func (r *streamIngestManagerImpl) CompleteStreamIngestion( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - ingestionJobID jobspb.JobID, - cutoverTimestamp hlc.Timestamp, + ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp, ) error { - return completeStreamIngestion(ctx, evalCtx, txn, ingestionJobID, cutoverTimestamp) + return completeStreamIngestion(ctx, r.evalCtx, r.txn, ingestionJobID, cutoverTimestamp) } // GetStreamIngestionStats implements streaming.StreamIngestManager interface. func (r *streamIngestManagerImpl) GetStreamIngestionStats( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID, + ctx context.Context, ingestionJobID jobspb.JobID, ) (*streampb.StreamIngestionStats, error) { - return getStreamIngestionStats(ctx, evalCtx, txn, ingestionJobID) + return getStreamIngestionStats(ctx, r.evalCtx, r.txn, ingestionJobID) } func newStreamIngestManagerWithPrivilegesCheck( - ctx context.Context, evalCtx *eval.Context, -) (streaming.StreamIngestManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.StreamIngestManager, error) { isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx) if err != nil { return nil, err @@ -64,7 +63,7 @@ func newStreamIngestManagerWithPrivilegesCheck( pgcode.InsufficientPrivilege, "replication requires enterprise license") } - return &streamIngestManagerImpl{}, nil + return &streamIngestManagerImpl{evalCtx: evalCtx, txn: txn}, nil } func init() { diff --git a/pkg/ccl/streamingccl/streampb/BUILD.bazel b/pkg/ccl/streamingccl/streampb/BUILD.bazel index fcf080739dc9..7ed7afceeada 100644 --- a/pkg/ccl/streamingccl/streampb/BUILD.bazel +++ b/pkg/ccl/streamingccl/streampb/BUILD.bazel @@ -35,7 +35,10 @@ go_proto_library( go_library( name = "streampb", - srcs = ["empty.go"], + srcs = [ + "empty.go", + "streamid.go", + ], embed = [":streampb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb", visibility = ["//visibility:public"], diff --git a/pkg/ccl/streamingccl/streampb/streamid.go b/pkg/ccl/streamingccl/streampb/streamid.go new file mode 100644 index 000000000000..4521d2b7400d --- /dev/null +++ b/pkg/ccl/streamingccl/streampb/streamid.go @@ -0,0 +1,18 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streampb + +// StreamID is the ID of a replication stream. +type StreamID int64 + +// SafeValue implements the redact.SafeValue interface. +func (j StreamID) SafeValue() {} + +// InvalidStreamID is the zero value for StreamID corresponding to no stream. +const InvalidStreamID StreamID = 0 diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index a6aaf392b069..6d0086cfe73a 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -38,7 +37,7 @@ import ( ) type eventStream struct { - streamID streaming.StreamID + streamID streampb.StreamID execCfg *sql.ExecutorConfig spec streampb.StreamPartitionSpec subscribedSpans roachpb.SpanGroup @@ -515,7 +514,7 @@ func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) { } func streamPartition( - evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte, + evalCtx *eval.Context, streamID streampb.StreamID, opaqueSpec []byte, ) (eval.ValueGenerator, error) { if !evalCtx.SessionData().AvoidBuffering { return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option") diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager.go b/pkg/ccl/streamingccl/streamproducer/replication_manager.go index d7f156eb39b7..be8f04a7cfb8 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager.go @@ -22,54 +22,49 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) -type replicationStreamManagerImpl struct{} +type replicationStreamManagerImpl struct { + evalCtx *eval.Context + txn *kv.Txn +} // StartReplicationStream implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) StartReplicationStream( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64, -) (streaming.StreamID, error) { - return startReplicationStreamJob(ctx, evalCtx, txn, tenantID) + ctx context.Context, tenantID uint64, +) (streampb.StreamID, error) { + return startReplicationStreamJob(ctx, r.evalCtx, r.txn, tenantID) } // HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) HeartbeatReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - streamID streaming.StreamID, - frontier hlc.Timestamp, - txn *kv.Txn, + ctx context.Context, streamID streampb.StreamID, frontier hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { - return heartbeatReplicationStream(ctx, evalCtx, streamID, frontier, txn) + return heartbeatReplicationStream(ctx, r.evalCtx, r.txn, streamID, frontier) } // StreamPartition implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) StreamPartition( - evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte, + streamID streampb.StreamID, opaqueSpec []byte, ) (eval.ValueGenerator, error) { - return streamPartition(evalCtx, streamID, opaqueSpec) + return streamPartition(r.evalCtx, streamID, opaqueSpec) } // GetReplicationStreamSpec implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) GetReplicationStreamSpec( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, + ctx context.Context, streamID streampb.StreamID, ) (*streampb.ReplicationStreamSpec, error) { - return getReplicationStreamSpec(ctx, evalCtx, txn, streamID) + return getReplicationStreamSpec(ctx, r.evalCtx, streamID) } // CompleteReplicationStream implements ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) CompleteReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - streamID streaming.StreamID, - successfulIngestion bool, + ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, ) error { - return completeReplicationStream(ctx, evalCtx, txn, streamID, successfulIngestion) + return completeReplicationStream(ctx, r.evalCtx, r.txn, streamID, successfulIngestion) } func newReplicationStreamManagerWithPrivilegesCheck( - ctx context.Context, evalCtx *eval.Context, -) (streaming.ReplicationStreamManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.ReplicationStreamManager, error) { isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx) if err != nil { return nil, err @@ -81,6 +76,7 @@ func newReplicationStreamManagerWithPrivilegesCheck( } execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) + enterpriseCheckErr := utilccl.CheckEnterpriseEnabled( execCfg.Settings, execCfg.NodeInfo.LogicalClusterID(), execCfg.Organization(), "REPLICATION") if enterpriseCheckErr != nil { @@ -88,7 +84,7 @@ func newReplicationStreamManagerWithPrivilegesCheck( pgcode.InsufficientPrivilege, "replication requires enterprise license") } - return &replicationStreamManagerImpl{}, nil + return &replicationStreamManagerImpl{evalCtx: evalCtx, txn: txn}, nil } func init() { diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go b/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go index d893ed2e0dde..2090ae5cdc1c 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -44,7 +43,7 @@ func TestReplicationManagerRequiresAdminRole(t *testing.T) { require.NoError(t, protoutil.Unmarshal(sessionSerialized, &sessionData)) } - getManagerForUser := func(u string) (streaming.ReplicationStreamManager, error) { + getManagerForUser := func(u string) (eval.ReplicationStreamManager, error) { sqlUser, err := username.MakeSQLUsernameFromUserInput(u, username.PurposeValidation) require.NoError(t, err) execCfg := s.ExecutorConfig().(sql.ExecutorConfig) @@ -52,7 +51,7 @@ func TestReplicationManagerRequiresAdminRole(t *testing.T) { p, cleanup := sql.NewInternalPlanner("test", txn, sqlUser, &sql.MemoryMetrics{}, &execCfg, sessionData) defer cleanup() ec := p.(interface{ EvalContext() *eval.Context }).EvalContext() - return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec) + return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec, txn) } for _, tc := range []struct { diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 357f9b9253cb..652de91ec09f 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -36,16 +35,16 @@ import ( // 2. TODO(casper): Updates the protected timestamp for spans being replicated func startReplicationStreamJob( ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64, -) (streaming.StreamID, error) { +) (streampb.StreamID, error) { execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) hasAdminRole, err := evalCtx.SessionAccessor.HasAdminRole(ctx) if err != nil { - return streaming.InvalidStreamID, err + return streampb.InvalidStreamID, err } if !hasAdminRole { - return streaming.InvalidStreamID, errors.New("admin role required to start stream replication jobs") + return streampb.InvalidStreamID, errors.New("admin role required to start stream replication jobs") } registry := execConfig.JobRegistry @@ -53,7 +52,7 @@ func startReplicationStreamJob( ptsID := uuid.MakeV4() jr := makeProducerJobRecord(registry, tenantID, timeout, evalCtx.SessionData().User(), ptsID) if _, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil { - return streaming.InvalidStreamID, err + return streampb.InvalidStreamID, err } ptp := execConfig.ProtectedTimestampProvider @@ -68,9 +67,9 @@ func startReplicationStreamJob( deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect) if err := ptp.Protect(ctx, txn, pts); err != nil { - return streaming.InvalidStreamID, err + return streampb.InvalidStreamID, err } - return streaming.StreamID(jr.JobID), nil + return streampb.StreamID(jr.JobID), nil } // Convert the producer job's status into corresponding replication @@ -99,7 +98,7 @@ func updateReplicationStreamProgress( expiration time.Time, ptsProvider protectedts.Provider, registry *jobs.Registry, - streamID streaming.StreamID, + streamID streampb.StreamID, consumedTime hlc.Timestamp, txn *kv.Txn, ) (status streampb.StreamReplicationStatus, err error) { @@ -157,9 +156,9 @@ func updateReplicationStreamProgress( func heartbeatReplicationStream( ctx context.Context, evalCtx *eval.Context, - streamID streaming.StreamID, - frontier hlc.Timestamp, txn *kv.Txn, + streamID streampb.StreamID, + frontier hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV) @@ -198,7 +197,7 @@ func heartbeatReplicationStream( // getReplicationStreamSpec gets a replication stream specification for the specified stream. func getReplicationStreamSpec( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, + ctx context.Context, evalCtx *eval.Context, streamID streampb.StreamID, ) (*streampb.ReplicationStreamSpec, error) { jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext) // Returns error if the replication stream is not active @@ -257,7 +256,7 @@ func completeReplicationStream( ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, - streamID streaming.StreamID, + streamID streampb.StreamID, successfulIngestion bool, ) error { registry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry diff --git a/pkg/kv/kvserver/client_protectedts_test.go b/pkg/kv/kvserver/client_protectedts_test.go index d14d6424d28f..e3d9243752f5 100644 --- a/pkg/kv/kvserver/client_protectedts_test.go +++ b/pkg/kv/kvserver/client_protectedts_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -61,6 +62,7 @@ func TestProtectedTimestamps(t *testing.T) { tc := testcluster.StartTestCluster(t, 3, args) defer tc.Stopper().Stop(ctx) s0 := tc.Server(0) + ief := s0.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) conn := tc.ServerConn(0) _, err := conn.Exec("CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)") @@ -182,7 +184,7 @@ func TestProtectedTimestamps(t *testing.T) { pts := ptstorage.New(s0.ClusterSettings(), s0.InternalExecutor().(*sql.InternalExecutor), nil /* knobs */) - ptsWithDB := ptstorage.WithDatabase(pts, s0.DB()) + ptsWithDB := ptstorage.WithDatabase(pts, s0.DB(), ief) ptsRec := ptpb.Record{ ID: uuid.MakeV4().GetBytes(), Timestamp: s0.Clock().Now(), @@ -274,7 +276,8 @@ func TestProtectedTimestamps(t *testing.T) { // Release the failed record. require.NoError(t, ptsWithDB.Release(ctx, nil, failedRec.ID.GetUUID())) require.NoError(t, ptsWithDB.Release(ctx, nil, laterRec.ID.GetUUID())) - state, err := ptsWithDB.GetState(ctx, nil) + ie := s0.InternalExecutorFactory().(sqlutil.InternalExecutorFactory).MakeInternalExecutorWithoutTxn() + state, err := ptsWithDB.GetState(ctx, nil /* txn */, ie) require.NoError(t, err) require.Len(t, state.Records, 0) require.Equal(t, int(state.NumRecords), len(state.Records)) diff --git a/pkg/kv/kvserver/protectedts/BUILD.bazel b/pkg/kv/kvserver/protectedts/BUILD.bazel index d28ca25f5e44..8a7d634c49a7 100644 --- a/pkg/kv/kvserver/protectedts/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/roachpb", "//pkg/settings", "//pkg/spanconfig", + "//pkg/sql/sqlutil", "//pkg/util/hlc", "//pkg/util/metric", "//pkg/util/stop", diff --git a/pkg/kv/kvserver/protectedts/protectedts.go b/pkg/kv/kvserver/protectedts/protectedts.go index 2fedb889f32f..003da6bebf67 100644 --- a/pkg/kv/kvserver/protectedts/protectedts.go +++ b/pkg/kv/kvserver/protectedts/protectedts.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -93,11 +94,11 @@ type Storage interface { Release(context.Context, *kv.Txn, uuid.UUID) error // GetMetadata retrieves the metadata with the provided Txn. - GetMetadata(context.Context, *kv.Txn) (ptpb.Metadata, error) + GetMetadata(context.Context, *kv.Txn, sqlutil.InternalExecutor) (ptpb.Metadata, error) // GetState retrieves the entire state of protectedts.Storage with the // provided Txn. - GetState(context.Context, *kv.Txn) (ptpb.State, error) + GetState(context.Context, *kv.Txn, sqlutil.InternalExecutor) (ptpb.State, error) // UpdateTimestamp updates the timestamp protected by the record with the // specified UUID. diff --git a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel index df1b46288ab9..9516b7038637 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", "//pkg/settings/cluster", + "//pkg/sql/sqlutil", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/stop", diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache.go b/pkg/kv/kvserver/protectedts/ptcache/cache.go index 6d277ffb6561..b94ceca966fe 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -32,6 +33,7 @@ import ( // Cache implements protectedts.Cache. type Cache struct { db *kv.DB + ief sqlutil.InternalExecutorFactory storage protectedts.Storage stopper *stop.Stopper settings *cluster.Settings @@ -56,15 +58,17 @@ type Cache struct { // Config configures a Cache. type Config struct { - DB *kv.DB - Storage protectedts.Storage - Settings *cluster.Settings + DB *kv.DB + IeFactory sqlutil.InternalExecutorFactory + Storage protectedts.Storage + Settings *cluster.Settings } // New returns a new cache. func New(config Config) *Cache { c := &Cache{ db: config.DB, + ief: config.IeFactory, storage: config.Storage, settings: config.Settings, } @@ -226,7 +230,9 @@ func (c *Cache) doUpdate(ctx context.Context) error { state ptpb.State ts hlc.Timestamp ) - err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + err := c.ief.TxnWithExecutor(ctx, c.db, nil /* sessionData */, func( + ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, + ) (err error) { // NB: because this is a read-only transaction, the commit will be a no-op; // returning nil here means the transaction will commit and will never need // to change its read timestamp. @@ -235,14 +241,14 @@ func (c *Cache) doUpdate(ctx context.Context) error { ts = txn.ReadTimestamp() } }() - md, err := c.storage.GetMetadata(ctx, txn) + md, err := c.storage.GetMetadata(ctx, txn, ie) if err != nil { return errors.Wrap(err, "failed to fetch protectedts metadata") } if versionChanged = md.Version != prev.Version; !versionChanged { return nil } - if state, err = c.storage.GetState(ctx, txn); err != nil { + if state, err = c.storage.GetState(ctx, txn, ie); err != nil { return errors.Wrap(err, "failed to fetch protectedts state") } return nil diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go index 64ec9b9ab781..12dee8c37d86 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go @@ -56,20 +56,23 @@ func TestCacheBasic(t *testing.T) { }) defer tc.Stopper().Stop(ctx) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := ptstorage.WithDatabase( ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB(), + ief, ) // Set the poll interval to be very short. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Microsecond) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) @@ -133,21 +136,24 @@ func TestRefresh(t *testing.T) { }) defer tc.Stopper().Stop(ctx) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := ptstorage.WithDatabase( ptstorage.New( s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), ptsKnobs), s.DB(), + ief, ) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) t.Run("already up-to-date", func(t *testing.T) { @@ -254,13 +260,15 @@ func TestStart(t *testing.T) { }, }) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := s.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) return tc, c } @@ -286,19 +294,22 @@ func TestQueryRecord(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := ptstorage.WithDatabase( ptstorage.New( s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB(), + ief, ) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) @@ -348,20 +359,23 @@ func TestIterate(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := ptstorage.WithDatabase( ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB(), + ief, ) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) @@ -426,6 +440,7 @@ func TestGetProtectionTimestamps(t *testing.T) { defer tc.Stopper().Stop(ctx) // Set the poll interval to be very long. s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) ts := func(nanos int) hlc.Timestamp { @@ -502,12 +517,14 @@ func TestGetProtectionTimestamps(t *testing.T) { s.InternalExecutor().(sqlutil.InternalExecutor), &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB(), + ief, ) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) @@ -525,20 +542,23 @@ func TestSettingChangedLeadsToFetch(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) p := ptstorage.WithDatabase( ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB(), + ief, ) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ - Settings: s.ClusterSettings(), - DB: s.DB(), - Storage: p, + Settings: s.ClusterSettings(), + DB: s.DB(), + IeFactory: ief, + Storage: p, }) require.NoError(t, c.Start(ctx, tc.Stopper())) diff --git a/pkg/kv/kvserver/protectedts/ptprovider/provider.go b/pkg/kv/kvserver/protectedts/ptprovider/provider.go index 377039e80c53..f38ccc633ae0 100644 --- a/pkg/kv/kvserver/protectedts/ptprovider/provider.go +++ b/pkg/kv/kvserver/protectedts/ptprovider/provider.go @@ -30,12 +30,13 @@ import ( // Config configures the Provider. type Config struct { - Settings *cluster.Settings - DB *kv.DB - Stores *kvserver.Stores - ReconcileStatusFuncs ptreconcile.StatusFuncs - InternalExecutor sqlutil.InternalExecutor - Knobs *protectedts.TestingKnobs + Settings *cluster.Settings + DB *kv.DB + Stores *kvserver.Stores + ReconcileStatusFuncs ptreconcile.StatusFuncs + InternalExecutorFactory sqlutil.InternalExecutorFactory + InternalExecutor sqlutil.InternalExecutor + Knobs *protectedts.TestingKnobs } // Provider is the concrete implementation of protectedts.Provider interface. @@ -52,11 +53,12 @@ func New(cfg Config) (protectedts.Provider, error) { return nil, err } storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor, cfg.Knobs) - reconciler := ptreconcile.New(cfg.Settings, cfg.DB, storage, cfg.ReconcileStatusFuncs) + reconciler := ptreconcile.New(cfg.Settings, cfg.DB, cfg.InternalExecutorFactory, storage, cfg.ReconcileStatusFuncs) cache := ptcache.New(ptcache.Config{ - DB: cfg.DB, - Storage: storage, - Settings: cfg.Settings, + DB: cfg.DB, + IeFactory: cfg.InternalExecutorFactory, + Storage: storage, + Settings: cfg.Settings, }) return &Provider{ diff --git a/pkg/kv/kvserver/protectedts/ptreconcile/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptreconcile/BUILD.bazel index f990a6b58c67..880d84a58a20 100644 --- a/pkg/kv/kvserver/protectedts/ptreconcile/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptreconcile/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql/sqlutil", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/stop", @@ -45,6 +46,7 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/sqlutil", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", diff --git a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go index 14a522a66c15..227f97baf589 100644 --- a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go +++ b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -53,6 +54,7 @@ type StatusFuncs map[string]StatusFunc type Reconciler struct { settings *cluster.Settings db *kv.DB + ief sqlutil.InternalExecutorFactory pts protectedts.Storage metrics Metrics statusFuncs StatusFuncs @@ -60,11 +62,16 @@ type Reconciler struct { // New constructs a Reconciler. func New( - st *cluster.Settings, db *kv.DB, storage protectedts.Storage, statusFuncs StatusFuncs, + st *cluster.Settings, + db *kv.DB, + ief sqlutil.InternalExecutorFactory, + storage protectedts.Storage, + statusFuncs StatusFuncs, ) *Reconciler { return &Reconciler{ settings: st, db: db, + ief: ief, pts: storage, metrics: makeMetrics(), statusFuncs: statusFuncs, @@ -119,9 +126,9 @@ func (r *Reconciler) run(ctx context.Context, stopper *stop.Stopper) { func (r *Reconciler) reconcile(ctx context.Context) { // Load protected timestamp records. var state ptpb.State - if err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := r.ief.TxnWithExecutor(ctx, r.db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { var err error - state, err = r.pts.GetState(ctx, txn) + state, err = r.pts.GetState(ctx, txn, ie) return err }); err != nil { r.metrics.ReconciliationErrors.Inc(1) diff --git a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go index ca24342b7a52..6d1b4b893e35 100644 --- a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go +++ b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -59,7 +60,8 @@ func TestReconciler(t *testing.T) { toRemove map[string]struct{} }{} state.toRemove = map[string]struct{}{} - r := ptreconcile.New(settings, s0.DB(), ptp, + ief := s0.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) + r := ptreconcile.New(settings, s0.DB(), ief, ptp, ptreconcile.StatusFuncs{ testTaskType: func( ctx context.Context, txn *kv.Txn, meta []byte, diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage.go b/pkg/kv/kvserver/protectedts/ptstorage/storage.go index 995b58e435aa..c44ce56e7f13 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage.go @@ -290,11 +290,13 @@ func (p *storage) Release(ctx context.Context, txn *kv.Txn, id uuid.UUID) error return nil } -func (p *storage) GetMetadata(ctx context.Context, txn *kv.Txn) (ptpb.Metadata, error) { +func (p *storage) GetMetadata( + ctx context.Context, txn *kv.Txn, executor sqlutil.InternalExecutor, +) (ptpb.Metadata, error) { if txn == nil { return ptpb.Metadata{}, errNoTxn } - row, err := p.ex.QueryRowEx(ctx, "protectedts-GetMetadata", txn, + row, err := executor.QueryRowEx(ctx, "protectedts-GetMetadata", txn, sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, getMetadataQuery) if err != nil { @@ -311,15 +313,17 @@ func (p *storage) GetMetadata(ctx context.Context, txn *kv.Txn) (ptpb.Metadata, }, nil } -func (p *storage) GetState(ctx context.Context, txn *kv.Txn) (ptpb.State, error) { +func (p *storage) GetState( + ctx context.Context, txn *kv.Txn, executor sqlutil.InternalExecutor, +) (ptpb.State, error) { if txn == nil { return ptpb.State{}, errNoTxn } - md, err := p.GetMetadata(ctx, txn) + md, err := p.GetMetadata(ctx, txn, executor) if err != nil { return ptpb.State{}, err } - records, err := p.getRecords(ctx, txn) + records, err := p.getRecords(ctx, txn, executor) if err != nil { return ptpb.State{}, err } @@ -352,12 +356,14 @@ func (p *storage) deprecatedGetRecords(ctx context.Context, txn *kv.Txn) ([]ptpb return records, nil } -func (p *storage) getRecords(ctx context.Context, txn *kv.Txn) ([]ptpb.Record, error) { +func (p *storage) getRecords( + ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, +) ([]ptpb.Record, error) { if useDeprecatedProtectedTSStorage(ctx, p.settings, p.knobs) { return p.deprecatedGetRecords(ctx, txn) } - it, err := p.ex.QueryIteratorEx(ctx, "protectedts-GetRecords", txn, + it, err := ie.QueryIteratorEx(ctx, "protectedts-GetRecords", txn, sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, getRecordsQuery) if err != nil { return nil, errors.Wrap(err, "failed to read records") diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index be96b9d0aa60..c24789bf5fce 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -301,15 +301,16 @@ var testCases = []testCase{ ops: []op{ funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now(), "", nil, tableTarget(42), tableSpan(42)) + ie := tCtx.tc.Server(0).InternalExecutorFactory().(sqlutil.InternalExecutorFactory).MakeInternalExecutorWithoutTxn() const msg = "must provide a non-nil transaction" require.Regexp(t, msg, tCtx.pts.Protect(ctx, nil /* txn */, &rec).Error()) require.Regexp(t, msg, tCtx.pts.Release(ctx, nil /* txn */, uuid.MakeV4()).Error()) require.Regexp(t, msg, tCtx.pts.MarkVerified(ctx, nil /* txn */, uuid.MakeV4()).Error()) _, err := tCtx.pts.GetRecord(ctx, nil /* txn */, uuid.MakeV4()) require.Regexp(t, msg, err.Error()) - _, err = tCtx.pts.GetMetadata(ctx, nil /* txn */) + _, err = tCtx.pts.GetMetadata(ctx, nil /* txn */, ie) require.Regexp(t, msg, err.Error()) - _, err = tCtx.pts.GetState(ctx, nil /* txn */) + _, err = tCtx.pts.GetState(ctx, nil /* txn */, ie) require.Regexp(t, msg, err.Error()) }), }, @@ -483,6 +484,7 @@ func (test testCase) run(t *testing.T) { s := tc.Server(0) pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), ptsKnobs) db := s.DB() + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) tCtx := testContext{ pts: pts, db: db, @@ -491,13 +493,13 @@ func (test testCase) run(t *testing.T) { } verify := func(t *testing.T) { var state ptpb.State - require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - state, err = pts.GetState(ctx, txn) + require.NoError(t, ief.TxnWithExecutor(ctx, db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { + state, err = pts.GetState(ctx, txn, ie) return err })) var md ptpb.Metadata - require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - md, err = pts.GetMetadata(ctx, txn) + require.NoError(t, ief.TxnWithExecutor(ctx, db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { + md, err = pts.GetMetadata(ctx, txn, ie) return err })) require.EqualValues(t, tCtx.state, state) @@ -641,8 +643,9 @@ func TestCorruptData(t *testing.T) { return err }).Error()) require.Nil(t, got) - require.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - _, err = pts.GetState(ctx, txn) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) + require.NoError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { + _, err = pts.GetState(ctx, txn, ie) return err })) log.Flush() @@ -731,8 +734,9 @@ func TestCorruptData(t *testing.T) { return err })) require.Nil(t, got) - require.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - _, err = pts.GetState(ctx, txn) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) + require.NoError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { + _, err = pts.GetState(ctx, txn, ie) return err })) log.Flush() @@ -758,6 +762,7 @@ func TestErrorsFromSQL(t *testing.T) { s := tc.Server(0) ie := s.InternalExecutor().(sqlutil.InternalExecutor) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) wrappedIE := &wrappedInternalExecutor{wrapped: ie} pts := ptstorage.New(s.ClusterSettings(), wrappedIE, &protectedts.TestingKnobs{}) @@ -778,12 +783,12 @@ func TestErrorsFromSQL(t *testing.T) { require.EqualError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return pts.Release(ctx, txn, rec.ID.GetUUID()) }), fmt.Sprintf("failed to release record %v: boom", rec.ID)) - require.EqualError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - _, err := pts.GetMetadata(ctx, txn) + require.EqualError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + _, err := pts.GetMetadata(ctx, txn, ie) return err }), "failed to read metadata: boom") - require.EqualError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - _, err := pts.GetState(ctx, txn) + require.EqualError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + _, err := pts.GetState(ctx, txn, ie) return err }), "failed to read metadata: boom") // Test that we get an error retrieving the records in GetState. @@ -797,8 +802,8 @@ func TestErrorsFromSQL(t *testing.T) { } return errors.New("boom") }) - require.EqualError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - _, err := pts.GetState(ctx, txn) + require.EqualError(t, ief.TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + _, err := pts.GetState(ctx, txn, ie) return err }), "failed to read records: boom") } diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go index 8eb62e7650a9..e30b4258a213 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_with_database.go @@ -16,19 +16,23 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // WithDatabase wraps s such that any calls made with a nil *Txn will be wrapped // in a call to db.Txn. This is often convenient in testing. -func WithDatabase(s protectedts.Storage, db *kv.DB) protectedts.Storage { - return &storageWithDatabase{s: s, db: db} +func WithDatabase( + s protectedts.Storage, db *kv.DB, ief sqlutil.InternalExecutorFactory, +) protectedts.Storage { + return &storageWithDatabase{s: s, db: db, ief: ief} } type storageWithDatabase struct { - db *kv.DB - s protectedts.Storage + db *kv.DB + ief sqlutil.InternalExecutorFactory + s protectedts.Storage } func (s *storageWithDatabase) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) error { @@ -72,29 +76,29 @@ func (s *storageWithDatabase) Release(ctx context.Context, txn *kv.Txn, id uuid. } func (s *storageWithDatabase) GetMetadata( - ctx context.Context, txn *kv.Txn, + ctx context.Context, txn *kv.Txn, executor sqlutil.InternalExecutor, ) (md ptpb.Metadata, err error) { if txn == nil { - err = s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - md, err = s.s.GetMetadata(ctx, txn) + err = s.ief.TxnWithExecutor(ctx, s.db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + md, err = s.s.GetMetadata(ctx, txn, ie) return err }) return md, err } - return s.s.GetMetadata(ctx, txn) + return s.s.GetMetadata(ctx, txn, executor) } func (s *storageWithDatabase) GetState( - ctx context.Context, txn *kv.Txn, + ctx context.Context, txn *kv.Txn, executor sqlutil.InternalExecutor, ) (state ptpb.State, err error) { if txn == nil { - err = s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - state, err = s.s.GetState(ctx, txn) + err = s.ief.TxnWithExecutor(ctx, s.db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) { + state, err = s.s.GetState(ctx, txn, ie) return err }) return state, err } - return s.s.GetState(ctx, txn) + return s.s.GetState(ctx, txn, executor) } func (s *storageWithDatabase) UpdateTimestamp( diff --git a/pkg/server/server.go b/pkg/server/server.go index 757145f1455d..db694b15cc88 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -499,10 +499,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { protectedtsKnobs, _ := cfg.TestingKnobs.ProtectedTS.(*protectedts.TestingKnobs) protectedtsProvider, err := ptprovider.New(ptprovider.Config{ - DB: db, - InternalExecutor: internalExecutor, - Settings: st, - Knobs: protectedtsKnobs, + DB: db, + InternalExecutor: internalExecutor, + InternalExecutorFactory: internalExecutorFactory, + Settings: st, + Knobs: protectedtsKnobs, ReconcileStatusFuncs: ptreconcile.StatusFuncs{ jobsprotectedts.GetMetaType(jobsprotectedts.Jobs): jobsprotectedts.MakeStatusFunc( jobRegistry, internalExecutor, jobsprotectedts.Jobs), diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index b89c387fc85b..7a998cd820a0 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -822,10 +822,11 @@ func makeTenantSQLServerArgs( var protectedTSProvider protectedts.Provider protectedtsKnobs, _ := baseCfg.TestingKnobs.ProtectedTS.(*protectedts.TestingKnobs) pp, err := ptprovider.New(ptprovider.Config{ - DB: db, - InternalExecutor: circularInternalExecutor, - Settings: st, - Knobs: protectedtsKnobs, + DB: db, + InternalExecutor: circularInternalExecutor, + InternalExecutorFactory: internalExecutorFactory, + Settings: st, + Knobs: protectedtsKnobs, ReconcileStatusFuncs: ptreconcile.StatusFuncs{ jobsprotectedts.GetMetaType(jobsprotectedts.Jobs): jobsprotectedts.MakeStatusFunc( circularJobRegistry, circularInternalExecutor, jobsprotectedts.Jobs), diff --git a/pkg/spanconfig/spanconfigreconciler/BUILD.bazel b/pkg/spanconfig/spanconfigreconciler/BUILD.bazel index ce01e4f0b07c..4ff015f80a00 100644 --- a/pkg/spanconfig/spanconfigreconciler/BUILD.bazel +++ b/pkg/spanconfig/spanconfigreconciler/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/sql/catalog/descs", "//pkg/sql/sem/tree", "//pkg/sql/sqlliveness", + "//pkg/sql/sqlutil", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/retry", diff --git a/pkg/spanconfig/spanconfigreconciler/reconciler.go b/pkg/spanconfig/spanconfigreconciler/reconciler.go index 8b994665d762..055975a86532 100644 --- a/pkg/spanconfig/spanconfigreconciler/reconciler.go +++ b/pkg/spanconfig/spanconfigreconciler/reconciler.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -230,10 +231,10 @@ func (f *fullReconciler) reconcile( // view of things. var records []spanconfig.Record - if err := sql.DescsTxn(ctx, f.execCfg, func( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + if err := f.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, f.execCfg.DB, nil /* session data */, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, ) error { - translator := f.sqlTranslatorFactory.NewSQLTranslator(txn, descsCol) + translator := f.sqlTranslatorFactory.NewSQLTranslator(txn, ie, descsCol) records, reconciledUpUntil, err = spanconfig.FullTranslate(ctx, translator) return err }); err != nil { @@ -475,30 +476,31 @@ func (r *incrementalReconciler) reconcile( var missingProtectedTimestampTargets []spanconfig.SystemTarget var records []spanconfig.Record - if err := sql.DescsTxn(ctx, r.execCfg, - func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { - var err error - - // TODO(irfansharif): Instead of these filter methods for missing - // tables and system targets that live on the Reconciler, we could - // move this to the SQLTranslator instead, now that the SQLTranslator - // is transaction scoped. - missingTableIDs, err = r.filterForMissingTableIDs(ctx, txn, descsCol, sqlUpdates) - if err != nil { - return err - } - - missingProtectedTimestampTargets, err = r.filterForMissingProtectedTimestampSystemTargets( - ctx, txn, sqlUpdates, - ) - if err != nil { - return err - } - - translator := r.sqlTranslatorFactory.NewSQLTranslator(txn, descsCol) - records, _, err = translator.Translate(ctx, allIDs, generateSystemSpanConfigurations) + if err := r.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, r.execCfg.DB, nil /* session data */, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, + ) error { + var err error + + // TODO(irfansharif): Instead of these filter methods for missing + // tables and system targets that live on the Reconciler, we could + // move this to the SQLTranslator instead, now that the SQLTranslator + // is transaction scoped. + missingTableIDs, err = r.filterForMissingTableIDs(ctx, txn, descsCol, sqlUpdates) + if err != nil { return err - }); err != nil { + } + + missingProtectedTimestampTargets, err = r.filterForMissingProtectedTimestampSystemTargets( + ctx, txn, sqlUpdates, ie, + ) + if err != nil { + return err + } + + translator := r.sqlTranslatorFactory.NewSQLTranslator(txn, ie, descsCol) + records, _, err = translator.Translate(ctx, allIDs, generateSystemSpanConfigurations) + return err + }); err != nil { return err } @@ -547,7 +549,7 @@ func (r *incrementalReconciler) reconcile( // correspond to cluster or tenant target protected timestamp records that are // no longer found, because they've been released. func (r *incrementalReconciler) filterForMissingProtectedTimestampSystemTargets( - ctx context.Context, txn *kv.Txn, updates []spanconfig.SQLUpdate, + ctx context.Context, txn *kv.Txn, updates []spanconfig.SQLUpdate, ie sqlutil.InternalExecutor, ) ([]spanconfig.SystemTarget, error) { seen := make(map[spanconfig.SystemTarget]struct{}) var missingSystemTargets []spanconfig.SystemTarget @@ -566,7 +568,7 @@ func (r *incrementalReconciler) filterForMissingProtectedTimestampSystemTargets( // timestamp subsystem, and the internal limits to limit the size of this // table, there is scope for improvement in the future. One option could be // a rangefeed-backed materialized view of the system table. - ptsState, err := r.execCfg.ProtectedTimestampProvider.GetState(ctx, txn) + ptsState, err := r.execCfg.ProtectedTimestampProvider.GetState(ctx, txn, ie) if err != nil { return nil, errors.Wrap(err, "failed to get protected timestamp state") } diff --git a/pkg/spanconfig/spanconfigsqltranslator/BUILD.bazel b/pkg/spanconfig/spanconfigsqltranslator/BUILD.bazel index 9aa346822354..8b619ebcd433 100644 --- a/pkg/spanconfig/spanconfigsqltranslator/BUILD.bazel +++ b/pkg/spanconfig/spanconfigsqltranslator/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/sem/tree", + "//pkg/sql/sqlutil", "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go b/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go index 2db46ff6fb26..0551d909db1e 100644 --- a/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go +++ b/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -34,14 +35,25 @@ import ( // SQLTranslator implements the spanconfig.SQLTranslator interface. var _ spanconfig.SQLTranslator = &SQLTranslator{} +// txnBundle is created to emphasize that the SQL translator is correspond to +// a certain txn, and all fields here are a whole. It essentially keeps the +// semantics of “translate at a snapshot in time”. This means that this +// txnBundle should be written only in `NewTranslator`. +type txnBundle struct { + txn *kv.Txn + descsCol *descs.Collection + // TODO(janexing): we inject ie here is to replace the executor used in + // s.ptsProvider.GetState() in SQLTranslator.Translate(). + ie sqlutil.InternalExecutor +} + // SQLTranslator is the concrete implementation of spanconfig.SQLTranslator. type SQLTranslator struct { ptsProvider protectedts.Provider codec keys.SQLCodec knobs *spanconfig.TestingKnobs - txn *kv.Txn - descsCol *descs.Collection + txnBundle txnBundle } // Factory is used to construct transaction-scoped SQLTranslators. @@ -66,18 +78,38 @@ func NewFactory( } // NewSQLTranslator constructs and returns a transaction-scoped -// spanconfig.SQLTranslator. The caller must ensure that the collection passed -// in is associated with the supplied transaction. -func (f *Factory) NewSQLTranslator(txn *kv.Txn, descsCol *descs.Collection) *SQLTranslator { +// spanconfig.SQLTranslator. The caller must ensure that the collection and +// internal executor and the transaction are associated with each other. +func (f *Factory) NewSQLTranslator( + txn *kv.Txn, ie sqlutil.InternalExecutor, descsCol *descs.Collection, +) *SQLTranslator { return &SQLTranslator{ ptsProvider: f.ptsProvider, codec: f.codec, knobs: f.knobs, - txn: txn, - descsCol: descsCol, + txnBundle: txnBundle{ + txn: txn, + descsCol: descsCol, + ie: ie, + }, } } +// GetTxn returns the txn bound to this sql translator. +func (s *SQLTranslator) GetTxn() *kv.Txn { + return s.txnBundle.txn +} + +// GetDescsCollection returns the descriptor collection bound to this sql translator. +func (s *SQLTranslator) GetDescsCollection() *descs.Collection { + return s.txnBundle.descsCol +} + +// GetInternalExecutor returns the internal executor bound to this sql translator. +func (s *SQLTranslator) GetInternalExecutor() sqlutil.InternalExecutor { + return s.txnBundle.ie +} + // Translate is part of the spanconfig.SQLTranslator interface. func (s *SQLTranslator) Translate( ctx context.Context, ids descpb.IDs, generateSystemSpanConfigurations bool, @@ -91,7 +123,7 @@ func (s *SQLTranslator) Translate( // timestamp subsystem, and the internal limits to limit the size of this // table, there is scope for improvement in the future. One option could be // a rangefeed-backed materialized view of the system table. - ptsState, err := s.ptsProvider.GetState(ctx, s.txn) + ptsState, err := s.ptsProvider.GetState(ctx, s.GetTxn(), s.GetInternalExecutor()) if err != nil { return nil, hlc.Timestamp{}, errors.Wrap(err, "failed to get protected timestamp state") } @@ -110,7 +142,7 @@ func (s *SQLTranslator) Translate( seen := make(map[descpb.ID]struct{}) var leafIDs descpb.IDs for _, id := range ids { - descendantLeafIDs, err := s.findDescendantLeafIDs(ctx, id, s.txn, s.descsCol) + descendantLeafIDs, err := s.findDescendantLeafIDs(ctx, id, s.GetTxn(), s.GetDescsCollection()) if err != nil { return nil, hlc.Timestamp{}, err } @@ -122,13 +154,13 @@ func (s *SQLTranslator) Translate( } } - pseudoTableRecords, err := s.maybeGeneratePseudoTableRecords(ctx, s.txn, ids) + pseudoTableRecords, err := s.maybeGeneratePseudoTableRecords(ctx, s.GetTxn(), ids) if err != nil { return nil, hlc.Timestamp{}, err } records = append(records, pseudoTableRecords...) - scratchRangeRecord, err := s.maybeGenerateScratchRangeRecord(ctx, s.txn, ids) + scratchRangeRecord, err := s.maybeGenerateScratchRangeRecord(ctx, s.GetTxn(), ids) if err != nil { return nil, hlc.Timestamp{}, err } @@ -138,14 +170,14 @@ func (s *SQLTranslator) Translate( // For every unique leaf ID, generate span configurations. for _, leafID := range leafIDs { - translatedRecords, err := s.generateSpanConfigurations(ctx, leafID, s.txn, s.descsCol, ptsStateReader) + translatedRecords, err := s.generateSpanConfigurations(ctx, leafID, s.GetTxn(), s.GetDescsCollection(), ptsStateReader) if err != nil { return nil, hlc.Timestamp{}, err } records = append(records, translatedRecords...) } - return records, s.txn.CommitTimestamp(), nil + return records, s.GetTxn().CommitTimestamp(), nil } // descLookupFlags is the set of look up flags used when fetching descriptors. diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 3dc104bc4cc0..4f4e883f9f6b 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -457,6 +457,7 @@ go_library( "//pkg/sql/vtable", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/streaming", "//pkg/testutils/serverutils", "//pkg/upgrade", "//pkg/util", diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index ebedc697f156..4d648255142b 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -328,6 +328,7 @@ func runPlanInsidePlan( ctx, evalCtx, &plannerCopy, params.p.txn, distributeType) planCtx.planner.curPlan.planComponents = *plan planCtx.ExtendedEvalCtx.Planner = &plannerCopy + planCtx.ExtendedEvalCtx.StreamManagerFactory = &plannerCopy planCtx.stmtType = recv.stmtType params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun( diff --git a/pkg/sql/colexec/colexecbase/cast_test.go b/pkg/sql/colexec/colexecbase/cast_test.go index 85f1448ad660..deda61f49bd6 100644 --- a/pkg/sql/colexec/colexecbase/cast_test.go +++ b/pkg/sql/colexec/colexecbase/cast_test.go @@ -41,6 +41,7 @@ func TestRandomizedCast(t *testing.T) { evalCtx := eval.MakeTestingEvalContext(st) defer evalCtx.Stop(ctx) evalCtx.Planner = &faketreeeval.DummyEvalPlanner{} + evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{} rng, _ := randutil.NewTestRand() getValidSupportedCast := func() (from, to *types.T) { diff --git a/pkg/sql/colexec/colexecbase/main_test.go b/pkg/sql/colexec/colexecbase/main_test.go index 61be63cc423c..e44b9d36ffe8 100644 --- a/pkg/sql/colexec/colexecbase/main_test.go +++ b/pkg/sql/colexec/colexecbase/main_test.go @@ -58,6 +58,7 @@ func TestMain(m *testing.M) { testMemAcc = &memAcc evalCtx := eval.MakeTestingEvalContext(st) evalCtx.Planner = &faketreeeval.DummyEvalPlanner{} + evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{} testColumnFactory = coldataext.NewExtendedColumnFactory(&evalCtx) testAllocator = colmem.NewAllocator(ctx, testMemAcc, testColumnFactory) defer testMemAcc.Close(ctx) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 93daf681086a..f35f09d7731c 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2763,6 +2763,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo *evalCtx = extendedEvalContext{ Context: eval.Context{ Planner: p, + StreamManagerFactory: p, PrivilegedAccessor: p, SessionAccessor: p, JobExecContext: p, diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 817a88703e88..0e0d49ded9c1 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -356,6 +356,7 @@ func (ds *ServerImpl) setupFlow( Locality: ds.ServerConfig.Locality, Tracer: ds.ServerConfig.Tracer, Planner: &faketreeeval.DummyEvalPlanner{Monitor: monitor}, + StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{}, PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, SessionAccessor: &faketreeeval.DummySessionAccessor{}, ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{}, diff --git a/pkg/sql/faketreeeval/evalctx.go b/pkg/sql/faketreeeval/evalctx.go index d98a45bb6f83..aa84ef39ceb6 100644 --- a/pkg/sql/faketreeeval/evalctx.go +++ b/pkg/sql/faketreeeval/evalctx.go @@ -156,6 +156,24 @@ func (so *DummyRegionOperator) ResetMultiRegionZoneConfigsForDatabase( return errors.WithStack(errRegionOperator) } +// DummyStreamManagerFactory implements the eval.StreamManagerFactory interface by +// returning errors. +type DummyStreamManagerFactory struct{} + +// GetReplicationStreamManager implements the eval.StreamManagerFactory interface. +func (smf *DummyStreamManagerFactory) GetReplicationStreamManager( + ctx context.Context, +) (eval.ReplicationStreamManager, error) { + return nil, errors.WithStack(errors.New("Stream manager factory not implemented")) +} + +// GetStreamIngestManager implements the eval.StreamManagerFactory interface. +func (smf *DummyStreamManagerFactory) GetStreamIngestManager( + ctx context.Context, +) (eval.StreamIngestManager, error) { + return nil, errors.WithStack(errors.New("Stream manager factory not implemented")) +} + // DummyEvalPlanner implements the eval.Planner interface by returning // errors. type DummyEvalPlanner struct { diff --git a/pkg/sql/gcjob/BUILD.bazel b/pkg/sql/gcjob/BUILD.bazel index 3f52e41061f2..e0d6e02a105c 100644 --- a/pkg/sql/gcjob/BUILD.bazel +++ b/pkg/sql/gcjob/BUILD.bazel @@ -39,6 +39,7 @@ go_library( "//pkg/sql/pgwire/pgerror", "//pkg/sql/sem/tree", "//pkg/sql/sqlerrors", + "//pkg/sql/sqlutil", "//pkg/storage", "//pkg/util/admission/admissionpb", "//pkg/util/hlc", diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index b4f29361de5a..ae6000d86593 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -410,8 +411,8 @@ func isTenantProtected( isProtected := false ptsProvider := execCfg.ProtectedTimestampProvider - if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ptsState, err := ptsProvider.GetState(ctx, txn) + if err := execCfg.InternalExecutorFactory.TxnWithExecutor(ctx, execCfg.DB, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + ptsState, err := ptsProvider.GetState(ctx, txn, ie) if err != nil { return errors.Wrap(err, "failed to get protectedts State") } diff --git a/pkg/sql/opt/memo/memo_test.go b/pkg/sql/opt/memo/memo_test.go index 2d577d74c012..3b5cb8bf5b42 100644 --- a/pkg/sql/opt/memo/memo_test.go +++ b/pkg/sql/opt/memo/memo_test.go @@ -152,6 +152,7 @@ func TestMemoIsStale(t *testing.T) { // which can handle a case of nil planner but cannot a case when the // planner's GetMultiregionConfig is nil, so we nil out the planner. evalCtx.Planner = nil + evalCtx.StreamManagerFactory = nil var o xform.Optimizer opttestutils.BuildQuery(t, &o, catalog, &evalCtx, "SELECT a, b+1 FROM abcview WHERE c='foo'") diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 3e8d8964ad0f..89a1632ece9e 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -380,6 +381,7 @@ func newInternalPlanner( p.extendedEvalCtx = internalExtendedEvalCtx(ctx, sds, params.collection, txn, ts, ts, execCfg) p.extendedEvalCtx.Planner = p + p.extendedEvalCtx.StreamManagerFactory = p p.extendedEvalCtx.PrivilegedAccessor = p p.extendedEvalCtx.SessionAccessor = p p.extendedEvalCtx.ClientNoticeSender = p @@ -828,3 +830,15 @@ func (p *planner) resetPlanner( p.skipDescriptorCache = false p.typeResolutionDbID = descpb.InvalidID } + +// GetReplicationStreamManager returns a ReplicationStreamManager. +func (p *planner) GetReplicationStreamManager( + ctx context.Context, +) (eval.ReplicationStreamManager, error) { + return streaming.GetReplicationStreamManager(ctx, p.EvalContext(), p.Txn()) +} + +// GetStreamIngestManager returns a StreamIngestManager. +func (p *planner) GetStreamIngestManager(ctx context.Context) (eval.StreamIngestManager, error) { + return streaming.GetStreamIngestManager(ctx, p.EvalContext(), p.Txn()) +} diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 51c8531be7e0..5863cc4b82cb 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2501,22 +2501,23 @@ func createSchemaChangeEvalCtx( ExecCfg: execCfg, Descs: descriptors, Context: eval.Context{ - SessionDataStack: sessiondata.NewStack(sd), - Planner: &faketreeeval.DummyEvalPlanner{}, - PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, - SessionAccessor: &faketreeeval.DummySessionAccessor{}, - ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{}, - Sequence: &faketreeeval.DummySequenceOperators{}, - Tenant: &faketreeeval.DummyTenantOperator{}, - Regions: &faketreeeval.DummyRegionOperator{}, - Settings: execCfg.Settings, - TestingKnobs: execCfg.EvalContextTestingKnobs, - ClusterID: execCfg.NodeInfo.LogicalClusterID(), - ClusterName: execCfg.RPCContext.ClusterName(), - NodeID: execCfg.NodeInfo.NodeID, - Codec: execCfg.Codec, - Locality: execCfg.Locality, - Tracer: execCfg.AmbientCtx.Tracer, + SessionDataStack: sessiondata.NewStack(sd), + Planner: &faketreeeval.DummyEvalPlanner{}, + StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{}, + PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, + SessionAccessor: &faketreeeval.DummySessionAccessor{}, + ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{}, + Sequence: &faketreeeval.DummySequenceOperators{}, + Tenant: &faketreeeval.DummyTenantOperator{}, + Regions: &faketreeeval.DummyRegionOperator{}, + Settings: execCfg.Settings, + TestingKnobs: execCfg.EvalContextTestingKnobs, + ClusterID: execCfg.NodeInfo.LogicalClusterID(), + ClusterName: execCfg.RPCContext.ClusterName(), + NodeID: execCfg.NodeInfo.NodeID, + Codec: execCfg.Codec, + Locality: execCfg.Locality, + Tracer: execCfg.AmbientCtx.Tracer, }, } // TODO(andrei): This is wrong (just like on the main code path on diff --git a/pkg/sql/schemachanger/scbuild/tree_context_builder.go b/pkg/sql/schemachanger/scbuild/tree_context_builder.go index 769110d4f742..8cf9cdafd886 100644 --- a/pkg/sql/schemachanger/scbuild/tree_context_builder.go +++ b/pkg/sql/schemachanger/scbuild/tree_context_builder.go @@ -46,17 +46,18 @@ func (b buildCtx) EvalCtx() *eval.Context { func newEvalCtx(ctx context.Context, d Dependencies) *eval.Context { evalCtx := &eval.Context{ - ClusterID: d.ClusterID(), - SessionDataStack: sessiondata.NewStack(d.SessionData()), - Planner: &faketreeeval.DummyEvalPlanner{}, - PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, - SessionAccessor: &faketreeeval.DummySessionAccessor{}, - ClientNoticeSender: d.ClientNoticeSender(), - Sequence: &faketreeeval.DummySequenceOperators{}, - Tenant: &faketreeeval.DummyTenantOperator{}, - Regions: &faketreeeval.DummyRegionOperator{}, - Settings: d.ClusterSettings(), - Codec: d.Codec(), + ClusterID: d.ClusterID(), + SessionDataStack: sessiondata.NewStack(d.SessionData()), + Planner: &faketreeeval.DummyEvalPlanner{}, + StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{}, + PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, + SessionAccessor: &faketreeeval.DummySessionAccessor{}, + ClientNoticeSender: d.ClientNoticeSender(), + Sequence: &faketreeeval.DummySequenceOperators{}, + Tenant: &faketreeeval.DummyTenantOperator{}, + Regions: &faketreeeval.DummyRegionOperator{}, + Settings: d.ClusterSettings(), + Codec: d.Codec(), } evalCtx.SetDeprecatedContext(ctx) return evalCtx diff --git a/pkg/sql/sem/builtins/BUILD.bazel b/pkg/sql/sem/builtins/BUILD.bazel index 2c19204b3464..e864ee6822ee 100644 --- a/pkg/sql/sem/builtins/BUILD.bazel +++ b/pkg/sql/sem/builtins/BUILD.bazel @@ -30,6 +30,7 @@ go_library( deps = [ "//pkg/base", "//pkg/build", + "//pkg/ccl/streamingccl/streampb", "//pkg/clusterversion", "//pkg/config/zonepb", "//pkg/geo", @@ -86,7 +87,6 @@ go_library( "//pkg/sql/storageparam", "//pkg/sql/storageparam/indexstorageparam", "//pkg/sql/types", - "//pkg/streaming", "//pkg/util", "//pkg/util/arith", "//pkg/util/bitarray", diff --git a/pkg/sql/sem/builtins/replication_builtins.go b/pkg/sql/sem/builtins/replication_builtins.go index f01a2a99b42f..346d2b0596d8 100644 --- a/pkg/sql/sem/builtins/replication_builtins.go +++ b/pkg/sql/sem/builtins/replication_builtins.go @@ -14,13 +14,13 @@ import ( "context" gojson "encoding/json" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" @@ -50,7 +50,7 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Int), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetStreamIngestManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetStreamIngestManager(ctx) if err != nil { return nil, err } @@ -58,7 +58,7 @@ var replicationBuiltins = map[string]builtinDefinition{ ingestionJobID := jobspb.JobID(*args[0].(*tree.DInt)) cutoverTime := args[1].(*tree.DTimestampTZ).Time cutoverTimestamp := hlc.Timestamp{WallTime: cutoverTime.UnixNano()} - err = mgr.CompleteStreamIngestion(ctx, evalCtx, evalCtx.Txn, ingestionJobID, cutoverTimestamp) + err = mgr.CompleteStreamIngestion(ctx, ingestionJobID, cutoverTimestamp) if err != nil { return nil, err } @@ -91,12 +91,12 @@ var replicationBuiltins = map[string]builtinDefinition{ if args[0] == tree.DNull { return tree.DNull, errors.New("job_id cannot be specified with null argument") } - mgr, err := streaming.GetStreamIngestManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetStreamIngestManager(ctx) if err != nil { return nil, err } ingestionJobID := int64(tree.MustBeDInt(args[0])) - stats, err := mgr.GetStreamIngestionStats(ctx, evalCtx, evalCtx.Txn, jobspb.JobID(ingestionJobID)) + stats, err := mgr.GetStreamIngestionStats(ctx, jobspb.JobID(ingestionJobID)) if err != nil { return nil, err } @@ -131,12 +131,12 @@ var replicationBuiltins = map[string]builtinDefinition{ if args[0] == tree.DNull { return tree.DNull, errors.New("job_id cannot be specified with null argument") } - mgr, err := streaming.GetStreamIngestManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetStreamIngestManager(ctx) if err != nil { return nil, err } ingestionJobID := int64(tree.MustBeDInt(args[0])) - stats, err := mgr.GetStreamIngestionStats(ctx, evalCtx, evalCtx.Txn, jobspb.JobID(ingestionJobID)) + stats, err := mgr.GetStreamIngestionStats(ctx, jobspb.JobID(ingestionJobID)) if err != nil { return nil, err } @@ -164,7 +164,7 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Int), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } @@ -172,7 +172,7 @@ var replicationBuiltins = map[string]builtinDefinition{ if err != nil { return nil, err } - jobID, err := mgr.StartReplicationStream(ctx, evalCtx, evalCtx.Txn, uint64(tenantID)) + jobID, err := mgr.StartReplicationStream(ctx, uint64(tenantID)) if err != nil { return nil, err } @@ -201,7 +201,7 @@ var replicationBuiltins = map[string]builtinDefinition{ if args[0] == tree.DNull || args[1] == tree.DNull { return tree.DNull, errors.New("stream_id or frontier_ts cannot be specified with null argument") } - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } @@ -209,8 +209,8 @@ var replicationBuiltins = map[string]builtinDefinition{ if err != nil { return nil, err } - streamID := streaming.StreamID(int(tree.MustBeDInt(args[0]))) - sps, err := mgr.HeartbeatReplicationStream(ctx, evalCtx, streamID, frontier, evalCtx.Txn) + streamID := streampb.StreamID(int(tree.MustBeDInt(args[0]))) + sps, err := mgr.HeartbeatReplicationStream(ctx, streamID, frontier) if err != nil { return nil, err } @@ -243,13 +243,12 @@ var replicationBuiltins = map[string]builtinDefinition{ []string{"stream_event"}, ), func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (eval.ValueGenerator, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } return mgr.StreamPartition( - evalCtx, - streaming.StreamID(tree.MustBeDInt(args[0])), + streampb.StreamID(tree.MustBeDInt(args[0])), []byte(tree.MustBeDBytes(args[1])), ) }, @@ -269,13 +268,13 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Bytes), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } streamID := int64(tree.MustBeDInt(args[0])) - spec, err := mgr.GetReplicationStreamSpec(ctx, evalCtx, evalCtx.Txn, streaming.StreamID(streamID)) + spec, err := mgr.GetReplicationStreamSpec(ctx, streampb.StreamID(streamID)) if err != nil { return nil, err } @@ -304,7 +303,7 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Int), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } @@ -312,7 +311,7 @@ var replicationBuiltins = map[string]builtinDefinition{ streamID := int64(tree.MustBeDInt(args[0])) successfulIngestion := bool(tree.MustBeDBool(args[1])) if err := mgr.CompleteReplicationStream( - ctx, evalCtx, evalCtx.Txn, streaming.StreamID(streamID), successfulIngestion, + ctx, streampb.StreamID(streamID), successfulIngestion, ); err != nil { return nil, err } diff --git a/pkg/sql/sem/eval/BUILD.bazel b/pkg/sql/sem/eval/BUILD.bazel index baf6d6676a1e..02e5413c29de 100644 --- a/pkg/sql/sem/eval/BUILD.bazel +++ b/pkg/sql/sem/eval/BUILD.bazel @@ -37,9 +37,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/ccl/streamingccl/streampb", "//pkg/clusterversion", "//pkg/geo", "//pkg/geo/geopb", + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/kvserverbase", diff --git a/pkg/sql/sem/eval/cast_map_test.go b/pkg/sql/sem/eval/cast_map_test.go index ebf1f92c8569..f456070a1fb0 100644 --- a/pkg/sql/sem/eval/cast_map_test.go +++ b/pkg/sql/sem/eval/cast_map_test.go @@ -39,6 +39,7 @@ func TestCastMap(t *testing.T) { evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) rng, _ := randutil.NewTestRand() evalCtx.Planner = &faketreeeval.DummyEvalPlanner{} + evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{} cast.ForEachCast(func(src, tgt oid.Oid, _ cast.Context, _ cast.ContextOrigin, _ volatility.V) { srcType := types.OidToType[src] diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index e45486b733ea..2439dc0ff821 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -17,6 +17,8 @@ import ( "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -138,6 +140,8 @@ type Context struct { Planner Planner + StreamManagerFactory StreamManagerFactory + // Not using sql.JobExecContext type to avoid cycle dependency with sql package JobExecContext interface{} @@ -345,6 +349,7 @@ func MakeTestingEvalContextWithMon(st *cluster.Settings, monitor *mon.BytesMonit monitor.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) ctx.TestingMon = monitor ctx.Planner = &fakePlannerWithMonitor{monitor: monitor} + ctx.StreamManagerFactory = &fakeStreamManagerFactory{} ctx.deprecatedContext = context.TODO() now := timeutil.Now() ctx.SetTxnTimestamp(now) @@ -362,6 +367,10 @@ func (p *fakePlannerWithMonitor) Mon() *mon.BytesMonitor { return p.monitor } +type fakeStreamManagerFactory struct { + StreamManagerFactory +} + // SessionData returns the SessionData the current EvalCtx should use to eval. func (ec *Context) SessionData() *sessiondata.SessionData { if ec.SessionDataStack == nil { @@ -682,3 +691,69 @@ func UnwrapDatum(ctx context.Context, evalCtx *Context, d tree.Datum) tree.Datum } return d } + +// StreamManagerFactory stores methods that return the streaming managers. +type StreamManagerFactory interface { + GetReplicationStreamManager(ctx context.Context) (ReplicationStreamManager, error) + GetStreamIngestManager(ctx context.Context) (StreamIngestManager, error) +} + +// ReplicationStreamManager represents a collection of APIs that streaming replication supports +// on the production side. +type ReplicationStreamManager interface { + // StartReplicationStream starts a stream replication job for the specified tenant on the producer side. + StartReplicationStream( + ctx context.Context, + tenantID uint64, + ) (streampb.StreamID, error) + + // HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating + // consumer has consumed until the given 'frontier' timestamp. This updates the producer job + // progress and extends its life, and the new producer progress will be returned. + // If 'frontier' is hlc.MaxTimestamp, returns the producer progress without updating it. + HeartbeatReplicationStream( + ctx context.Context, + streamID streampb.StreamID, + frontier hlc.Timestamp, + ) (streampb.StreamReplicationStatus, error) + + // StreamPartition starts streaming replication on the producer side for the partition specified + // by opaqueSpec which contains serialized streampb.StreamPartitionSpec protocol message and + // returns a value generator which yields events for the specified partition. + StreamPartition( + streamID streampb.StreamID, + opaqueSpec []byte, + ) (ValueGenerator, error) + + // GetReplicationStreamSpec gets a stream replication spec on the producer side. + GetReplicationStreamSpec( + ctx context.Context, + streamID streampb.StreamID, + ) (*streampb.ReplicationStreamSpec, error) + + // CompleteReplicationStream completes a replication stream job on the producer side. + // 'successfulIngestion' indicates whether the stream ingestion finished successfully and + // determines the fate of the producer job, succeeded or canceled. + CompleteReplicationStream( + ctx context.Context, + streamID streampb.StreamID, + successfulIngestion bool, + ) error +} + +// StreamIngestManager represents a collection of APIs that streaming replication supports +// on the ingestion side. +type StreamIngestManager interface { + // CompleteStreamIngestion signals a running stream ingestion job to complete on the consumer side. + CompleteStreamIngestion( + ctx context.Context, + ingestionJobID jobspb.JobID, + cutoverTimestamp hlc.Timestamp, + ) error + + // GetStreamIngestionStats gets a statistics summary for a stream ingestion job. + GetStreamIngestionStats( + ctx context.Context, + ingestionJobID jobspb.JobID, + ) (*streampb.StreamIngestionStats, error) +} diff --git a/pkg/streaming/BUILD.bazel b/pkg/streaming/BUILD.bazel index 0edf9069f018..e1ce04c319c8 100644 --- a/pkg/streaming/BUILD.bazel +++ b/pkg/streaming/BUILD.bazel @@ -8,10 +8,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ccl/streamingccl/streampb", - "//pkg/jobs/jobspb", "//pkg/kv", "//pkg/sql/sem/eval", - "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/streaming/api.go b/pkg/streaming/api.go index 2d96bbb468f4..2d70e4324c12 100644 --- a/pkg/streaming/api.go +++ b/pkg/streaming/api.go @@ -14,118 +14,38 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) // StreamID is the ID of a replication stream. -type StreamID int64 - -// SafeValue implements the redact.SafeValue interface. -func (j StreamID) SafeValue() {} - -// InvalidStreamID is the zero value for StreamID corresponding to no stream. -const InvalidStreamID StreamID = 0 +type StreamID = streampb.StreamID // GetReplicationStreamManagerHook is the hook to get access to the producer side replication APIs. // Used by builtin functions to trigger streaming replication. -var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context) (ReplicationStreamManager, error) +var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.ReplicationStreamManager, error) // GetStreamIngestManagerHook is the hook to get access to the ingestion side replication APIs. // Used by builtin functions to trigger streaming replication. -var GetStreamIngestManagerHook func(ctx context.Context, evalCtx *eval.Context) (StreamIngestManager, error) - -// ReplicationStreamManager represents a collection of APIs that streaming replication supports -// on the production side. -type ReplicationStreamManager interface { - // StartReplicationStream starts a stream replication job for the specified tenant on the producer side. - StartReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - tenantID uint64, - ) (StreamID, error) - - // HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating - // consumer has consumed until the given 'frontier' timestamp. This updates the producer job - // progress and extends its life, and the new producer progress will be returned. - // If 'frontier' is hlc.MaxTimestamp, returns the producer progress without updating it. - HeartbeatReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - streamID StreamID, - frontier hlc.Timestamp, - txn *kv.Txn) (streampb.StreamReplicationStatus, error) - - // StreamPartition starts streaming replication on the producer side for the partition specified - // by opaqueSpec which contains serialized streampb.StreamPartitionSpec protocol message and - // returns a value generator which yields events for the specified partition. - StreamPartition( - evalCtx *eval.Context, - streamID StreamID, - opaqueSpec []byte, - ) (eval.ValueGenerator, error) - - // GetReplicationStreamSpec gets a stream replication spec on the producer side. - GetReplicationStreamSpec( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - streamID StreamID, - ) (*streampb.ReplicationStreamSpec, error) - - // CompleteReplicationStream completes a replication stream job on the producer side. - // 'successfulIngestion' indicates whether the stream ingestion finished successfully and - // determines the fate of the producer job, succeeded or canceled. - CompleteReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - streamID StreamID, - successfulIngestion bool, - ) error -} - -// StreamIngestManager represents a collection of APIs that streaming replication supports -// on the ingestion side. -type StreamIngestManager interface { - // CompleteStreamIngestion signals a running stream ingestion job to complete on the consumer side. - CompleteStreamIngestion( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - ingestionJobID jobspb.JobID, - cutoverTimestamp hlc.Timestamp, - ) error - - // GetStreamIngestionStats gets a statistics summary for a stream ingestion job. - GetStreamIngestionStats( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - ingestionJobID jobspb.JobID, - ) (*streampb.StreamIngestionStats, error) -} +var GetStreamIngestManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.StreamIngestManager, error) // GetReplicationStreamManager returns a ReplicationStreamManager if a CCL binary is loaded. func GetReplicationStreamManager( - ctx context.Context, evalCtx *eval.Context, -) (ReplicationStreamManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.ReplicationStreamManager, error) { if GetReplicationStreamManagerHook == nil { return nil, errors.New("replication streaming requires a CCL binary") } - return GetReplicationStreamManagerHook(ctx, evalCtx) + return GetReplicationStreamManagerHook(ctx, evalCtx, txn) } // GetStreamIngestManager returns a StreamIngestManager if a CCL binary is loaded. func GetStreamIngestManager( - ctx context.Context, evalCtx *eval.Context, -) (StreamIngestManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.StreamIngestManager, error) { if GetReplicationStreamManagerHook == nil { return nil, errors.New("replication streaming requires a CCL binary") } - return GetStreamIngestManagerHook(ctx, evalCtx) + return GetStreamIngestManagerHook(ctx, evalCtx, txn) }