diff --git a/pkg/ccl/BUILD.bazel b/pkg/ccl/BUILD.bazel index f8e9566761e7..2eabdbd72da4 100644 --- a/pkg/ccl/BUILD.bazel +++ b/pkg/ccl/BUILD.bazel @@ -17,7 +17,7 @@ go_library( "//pkg/ccl/partitionccl", "//pkg/ccl/storageccl", "//pkg/ccl/storageccl/engineccl", - "//pkg/ccl/streamingccl/ingestion", + "//pkg/ccl/streamingccl/streamingest", "//pkg/ccl/utilccl", "//pkg/ccl/workloadccl", ], diff --git a/pkg/ccl/ccl_init.go b/pkg/ccl/ccl_init.go index 9c603f1b9f64..226a3e1fad2c 100644 --- a/pkg/ccl/ccl_init.go +++ b/pkg/ccl/ccl_init.go @@ -25,7 +25,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" - _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/ingestion" + _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest" _ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl" ) diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 578e41793c7f..10e641f42fd3 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "stream_ingestion_frontier_processor.go", "stream_ingestion_job.go", + "stream_ingestion_planning.go", "stream_ingestion_processor.go", "stream_ingestion_processor_planning.go", ], @@ -14,13 +15,16 @@ go_library( "//pkg/ccl/storageccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamclient", + "//pkg/ccl/utilccl", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/keys", "//pkg/kv", "//pkg/kv/bulk", "//pkg/roachpb", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/catalog/colinfo", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/physicalplan", @@ -30,9 +34,11 @@ go_library( "//pkg/sql/types", "//pkg/storage", "//pkg/util/hlc", + "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/span", "//pkg/util/timeutil", + "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 0263c4367b2b..b9dbfdf6da13 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -58,8 +58,8 @@ func ingest( } // Construct stream ingestion processor specs. - streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(topology, - nodes, jobID) + streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs( + streamAddress, topology, nodes, jobID) if err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go new file mode 100644 index 000000000000..dc4b2bff4afe --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -0,0 +1,129 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/errors" +) + +func streamIngestionJobDescription( + p sql.PlanHookState, streamIngestion *tree.StreamIngestion, +) (string, error) { + ann := p.ExtendedEvalContext().Annotations + return tree.AsStringWithFQNames(streamIngestion, ann), nil +} + +func ingestionPlanHook( + ctx context.Context, stmt tree.Statement, p sql.PlanHookState, +) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) { + ingestionStmt, ok := stmt.(*tree.StreamIngestion) + if !ok { + return nil, nil, nil, false, nil + } + + fromFn, err := p.TypeAsStringArray(ctx, tree.Exprs(ingestionStmt.From), "INGESTION") + if err != nil { + return nil, nil, nil, false, err + } + + fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { + ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) + defer span.Finish() + + if err := utilccl.CheckEnterpriseEnabled( + p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), + "RESTORE FROM REPLICATION STREAM", + ); err != nil { + return err + } + + from, err := fromFn() + if err != nil { + return err + } + + // We only support a TENANT target, so error out if that is nil. + if ingestionStmt.Targets.Tenant == (roachpb.TenantID{}) { + return errors.Newf("no tenant specified in ingestion query: %s", ingestionStmt.String()) + } + + if ingestionStmt.Targets.Types != nil || ingestionStmt.Targets.Databases != nil || + ingestionStmt.Targets.Tables != nil || ingestionStmt.Targets.Schemas != nil { + return errors.Newf("unsupported target in ingestion query, "+ + "only tenant ingestion is supported: %s", ingestionStmt.String()) + } + + // TODO(adityamaru): Add privileges checks. Probably the same as RESTORE. + + prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant) + streamIngestionDetails := jobspb.StreamIngestionDetails{ + StreamAddress: streamingccl.StreamAddress(from[0]), + Span: roachpb.Span{Key: prefix, EndKey: prefix.Next()}, + // TODO: Figure out what the initial ts should be. + StartTime: hlc.Timestamp{}, + } + + jobDescription, err := streamIngestionJobDescription(p, ingestionStmt) + if err != nil { + return err + } + + jr := jobs.Record{ + Description: jobDescription, + Username: p.User(), + Progress: jobspb.StreamIngestionProgress{}, + Details: streamIngestionDetails, + } + + var sj *jobs.StartableJob + if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn) + return err + }); err != nil { + if sj != nil { + if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil { + log.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr) + } + } + return err + } + + return sj.Start(ctx) + } + + return fn, utilccl.BulkJobExecutionResultHeader, nil, false, nil +} + +func init() { + sql.AddPlanHook(ingestionPlanHook) + jobs.RegisterConstructor( + jobspb.TypeStreamIngestion, + func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + return &streamIngestionResumer{ + job: job, + } + }, + ) +} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index 069695ceb03d..712e74766dd5 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -27,7 +27,10 @@ import ( ) func distStreamIngestionPlanSpecs( - topology streamingccl.Topology, nodes []roachpb.NodeID, jobID int64, + streamAddress streamingccl.StreamAddress, + topology streamingccl.Topology, + nodes []roachpb.NodeID, + jobID int64, ) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) { // For each stream partition in the topology, assign it to a node. @@ -40,6 +43,7 @@ func distStreamIngestionPlanSpecs( // the partition addresses. if i < len(nodes) { spec := &execinfrapb.StreamIngestionDataSpec{ + StreamAddress: streamAddress, PartitionAddresses: make([]streamingccl.PartitionAddress, 0), } streamIngestionSpecs = append(streamIngestionSpecs, spec) @@ -110,7 +114,7 @@ func distStreamIngest( execinfrapb.ProcessorCoreUnion{StreamIngestionFrontier: streamIngestionFrontierSpec}, execinfrapb.PostProcessSpec{}, streamIngestionResultTypes) - // TODO(adityamaru): Once result types are updated, add PlanToStreamColMap. + p.PlanToStreamColMap = []int{0} dsp.FinalizePlan(planCtx, p) rw := makeStreamIngestionResultWriter(ctx, jobID, execCfg.JobRegistry) @@ -129,7 +133,7 @@ func distStreamIngest( // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) - return nil + return rw.Err() } type streamIngestionResultWriter struct {