Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129056: logical: Optionally filter rangefeed events to avoid replicating TTL … r=msbutler a=navsetlur

…deletes

Some of our users want the ability to avoid replicating TTL deletes from one
side of LDR to the other. This commit adds that ability to filter rangefeed
events when the bit is set on the other side. For testing, ttljob_test.go
does a lot of job scheduling wrangling to get the row TTL job to run, and
that feels unecessary for a unit test on this. This code would benefit
from a roachtest that explictly tests the TTL delete functionality with
admission control loosened

Release note (sql change): The option IGNORE_CDC_IGNORED_TTL_DELETES
can be set when creating a logical replication stream to filter out
replicating row TTL deletes. Note that the corresponding CDC flag
ttl_disable_changefeed_replicationi must be set for this to have
any effect.

Epic: CRDB-41223

Co-authored-by: Naveen Setlur <naveen.setlur@cockroachlabs.com>
  • Loading branch information
craig[bot] and navsetlur committed Aug 21, 2024
2 parents 5c51c69 + 60472c1 commit 20a4b27
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 15 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,7 @@ unreserved_keyword ::=
| 'NOWAIT'
| 'NULLS'
| 'IGNORE_FOREIGN_KEYS'
| 'IGNORE_CDC_IGNORED_TTL_DELETES'
| 'INSENSITIVE'
| 'OF'
| 'OFF'
Expand Down Expand Up @@ -3874,6 +3875,7 @@ bare_label_keywords ::=
| 'IFERROR'
| 'IFNULL'
| 'IGNORE_FOREIGN_KEYS'
| 'IGNORE_CDC_IGNORED_TTL_DELETES'
| 'ILIKE'
| 'IMMEDIATE'
| 'IMMEDIATELY'
Expand Down
18 changes: 17 additions & 1 deletion pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func createLogicalReplicationStreamPlanHook(
ReplicationPairs: repPairs,
TableNames: srcTableNames,
DefaultConflictResolution: defaultConflictResolution,
FilterRangefeed: options.GetFilterRangefeed(),
},
Progress: progress,
}
Expand Down Expand Up @@ -248,6 +249,9 @@ func createLogicalReplicationStreamTypeCheck(
stmt.Options.DefaultFunction,
stmt.Options.Mode,
},
exprutil.Bools{
stmt.Options.FilterRangefeed,
},
}
if err := exprutil.TypeCheck(ctx, "LOGICAL REPLICATION STREAM", p.SemaCtx(),
toTypeCheck...,
Expand All @@ -263,7 +267,8 @@ type resolvedLogicalReplicationOptions struct {
mode *string
defaultFunction *jobspb.LogicalReplicationDetails_DefaultConflictResolution
// Mapping of table name to function descriptor
userFunctions map[string]int32
userFunctions map[string]int32
filterRangefeed bool
}

func evalLogicalReplicationOptions(
Expand Down Expand Up @@ -337,6 +342,10 @@ func evalLogicalReplicationOptions(
r.userFunctions[objName.String()] = descID
}
}

if options.FilterRangefeed == tree.DBoolTrue {
r.filterRangefeed = true
}
return r, nil
}

Expand Down Expand Up @@ -388,3 +397,10 @@ func (r *resolvedLogicalReplicationOptions) GetUserFunctions() (map[string]int32
}
return r.userFunctions, true
}

func (r *resolvedLogicalReplicationOptions) GetFilterRangefeed() bool {
if r == nil {
return false
}
return r.filterRangefeed
}
2 changes: 2 additions & 0 deletions pkg/ccl/crosscluster/logical/logical_replication_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func constructLogicalReplicationWriterSpecs(
tableMd map[int32]execinfrapb.TableReplicationMetadata,
jobID jobspb.JobID,
streamID streampb.StreamID,
filterRangefeed bool,
) (map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, error) {
spanGroup := roachpb.SpanGroup{}
baseSpec := execinfrapb.LogicalReplicationWriterSpec{
Expand All @@ -45,6 +46,7 @@ func constructLogicalReplicationWriterSpecs(
Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info
StreamAddress: string(streamAddress),
TableMetadata: tableMd,
FilterRangefeed: filterRangefeed,
}

writerSpecs := make(map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, len(destSQLInstances))
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
progress.Checkpoint,
tablesMd,
p.job.ID(),
streampb.StreamID(payload.StreamID))
streampb.StreamID(payload.StreamID),
payload.FilterRangefeed)
if err != nil {
return nil, nil, info, err
}
Expand Down
66 changes: 65 additions & 1 deletion pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,70 @@ family f2(other_payload, v2))
serverASQL.CheckQueryResults(t, "SELECT * from tab_with_cf", expectedRows)
}

