Skip to content

Commit

Permalink
sql: add new RESTORE FROM REPLICATION STREAM syntax
Browse files Browse the repository at this point in the history
This change adds basic SQL syntax to start the ingestion job which will
read from the replication stream it is pointed to, and ingest the KVs
into the destination tenant's keyspace.

Release note (sql change): add SQL syntax for `RESTORE tenant x FROM
REPLICATION STREAM FROM 'replication_stream'`. This allows the user to
start an ingestion job to ingest KVs from the replication stream into
the destination tenant's keyspace.
  • Loading branch information
adityamaru committed Jan 27, 2021
1 parent 59e6d29 commit 4269f1b
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/restore.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ restore_stmt ::=
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' restore_options_list
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' 'OPTIONS' '(' restore_options_list ')'
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*)
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list
3 changes: 3 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ restore_stmt ::=
| 'RESTORE' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list

resume_stmt ::=
resume_jobs_stmt
Expand Down Expand Up @@ -1013,6 +1014,7 @@ unreserved_keyword ::=
| 'RENAME'
| 'REPEATABLE'
| 'REPLACE'
| 'REPLICATION'
| 'RESET'
| 'RESTORE'
| 'RESTRICT'
Expand Down Expand Up @@ -1066,6 +1068,7 @@ unreserved_keyword ::=
| 'STORE'
| 'STORED'
| 'STORING'
| 'STREAM'
| 'STRICT'
| 'SUBSCRIPTION'
| 'SURVIVE'
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/ccl/partitionccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/storageccl/engineccl",
"//pkg/ccl/streamingccl/ingestion",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl",
],
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/ingestion"
_ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl"
)
1 change: 1 addition & 0 deletions pkg/sql/opaque.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func init() {
&tree.CreateChangefeed{},
&tree.Import{},
&tree.ScheduledBackup{},
&tree.StreamIngestion{},
} {
typ := optbuilder.OpaqueReadOnly
if tree.CanModifySchema(stmt) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,9 @@ func TestParse(t *testing.T) {

{`RESTORE TENANT 36 FROM ($1, $2) AS OF SYSTEM TIME '1'`},

{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'`},
{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1`},

{`BACKUP TABLE foo TO 'bar' WITH revision_history, detached`},
{`RESTORE TABLE foo FROM 'bar' WITH skip_missing_foreign_keys, skip_missing_sequences, detached`},

Expand Down
13 changes: 11 additions & 2 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func (u *sqlSymUnion) objectNamePrefixList() tree.ObjectNamePrefixList {

%token <str> RANGE RANGES READ REAL REASSIGN RECURSIVE RECURRING REF REFERENCES REFRESH
%token <str> REGCLASS REGION REGIONAL REGIONS REGPROC REGPROCEDURE REGNAMESPACE REGTYPE REINDEX
%token <str> REMOVE_PATH RENAME REPEATABLE REPLACE
%token <str> REMOVE_PATH RENAME REPEATABLE REPLACE REPLICATION
%token <str> RELEASE RESET RESTORE RESTRICT RESUME RETURNING RETRY REVISION_HISTORY REVOKE RIGHT
%token <str> ROLE ROLES ROLLBACK ROLLUP ROW ROWS RSHIFT RULE RUNNING

Expand All @@ -684,7 +684,7 @@ func (u *sqlSymUnion) objectNamePrefixList() tree.ObjectNamePrefixList {
%token <str> SHARE SHOW SIMILAR SIMPLE SKIP SKIP_MISSING_FOREIGN_KEYS
%token <str> SKIP_MISSING_SEQUENCES SKIP_MISSING_SEQUENCE_OWNERS SKIP_MISSING_VIEWS SMALLINT SMALLSERIAL SNAPSHOT SOME SPLIT SQL

%token <str> START STATISTICS STATUS STDIN STRICT STRING STORAGE STORE STORED STORING SUBSTRING
%token <str> START STATISTICS STATUS STDIN STRICT STRING STORAGE STORE STORED STORING STREAM SUBSTRING
%token <str> SURVIVE SURVIVAL SYMMETRIC SYNTAX SYSTEM SQRT SUBSCRIPTION STATEMENTS

%token <str> TABLE TABLES TABLESPACE TEMP TEMPLATE TEMPORARY TENANT TESTING_RELOCATE EXPERIMENTAL_RELOCATE TEXT THEN
Expand Down Expand Up @@ -2612,6 +2612,13 @@ restore_stmt:
Options: *($8.restoreOptions()),
}
}
| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list
{
$$.val = &tree.StreamIngestion{
Targets: $2.targetList(),
From: $7.stringOrPlaceholderOptList(),
}
}
| RESTORE error // SHOW HELP: RESTORE

string_or_placeholder_opt_list:
Expand Down Expand Up @@ -12417,6 +12424,7 @@ unreserved_keyword:
| RENAME
| REPEATABLE
| REPLACE
| REPLICATION
| RESET
| RESTORE
| RESTRICT
Expand Down Expand Up @@ -12470,6 +12478,7 @@ unreserved_keyword:
| STORE
| STORED
| STORING
| STREAM
| STRICT
| SUBSCRIPTION
| SURVIVE
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/tree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ go_library(
"show.go",
"split.go",
"stmt.go",
"stream_ingestion.go",
"survival_goal.go",
"table_name.go",
"table_pattern.go",
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/sem/tree/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ var _ CCLOnlyStatement = &CreateChangefeed{}
var _ CCLOnlyStatement = &Import{}
var _ CCLOnlyStatement = &Export{}
var _ CCLOnlyStatement = &ScheduledBackup{}
var _ CCLOnlyStatement = &StreamIngestion{}

// StatementType implements the Statement interface.
func (*AlterDatabaseOwner) StatementType() StatementType { return DDL }
Expand Down Expand Up @@ -1027,6 +1028,14 @@ func (*Split) StatementType() StatementType { return Rows }
// StatementTag returns a short string identifying the type of statement.
func (*Split) StatementTag() string { return "SPLIT" }

// StatementType implements the Statement interface.
func (*StreamIngestion) StatementType() StatementType { return Rows }

// StatementTag returns a short string identifying the type of statement.
func (*StreamIngestion) StatementTag() string { return "RESTORE FROM REPLICATION STREAM" }

func (*StreamIngestion) cclOnlyStatement() {}

// StatementType implements the Statement interface.
func (*Unsplit) StatementType() StatementType { return Rows }

Expand Down Expand Up @@ -1196,6 +1205,7 @@ func (n *ShowVar) String() string { return AsString(n) }
func (n *ShowZoneConfig) String() string { return AsString(n) }
func (n *ShowFingerprints) String() string { return AsString(n) }
func (n *Split) String() string { return AsString(n) }
func (n *StreamIngestion) String() string { return AsString(n) }
func (n *Unsplit) String() string { return AsString(n) }
func (n *Truncate) String() string { return AsString(n) }
func (n *UnionClause) String() string { return AsString(n) }
Expand Down
28 changes: 28 additions & 0 deletions pkg/sql/sem/tree/stream_ingestion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tree

// StreamIngestion represents a RESTORE FROM REPLICATION STREAM statement.
type StreamIngestion struct {
Targets TargetList
From StringOrPlaceholderOptList
}

var _ Statement = &StreamIngestion{}

// Format implements the NodeFormatter interface.
func (node *StreamIngestion) Format(ctx *FmtCtx) {
ctx.WriteString("RESTORE ")
ctx.FormatNode(&node.Targets)
ctx.WriteString(" ")
ctx.WriteString("FROM REPLICATION STREAM FROM ")
ctx.FormatNode(&node.From)
}

0 comments on commit 4269f1b

Please sign in to comment.