Skip to content

Commit

Permalink
[#23809] CDCSDK: Filter out records corresponding to index tables in …
Browse files Browse the repository at this point in the history
…colocated DBs

Summary:
If an index is created on a colocated table, the index table also resides on the same colocated tablet. If GetChanges is called on such a tablet, then we may encounter intents corresponding to the index table. Currently for the gRPC model of consumption cdc_service sends such records for index tables and the connector filters them. However for logical replication, the GetChanges call fails when it encounters the intents corresponding to index tables. This is because for logical replication, we try to find the replica identity corresponding to the index table in stream metadata. However since index tables are not part of CDC streams, this operation fails causing GetChanges to return with non-ok status.

In this diff we introduce filtering mechanism by which we filter out the intents corresponding to the index and other non-eligible tables. A record from colocated tablet is shipped from cdc_service if it belongs to a tablet in qualified tables list and does not belong to a table in unqualified tables list.
Jira: DB-12710

Test Plan: ./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestColocationWithIndexes

Reviewers: skumar, siddharth.shah, asrinivasan

Reviewed By: skumar

Subscribers: steve.varnau, svc_phabricator, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D37929
  • Loading branch information
Sumukh-Phalgaonkar committed Sep 11, 2024
1 parent 33342b3 commit 361a99a
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
21 changes: 21 additions & 0 deletions src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,19 @@ Result<SchemaDetails> GetOrPopulateRequiredSchemaDetails(
return STATUS_FORMAT(InternalError, "Did not find schema for table: ", req_table_id);
}

bool IsColocatedTableQualifiedForStreaming(
const TableId& table_id, const StreamMetadata& metadata) {
auto qualified_tables = metadata.GetTableIds();
std::unordered_set<TableId> qualified_tables_set(
qualified_tables.begin(), qualified_tables.end());

auto unqualified_tables = metadata.GetUnqualifiedTableIds();
std::unordered_set<TableId> unqualified_tables_set(
unqualified_tables.begin(), unqualified_tables.end());

return qualified_tables_set.contains(table_id) && !unqualified_tables_set.contains(table_id);
}

Result<CDCRecordType> GetRecordTypeForPopulatingBeforeImage(
const StreamMetadata& metadata, const TableId& table_id) {
if (FLAGS_ysql_yb_enable_replica_identity && IsReplicationSlotStream(metadata)) {
Expand Down Expand Up @@ -905,6 +918,10 @@ Status PopulateCDCSDKIntentRecord(
schema_version = schema_details.schema_version;
table_name = table_info->table_name;
table_id = table_info->table_id;
if (!IsColocatedTableQualifiedForStreaming(table_id, metadata)) {
continue;
}

record_type = VERIFY_RESULT(GetRecordTypeForPopulatingBeforeImage(metadata, table_id));
schema_packing_storage = SchemaPackingStorage(tablet->table_type());
schema_packing_storage.AddSchema(schema_version, schema);
Expand Down Expand Up @@ -1302,6 +1319,10 @@ Status PopulateCDCSDKWriteRecord(
schema_version = schema_details.schema_version;
table_name = table_info->table_name;
table_id = table_info->table_id;
if (!IsColocatedTableQualifiedForStreaming(table_id, metadata)) {
continue;
}

record_type = VERIFY_RESULT(GetRecordTypeForPopulatingBeforeImage(metadata, table_id));
schema_packing_storage = SchemaPackingStorage(tablet_ptr->table_type());
schema_packing_storage.AddSchema(schema_version, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3495,5 +3495,62 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestIntentGC) {
ASSERT_TRUE(received_gc_error);
}

TEST_F(CDCSDKConsumptionConsistentChangesTest, TestColocationWithIndexes) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_max_consistent_records) = 20;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_vwal_getchanges_resp_max_size_bytes) = 10_KB;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0;

ASSERT_OK(SetUpWithParams(
1, 1, true /* colocated */, true /* cdc_populate_safepoint_record */,
true /* set_pgsql_proxy_bind_address */));
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));

// This array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, and COMMIT in
// that order.
int expected_count[] = {0, 501, 0, 0, 0, 0, 2, 2};

// Create a table.
ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0(key int primary key, value_1 int)", kTableName));
auto table = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, kTableName));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));

// Create an index.
ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx ON $0(value_1 ASC)", kTableName));

// Create a consistent snapshot stream.
xrepl::StreamId stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

// Perform 1 multi-shard txn and 1 single shard txn.
ASSERT_OK(WriteRowsHelperWithConn(0, 500, &test_cluster_, true, &conn));
ASSERT_OK(WriteRowsWithConn(500, 501, &test_cluster_, &conn));

auto checkpoint_before = ASSERT_RESULT(GetCDCCheckpoint(stream_id, tablets));

// We should receive DMLs corresponding to the above txns only. No records corresponding to the
// index table should be seen. Since we have reduced the batch size, GetChanges will be called
// multiple times.
auto gcc_resp = ASSERT_RESULT(GetAllPendingTxnsFromVirtualWAL(
stream_id, {table.table_id()}, 501, true /* init_virtual_wal */));

ASSERT_EQ(gcc_resp.records.size(), 505);
for (int i = 0; i < 8; i++) {
ASSERT_EQ(expected_count[i], gcc_resp.record_count[i]);
}

// Assert that the checkpoint has moved forward.
auto checkpoint_after = ASSERT_RESULT(GetCDCCheckpoint(stream_id, tablets));
ASSERT_EQ(checkpoint_before.size(), 1);
ASSERT_EQ(checkpoint_after.size(), 1);
ASSERT_GT(checkpoint_after[0].index, checkpoint_before[0].index);

// This GetConsistentChanges call will fetch an empty batch. The response will only contain
// safepoint records.
auto change_resp = ASSERT_RESULT(GetConsistentChangesFromCDC(stream_id));
for (auto record : change_resp.cdc_sdk_proto_records()) {
ASSERT_EQ(record.row_message().op(), RowMessage_Op_SAFEPOINT);
}
}

} // namespace cdc
} // namespace yb

0 comments on commit 361a99a

Please sign in to comment.