Skip to content

Commit

Permalink
ingestion: add planHook for ingestion job
Browse files Browse the repository at this point in the history
This change adds the boiler plate plan hook that configures and starts
up the ingestion job.

Release note: None
  • Loading branch information
adityamaru committed Jan 27, 2021
1 parent 4269f1b commit 320a77b
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ go_library(
"//pkg/ccl/partitionccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/storageccl/engineccl",
"//pkg/ccl/streamingccl/ingestion",
"//pkg/ccl/streamingccl/streamingest",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl",
],
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/ingestion"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl"
)
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"stream_ingestion_frontier_processor.go",
"stream_ingestion_job.go",
"stream_ingestion_planning.go",
"stream_ingestion_processor.go",
"stream_ingestion_processor_planning.go",
],
Expand All @@ -14,13 +15,16 @@ go_library(
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/bulk",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/physicalplan",
Expand All @@ -30,9 +34,11 @@ go_library(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/span",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func ingest(
}

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(topology,
nodes, jobID)
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, nodes, jobID)
if err != nil {
return err
}
Expand Down
129 changes: 129 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingest

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

func streamIngestionJobDescription(
p sql.PlanHookState, streamIngestion *tree.StreamIngestion,
) (string, error) {
ann := p.ExtendedEvalContext().Annotations
return tree.AsStringWithFQNames(streamIngestion, ann), nil
}

func ingestionPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) {
ingestionStmt, ok := stmt.(*tree.StreamIngestion)
if !ok {
return nil, nil, nil, false, nil
}

fromFn, err := p.TypeAsStringArray(ctx, tree.Exprs(ingestionStmt.From), "INGESTION")
if err != nil {
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer span.Finish()

if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(),
"RESTORE FROM REPLICATION STREAM",
); err != nil {
return err
}

from, err := fromFn()
if err != nil {
return err
}

// We only support a TENANT target, so error out if that is nil.
if ingestionStmt.Targets.Tenant == (roachpb.TenantID{}) {
return errors.Newf("no tenant specified in ingestion query: %s", ingestionStmt.String())
}

if ingestionStmt.Targets.Types != nil || ingestionStmt.Targets.Databases != nil ||
ingestionStmt.Targets.Tables != nil || ingestionStmt.Targets.Schemas != nil {
return errors.Newf("unsupported target in ingestion query, "+
"only tenant ingestion is supported: %s", ingestionStmt.String())
}

// TODO(adityamaru): Add privileges checks. Probably the same as RESTORE.

prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant)
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: streamingccl.StreamAddress(from[0]),
Span: roachpb.Span{Key: prefix, EndKey: prefix.Next()},
// TODO: Figure out what the initial ts should be.
StartTime: hlc.Timestamp{},
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
if err != nil {
return err
}

jr := jobs.Record{
Description: jobDescription,
Username: p.User(),
Progress: jobspb.StreamIngestionProgress{},
Details: streamIngestionDetails,
}

var sj *jobs.StartableJob
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
return err
}); err != nil {
if sj != nil {
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
log.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr)
}
}
return err
}

return sj.Start(ctx)
}

return fn, utilccl.BulkJobExecutionResultHeader, nil, false, nil
}

func init() {
sql.AddPlanHook(ingestionPlanHook)
jobs.RegisterConstructor(
jobspb.TypeStreamIngestion,
func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return &streamIngestionResumer{
job: job,
}
},
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
)

func distStreamIngestionPlanSpecs(
topology streamingccl.Topology, nodes []roachpb.NodeID, jobID int64,
streamAddress streamingccl.StreamAddress,
topology streamingccl.Topology,
nodes []roachpb.NodeID,
jobID int64,
) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) {

// For each stream partition in the topology, assign it to a node.
Expand All @@ -40,6 +43,7 @@ func distStreamIngestionPlanSpecs(
// the partition addresses.
if i < len(nodes) {
spec := &execinfrapb.StreamIngestionDataSpec{
StreamAddress: streamAddress,
PartitionAddresses: make([]streamingccl.PartitionAddress, 0),
}
streamIngestionSpecs = append(streamIngestionSpecs, spec)
Expand Down Expand Up @@ -110,7 +114,7 @@ func distStreamIngest(
execinfrapb.ProcessorCoreUnion{StreamIngestionFrontier: streamIngestionFrontierSpec},
execinfrapb.PostProcessSpec{}, streamIngestionResultTypes)

// TODO(adityamaru): Once result types are updated, add PlanToStreamColMap.
p.PlanToStreamColMap = []int{0}
dsp.FinalizePlan(planCtx, p)

rw := makeStreamIngestionResultWriter(ctx, jobID, execCfg.JobRegistry)
Expand All @@ -129,7 +133,7 @@ func distStreamIngest(
// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
return nil
return rw.Err()
}

type streamIngestionResultWriter struct {
Expand Down

0 comments on commit 320a77b

Please sign in to comment.