Skip to content

Commit

Permalink
docdb [#9877]: Fix xCluster Stream Deletion Race Condition
Browse files Browse the repository at this point in the history
Summary:
Fix a race condition in xCluster stream deletion where cdc service upserts the checkpoint for cdc_state, causing insert of entries for streams entries. Fix this by making the update checkpoint operation a pure update and handling initial setup insert into the cdc_state table as part of master's CreateCDCStream RPC. Deletion is already handled in DeleteCDCStream so no changes are needed on the delete path.

We still want to let BootstrapProducer modify cdc_state with a unique opid and bootstrap id, so we don't modify cdc_state in CreateCDCStream if the call comes from bootstrap.

Test Plan: ybd release --cxx-test integration-tests_cdc_service-int-test --gtest_filter *TestDeleteCDCStream/1

Reviewers: nicolas, jhe

Reviewed By: jhe

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D14028
  • Loading branch information
rahuldesirazu committed Jan 14, 2022
1 parent 5d10055 commit 98dab94
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 38 deletions.
17 changes: 11 additions & 6 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1349,20 +1349,25 @@ Status CDCServiceImpl::UpdateCheckpoint(const ProducerTabletInfo& producer_table
}

if (update_cdc_state) {
auto res = GetCdcStateTable();
RETURN_NOT_OK(res);
const auto op = (*res)->NewUpdateOp();
auto cdc_state = VERIFY_RESULT(GetCdcStateTable());
const auto op = cdc_state->NewUpdateOp();
auto* const req = op->mutable_request();
DCHECK(!producer_tablet.stream_id.empty() && !producer_tablet.tablet_id.empty());
QLAddStringHashValue(req, producer_tablet.tablet_id);
QLAddStringRangeValue(req, producer_tablet.stream_id);
(*res)->AddStringColumnValue(req, master::kCdcCheckpoint, commit_op_id.ToString());

cdc_state->AddStringColumnValue(req, master::kCdcCheckpoint, commit_op_id.ToString());
// If we have a last record hybrid time, use that for physical time. If not, it means we're
// caught up, so the current time.
uint64_t last_replication_time_micros = last_record_hybrid_time != 0 ?
HybridTime(last_record_hybrid_time).GetPhysicalValueMicros() : GetCurrentTimeMicros();
(*res)->AddTimestampColumnValue(req, master::kCdcLastReplicationTime,
last_replication_time_micros);
cdc_state->AddTimestampColumnValue(req, master::kCdcLastReplicationTime,
last_replication_time_micros);
// Only perform the update if we have a row in cdc_state to prevent a race condition where
// a stream is deleted and then this logic inserts entries in cdc_state from that deleted
// stream.
auto* condition = req->mutable_if_expr()->mutable_condition();
condition->set_op(QL_OP_EXISTS);
RETURN_NOT_OK(RefreshCacheOnFail(session->ApplyAndFlush(op)));
}

Expand Down
29 changes: 22 additions & 7 deletions ent/src/yb/integration-tests/cdc_service-int-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -511,18 +511,20 @@ TEST_P(CDCServiceTest, TestDeleteCDCStream) {
std::vector<std::string> ranges;
ASSERT_OK(client_->GetTablets(table_.table()->name(), 0 /* max_tablets */, &tablet_ids, &ranges));

bool get_changes_error = false;
// Send GetChanges requests so an entry for each tablet can be added to the cdc_state table.
// Term and index don't matter.
for (const auto& tablet_id : tablet_ids) {
GetChanges(tablet_id, stream_id, 1, 1, &get_changes_error);
ASSERT_FALSE(get_changes_error);
VerifyCdcStateMatches(client_.get(), stream_id, tablet_id, 1, 1);
VerifyCdcStateMatches(client_.get(), stream_id, tablet_id, 0, 0);
}

{
const auto& tserver = cluster_->mini_tablet_server(0)->server();
std::string tablet_id;
GetTablet(&tablet_id, table_.name());
ASSERT_NO_FATALS(WriteTestRow(0, 10, "key0", tablet_id, tserver->proxy()));
}

ASSERT_OK(client_->DeleteCDCStream(stream_id));

// Check that the stream no longer exists.
// Check that the stream still no longer exists.
table_id.clear();
options.clear();
Status s = client_->GetCDCStream(stream_id, &table_id, &options);
Expand All @@ -531,6 +533,19 @@ TEST_P(CDCServiceTest, TestDeleteCDCStream) {
for (const auto& tablet_id : tablet_ids) {
VerifyStreamDeletedFromCdcState(client_.get(), stream_id, tablet_id);
}

// Once the CatalogManager has cleaned up cdc_state, make sure a subsequent call to GetChanges
// doesn't re-populate cdc_state with cleaned up entries. UpdateCheckpoint will be called since
// this is the first time we're calling GetChanges.
bool get_changes_error = false;
for (const auto& tablet_id : tablet_ids) {
GetChanges(tablet_id, stream_id, 0, 0, &get_changes_error);
ASSERT_FALSE(get_changes_error);
}

for (const auto& tablet_id : tablet_ids) {
VerifyStreamDeletedFromCdcState(client_.get(), stream_id, tablet_id);
}
}

TEST_P(CDCServiceTest, TestMetricsOnDeletedReplication) {
Expand Down
14 changes: 9 additions & 5 deletions ent/src/yb/integration-tests/twodc_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,15 @@ Status TwoDCTestBase::SetupUniverseReplication(

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
RETURN_NOT_OK(master_proxy->SetupUniverseReplication(req, &resp, &rpc));
if (resp.has_error()) {
return STATUS(IllegalState, "Failed setting up universe replication");
}
return Status::OK();
return WaitFor([&] () -> Result<bool> {
if (!master_proxy->SetupUniverseReplication(req, &resp, &rpc).ok()) {
return false;
}
if (resp.has_error()) {
return false;
}
return true;
}, MonoDelta::FromSeconds(30), "Setup universe replication");
}

Status TwoDCTestBase::VerifyUniverseReplication(
Expand Down
33 changes: 32 additions & 1 deletion ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ TAG_FLAG(enable_transaction_snapshots, hidden);
TAG_FLAG(enable_transaction_snapshots, advanced);
TAG_FLAG(enable_transaction_snapshots, runtime);

DEFINE_test_flag(bool, disable_cdc_state_insert_on_setup, false,
"Disable inserting new entries into cdc state as part of the setup flow.");

namespace yb {

using rpc::RpcContext;
Expand Down Expand Up @@ -2549,6 +2552,34 @@ Status CatalogManager::CreateCDCStream(const CreateCDCStreamRequestPB* req,
LOG(INFO) << "Created CDC stream " << stream->ToString();

RETURN_NOT_OK(CreateCdcStateTableIfNeeded(rpc));
if (!PREDICT_FALSE(FLAGS_TEST_disable_cdc_state_insert_on_setup) &&
(!req->has_initial_state() || (req->initial_state() == master::SysCDCStreamEntryPB::ACTIVE))) {
// Create the cdc state entries for the tablets in this table from scratch since we have no
// data to bootstrap. If we data to bootstrap, let the BootstrapProducer logic take care of
// populating entries in cdc_state.
auto ybclient = master_->cdc_state_client_initializer().client();
if (!ybclient) {
return STATUS(IllegalState, "Client not initialized or shutting down");
}
client::TableHandle cdc_table;
const client::YBTableName cdc_state_table_name(
YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
RETURN_NOT_OK(ybclient->WaitForCreateTableToFinish(cdc_state_table_name));
RETURN_NOT_OK(cdc_table.Open(cdc_state_table_name, ybclient));
std::shared_ptr<client::YBSession> session = ybclient->NewSession();
auto tablets = table->GetTablets();
for (const auto& tablet : tablets) {
const auto op = cdc_table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
auto* const req = op->mutable_request();
QLAddStringHashValue(req, tablet->id());
QLAddStringRangeValue(req, stream->id());
cdc_table.AddStringColumnValue(req, master::kCdcCheckpoint, OpId().ToString());
cdc_table.AddTimestampColumnValue(
req, master::kCdcLastReplicationTime, GetCurrentTimeMicros());
session->Apply(op);
}
RETURN_NOT_OK(session->Flush());
}
return Status::OK();
}

Expand Down Expand Up @@ -2647,7 +2678,7 @@ Status CatalogManager::FindCDCStreamsMarkedAsDeleting(

Status CatalogManager::CleanUpDeletedCDCStreams(
const std::vector<scoped_refptr<CDCStreamInfo>>& streams) {
auto ybclient = master_->async_client_initializer().client();
auto ybclient = master_->cdc_state_client_initializer().client();
if (!ybclient) {
return STATUS(IllegalState, "Client not initialized or shutting down");
}
Expand Down
22 changes: 22 additions & 0 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,28 @@ Status CatalogManager::PrepareDefaultClusterConfig(int64_t term) {
return Status::OK();
}

std::vector<std::string> CatalogManager::GetMasterAddresses() {
std::vector<std::string> result;
consensus::ConsensusStatePB state;
auto status = GetCurrentConfig(&state);
if (!status.ok()) {
LOG(WARNING) << "Failed to get current config: " << status;
return result;
}
for (const auto& peer : state.config().peers()) {
std::vector<std::string> peer_addresses;
for (const auto& list : {peer.last_known_private_addr(), peer.last_known_broadcast_addr()}) {
for (const auto& entry : list) {
peer_addresses.push_back(HostPort::FromPB(entry).ToString());
}
}
if (!peer_addresses.empty()) {
result.push_back(JoinStrings(peer_addresses, ","));
}
}
return result;
}

Status CatalogManager::PrepareDefaultSysConfig(int64_t term) {
{
LockGuard lock(permissions_manager()->mutex());
Expand Down
5 changes: 5 additions & 0 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,8 @@ class CatalogManager :

BlacklistSet BlacklistSetFromPB() const override;

std::vector<std::string> GetMasterAddresses();

protected:
// TODO Get rid of these friend classes and introduce formal interface.
friend class TableLoader;
Expand Down Expand Up @@ -1498,6 +1500,9 @@ class CatalogManager :

MonoTime time_elected_leader_;

std::unique_ptr<client::YBClient> cdc_state_client_;


void StartElectionIfReady(
const consensus::ConsensusStatePB& cstate, TabletInfo* tablet);

Expand Down
3 changes: 3 additions & 0 deletions src/yb/master/master-test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "yb/util/test_util.h"

DECLARE_bool(catalog_manager_check_ts_count_for_create_table);
DECLARE_bool(TEST_disable_cdc_state_insert_on_setup);

namespace yb {
namespace master {
Expand All @@ -76,6 +77,8 @@ void MasterTestBase::SetUp() {
// In this test, we create tables to test catalog manager behavior,
// but we have no tablet servers. Typically this would be disallowed.
FLAGS_catalog_manager_check_ts_count_for_create_table = false;
// Since this is a master-only test, don't do any operations on cdc state for xCluster tests.
FLAGS_TEST_disable_cdc_state_insert_on_setup = true;

// Start master with the create flag on.
mini_master_.reset(new MiniMaster(Env::Default(), GetTestPath("Master"),
Expand Down
39 changes: 20 additions & 19 deletions src/yb/master/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ DEFINE_test_flag(string, master_extra_list_host_port, "",

DECLARE_int64(inbound_rpc_memory_limit);

DECLARE_int32(master_ts_rpc_timeout_ms);
namespace yb {
namespace master {

Expand Down Expand Up @@ -194,28 +195,27 @@ Status Master::Init() {
.set_master_address_flag_name("master_addresses")
.default_admin_operation_timeout(MonoDelta::FromMilliseconds(FLAGS_master_rpc_timeout_ms))
.AddMasterAddressSource([this] {
std::vector<std::string> result;
consensus::ConsensusStatePB state;
auto status = catalog_manager_->GetCurrentConfig(&state);
if (!status.ok()) {
LOG(WARNING) << "Failed to get current config: " << status;
return result;
}
for (const auto& peer : state.config().peers()) {
std::vector<std::string> peer_addresses;
for (const auto& list : {peer.last_known_private_addr(), peer.last_known_broadcast_addr()}) {
for (const auto& entry : list) {
peer_addresses.push_back(HostPort::FromPB(entry).ToString());
}
}
if (!peer_addresses.empty()) {
result.push_back(JoinStrings(peer_addresses, ","));
}
}
return result;
return catalog_manager_->GetMasterAddresses();
});
async_client_init_->Start();

cdc_state_client_init_ = std::make_unique<client::AsyncClientInitialiser>(
"cdc_state_client", 0 /* num_reactors */,
// TODO: use the correct flag
60, // FLAGS_tserver_yb_client_default_timeout_ms / 1000,
"" /* tserver_uuid */,
&options(),
metric_entity(),
mem_tracker(),
messenger());
cdc_state_client_init_->builder()
.set_master_address_flag_name("master_addresses")
.default_admin_operation_timeout(MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms))
.AddMasterAddressSource([this] {
return catalog_manager_->GetMasterAddresses();
});
cdc_state_client_init_->Start();

state_ = kInitialized;
return Status::OK();
}
Expand Down Expand Up @@ -345,6 +345,7 @@ void Master::Shutdown() {
auto started = catalog_manager_->StartShutdown();
LOG_IF(DFATAL, !started) << name << " catalog manager shutdown already in progress";
async_client_init_->Shutdown();
cdc_state_client_init_->Shutdown();
RpcAndWebServerBase::Shutdown();
catalog_manager_->CompleteShutdown();
LOG(INFO) << name << " shutdown complete.";
Expand Down
5 changes: 5 additions & 0 deletions src/yb/master/master.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ class Master : public tserver::DbServerBase {
return *async_client_init_;
}

yb::client::AsyncClientInitialiser& cdc_state_client_initializer() {
return *cdc_state_client_init_;
}

enum MasterMetricType {
TaskMetric,
AttemptMetric,
Expand Down Expand Up @@ -222,6 +226,7 @@ class Master : public tserver::DbServerBase {
std::unique_ptr<MasterTabletServer> master_tablet_server_;

std::unique_ptr<yb::client::AsyncClientInitialiser> async_client_init_;
std::unique_ptr<yb::client::AsyncClientInitialiser> cdc_state_client_init_;
std::mutex master_metrics_mutex_;
std::map<std::string, scoped_refptr<Histogram>> master_metrics_ GUARDED_BY(master_metrics_mutex_);

Expand Down

0 comments on commit 98dab94

Please sign in to comment.