func TestFilterRangefeedInReplicationStream(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "multi cluster/node config exhausts hardware")

ctx := context.Background()

filterVal := []bool{}
var filterValLock syncutil.Mutex

clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
DistSQL: &execinfra.TestingKnobs{
StreamingTestingKnobs: &sql.StreamingTestingKnobs{
BeforeClientSubscribe: func(_ string, _ string, _ span.Frontier, filterRangefeed bool) {
filterValLock.Lock()
defer filterValLock.Unlock()
filterVal = append(filterVal, filterRangefeed)
},
},
},
},
},
}

server, s, dbA, dbB := setupLogicalTestServer(t, ctx, clusterArgs, 1)
defer server.Stopper().Stop(ctx)

dbAURL, cleanup := s.PGUrl(t, serverutils.DBName("a"))
defer cleanup()
dbBURL, cleanupB := s.PGUrl(t, serverutils.DBName("b"))
defer cleanupB()

var (
jobAID jobspb.JobID
jobBID jobspb.JobID
)

dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String()).Scan(&jobAID)
dbB.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH IGNORE_CDC_IGNORED_TTL_DELETES", dbAURL.String()).Scan(&jobBID)

now := server.Server(0).Clock().Now()
t.Logf("waiting for replication job %d", jobAID)
WaitUntilReplicatedTime(t, now, dbA, jobAID)
t.Logf("waiting for replication job %d", jobBID)
WaitUntilReplicatedTime(t, now, dbB, jobBID)

// Verify that Job contains FilterRangeFeed
details := jobutils.GetJobPayload(t, dbA, jobAID).GetLogicalReplicationDetails()
require.False(t, details.FilterRangefeed)

details = jobutils.GetJobPayload(t, dbB, jobBID).GetLogicalReplicationDetails()
require.True(t, details.FilterRangefeed)

require.Equal(t, len(filterVal), 2)

// Only one should be true
require.True(t, filterVal[0] != filterVal[1])
}

