From 98dab943855c6642a4cd6c1fb3a52a6695f53914 Mon Sep 17 00:00:00 2001 From: Rahul Desirazu Date: Thu, 18 Nov 2021 12:36:30 -0800 Subject: [PATCH] docdb [#9877]: Fix xCluster Stream Deletion Race Condition Summary: Fix a race condition in xCluster stream deletion where cdc service upserts the checkpoint for cdc_state, causing insert of entries for streams entries. Fix this by making the update checkpoint operation a pure update and handling initial setup insert into the cdc_state table as part of master's CreateCDCStream RPC. Deletion is already handled in DeleteCDCStream so no changes are needed on the delete path. We still want to let BootstrapProducer modify cdc_state with a unique opid and bootstrap id, so we don't modify cdc_state in CreateCDCStream if the call comes from bootstrap. Test Plan: ybd release --cxx-test integration-tests_cdc_service-int-test --gtest_filter *TestDeleteCDCStream/1 Reviewers: nicolas, jhe Reviewed By: jhe Subscribers: ybase, bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D14028 --- ent/src/yb/cdc/cdc_service.cc | 17 +++++--- .../integration-tests/cdc_service-int-test.cc | 29 ++++++++++---- .../yb/integration-tests/twodc_test_base.cc | 14 ++++--- ent/src/yb/master/catalog_manager_ent.cc | 33 +++++++++++++++- src/yb/master/catalog_manager.cc | 22 +++++++++++ src/yb/master/catalog_manager.h | 5 +++ src/yb/master/master-test_base.cc | 3 ++ src/yb/master/master.cc | 39 ++++++++++--------- src/yb/master/master.h | 5 +++ 9 files changed, 129 insertions(+), 38 deletions(-) diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index e864482363da..13a7bee6ecc9 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -1349,20 +1349,25 @@ Status CDCServiceImpl::UpdateCheckpoint(const ProducerTabletInfo& producer_table } if (update_cdc_state) { - auto res = GetCdcStateTable(); - RETURN_NOT_OK(res); - const auto op = (*res)->NewUpdateOp(); + auto cdc_state = VERIFY_RESULT(GetCdcStateTable()); + const auto op = cdc_state->NewUpdateOp(); auto* const req = op->mutable_request(); DCHECK(!producer_tablet.stream_id.empty() && !producer_tablet.tablet_id.empty()); QLAddStringHashValue(req, producer_tablet.tablet_id); QLAddStringRangeValue(req, producer_tablet.stream_id); - (*res)->AddStringColumnValue(req, master::kCdcCheckpoint, commit_op_id.ToString()); + + cdc_state->AddStringColumnValue(req, master::kCdcCheckpoint, commit_op_id.ToString()); // If we have a last record hybrid time, use that for physical time. If not, it means we're // caught up, so the current time. uint64_t last_replication_time_micros = last_record_hybrid_time != 0 ? HybridTime(last_record_hybrid_time).GetPhysicalValueMicros() : GetCurrentTimeMicros(); - (*res)->AddTimestampColumnValue(req, master::kCdcLastReplicationTime, - last_replication_time_micros); + cdc_state->AddTimestampColumnValue(req, master::kCdcLastReplicationTime, + last_replication_time_micros); + // Only perform the update if we have a row in cdc_state to prevent a race condition where + // a stream is deleted and then this logic inserts entries in cdc_state from that deleted + // stream. + auto* condition = req->mutable_if_expr()->mutable_condition(); + condition->set_op(QL_OP_EXISTS); RETURN_NOT_OK(RefreshCacheOnFail(session->ApplyAndFlush(op))); } diff --git a/ent/src/yb/integration-tests/cdc_service-int-test.cc b/ent/src/yb/integration-tests/cdc_service-int-test.cc index 2c8f3bc52ccb..779991241d73 100644 --- a/ent/src/yb/integration-tests/cdc_service-int-test.cc +++ b/ent/src/yb/integration-tests/cdc_service-int-test.cc @@ -511,18 +511,20 @@ TEST_P(CDCServiceTest, TestDeleteCDCStream) { std::vector ranges; ASSERT_OK(client_->GetTablets(table_.table()->name(), 0 /* max_tablets */, &tablet_ids, &ranges)); - bool get_changes_error = false; - // Send GetChanges requests so an entry for each tablet can be added to the cdc_state table. - // Term and index don't matter. for (const auto& tablet_id : tablet_ids) { - GetChanges(tablet_id, stream_id, 1, 1, &get_changes_error); - ASSERT_FALSE(get_changes_error); - VerifyCdcStateMatches(client_.get(), stream_id, tablet_id, 1, 1); + VerifyCdcStateMatches(client_.get(), stream_id, tablet_id, 0, 0); + } + + { + const auto& tserver = cluster_->mini_tablet_server(0)->server(); + std::string tablet_id; + GetTablet(&tablet_id, table_.name()); + ASSERT_NO_FATALS(WriteTestRow(0, 10, "key0", tablet_id, tserver->proxy())); } ASSERT_OK(client_->DeleteCDCStream(stream_id)); - // Check that the stream no longer exists. + // Check that the stream still no longer exists. table_id.clear(); options.clear(); Status s = client_->GetCDCStream(stream_id, &table_id, &options); @@ -531,6 +533,19 @@ TEST_P(CDCServiceTest, TestDeleteCDCStream) { for (const auto& tablet_id : tablet_ids) { VerifyStreamDeletedFromCdcState(client_.get(), stream_id, tablet_id); } + + // Once the CatalogManager has cleaned up cdc_state, make sure a subsequent call to GetChanges + // doesn't re-populate cdc_state with cleaned up entries. UpdateCheckpoint will be called since + // this is the first time we're calling GetChanges. + bool get_changes_error = false; + for (const auto& tablet_id : tablet_ids) { + GetChanges(tablet_id, stream_id, 0, 0, &get_changes_error); + ASSERT_FALSE(get_changes_error); + } + + for (const auto& tablet_id : tablet_ids) { + VerifyStreamDeletedFromCdcState(client_.get(), stream_id, tablet_id); + } } TEST_P(CDCServiceTest, TestMetricsOnDeletedReplication) { diff --git a/ent/src/yb/integration-tests/twodc_test_base.cc b/ent/src/yb/integration-tests/twodc_test_base.cc index 53173178729e..90db3142db3f 100644 --- a/ent/src/yb/integration-tests/twodc_test_base.cc +++ b/ent/src/yb/integration-tests/twodc_test_base.cc @@ -92,11 +92,15 @@ Status TwoDCTestBase::SetupUniverseReplication( rpc::RpcController rpc; rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); - RETURN_NOT_OK(master_proxy->SetupUniverseReplication(req, &resp, &rpc)); - if (resp.has_error()) { - return STATUS(IllegalState, "Failed setting up universe replication"); - } - return Status::OK(); + return WaitFor([&] () -> Result { + if (!master_proxy->SetupUniverseReplication(req, &resp, &rpc).ok()) { + return false; + } + if (resp.has_error()) { + return false; + } + return true; + }, MonoDelta::FromSeconds(30), "Setup universe replication"); } Status TwoDCTestBase::VerifyUniverseReplication( diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index d7209d1f7229..c2b75ab0c783 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -118,6 +118,9 @@ TAG_FLAG(enable_transaction_snapshots, hidden); TAG_FLAG(enable_transaction_snapshots, advanced); TAG_FLAG(enable_transaction_snapshots, runtime); +DEFINE_test_flag(bool, disable_cdc_state_insert_on_setup, false, + "Disable inserting new entries into cdc state as part of the setup flow."); + namespace yb { using rpc::RpcContext; @@ -2549,6 +2552,34 @@ Status CatalogManager::CreateCDCStream(const CreateCDCStreamRequestPB* req, LOG(INFO) << "Created CDC stream " << stream->ToString(); RETURN_NOT_OK(CreateCdcStateTableIfNeeded(rpc)); + if (!PREDICT_FALSE(FLAGS_TEST_disable_cdc_state_insert_on_setup) && + (!req->has_initial_state() || (req->initial_state() == master::SysCDCStreamEntryPB::ACTIVE))) { + // Create the cdc state entries for the tablets in this table from scratch since we have no + // data to bootstrap. If we data to bootstrap, let the BootstrapProducer logic take care of + // populating entries in cdc_state. + auto ybclient = master_->cdc_state_client_initializer().client(); + if (!ybclient) { + return STATUS(IllegalState, "Client not initialized or shutting down"); + } + client::TableHandle cdc_table; + const client::YBTableName cdc_state_table_name( + YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); + RETURN_NOT_OK(ybclient->WaitForCreateTableToFinish(cdc_state_table_name)); + RETURN_NOT_OK(cdc_table.Open(cdc_state_table_name, ybclient)); + std::shared_ptr session = ybclient->NewSession(); + auto tablets = table->GetTablets(); + for (const auto& tablet : tablets) { + const auto op = cdc_table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); + auto* const req = op->mutable_request(); + QLAddStringHashValue(req, tablet->id()); + QLAddStringRangeValue(req, stream->id()); + cdc_table.AddStringColumnValue(req, master::kCdcCheckpoint, OpId().ToString()); + cdc_table.AddTimestampColumnValue( + req, master::kCdcLastReplicationTime, GetCurrentTimeMicros()); + session->Apply(op); + } + RETURN_NOT_OK(session->Flush()); + } return Status::OK(); } @@ -2647,7 +2678,7 @@ Status CatalogManager::FindCDCStreamsMarkedAsDeleting( Status CatalogManager::CleanUpDeletedCDCStreams( const std::vector>& streams) { - auto ybclient = master_->async_client_initializer().client(); + auto ybclient = master_->cdc_state_client_initializer().client(); if (!ybclient) { return STATUS(IllegalState, "Client not initialized or shutting down"); } diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 1fcd64803bf0..44ff003d21af 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -1236,6 +1236,28 @@ Status CatalogManager::PrepareDefaultClusterConfig(int64_t term) { return Status::OK(); } +std::vector CatalogManager::GetMasterAddresses() { + std::vector result; + consensus::ConsensusStatePB state; + auto status = GetCurrentConfig(&state); + if (!status.ok()) { + LOG(WARNING) << "Failed to get current config: " << status; + return result; + } + for (const auto& peer : state.config().peers()) { + std::vector peer_addresses; + for (const auto& list : {peer.last_known_private_addr(), peer.last_known_broadcast_addr()}) { + for (const auto& entry : list) { + peer_addresses.push_back(HostPort::FromPB(entry).ToString()); + } + } + if (!peer_addresses.empty()) { + result.push_back(JoinStrings(peer_addresses, ",")); + } + } + return result; +} + Status CatalogManager::PrepareDefaultSysConfig(int64_t term) { { LockGuard lock(permissions_manager()->mutex()); diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index de9e733dd14a..c61c95347d5f 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -825,6 +825,8 @@ class CatalogManager : BlacklistSet BlacklistSetFromPB() const override; + std::vector GetMasterAddresses(); + protected: // TODO Get rid of these friend classes and introduce formal interface. friend class TableLoader; @@ -1498,6 +1500,9 @@ class CatalogManager : MonoTime time_elected_leader_; + std::unique_ptr cdc_state_client_; + + void StartElectionIfReady( const consensus::ConsensusStatePB& cstate, TabletInfo* tablet); diff --git a/src/yb/master/master-test_base.cc b/src/yb/master/master-test_base.cc index 2a2be5511c54..df30839270b5 100644 --- a/src/yb/master/master-test_base.cc +++ b/src/yb/master/master-test_base.cc @@ -59,6 +59,7 @@ #include "yb/util/test_util.h" DECLARE_bool(catalog_manager_check_ts_count_for_create_table); +DECLARE_bool(TEST_disable_cdc_state_insert_on_setup); namespace yb { namespace master { @@ -76,6 +77,8 @@ void MasterTestBase::SetUp() { // In this test, we create tables to test catalog manager behavior, // but we have no tablet servers. Typically this would be disallowed. FLAGS_catalog_manager_check_ts_count_for_create_table = false; + // Since this is a master-only test, don't do any operations on cdc state for xCluster tests. + FLAGS_TEST_disable_cdc_state_insert_on_setup = true; // Start master with the create flag on. mini_master_.reset(new MiniMaster(Env::Default(), GetTestPath("Master"), diff --git a/src/yb/master/master.cc b/src/yb/master/master.cc index 9c1502aa5695..7995766839af 100644 --- a/src/yb/master/master.cc +++ b/src/yb/master/master.cc @@ -131,6 +131,7 @@ DEFINE_test_flag(string, master_extra_list_host_port, "", DECLARE_int64(inbound_rpc_memory_limit); +DECLARE_int32(master_ts_rpc_timeout_ms); namespace yb { namespace master { @@ -194,28 +195,27 @@ Status Master::Init() { .set_master_address_flag_name("master_addresses") .default_admin_operation_timeout(MonoDelta::FromMilliseconds(FLAGS_master_rpc_timeout_ms)) .AddMasterAddressSource([this] { - std::vector result; - consensus::ConsensusStatePB state; - auto status = catalog_manager_->GetCurrentConfig(&state); - if (!status.ok()) { - LOG(WARNING) << "Failed to get current config: " << status; - return result; - } - for (const auto& peer : state.config().peers()) { - std::vector peer_addresses; - for (const auto& list : {peer.last_known_private_addr(), peer.last_known_broadcast_addr()}) { - for (const auto& entry : list) { - peer_addresses.push_back(HostPort::FromPB(entry).ToString()); - } - } - if (!peer_addresses.empty()) { - result.push_back(JoinStrings(peer_addresses, ",")); - } - } - return result; + return catalog_manager_->GetMasterAddresses(); }); async_client_init_->Start(); + cdc_state_client_init_ = std::make_unique( + "cdc_state_client", 0 /* num_reactors */, + // TODO: use the correct flag + 60, // FLAGS_tserver_yb_client_default_timeout_ms / 1000, + "" /* tserver_uuid */, + &options(), + metric_entity(), + mem_tracker(), + messenger()); + cdc_state_client_init_->builder() + .set_master_address_flag_name("master_addresses") + .default_admin_operation_timeout(MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms)) + .AddMasterAddressSource([this] { + return catalog_manager_->GetMasterAddresses(); + }); + cdc_state_client_init_->Start(); + state_ = kInitialized; return Status::OK(); } @@ -345,6 +345,7 @@ void Master::Shutdown() { auto started = catalog_manager_->StartShutdown(); LOG_IF(DFATAL, !started) << name << " catalog manager shutdown already in progress"; async_client_init_->Shutdown(); + cdc_state_client_init_->Shutdown(); RpcAndWebServerBase::Shutdown(); catalog_manager_->CompleteShutdown(); LOG(INFO) << name << " shutdown complete."; diff --git a/src/yb/master/master.h b/src/yb/master/master.h index cd962d0d3bc0..660a00df48cc 100644 --- a/src/yb/master/master.h +++ b/src/yb/master/master.h @@ -152,6 +152,10 @@ class Master : public tserver::DbServerBase { return *async_client_init_; } + yb::client::AsyncClientInitialiser& cdc_state_client_initializer() { + return *cdc_state_client_init_; + } + enum MasterMetricType { TaskMetric, AttemptMetric, @@ -222,6 +226,7 @@ class Master : public tserver::DbServerBase { std::unique_ptr master_tablet_server_; std::unique_ptr async_client_init_; + std::unique_ptr cdc_state_client_init_; std::mutex master_metrics_mutex_; std::map> master_metrics_ GUARDED_BY(master_metrics_mutex_);