Skip to content

Commit

Permalink
Merge #59112
Browse files Browse the repository at this point in the history
59112: streamingccl,sql: add sql syntax and ingestion planning logic r=pbardea a=adityamaru

commit 1 - add ingestion job SQL syntax

commit 2 - add ingestion job plan hook and hookup planning, job, and processors.

Co-authored-by: Aditya Maru <adityamaru@gmail.com>
  • Loading branch information
craig[bot] and adityamaru committed Jan 29, 2021
2 parents f955d8d + 320a77b commit 58a7d76
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 7 deletions.
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/restore.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ restore_stmt ::=
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' restore_options_list
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' 'OPTIONS' '(' restore_options_list ')'
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*)
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list
3 changes: 3 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ restore_stmt ::=
| 'RESTORE' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list

resume_stmt ::=
resume_jobs_stmt
Expand Down Expand Up @@ -1013,6 +1014,7 @@ unreserved_keyword ::=
| 'RENAME'
| 'REPEATABLE'
| 'REPLACE'
| 'REPLICATION'
| 'RESET'
| 'RESTORE'
| 'RESTRICT'
Expand Down Expand Up @@ -1066,6 +1068,7 @@ unreserved_keyword ::=
| 'STORE'
| 'STORED'
| 'STORING'
| 'STREAM'
| 'STRICT'
| 'SUBSCRIPTION'
| 'SURVIVE'
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/ccl/partitionccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/storageccl/engineccl",
"//pkg/ccl/streamingccl/streamingest",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl",
],
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl"
)
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"stream_ingestion_frontier_processor.go",
"stream_ingestion_job.go",
"stream_ingestion_planning.go",
"stream_ingestion_processor.go",
"stream_ingestion_processor_planning.go",
],
Expand All @@ -14,13 +15,16 @@ go_library(
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/bulk",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/physicalplan",
Expand All @@ -30,9 +34,11 @@ go_library(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/span",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func ingest(
}

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(topology,
nodes, jobID)
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, nodes, jobID)
if err != nil {
return err
}
Expand Down
129 changes: 129 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingest

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

func streamIngestionJobDescription(
p sql.PlanHookState, streamIngestion *tree.StreamIngestion,
) (string, error) {
ann := p.ExtendedEvalContext().Annotations
return tree.AsStringWithFQNames(streamIngestion, ann), nil
}

func ingestionPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) {
ingestionStmt, ok := stmt.(*tree.StreamIngestion)
if !ok {
return nil, nil, nil, false, nil
}

fromFn, err := p.TypeAsStringArray(ctx, tree.Exprs(ingestionStmt.From), "INGESTION")
if err != nil {
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer span.Finish()

if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(),
"RESTORE FROM REPLICATION STREAM",
); err != nil {
return err
}

from, err := fromFn()
if err != nil {
return err
}

// We only support a TENANT target, so error out if that is nil.
if ingestionStmt.Targets.Tenant == (roachpb.TenantID{}) {
return errors.Newf("no tenant specified in ingestion query: %s", ingestionStmt.String())
}

if ingestionStmt.Targets.Types != nil || ingestionStmt.Targets.Databases != nil ||
ingestionStmt.Targets.Tables != nil || ingestionStmt.Targets.Schemas != nil {
return errors.Newf("unsupported target in ingestion query, "+
"only tenant ingestion is supported: %s", ingestionStmt.String())
}

// TODO(adityamaru): Add privileges checks. Probably the same as RESTORE.

prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant)
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: streamingccl.StreamAddress(from[0]),
Span: roachpb.Span{Key: prefix, EndKey: prefix.Next()},
// TODO: Figure out what the initial ts should be.
StartTime: hlc.Timestamp{},
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
if err != nil {
return err
}

jr := jobs.Record{
Description: jobDescription,
Username: p.User(),
Progress: jobspb.StreamIngestionProgress{},
Details: streamIngestionDetails,
}