func TestRandomTables(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -754,7 +818,7 @@ func TestLogicalAutoReplan(t *testing.T) {
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
DistSQL: &execinfra.TestingKnobs{
StreamingTestingKnobs: &sql.StreamingTestingKnobs{
BeforeClientSubscribe: func(addr string, token string, _ span.Frontier) {
BeforeClientSubscribe: func(addr string, token string, _ span.Frontier, _ bool) {
addressesMu.Lock()
defer addressesMu.Unlock()
clientAddresses[addr] = struct{}{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,15 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {

if streamingKnobs, ok := lrw.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil {
streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier)
streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier, lrw.spec.FilterRangefeed)
}
}
sub, err := streamClient.Subscribe(ctx,
streampb.StreamID(lrw.spec.StreamID),
int32(lrw.FlowCtx.NodeID.SQLInstanceID()), lrw.ProcessorID,
token,
lrw.spec.InitialScanTimestamp, lrw.frontier,
streamclient.WithFiltering(true),
streamclient.WithFiltering(lrw.spec.FilterRangefeed),
streamclient.WithDiff(true),
)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/crosscluster/physical/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestTenantStreamingCheckpoint(t *testing.T) {
lastClientStart := make(map[string]hlc.Timestamp)
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.TestingKnobs = &sql.StreamingTestingKnobs{
BeforeClientSubscribe: func(addr string, token string, clientStartTimes span.Frontier) {
BeforeClientSubscribe: func(addr string, token string, clientStartTimes span.Frontier, _ bool) {
lastClientStart[token] = clientStartTimes.Frontier()
},
}
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
clientAddresses := make(map[string]struct{})
var addressesMu syncutil.Mutex
args.TestingKnobs = &sql.StreamingTestingKnobs{
BeforeClientSubscribe: func(addr string, token string, _ span.Frontier) {
BeforeClientSubscribe: func(addr string, token string, _ span.Frontier, _ bool) {
addressesMu.Lock()
defer addressesMu.Unlock()
clientAddresses[addr] = struct{}{}
Expand Down Expand Up @@ -797,7 +797,7 @@ func TestStreamingAutoReplan(t *testing.T) {
clientAddresses := make(map[string]struct{})
var addressesMu syncutil.Mutex
args.TestingKnobs = &sql.StreamingTestingKnobs{
BeforeClientSubscribe: func(addr string, token string, _ span.Frontier) {
BeforeClientSubscribe: func(addr string, token string, _ span.Frontier, _ bool) {
addressesMu.Lock()
defer addressesMu.Unlock()
clientAddresses[addr] = struct{}{}
Expand Down Expand Up @@ -880,7 +880,7 @@ func TestStreamingReplanOnLag(t *testing.T) {
clientAddresses := make(map[string]struct{})
var addressesMu syncutil.Mutex
args.TestingKnobs = &sql.StreamingTestingKnobs{
BeforeClientSubscribe: func(addr string, token string, _ span.Frontier) {
BeforeClientSubscribe: func(addr string, token string, _ span.Frontier, _ bool) {
addressesMu.Lock()
defer addressesMu.Unlock()
clientAddresses[addr] = struct{}{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {

if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil {
streamingKnobs.BeforeClientSubscribe(addr, string(token), sip.frontier)
streamingKnobs.BeforeClientSubscribe(addr, string(token), sip.frontier, false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestStreamIngestionProcessor(t *testing.T) {
}

lastClientStart := make(map[string]hlc.Timestamp)
streamingTestingKnobs := &sql.StreamingTestingKnobs{BeforeClientSubscribe: func(addr string, token string, clientStartTimes span.Frontier) {
streamingTestingKnobs := &sql.StreamingTestingKnobs{BeforeClientSubscribe: func(addr string, token string, clientStartTimes span.Frontier, _ bool) {
sp := p1Span
if token == string(p2) {
sp = p2Span
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/crosscluster/producer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
rangefeed.WithOnValues(s.onValues),
rangefeed.WithDiff(s.spec.WithDiff),
rangefeed.WithInvoker(func(fn func() error) error { return fn() }),
rangefeed.WithFiltering(s.spec.WithFiltering),
}
if emitMetadata.Get(&s.execCfg.Settings.SV) {
opts = append(opts, rangefeed.WithOnMetadata(s.onMetadata))
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ message LogicalReplicationDetails {
int32 function_id = 2;
}
DefaultConflictResolution default_conflict_resolution = 7 [(gogoproto.nullable) = false];

bool filter_rangefeed = 8;
}

message LogicalReplicationProgress {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ type StreamingTestingKnobs struct {

// BeforeClientSubscribe allows observation of parameters about to be passed
// to a streaming client
BeforeClientSubscribe func(addr string, token string, frontier span.Frontier)
BeforeClientSubscribe func(addr string, token string, frontier span.Frontier, filterRangefeed bool)

// BeforeIngestionStart allows blocking the stream ingestion job
// before a stream ingestion happens.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -530,4 +530,8 @@ message LogicalReplicationWriterSpec {
// TableReplicationMetadata is a map from destination table IDs to metadata
// containing source table descriptors and fully qualified destination table names.
map<int32, TableReplicationMetadata> table_metadata = 8 [(gogoproto.nullable) = false];

// FilterRangefeeed is an option on whether to filter out 'OmitinRangefeed' events
// when processing changes in LDR
optional bool filter_rangefeed = 9 [(gogoproto.nullable) = false];
}
11 changes: 9 additions & 2 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ func (u *sqlSymUnion) triggerForEach() tree.TriggerForEach {
%token <str> HAVING HASH HEADER HIGH HISTOGRAM HOLD HOUR

%token <str> IDENTITY
%token <str> IF IFERROR IFNULL IGNORE_FOREIGN_KEYS ILIKE IMMEDIATE IMMEDIATELY IMMUTABLE IMPORT IN INCLUDE
%token <str> IF IFERROR IFNULL IGNORE_FOREIGN_KEYS IGNORE_CDC_IGNORED_TTL_DELETES ILIKE IMMEDIATE IMMEDIATELY IMMUTABLE IMPORT IN INCLUDE
%token <str> INCLUDING INCLUDE_ALL_SECONDARY_TENANTS INCLUDE_ALL_VIRTUAL_CLUSTERS INCREMENT INCREMENTAL INCREMENTAL_LOCATION
%token <str> INET INET_CONTAINED_BY_OR_EQUALS
%token <str> INET_CONTAINS_OR_EQUALS INDEX INDEXES INHERITS INJECT INITIALLY
Expand Down Expand Up @@ -4637,7 +4637,8 @@ create_stmt:
// < MODE = immediate | transactional > |
// < CURSOR = start_time > |
// < DEFAULT FUNCTION = lww | dlq | udf
// < FUNCTION 'udf' FOR TABLE local_name , ... >
// < FUNCTION 'udf' FOR TABLE local_name , ... > |
// < IGNORE_CDC_IGNORED_TTL_DELETES >
// ]
create_logical_replication_stream_stmt:
CREATE LOGICAL REPLICATION STREAM FROM logical_replication_resources ON string_or_placeholder INTO logical_replication_resources opt_logical_replication_options
Expand Down Expand Up @@ -4735,6 +4736,10 @@ logical_replication_options:
{
$$.val = &tree.LogicalReplicationOptions{UserFunctions: map[tree.UnresolvedName]tree.RoutineName{*$5.unresolvedObjectName().ToUnresolvedName():$2.unresolvedObjectName().ToRoutineName()}}
}
| IGNORE_CDC_IGNORED_TTL_DELETES
{
$$.val = &tree.LogicalReplicationOptions{FilterRangefeed: tree.MakeDBool(true)}
}

// %Help: CREATE VIRTUAL CLUSTER - create a new virtual cluster
// %Category: Experimental
Expand Down Expand Up @@ -17742,6 +17747,7 @@ unreserved_keyword:
| NOWAIT
| NULLS
| IGNORE_FOREIGN_KEYS
| IGNORE_CDC_IGNORED_TTL_DELETES
| INSENSITIVE
| OF
| OFF
Expand Down Expand Up @@ -18169,6 +18175,7 @@ bare_label_keywords:
| IFERROR
| IFNULL
| IGNORE_FOREIGN_KEYS
| IGNORE_CDC_IGNORED_TTL_DELETES
| ILIKE
| IMMEDIATE
| IMMEDIATELY
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/parser/testdata/create_logical_replication
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ CREATE LOGICAL REPLICATION STREAM FROM TABLE (foo) ON ('uri') INTO TABLE (foo) W
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON '_' INTO TABLE foo WITH OPTIONS (CURSOR = '_', DEFAULT FUNCTION = '_', MODE = '_', FUNCTION a FOR TABLE b, FUNCTION c FOR TABLE d) -- literals removed
CREATE LOGICAL REPLICATION STREAM FROM TABLE _ ON 'uri' INTO TABLE _ WITH OPTIONS (CURSOR = '1536242855577149065.0000000000', DEFAULT FUNCTION = 'lww', MODE = 'immediate', FUNCTION _ FOR TABLE _, FUNCTION _ FOR TABLE _) -- identifiers removed

parse
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES;
----
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH OPTIONS (MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES) -- normalized!
CREATE LOGICAL REPLICATION STREAM FROM TABLE (foo.bar) ON ('uri') INTO TABLE (foo.bar) WITH OPTIONS (MODE = ('immediate'), IGNORE_CDC_IGNORED_TTL_DELETES) -- fully parenthesized
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON '_' INTO TABLE foo.bar WITH OPTIONS (MODE = '_', IGNORE_CDC_IGNORED_TTL_DELETES) -- literals removed
CREATE LOGICAL REPLICATION STREAM FROM TABLE _._ ON 'uri' INTO TABLE _._ WITH OPTIONS (MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES) -- identifiers removed

error
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo, bar ON 'uri' INTO TABLE foo, bar;
----
Expand Down
16 changes: 15 additions & 1 deletion pkg/sql/sem/tree/create_logical_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type LogicalReplicationOptions struct {
Cursor Expr
Mode Expr
DefaultFunction Expr
FilterRangefeed *DBool
}

var _ Statement = &CreateLogicalReplicationStream{}
Expand Down Expand Up @@ -124,6 +125,10 @@ func (lro *LogicalReplicationOptions) Format(ctx *FmtCtx) {
ctx.FormatNode(&k)
}
}
if lro.FilterRangefeed != nil && *lro.FilterRangefeed {
maybeAddSep()
ctx.WriteString("IGNORE_CDC_IGNORED_TTL_DELETES")
}
}

func (o *LogicalReplicationOptions) CombineWith(other *LogicalReplicationOptions) error {
Expand Down Expand Up @@ -163,6 +168,14 @@ func (o *LogicalReplicationOptions) CombineWith(other *LogicalReplicationOptions
}
}

if o.FilterRangefeed != nil {
if other.FilterRangefeed != nil {
return errors.New("IGNORE_TTL_DELETES option specified multiple times")
}
} else {
o.FilterRangefeed = other.FilterRangefeed
}

return nil
}

Expand All @@ -172,5 +185,6 @@ func (o LogicalReplicationOptions) IsDefault() bool {
return o.Cursor == options.Cursor &&
o.Mode == options.Mode &&
o.DefaultFunction == options.DefaultFunction &&
o.UserFunctions == nil
o.UserFunctions == nil &&
o.FilterRangefeed == options.FilterRangefeed
}

0 comments on commit 20a4b27

Please sign in to comment.