Skip to content

Commit

Permalink
[#23188] DocDB: Persist new colocated_id mapping discovered as part o…
Browse files Browse the repository at this point in the history
…f processing CHANGE_METADATA_OP in xCluster ClusterConfig.

Summary:
Problem:

When a new table is added to a colocated database, the table needs to be created on both xCluster source and target with the same colocation_id.
As part of processing the ChangeMetadataOp for the AddTable operation received from the source, the target creates a mapping of source->target schema versions of the newly created table. However, this is not getting persisted in ClusterConfig if the source/target schema versions are default values. As a result while replication may work immediately after setup, upon a restart of the T-server, the mapping of source-target schema versions may be lost and replication may stall until another schema change happens on the source.

Fix:
Fix is to detect the non-existent colocation_id correctly and persisting the ClusterConfig.

Test Plan:
ybt xcluster_ysql_colocated-test XClusterYsqlColocatedTest.DatabaseReplication

The test was failing without the fix and is passing with the fix.

Reviewers: hsunder, xCluster

Reviewed By: hsunder

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D36552
  • Loading branch information
lingamsandeep committed Jul 18, 2024
1 parent a64fbfc commit 9f82c01
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 8 deletions.
44 changes: 38 additions & 6 deletions src/yb/integration-tests/xcluster/xcluster_ysql_colocated-test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) YugabyteDB, Inc.
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
Expand All @@ -23,6 +23,7 @@
#include "yb/master/master_ddl.proxy.h"
#include "yb/master/master_replication.proxy.h"
#include "yb/master/mini_master.h"
#include "yb/tserver/mini_tablet_server.h"
#include "yb/util/backoff_waiter.h"

DECLARE_bool(xcluster_wait_on_ddl_alter);
Expand Down Expand Up @@ -118,6 +119,7 @@ class XClusterYsqlColocatedTest : public XClusterYsqlTestBase {
auto& tables = onlyColocated ? colocated_consumer_tables : consumer_tables_;
for (const auto& consumer_table : tables) {
LOG(INFO) << "Checking records for table " << consumer_table->name().ToString();
RETURN_NOT_OK(WaitForRowCount(consumer_table->name(), num_results, &consumer_cluster_));
RETURN_NOT_OK(ValidateRows(consumer_table->name(), num_results, &consumer_cluster_));
}
return true;
Expand Down Expand Up @@ -196,28 +198,58 @@ class XClusterYsqlColocatedTest : public XClusterYsqlTestBase {
[&]() -> Result<bool> {
LOG(INFO) << "Checking records for table "
<< new_colocated_consumer_table->name().ToString();
RETURN_NOT_OK(
ValidateRows(new_colocated_consumer_table->name(), kRecordBatch, &consumer_cluster_));
RETURN_NOT_OK(WaitForRowCount(
new_colocated_consumer_table->name(), kRecordBatch, &consumer_cluster_));
RETURN_NOT_OK(ValidateRows(
new_colocated_consumer_table->name(), kRecordBatch, &consumer_cluster_));
return true;
},
MonoDelta::FromSeconds(20 * kTimeMultiplier),
"IsDataReplicatedCorrectly new colocated table"));

// 6. Drop the new table and ensure that data is getting replicated correctly for
// 6. Shutdown the colocated tablet leader and verify that replication is still happening.
{
auto tablet_ids = ListTabletIdsForTable(consumer_cluster(), colocated_parent_table_id);
auto old_ts = FindTabletLeader(consumer_cluster(), *tablet_ids.begin());
old_ts->Shutdown();
const auto deadline = CoarseMonoClock::Now() + 10s * kTimeMultiplier;
RETURN_NOT_OK(WaitUntilTabletHasLeader(consumer_cluster(), *tablet_ids.begin(), deadline));
RETURN_NOT_OK(old_ts->RestartStoppedServer());
RETURN_NOT_OK(old_ts->WaitStarted());

RETURN_NOT_OK(InsertRowsInProducer(
kRecordBatch, 2 * kRecordBatch, new_colocated_producer_table,
use_transaction));

RETURN_NOT_OK(WaitFor(
[&]() -> Result<bool> {
LOG(INFO) << "Checking records for table "
<< new_colocated_consumer_table->name().ToString();
RETURN_NOT_OK(WaitForRowCount(
new_colocated_consumer_table->name(), 2 * kRecordBatch, &consumer_cluster_));
RETURN_NOT_OK(ValidateRows(
new_colocated_consumer_table->name(), 2 * kRecordBatch, &consumer_cluster_));
return true;
},
MonoDelta::FromSeconds(20 * kTimeMultiplier),
"IsDataReplicatedCorrectly new colocated table"));
}

// 7. Drop the new table and ensure that data is getting replicated correctly for
// the other tables
RETURN_NOT_OK(
DropYsqlTable(&producer_cluster_, namespace_name, "", Format("test_table_$0", idx)));
LOG(INFO) << Format("Dropped test_table_$0 on Producer side", idx);

// 7. Add additional data to the original tables.
// 8. Add additional data to the original tables.
for (const auto& producer_table : producer_tables_) {
LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
RETURN_NOT_OK(
InsertRowsInProducer(count, count + kRecordBatch, producer_table, use_transaction));
}
count += kRecordBatch;

// 8. Verify all tables are properly replicated.
// 9. Verify all tables are properly replicated.
RETURN_NOT_OK(WaitFor(
[&]() -> Result<bool> { return data_replicated_correctly(count, false); },
MonoDelta::FromSeconds(20 * kTimeMultiplier),
Expand Down
10 changes: 8 additions & 2 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5670,16 +5670,22 @@ Status CatalogManager::UpdateConsumerOnProducerMetadata(
schema_cached->Clear();

cdc::SchemaVersionsPB* schema_versions_pb = nullptr;
bool schema_versions_updated = false;

// TODO (#16557): Support remove_table_id() for colocated tables / tablegroups.
if (IsColocationParentTableId(consumer_table_id) && req->colocation_id() != kColocationIdNotSet) {
auto map = stream_entry->mutable_colocated_schema_versions();
schema_versions_pb = &((*map)[req->colocation_id()]);
schema_versions_pb = FindOrNull(*map, req->colocation_id());
if (nullptr == schema_versions_pb) {
// If the colocation_id itself does not exist, it needs to be recorded in clusterconfig.
// This is to handle the case where source-target schema version mapping is 0:0.
schema_versions_updated = true;
schema_versions_pb = &((*map)[req->colocation_id()]);
}
} else {
schema_versions_pb = stream_entry->mutable_schema_versions();
}

bool schema_versions_updated = false;
SchemaVersion current_producer_schema_version =
schema_versions_pb->current_producer_schema_version();
SchemaVersion current_consumer_schema_version =
Expand Down

0 comments on commit 9f82c01

Please sign in to comment.