var sj *jobs.StartableJob
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
return err
}); err != nil {
if sj != nil {
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
log.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr)
}
}
return err
}

return sj.Start(ctx)
}

return fn, utilccl.BulkJobExecutionResultHeader, nil, false, nil
}

func init() {
sql.AddPlanHook(ingestionPlanHook)
jobs.RegisterConstructor(
jobspb.TypeStreamIngestion,
func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return &streamIngestionResumer{
job: job,
}
},
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
)

func distStreamIngestionPlanSpecs(
topology streamingccl.Topology, nodes []roachpb.NodeID, jobID int64,
streamAddress streamingccl.StreamAddress,
topology streamingccl.Topology,
nodes []roachpb.NodeID,
jobID int64,
) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) {

// For each stream partition in the topology, assign it to a node.
Expand All @@ -40,6 +43,7 @@ func distStreamIngestionPlanSpecs(
// the partition addresses.
if i < len(nodes) {
spec := &execinfrapb.StreamIngestionDataSpec{
StreamAddress: streamAddress,
PartitionAddresses: make([]streamingccl.PartitionAddress, 0),
}
streamIngestionSpecs = append(streamIngestionSpecs, spec)
Expand Down Expand Up @@ -110,7 +114,7 @@ func distStreamIngest(
execinfrapb.ProcessorCoreUnion{StreamIngestionFrontier: streamIngestionFrontierSpec},
execinfrapb.PostProcessSpec{}, streamIngestionResultTypes)

// TODO(adityamaru): Once result types are updated, add PlanToStreamColMap.
p.PlanToStreamColMap = []int{0}
dsp.FinalizePlan(planCtx, p)

rw := makeStreamIngestionResultWriter(ctx, jobID, execCfg.JobRegistry)
Expand All @@ -129,7 +133,7 @@ func distStreamIngest(
// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
return nil
return rw.Err()
}

type streamIngestionResultWriter struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opaque.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func init() {
&tree.CreateChangefeed{},
&tree.Import{},
&tree.ScheduledBackup{},
&tree.StreamIngestion{},
} {
typ := optbuilder.OpaqueReadOnly
if tree.CanModifySchema(stmt) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,9 @@ func TestParse(t *testing.T) {

{`RESTORE TENANT 36 FROM ($1, $2) AS OF SYSTEM TIME '1'`},

{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'`},
{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1`},

{`BACKUP TABLE foo TO 'bar' WITH revision_history, detached`},
{`RESTORE TABLE foo FROM 'bar' WITH skip_missing_foreign_keys, skip_missing_sequences, detached`},

Expand Down
13 changes: 11 additions & 2 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func (u *sqlSymUnion) objectNamePrefixList() tree.ObjectNamePrefixList {

%token <str> RANGE RANGES READ REAL REASSIGN RECURSIVE RECURRING REF REFERENCES REFRESH
%token <str> REGCLASS REGION REGIONAL REGIONS REGPROC REGPROCEDURE REGNAMESPACE REGTYPE REINDEX
%token <str> REMOVE_PATH RENAME REPEATABLE REPLACE
%token <str> REMOVE_PATH RENAME REPEATABLE REPLACE REPLICATION
%token <str> RELEASE RESET RESTORE RESTRICT RESUME RETURNING RETRY REVISION_HISTORY REVOKE RIGHT
%token <str> ROLE ROLES ROLLBACK ROLLUP ROW ROWS RSHIFT RULE RUNNING

Expand All @@ -684,7 +684,7 @@ func (u *sqlSymUnion) objectNamePrefixList() tree.ObjectNamePrefixList {
%token <str> SHARE SHOW SIMILAR SIMPLE SKIP SKIP_MISSING_FOREIGN_KEYS
%token <str> SKIP_MISSING_SEQUENCES SKIP_MISSING_SEQUENCE_OWNERS SKIP_MISSING_VIEWS SMALLINT SMALLSERIAL SNAPSHOT SOME SPLIT SQL

%token <str> START STATISTICS STATUS STDIN STRICT STRING STORAGE STORE STORED STORING SUBSTRING
%token <str> START STATISTICS STATUS STDIN STRICT STRING STORAGE STORE STORED STORING STREAM SUBSTRING
%token <str> SURVIVE SURVIVAL SYMMETRIC SYNTAX SYSTEM SQRT SUBSCRIPTION STATEMENTS

%token <str> TABLE TABLES TABLESPACE TEMP TEMPLATE TEMPORARY TENANT TESTING_RELOCATE EXPERIMENTAL_RELOCATE TEXT THEN
Expand Down Expand Up @@ -2612,6 +2612,13 @@ restore_stmt:
Options: *($8.restoreOptions()),
}
}
| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list
{
$$.val = &tree.StreamIngestion{
Targets: $2.targetList(),
From: $7.stringOrPlaceholderOptList(),
}
}
| RESTORE error // SHOW HELP: RESTORE

string_or_placeholder_opt_list:
Expand Down Expand Up @@ -12417,6 +12424,7 @@ unreserved_keyword:
| RENAME
| REPEATABLE
| REPLACE
| REPLICATION
| RESET
| RESTORE
| RESTRICT
Expand Down Expand Up @@ -12470,6 +12478,7 @@ unreserved_keyword:
| STORE
| STORED
| STORING
| STREAM
| STRICT
| SUBSCRIPTION
| SURVIVE
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/tree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ go_library(
"show.go",
"split.go",
"stmt.go",
"stream_ingestion.go",
"survival_goal.go",
"table_name.go",
"table_pattern.go",
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/sem/tree/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ var _ CCLOnlyStatement = &CreateChangefeed{}
var _ CCLOnlyStatement = &Import{}
var _ CCLOnlyStatement = &Export{}
var _ CCLOnlyStatement = &ScheduledBackup{}
var _ CCLOnlyStatement = &StreamIngestion{}

// StatementType implements the Statement interface.
func (*AlterDatabaseOwner) StatementType() StatementType { return DDL }
Expand Down Expand Up @@ -1027,6 +1028,14 @@ func (*Split) StatementType() StatementType { return Rows }
// StatementTag returns a short string identifying the type of statement.
func (*Split) StatementTag() string { return "SPLIT" }

// StatementType implements the Statement interface.
func (*StreamIngestion) StatementType() StatementType { return Rows }

// StatementTag returns a short string identifying the type of statement.
func (*StreamIngestion) StatementTag() string { return "RESTORE FROM REPLICATION STREAM" }

func (*StreamIngestion) cclOnlyStatement() {}

// StatementType implements the Statement interface.
func (*Unsplit) StatementType() StatementType { return Rows }

Expand Down Expand Up @@ -1196,6 +1205,7 @@ func (n *ShowVar) String() string { return AsString(n) }
func (n *ShowZoneConfig) String() string { return AsString(n) }
func (n *ShowFingerprints) String() string { return AsString(n) }
func (n *Split) String() string { return AsString(n) }
func (n *StreamIngestion) String() string { return AsString(n) }
func (n *Unsplit) String() string { return AsString(n) }
func (n *Truncate) String() string { return AsString(n) }
func (n *UnionClause) String() string { return AsString(n) }
Expand Down
28 changes: 28 additions & 0 deletions pkg/sql/sem/tree/stream_ingestion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tree

// StreamIngestion represents a RESTORE FROM REPLICATION STREAM statement.
type StreamIngestion struct {
Targets TargetList
From StringOrPlaceholderOptList
}

var _ Statement = &StreamIngestion{}

// Format implements the NodeFormatter interface.
func (node *StreamIngestion) Format(ctx *FmtCtx) {
ctx.WriteString("RESTORE ")
ctx.FormatNode(&node.Targets)
ctx.WriteString(" ")
ctx.WriteString("FROM REPLICATION STREAM FROM ")
ctx.FormatNode(&node.From)
}

0 comments on commit 58a7d76

Please sign in to comment.