Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingccl: avoid passing evalCtx, txn as parameters to ingestion & replication funcs #90964

Merged
merged 1 commit into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pkg/base/node_id.go | `*SQLIDContainer`
pkg/base/node_id.go | `*StoreIDContainer`
pkg/ccl/backupccl/backuppb/backup.go | `sz`
pkg/ccl/backupccl/backuppb/backup.go | `timing`
pkg/ccl/streamingccl/streampb/streamid.go | `StreamID`
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/bulk/bulk_metrics.go | `sz`
Expand Down Expand Up @@ -51,7 +52,6 @@ pkg/storage/enginepb/mvcc.go | `TxnEpoch`
pkg/storage/enginepb/mvcc.go | `TxnSeq`
pkg/storage/enginepb/mvcc3.go | `*MVCCStats`
pkg/storage/enginepb/mvcc3.go | `MVCCStatsDelta`
pkg/streaming/api.go | `StreamID`
pkg/util/hlc/timestamp.go | `ClockTimestamp`
pkg/util/hlc/timestamp.go | `LegacyTimestamp`
pkg/util/hlc/timestamp.go | `Timestamp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *partitionedStreamClient) Create(
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64())
err := row.Scan(&streamID)
if err != nil {
return streaming.InvalidStreamID,
return streampb.InvalidStreamID,
errors.Wrapf(err, "error creating replication stream for tenant %s", tenantID.String())
}

Expand Down
23 changes: 11 additions & 12 deletions pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,28 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

type streamIngestManagerImpl struct{}
type streamIngestManagerImpl struct {
evalCtx *eval.Context
txn *kv.Txn
}

// CompleteStreamIngestion implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) CompleteStreamIngestion(
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
ingestionJobID jobspb.JobID,
cutoverTimestamp hlc.Timestamp,
ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp,
) error {
return completeStreamIngestion(ctx, evalCtx, txn, ingestionJobID, cutoverTimestamp)
return completeStreamIngestion(ctx, r.evalCtx, r.txn, ingestionJobID, cutoverTimestamp)
}

// GetStreamIngestionStats implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) GetStreamIngestionStats(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID,
ctx context.Context, ingestionJobID jobspb.JobID,
) (*streampb.StreamIngestionStats, error) {
return getStreamIngestionStats(ctx, evalCtx, txn, ingestionJobID)
return getStreamIngestionStats(ctx, r.evalCtx, r.txn, ingestionJobID)
}

func newStreamIngestManagerWithPrivilegesCheck(
ctx context.Context, evalCtx *eval.Context,
) (streaming.StreamIngestManager, error) {
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn,
) (eval.StreamIngestManager, error) {
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
Expand All @@ -64,7 +63,7 @@ func newStreamIngestManagerWithPrivilegesCheck(
pgcode.InsufficientPrivilege, "replication requires enterprise license")
}

return &streamIngestManagerImpl{}, nil
return &streamIngestManagerImpl{evalCtx: evalCtx, txn: txn}, nil
}

func init() {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/streamingccl/streampb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ go_proto_library(

go_library(
name = "streampb",
srcs = ["empty.go"],
srcs = [
"empty.go",
"streamid.go",
],
embed = [":streampb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb",
visibility = ["//visibility:public"],
Expand Down
18 changes: 18 additions & 0 deletions pkg/ccl/streamingccl/streampb/streamid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streampb

// StreamID is the ID of a replication stream.
type StreamID int64

// SafeValue implements the redact.SafeValue interface.
func (j StreamID) SafeValue() {}

// InvalidStreamID is the zero value for StreamID corresponding to no stream.
const InvalidStreamID StreamID = 0
5 changes: 2 additions & 3 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -38,7 +37,7 @@ import (
)

type eventStream struct {
streamID streaming.StreamID
streamID streampb.StreamID
execCfg *sql.ExecutorConfig
spec streampb.StreamPartitionSpec
subscribedSpans roachpb.SpanGroup
Expand Down Expand Up @@ -515,7 +514,7 @@ func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) {
}

func streamPartition(
evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte,
evalCtx *eval.Context, streamID streampb.StreamID, opaqueSpec []byte,
) (eval.ValueGenerator, error) {
if !evalCtx.SessionData().AvoidBuffering {
return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option")
Expand Down
42 changes: 19 additions & 23 deletions pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,54 +22,49 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

type replicationStreamManagerImpl struct{}
type replicationStreamManagerImpl struct {
evalCtx *eval.Context
txn *kv.Txn
}

// StartReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) StartReplicationStream(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64,
) (streaming.StreamID, error) {
return startReplicationStreamJob(ctx, evalCtx, txn, tenantID)
ctx context.Context, tenantID uint64,
) (streampb.StreamID, error) {
return startReplicationStreamJob(ctx, r.evalCtx, r.txn, tenantID)
}

// HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) HeartbeatReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
streamID streaming.StreamID,
frontier hlc.Timestamp,
txn *kv.Txn,
ctx context.Context, streamID streampb.StreamID, frontier hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
return heartbeatReplicationStream(ctx, evalCtx, streamID, frontier, txn)
return heartbeatReplicationStream(ctx, r.evalCtx, r.txn, streamID, frontier)
}

// StreamPartition implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) StreamPartition(
evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte,
streamID streampb.StreamID, opaqueSpec []byte,
) (eval.ValueGenerator, error) {
return streamPartition(evalCtx, streamID, opaqueSpec)
return streamPartition(r.evalCtx, streamID, opaqueSpec)
}

// GetReplicationStreamSpec implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) GetReplicationStreamSpec(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
ctx context.Context, streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
return getReplicationStreamSpec(ctx, evalCtx, txn, streamID)
return getReplicationStreamSpec(ctx, r.evalCtx, streamID)
}

// CompleteReplicationStream implements ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) CompleteReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
streamID streaming.StreamID,
successfulIngestion bool,
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
return completeReplicationStream(ctx, evalCtx, txn, streamID, successfulIngestion)
return completeReplicationStream(ctx, r.evalCtx, r.txn, streamID, successfulIngestion)
}

func newReplicationStreamManagerWithPrivilegesCheck(
ctx context.Context, evalCtx *eval.Context,
) (streaming.ReplicationStreamManager, error) {
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn,
) (eval.ReplicationStreamManager, error) {
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
Expand All @@ -81,14 +76,15 @@ func newReplicationStreamManagerWithPrivilegesCheck(
}

execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)

enterpriseCheckErr := utilccl.CheckEnterpriseEnabled(
execCfg.Settings, execCfg.NodeInfo.LogicalClusterID(), execCfg.Organization(), "REPLICATION")
if enterpriseCheckErr != nil {
return nil, pgerror.Wrap(enterpriseCheckErr,
pgcode.InsufficientPrivilege, "replication requires enterprise license")
}

return &replicationStreamManagerImpl{}, nil
return &replicationStreamManagerImpl{evalCtx: evalCtx, txn: txn}, nil
}

func init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -44,15 +43,15 @@ func TestReplicationManagerRequiresAdminRole(t *testing.T) {
require.NoError(t, protoutil.Unmarshal(sessionSerialized, &sessionData))
}

getManagerForUser := func(u string) (streaming.ReplicationStreamManager, error) {
getManagerForUser := func(u string) (eval.ReplicationStreamManager, error) {
sqlUser, err := username.MakeSQLUsernameFromUserInput(u, username.PurposeValidation)
require.NoError(t, err)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
txn := kvDB.NewTxn(ctx, "test")
p, cleanup := sql.NewInternalPlanner("test", txn, sqlUser, &sql.MemoryMetrics{}, &execCfg, sessionData)
defer cleanup()
ec := p.(interface{ EvalContext() *eval.Context }).EvalContext()
return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec)
return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec, txn)
}

for _, tc := range []struct {
Expand Down
23 changes: 11 additions & 12 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -36,24 +35,24 @@ import (
// 2. TODO(casper): Updates the protected timestamp for spans being replicated
func startReplicationStreamJob(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64,
) (streaming.StreamID, error) {
) (streampb.StreamID, error) {
execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
hasAdminRole, err := evalCtx.SessionAccessor.HasAdminRole(ctx)

if err != nil {
return streaming.InvalidStreamID, err
return streampb.InvalidStreamID, err
}

if !hasAdminRole {
return streaming.InvalidStreamID, errors.New("admin role required to start stream replication jobs")
return streampb.InvalidStreamID, errors.New("admin role required to start stream replication jobs")
}

registry := execConfig.JobRegistry
timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV)
ptsID := uuid.MakeV4()
jr := makeProducerJobRecord(registry, tenantID, timeout, evalCtx.SessionData().User(), ptsID)
if _, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
return streaming.InvalidStreamID, err
return streampb.InvalidStreamID, err
}

ptp := execConfig.ProtectedTimestampProvider
Expand All @@ -68,9 +67,9 @@ func startReplicationStreamJob(
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)

if err := ptp.Protect(ctx, txn, pts); err != nil {
return streaming.InvalidStreamID, err
return streampb.InvalidStreamID, err
}
return streaming.StreamID(jr.JobID), nil
return streampb.StreamID(jr.JobID), nil
}

// Convert the producer job's status into corresponding replication
Expand Down Expand Up @@ -99,7 +98,7 @@ func updateReplicationStreamProgress(
expiration time.Time,
ptsProvider protectedts.Provider,
registry *jobs.Registry,
streamID streaming.StreamID,
streamID streampb.StreamID,
consumedTime hlc.Timestamp,
txn *kv.Txn,
) (status streampb.StreamReplicationStatus, err error) {
Expand Down Expand Up @@ -157,9 +156,9 @@ func updateReplicationStreamProgress(
func heartbeatReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
streamID streaming.StreamID,
frontier hlc.Timestamp,
txn *kv.Txn,
streamID streampb.StreamID,
frontier hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV)
Expand Down Expand Up @@ -198,7 +197,7 @@ func heartbeatReplicationStream(

// getReplicationStreamSpec gets a replication stream specification for the specified stream.
func getReplicationStreamSpec(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
ctx context.Context, evalCtx *eval.Context, streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)
// Returns error if the replication stream is not active
Expand Down Expand Up @@ -257,7 +256,7 @@ func completeReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
streamID streaming.StreamID,
streamID streampb.StreamID,
successfulIngestion bool,
) error {
registry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ go_library(
"//pkg/sql/vtable",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/streaming",
"//pkg/testutils/serverutils",
"//pkg/upgrade",
"//pkg/util",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func runPlanInsidePlan(
ctx, evalCtx, &plannerCopy, params.p.txn, distributeType)
planCtx.planner.curPlan.planComponents = *plan
planCtx.ExtendedEvalCtx.Planner = &plannerCopy
planCtx.ExtendedEvalCtx.StreamManagerFactory = &plannerCopy
planCtx.stmtType = recv.stmtType

params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun(
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecbase/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestRandomizedCast(t *testing.T) {
evalCtx := eval.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
evalCtx.Planner = &faketreeeval.DummyEvalPlanner{}
evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{}
rng, _ := randutil.NewTestRand()

getValidSupportedCast := func() (from, to *types.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecbase/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestMain(m *testing.M) {
testMemAcc = &memAcc
evalCtx := eval.MakeTestingEvalContext(st)
evalCtx.Planner = &faketreeeval.DummyEvalPlanner{}
evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{}
testColumnFactory = coldataext.NewExtendedColumnFactory(&evalCtx)
testAllocator = colmem.NewAllocator(ctx, testMemAcc, testColumnFactory)
defer testMemAcc.Close(ctx)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2762,6 +2762,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
*evalCtx = extendedEvalContext{
Context: eval.Context{
Planner: p,
StreamManagerFactory: p,
PrivilegedAccessor: p,
SessionAccessor: p,
JobExecContext: p,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func (ds *ServerImpl) setupFlow(
Locality: ds.ServerConfig.Locality,
Tracer: ds.ServerConfig.Tracer,
Planner: &faketreeeval.DummyEvalPlanner{Monitor: monitor},
StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{},
PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{},
SessionAccessor: &faketreeeval.DummySessionAccessor{},
ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{},
Expand Down
Loading