diff --git a/docs/generated/redact_safe.md b/docs/generated/redact_safe.md index 6f0aa1e545a1..d65990bd80ef 100644 --- a/docs/generated/redact_safe.md +++ b/docs/generated/redact_safe.md @@ -7,6 +7,7 @@ pkg/base/node_id.go | `*SQLIDContainer` pkg/base/node_id.go | `*StoreIDContainer` pkg/ccl/backupccl/backuppb/backup.go | `sz` pkg/ccl/backupccl/backuppb/backup.go | `timing` +pkg/ccl/streamingccl/streampb/streamid.go | `StreamID` pkg/cli/exit/exit.go | `Code` pkg/jobs/jobspb/wrap.go | `Type` pkg/kv/bulk/bulk_metrics.go | `sz` @@ -51,7 +52,6 @@ pkg/storage/enginepb/mvcc.go | `TxnEpoch` pkg/storage/enginepb/mvcc.go | `TxnSeq` pkg/storage/enginepb/mvcc3.go | `*MVCCStats` pkg/storage/enginepb/mvcc3.go | `MVCCStatsDelta` -pkg/streaming/api.go | `StreamID` pkg/util/hlc/timestamp.go | `ClockTimestamp` pkg/util/hlc/timestamp.go | `LegacyTimestamp` pkg/util/hlc/timestamp.go | `Timestamp` diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index 7337b8943c4e..44cddf7e9ddc 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -75,7 +75,7 @@ func (p *partitionedStreamClient) Create( row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64()) err := row.Scan(&streamID) if err != nil { - return streaming.InvalidStreamID, + return streampb.InvalidStreamID, errors.Wrapf(err, "error creating replication stream for tenant %s", tenantID.String()) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go index 9ed03a234d4d..db7bdcc457c0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go @@ -23,29 +23,28 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) -type streamIngestManagerImpl struct{} +type streamIngestManagerImpl struct { + evalCtx *eval.Context + txn *kv.Txn +} // CompleteStreamIngestion implements streaming.StreamIngestManager interface. func (r *streamIngestManagerImpl) CompleteStreamIngestion( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - ingestionJobID jobspb.JobID, - cutoverTimestamp hlc.Timestamp, + ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp, ) error { - return completeStreamIngestion(ctx, evalCtx, txn, ingestionJobID, cutoverTimestamp) + return completeStreamIngestion(ctx, r.evalCtx, r.txn, ingestionJobID, cutoverTimestamp) } // GetStreamIngestionStats implements streaming.StreamIngestManager interface. func (r *streamIngestManagerImpl) GetStreamIngestionStats( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID, + ctx context.Context, ingestionJobID jobspb.JobID, ) (*streampb.StreamIngestionStats, error) { - return getStreamIngestionStats(ctx, evalCtx, txn, ingestionJobID) + return getStreamIngestionStats(ctx, r.evalCtx, r.txn, ingestionJobID) } func newStreamIngestManagerWithPrivilegesCheck( - ctx context.Context, evalCtx *eval.Context, -) (streaming.StreamIngestManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.StreamIngestManager, error) { isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx) if err != nil { return nil, err @@ -64,7 +63,7 @@ func newStreamIngestManagerWithPrivilegesCheck( pgcode.InsufficientPrivilege, "replication requires enterprise license") } - return &streamIngestManagerImpl{}, nil + return &streamIngestManagerImpl{evalCtx: evalCtx, txn: txn}, nil } func init() { diff --git a/pkg/ccl/streamingccl/streampb/BUILD.bazel b/pkg/ccl/streamingccl/streampb/BUILD.bazel index fcf080739dc9..7ed7afceeada 100644 --- a/pkg/ccl/streamingccl/streampb/BUILD.bazel +++ b/pkg/ccl/streamingccl/streampb/BUILD.bazel @@ -35,7 +35,10 @@ go_proto_library( go_library( name = "streampb", - srcs = ["empty.go"], + srcs = [ + "empty.go", + "streamid.go", + ], embed = [":streampb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb", visibility = ["//visibility:public"], diff --git a/pkg/ccl/streamingccl/streampb/streamid.go b/pkg/ccl/streamingccl/streampb/streamid.go new file mode 100644 index 000000000000..4521d2b7400d --- /dev/null +++ b/pkg/ccl/streamingccl/streampb/streamid.go @@ -0,0 +1,18 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streampb + +// StreamID is the ID of a replication stream. +type StreamID int64 + +// SafeValue implements the redact.SafeValue interface. +func (j StreamID) SafeValue() {} + +// InvalidStreamID is the zero value for StreamID corresponding to no stream. +const InvalidStreamID StreamID = 0 diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index a6aaf392b069..6d0086cfe73a 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -38,7 +37,7 @@ import ( ) type eventStream struct { - streamID streaming.StreamID + streamID streampb.StreamID execCfg *sql.ExecutorConfig spec streampb.StreamPartitionSpec subscribedSpans roachpb.SpanGroup @@ -515,7 +514,7 @@ func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) { } func streamPartition( - evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte, + evalCtx *eval.Context, streamID streampb.StreamID, opaqueSpec []byte, ) (eval.ValueGenerator, error) { if !evalCtx.SessionData().AvoidBuffering { return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option") diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager.go b/pkg/ccl/streamingccl/streamproducer/replication_manager.go index d7f156eb39b7..be8f04a7cfb8 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager.go @@ -22,54 +22,49 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) -type replicationStreamManagerImpl struct{} +type replicationStreamManagerImpl struct { + evalCtx *eval.Context + txn *kv.Txn +} // StartReplicationStream implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) StartReplicationStream( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64, -) (streaming.StreamID, error) { - return startReplicationStreamJob(ctx, evalCtx, txn, tenantID) + ctx context.Context, tenantID uint64, +) (streampb.StreamID, error) { + return startReplicationStreamJob(ctx, r.evalCtx, r.txn, tenantID) } // HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) HeartbeatReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - streamID streaming.StreamID, - frontier hlc.Timestamp, - txn *kv.Txn, + ctx context.Context, streamID streampb.StreamID, frontier hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { - return heartbeatReplicationStream(ctx, evalCtx, streamID, frontier, txn) + return heartbeatReplicationStream(ctx, r.evalCtx, r.txn, streamID, frontier) } // StreamPartition implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) StreamPartition( - evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte, + streamID streampb.StreamID, opaqueSpec []byte, ) (eval.ValueGenerator, error) { - return streamPartition(evalCtx, streamID, opaqueSpec) + return streamPartition(r.evalCtx, streamID, opaqueSpec) } // GetReplicationStreamSpec implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) GetReplicationStreamSpec( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, + ctx context.Context, streamID streampb.StreamID, ) (*streampb.ReplicationStreamSpec, error) { - return getReplicationStreamSpec(ctx, evalCtx, txn, streamID) + return getReplicationStreamSpec(ctx, r.evalCtx, streamID) } // CompleteReplicationStream implements ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) CompleteReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - streamID streaming.StreamID, - successfulIngestion bool, + ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, ) error { - return completeReplicationStream(ctx, evalCtx, txn, streamID, successfulIngestion) + return completeReplicationStream(ctx, r.evalCtx, r.txn, streamID, successfulIngestion) } func newReplicationStreamManagerWithPrivilegesCheck( - ctx context.Context, evalCtx *eval.Context, -) (streaming.ReplicationStreamManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.ReplicationStreamManager, error) { isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx) if err != nil { return nil, err @@ -81,6 +76,7 @@ func newReplicationStreamManagerWithPrivilegesCheck( } execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) + enterpriseCheckErr := utilccl.CheckEnterpriseEnabled( execCfg.Settings, execCfg.NodeInfo.LogicalClusterID(), execCfg.Organization(), "REPLICATION") if enterpriseCheckErr != nil { @@ -88,7 +84,7 @@ func newReplicationStreamManagerWithPrivilegesCheck( pgcode.InsufficientPrivilege, "replication requires enterprise license") } - return &replicationStreamManagerImpl{}, nil + return &replicationStreamManagerImpl{evalCtx: evalCtx, txn: txn}, nil } func init() { diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go b/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go index d893ed2e0dde..2090ae5cdc1c 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -44,7 +43,7 @@ func TestReplicationManagerRequiresAdminRole(t *testing.T) { require.NoError(t, protoutil.Unmarshal(sessionSerialized, &sessionData)) } - getManagerForUser := func(u string) (streaming.ReplicationStreamManager, error) { + getManagerForUser := func(u string) (eval.ReplicationStreamManager, error) { sqlUser, err := username.MakeSQLUsernameFromUserInput(u, username.PurposeValidation) require.NoError(t, err) execCfg := s.ExecutorConfig().(sql.ExecutorConfig) @@ -52,7 +51,7 @@ func TestReplicationManagerRequiresAdminRole(t *testing.T) { p, cleanup := sql.NewInternalPlanner("test", txn, sqlUser, &sql.MemoryMetrics{}, &execCfg, sessionData) defer cleanup() ec := p.(interface{ EvalContext() *eval.Context }).EvalContext() - return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec) + return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec, txn) } for _, tc := range []struct { diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 357f9b9253cb..652de91ec09f 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -36,16 +35,16 @@ import ( // 2. TODO(casper): Updates the protected timestamp for spans being replicated func startReplicationStreamJob( ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64, -) (streaming.StreamID, error) { +) (streampb.StreamID, error) { execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) hasAdminRole, err := evalCtx.SessionAccessor.HasAdminRole(ctx) if err != nil { - return streaming.InvalidStreamID, err + return streampb.InvalidStreamID, err } if !hasAdminRole { - return streaming.InvalidStreamID, errors.New("admin role required to start stream replication jobs") + return streampb.InvalidStreamID, errors.New("admin role required to start stream replication jobs") } registry := execConfig.JobRegistry @@ -53,7 +52,7 @@ func startReplicationStreamJob( ptsID := uuid.MakeV4() jr := makeProducerJobRecord(registry, tenantID, timeout, evalCtx.SessionData().User(), ptsID) if _, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil { - return streaming.InvalidStreamID, err + return streampb.InvalidStreamID, err } ptp := execConfig.ProtectedTimestampProvider @@ -68,9 +67,9 @@ func startReplicationStreamJob( deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect) if err := ptp.Protect(ctx, txn, pts); err != nil { - return streaming.InvalidStreamID, err + return streampb.InvalidStreamID, err } - return streaming.StreamID(jr.JobID), nil + return streampb.StreamID(jr.JobID), nil } // Convert the producer job's status into corresponding replication @@ -99,7 +98,7 @@ func updateReplicationStreamProgress( expiration time.Time, ptsProvider protectedts.Provider, registry *jobs.Registry, - streamID streaming.StreamID, + streamID streampb.StreamID, consumedTime hlc.Timestamp, txn *kv.Txn, ) (status streampb.StreamReplicationStatus, err error) { @@ -157,9 +156,9 @@ func updateReplicationStreamProgress( func heartbeatReplicationStream( ctx context.Context, evalCtx *eval.Context, - streamID streaming.StreamID, - frontier hlc.Timestamp, txn *kv.Txn, + streamID streampb.StreamID, + frontier hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV) @@ -198,7 +197,7 @@ func heartbeatReplicationStream( // getReplicationStreamSpec gets a replication stream specification for the specified stream. func getReplicationStreamSpec( - ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, + ctx context.Context, evalCtx *eval.Context, streamID streampb.StreamID, ) (*streampb.ReplicationStreamSpec, error) { jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext) // Returns error if the replication stream is not active @@ -257,7 +256,7 @@ func completeReplicationStream( ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, - streamID streaming.StreamID, + streamID streampb.StreamID, successfulIngestion bool, ) error { registry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index e82848c50e35..f4591543ec01 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -456,6 +456,7 @@ go_library( "//pkg/sql/vtable", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/streaming", "//pkg/testutils/serverutils", "//pkg/upgrade", "//pkg/util", diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index 3b160e12936c..d88e9bd5f2e4 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -329,6 +329,7 @@ func runPlanInsidePlan( ctx, evalCtx, &plannerCopy, params.p.txn, distributeType) planCtx.planner.curPlan.planComponents = *plan planCtx.ExtendedEvalCtx.Planner = &plannerCopy + planCtx.ExtendedEvalCtx.StreamManagerFactory = &plannerCopy planCtx.stmtType = recv.stmtType params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun( diff --git a/pkg/sql/colexec/colexecbase/cast_test.go b/pkg/sql/colexec/colexecbase/cast_test.go index 85f1448ad660..deda61f49bd6 100644 --- a/pkg/sql/colexec/colexecbase/cast_test.go +++ b/pkg/sql/colexec/colexecbase/cast_test.go @@ -41,6 +41,7 @@ func TestRandomizedCast(t *testing.T) { evalCtx := eval.MakeTestingEvalContext(st) defer evalCtx.Stop(ctx) evalCtx.Planner = &faketreeeval.DummyEvalPlanner{} + evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{} rng, _ := randutil.NewTestRand() getValidSupportedCast := func() (from, to *types.T) { diff --git a/pkg/sql/colexec/colexecbase/main_test.go b/pkg/sql/colexec/colexecbase/main_test.go index 61be63cc423c..e44b9d36ffe8 100644 --- a/pkg/sql/colexec/colexecbase/main_test.go +++ b/pkg/sql/colexec/colexecbase/main_test.go @@ -58,6 +58,7 @@ func TestMain(m *testing.M) { testMemAcc = &memAcc evalCtx := eval.MakeTestingEvalContext(st) evalCtx.Planner = &faketreeeval.DummyEvalPlanner{} + evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{} testColumnFactory = coldataext.NewExtendedColumnFactory(&evalCtx) testAllocator = colmem.NewAllocator(ctx, testMemAcc, testColumnFactory) defer testMemAcc.Close(ctx) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ea35df531a19..8cc2ce0e56eb 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2762,6 +2762,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo *evalCtx = extendedEvalContext{ Context: eval.Context{ Planner: p, + StreamManagerFactory: p, PrivilegedAccessor: p, SessionAccessor: p, JobExecContext: p, diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 817a88703e88..0e0d49ded9c1 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -356,6 +356,7 @@ func (ds *ServerImpl) setupFlow( Locality: ds.ServerConfig.Locality, Tracer: ds.ServerConfig.Tracer, Planner: &faketreeeval.DummyEvalPlanner{Monitor: monitor}, + StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{}, PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, SessionAccessor: &faketreeeval.DummySessionAccessor{}, ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{}, diff --git a/pkg/sql/faketreeeval/evalctx.go b/pkg/sql/faketreeeval/evalctx.go index d98a45bb6f83..aa84ef39ceb6 100644 --- a/pkg/sql/faketreeeval/evalctx.go +++ b/pkg/sql/faketreeeval/evalctx.go @@ -156,6 +156,24 @@ func (so *DummyRegionOperator) ResetMultiRegionZoneConfigsForDatabase( return errors.WithStack(errRegionOperator) } +// DummyStreamManagerFactory implements the eval.StreamManagerFactory interface by +// returning errors. +type DummyStreamManagerFactory struct{} + +// GetReplicationStreamManager implements the eval.StreamManagerFactory interface. +func (smf *DummyStreamManagerFactory) GetReplicationStreamManager( + ctx context.Context, +) (eval.ReplicationStreamManager, error) { + return nil, errors.WithStack(errors.New("Stream manager factory not implemented")) +} + +// GetStreamIngestManager implements the eval.StreamManagerFactory interface. +func (smf *DummyStreamManagerFactory) GetStreamIngestManager( + ctx context.Context, +) (eval.StreamIngestManager, error) { + return nil, errors.WithStack(errors.New("Stream manager factory not implemented")) +} + // DummyEvalPlanner implements the eval.Planner interface by returning // errors. type DummyEvalPlanner struct { diff --git a/pkg/sql/opt/memo/memo_test.go b/pkg/sql/opt/memo/memo_test.go index 2d577d74c012..3b5cb8bf5b42 100644 --- a/pkg/sql/opt/memo/memo_test.go +++ b/pkg/sql/opt/memo/memo_test.go @@ -152,6 +152,7 @@ func TestMemoIsStale(t *testing.T) { // which can handle a case of nil planner but cannot a case when the // planner's GetMultiregionConfig is nil, so we nil out the planner. evalCtx.Planner = nil + evalCtx.StreamManagerFactory = nil var o xform.Optimizer opttestutils.BuildQuery(t, &o, catalog, &evalCtx, "SELECT a, b+1 FROM abcview WHERE c='foo'") diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 41c2d631e38f..83586f27a10f 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -380,6 +381,7 @@ func newInternalPlanner( p.extendedEvalCtx = internalExtendedEvalCtx(ctx, sds, params.collection, txn, ts, ts, execCfg) p.extendedEvalCtx.Planner = p + p.extendedEvalCtx.StreamManagerFactory = p p.extendedEvalCtx.PrivilegedAccessor = p p.extendedEvalCtx.SessionAccessor = p p.extendedEvalCtx.ClientNoticeSender = p @@ -828,3 +830,15 @@ func (p *planner) resetPlanner( p.skipDescriptorCache = false p.typeResolutionDbID = descpb.InvalidID } + +// GetReplicationStreamManager returns a ReplicationStreamManager. +func (p *planner) GetReplicationStreamManager( + ctx context.Context, +) (eval.ReplicationStreamManager, error) { + return streaming.GetReplicationStreamManager(ctx, p.EvalContext(), p.Txn()) +} + +// GetStreamIngestManager returns a StreamIngestManager. +func (p *planner) GetStreamIngestManager(ctx context.Context) (eval.StreamIngestManager, error) { + return streaming.GetStreamIngestManager(ctx, p.EvalContext(), p.Txn()) +} diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 8251bc12c639..f04bc67b2940 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2501,22 +2501,23 @@ func createSchemaChangeEvalCtx( ExecCfg: execCfg, Descs: descriptors, Context: eval.Context{ - SessionDataStack: sessiondata.NewStack(sd), - Planner: &faketreeeval.DummyEvalPlanner{}, - PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, - SessionAccessor: &faketreeeval.DummySessionAccessor{}, - ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{}, - Sequence: &faketreeeval.DummySequenceOperators{}, - Tenant: &faketreeeval.DummyTenantOperator{}, - Regions: &faketreeeval.DummyRegionOperator{}, - Settings: execCfg.Settings, - TestingKnobs: execCfg.EvalContextTestingKnobs, - ClusterID: execCfg.NodeInfo.LogicalClusterID(), - ClusterName: execCfg.RPCContext.ClusterName(), - NodeID: execCfg.NodeInfo.NodeID, - Codec: execCfg.Codec, - Locality: execCfg.Locality, - Tracer: execCfg.AmbientCtx.Tracer, + SessionDataStack: sessiondata.NewStack(sd), + Planner: &faketreeeval.DummyEvalPlanner{}, + StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{}, + PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, + SessionAccessor: &faketreeeval.DummySessionAccessor{}, + ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{}, + Sequence: &faketreeeval.DummySequenceOperators{}, + Tenant: &faketreeeval.DummyTenantOperator{}, + Regions: &faketreeeval.DummyRegionOperator{}, + Settings: execCfg.Settings, + TestingKnobs: execCfg.EvalContextTestingKnobs, + ClusterID: execCfg.NodeInfo.LogicalClusterID(), + ClusterName: execCfg.RPCContext.ClusterName(), + NodeID: execCfg.NodeInfo.NodeID, + Codec: execCfg.Codec, + Locality: execCfg.Locality, + Tracer: execCfg.AmbientCtx.Tracer, }, } // TODO(andrei): This is wrong (just like on the main code path on diff --git a/pkg/sql/schemachanger/scbuild/tree_context_builder.go b/pkg/sql/schemachanger/scbuild/tree_context_builder.go index 769110d4f742..8cf9cdafd886 100644 --- a/pkg/sql/schemachanger/scbuild/tree_context_builder.go +++ b/pkg/sql/schemachanger/scbuild/tree_context_builder.go @@ -46,17 +46,18 @@ func (b buildCtx) EvalCtx() *eval.Context { func newEvalCtx(ctx context.Context, d Dependencies) *eval.Context { evalCtx := &eval.Context{ - ClusterID: d.ClusterID(), - SessionDataStack: sessiondata.NewStack(d.SessionData()), - Planner: &faketreeeval.DummyEvalPlanner{}, - PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, - SessionAccessor: &faketreeeval.DummySessionAccessor{}, - ClientNoticeSender: d.ClientNoticeSender(), - Sequence: &faketreeeval.DummySequenceOperators{}, - Tenant: &faketreeeval.DummyTenantOperator{}, - Regions: &faketreeeval.DummyRegionOperator{}, - Settings: d.ClusterSettings(), - Codec: d.Codec(), + ClusterID: d.ClusterID(), + SessionDataStack: sessiondata.NewStack(d.SessionData()), + Planner: &faketreeeval.DummyEvalPlanner{}, + StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{}, + PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, + SessionAccessor: &faketreeeval.DummySessionAccessor{}, + ClientNoticeSender: d.ClientNoticeSender(), + Sequence: &faketreeeval.DummySequenceOperators{}, + Tenant: &faketreeeval.DummyTenantOperator{}, + Regions: &faketreeeval.DummyRegionOperator{}, + Settings: d.ClusterSettings(), + Codec: d.Codec(), } evalCtx.SetDeprecatedContext(ctx) return evalCtx diff --git a/pkg/sql/sem/builtins/BUILD.bazel b/pkg/sql/sem/builtins/BUILD.bazel index 2c19204b3464..e864ee6822ee 100644 --- a/pkg/sql/sem/builtins/BUILD.bazel +++ b/pkg/sql/sem/builtins/BUILD.bazel @@ -30,6 +30,7 @@ go_library( deps = [ "//pkg/base", "//pkg/build", + "//pkg/ccl/streamingccl/streampb", "//pkg/clusterversion", "//pkg/config/zonepb", "//pkg/geo", @@ -86,7 +87,6 @@ go_library( "//pkg/sql/storageparam", "//pkg/sql/storageparam/indexstorageparam", "//pkg/sql/types", - "//pkg/streaming", "//pkg/util", "//pkg/util/arith", "//pkg/util/bitarray", diff --git a/pkg/sql/sem/builtins/replication_builtins.go b/pkg/sql/sem/builtins/replication_builtins.go index f01a2a99b42f..346d2b0596d8 100644 --- a/pkg/sql/sem/builtins/replication_builtins.go +++ b/pkg/sql/sem/builtins/replication_builtins.go @@ -14,13 +14,13 @@ import ( "context" gojson "encoding/json" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" @@ -50,7 +50,7 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Int), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetStreamIngestManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetStreamIngestManager(ctx) if err != nil { return nil, err } @@ -58,7 +58,7 @@ var replicationBuiltins = map[string]builtinDefinition{ ingestionJobID := jobspb.JobID(*args[0].(*tree.DInt)) cutoverTime := args[1].(*tree.DTimestampTZ).Time cutoverTimestamp := hlc.Timestamp{WallTime: cutoverTime.UnixNano()} - err = mgr.CompleteStreamIngestion(ctx, evalCtx, evalCtx.Txn, ingestionJobID, cutoverTimestamp) + err = mgr.CompleteStreamIngestion(ctx, ingestionJobID, cutoverTimestamp) if err != nil { return nil, err } @@ -91,12 +91,12 @@ var replicationBuiltins = map[string]builtinDefinition{ if args[0] == tree.DNull { return tree.DNull, errors.New("job_id cannot be specified with null argument") } - mgr, err := streaming.GetStreamIngestManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetStreamIngestManager(ctx) if err != nil { return nil, err } ingestionJobID := int64(tree.MustBeDInt(args[0])) - stats, err := mgr.GetStreamIngestionStats(ctx, evalCtx, evalCtx.Txn, jobspb.JobID(ingestionJobID)) + stats, err := mgr.GetStreamIngestionStats(ctx, jobspb.JobID(ingestionJobID)) if err != nil { return nil, err } @@ -131,12 +131,12 @@ var replicationBuiltins = map[string]builtinDefinition{ if args[0] == tree.DNull { return tree.DNull, errors.New("job_id cannot be specified with null argument") } - mgr, err := streaming.GetStreamIngestManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetStreamIngestManager(ctx) if err != nil { return nil, err } ingestionJobID := int64(tree.MustBeDInt(args[0])) - stats, err := mgr.GetStreamIngestionStats(ctx, evalCtx, evalCtx.Txn, jobspb.JobID(ingestionJobID)) + stats, err := mgr.GetStreamIngestionStats(ctx, jobspb.JobID(ingestionJobID)) if err != nil { return nil, err } @@ -164,7 +164,7 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Int), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } @@ -172,7 +172,7 @@ var replicationBuiltins = map[string]builtinDefinition{ if err != nil { return nil, err } - jobID, err := mgr.StartReplicationStream(ctx, evalCtx, evalCtx.Txn, uint64(tenantID)) + jobID, err := mgr.StartReplicationStream(ctx, uint64(tenantID)) if err != nil { return nil, err } @@ -201,7 +201,7 @@ var replicationBuiltins = map[string]builtinDefinition{ if args[0] == tree.DNull || args[1] == tree.DNull { return tree.DNull, errors.New("stream_id or frontier_ts cannot be specified with null argument") } - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } @@ -209,8 +209,8 @@ var replicationBuiltins = map[string]builtinDefinition{ if err != nil { return nil, err } - streamID := streaming.StreamID(int(tree.MustBeDInt(args[0]))) - sps, err := mgr.HeartbeatReplicationStream(ctx, evalCtx, streamID, frontier, evalCtx.Txn) + streamID := streampb.StreamID(int(tree.MustBeDInt(args[0]))) + sps, err := mgr.HeartbeatReplicationStream(ctx, streamID, frontier) if err != nil { return nil, err } @@ -243,13 +243,12 @@ var replicationBuiltins = map[string]builtinDefinition{ []string{"stream_event"}, ), func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (eval.ValueGenerator, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } return mgr.StreamPartition( - evalCtx, - streaming.StreamID(tree.MustBeDInt(args[0])), + streampb.StreamID(tree.MustBeDInt(args[0])), []byte(tree.MustBeDBytes(args[1])), ) }, @@ -269,13 +268,13 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Bytes), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } streamID := int64(tree.MustBeDInt(args[0])) - spec, err := mgr.GetReplicationStreamSpec(ctx, evalCtx, evalCtx.Txn, streaming.StreamID(streamID)) + spec, err := mgr.GetReplicationStreamSpec(ctx, streampb.StreamID(streamID)) if err != nil { return nil, err } @@ -304,7 +303,7 @@ var replicationBuiltins = map[string]builtinDefinition{ }, ReturnType: tree.FixedReturnType(types.Int), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - mgr, err := streaming.GetReplicationStreamManager(ctx, evalCtx) + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } @@ -312,7 +311,7 @@ var replicationBuiltins = map[string]builtinDefinition{ streamID := int64(tree.MustBeDInt(args[0])) successfulIngestion := bool(tree.MustBeDBool(args[1])) if err := mgr.CompleteReplicationStream( - ctx, evalCtx, evalCtx.Txn, streaming.StreamID(streamID), successfulIngestion, + ctx, streampb.StreamID(streamID), successfulIngestion, ); err != nil { return nil, err } diff --git a/pkg/sql/sem/eval/BUILD.bazel b/pkg/sql/sem/eval/BUILD.bazel index baf6d6676a1e..02e5413c29de 100644 --- a/pkg/sql/sem/eval/BUILD.bazel +++ b/pkg/sql/sem/eval/BUILD.bazel @@ -37,9 +37,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/ccl/streamingccl/streampb", "//pkg/clusterversion", "//pkg/geo", "//pkg/geo/geopb", + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/kvserverbase", diff --git a/pkg/sql/sem/eval/cast_map_test.go b/pkg/sql/sem/eval/cast_map_test.go index ebf1f92c8569..f456070a1fb0 100644 --- a/pkg/sql/sem/eval/cast_map_test.go +++ b/pkg/sql/sem/eval/cast_map_test.go @@ -39,6 +39,7 @@ func TestCastMap(t *testing.T) { evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) rng, _ := randutil.NewTestRand() evalCtx.Planner = &faketreeeval.DummyEvalPlanner{} + evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{} cast.ForEachCast(func(src, tgt oid.Oid, _ cast.Context, _ cast.ContextOrigin, _ volatility.V) { srcType := types.OidToType[src] diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index 4cbcad3c480f..fd34c2dd5c1c 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -17,6 +17,8 @@ import ( "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -138,6 +140,8 @@ type Context struct { Planner Planner + StreamManagerFactory StreamManagerFactory + // Not using sql.JobExecContext type to avoid cycle dependency with sql package JobExecContext interface{} @@ -335,6 +339,7 @@ func MakeTestingEvalContextWithMon(st *cluster.Settings, monitor *mon.BytesMonit monitor.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) ctx.TestingMon = monitor ctx.Planner = &fakePlannerWithMonitor{monitor: monitor} + ctx.StreamManagerFactory = &fakeStreamManagerFactory{} ctx.deprecatedContext = context.TODO() now := timeutil.Now() ctx.SetTxnTimestamp(now) @@ -352,6 +357,10 @@ func (p *fakePlannerWithMonitor) Mon() *mon.BytesMonitor { return p.monitor } +type fakeStreamManagerFactory struct { + StreamManagerFactory +} + // SessionData returns the SessionData the current EvalCtx should use to eval. func (ec *Context) SessionData() *sessiondata.SessionData { if ec.SessionDataStack == nil { @@ -672,3 +681,69 @@ func UnwrapDatum(ctx context.Context, evalCtx *Context, d tree.Datum) tree.Datum } return d } + +// StreamManagerFactory stores methods that return the streaming managers. +type StreamManagerFactory interface { + GetReplicationStreamManager(ctx context.Context) (ReplicationStreamManager, error) + GetStreamIngestManager(ctx context.Context) (StreamIngestManager, error) +} + +// ReplicationStreamManager represents a collection of APIs that streaming replication supports +// on the production side. +type ReplicationStreamManager interface { + // StartReplicationStream starts a stream replication job for the specified tenant on the producer side. + StartReplicationStream( + ctx context.Context, + tenantID uint64, + ) (streampb.StreamID, error) + + // HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating + // consumer has consumed until the given 'frontier' timestamp. This updates the producer job + // progress and extends its life, and the new producer progress will be returned. + // If 'frontier' is hlc.MaxTimestamp, returns the producer progress without updating it. + HeartbeatReplicationStream( + ctx context.Context, + streamID streampb.StreamID, + frontier hlc.Timestamp, + ) (streampb.StreamReplicationStatus, error) + + // StreamPartition starts streaming replication on the producer side for the partition specified + // by opaqueSpec which contains serialized streampb.StreamPartitionSpec protocol message and + // returns a value generator which yields events for the specified partition. + StreamPartition( + streamID streampb.StreamID, + opaqueSpec []byte, + ) (ValueGenerator, error) + + // GetReplicationStreamSpec gets a stream replication spec on the producer side. + GetReplicationStreamSpec( + ctx context.Context, + streamID streampb.StreamID, + ) (*streampb.ReplicationStreamSpec, error) + + // CompleteReplicationStream completes a replication stream job on the producer side. + // 'successfulIngestion' indicates whether the stream ingestion finished successfully and + // determines the fate of the producer job, succeeded or canceled. + CompleteReplicationStream( + ctx context.Context, + streamID streampb.StreamID, + successfulIngestion bool, + ) error +} + +// StreamIngestManager represents a collection of APIs that streaming replication supports +// on the ingestion side. +type StreamIngestManager interface { + // CompleteStreamIngestion signals a running stream ingestion job to complete on the consumer side. + CompleteStreamIngestion( + ctx context.Context, + ingestionJobID jobspb.JobID, + cutoverTimestamp hlc.Timestamp, + ) error + + // GetStreamIngestionStats gets a statistics summary for a stream ingestion job. + GetStreamIngestionStats( + ctx context.Context, + ingestionJobID jobspb.JobID, + ) (*streampb.StreamIngestionStats, error) +} diff --git a/pkg/streaming/BUILD.bazel b/pkg/streaming/BUILD.bazel index 0edf9069f018..e1ce04c319c8 100644 --- a/pkg/streaming/BUILD.bazel +++ b/pkg/streaming/BUILD.bazel @@ -8,10 +8,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ccl/streamingccl/streampb", - "//pkg/jobs/jobspb", "//pkg/kv", "//pkg/sql/sem/eval", - "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/streaming/api.go b/pkg/streaming/api.go index 2d96bbb468f4..2d70e4324c12 100644 --- a/pkg/streaming/api.go +++ b/pkg/streaming/api.go @@ -14,118 +14,38 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) // StreamID is the ID of a replication stream. -type StreamID int64 - -// SafeValue implements the redact.SafeValue interface. -func (j StreamID) SafeValue() {} - -// InvalidStreamID is the zero value for StreamID corresponding to no stream. -const InvalidStreamID StreamID = 0 +type StreamID = streampb.StreamID // GetReplicationStreamManagerHook is the hook to get access to the producer side replication APIs. // Used by builtin functions to trigger streaming replication. -var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context) (ReplicationStreamManager, error) +var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.ReplicationStreamManager, error) // GetStreamIngestManagerHook is the hook to get access to the ingestion side replication APIs. // Used by builtin functions to trigger streaming replication. -var GetStreamIngestManagerHook func(ctx context.Context, evalCtx *eval.Context) (StreamIngestManager, error) - -// ReplicationStreamManager represents a collection of APIs that streaming replication supports -// on the production side. -type ReplicationStreamManager interface { - // StartReplicationStream starts a stream replication job for the specified tenant on the producer side. - StartReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - tenantID uint64, - ) (StreamID, error) - - // HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating - // consumer has consumed until the given 'frontier' timestamp. This updates the producer job - // progress and extends its life, and the new producer progress will be returned. - // If 'frontier' is hlc.MaxTimestamp, returns the producer progress without updating it. - HeartbeatReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - streamID StreamID, - frontier hlc.Timestamp, - txn *kv.Txn) (streampb.StreamReplicationStatus, error) - - // StreamPartition starts streaming replication on the producer side for the partition specified - // by opaqueSpec which contains serialized streampb.StreamPartitionSpec protocol message and - // returns a value generator which yields events for the specified partition. - StreamPartition( - evalCtx *eval.Context, - streamID StreamID, - opaqueSpec []byte, - ) (eval.ValueGenerator, error) - - // GetReplicationStreamSpec gets a stream replication spec on the producer side. - GetReplicationStreamSpec( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - streamID StreamID, - ) (*streampb.ReplicationStreamSpec, error) - - // CompleteReplicationStream completes a replication stream job on the producer side. - // 'successfulIngestion' indicates whether the stream ingestion finished successfully and - // determines the fate of the producer job, succeeded or canceled. - CompleteReplicationStream( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - streamID StreamID, - successfulIngestion bool, - ) error -} - -// StreamIngestManager represents a collection of APIs that streaming replication supports -// on the ingestion side. -type StreamIngestManager interface { - // CompleteStreamIngestion signals a running stream ingestion job to complete on the consumer side. - CompleteStreamIngestion( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - ingestionJobID jobspb.JobID, - cutoverTimestamp hlc.Timestamp, - ) error - - // GetStreamIngestionStats gets a statistics summary for a stream ingestion job. - GetStreamIngestionStats( - ctx context.Context, - evalCtx *eval.Context, - txn *kv.Txn, - ingestionJobID jobspb.JobID, - ) (*streampb.StreamIngestionStats, error) -} +var GetStreamIngestManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.StreamIngestManager, error) // GetReplicationStreamManager returns a ReplicationStreamManager if a CCL binary is loaded. func GetReplicationStreamManager( - ctx context.Context, evalCtx *eval.Context, -) (ReplicationStreamManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.ReplicationStreamManager, error) { if GetReplicationStreamManagerHook == nil { return nil, errors.New("replication streaming requires a CCL binary") } - return GetReplicationStreamManagerHook(ctx, evalCtx) + return GetReplicationStreamManagerHook(ctx, evalCtx, txn) } // GetStreamIngestManager returns a StreamIngestManager if a CCL binary is loaded. func GetStreamIngestManager( - ctx context.Context, evalCtx *eval.Context, -) (StreamIngestManager, error) { + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, +) (eval.StreamIngestManager, error) { if GetReplicationStreamManagerHook == nil { return nil, errors.New("replication streaming requires a CCL binary") } - return GetStreamIngestManagerHook(ctx, evalCtx) + return GetStreamIngestManagerHook(ctx, evalCtx, txn) }