Skip to content

Commit

Permalink
[2DC][#2367] yb-admin command to pause/resume replication
Browse files Browse the repository at this point in the history
Summary:
Added set_universe_replication_enabled, which allows the user to toggle whether a
replication instance is running or not.  This command will keep the in-memory state but tell the
TServers not to setup any CDC Consumers for the disabled universes.

Test Plan: TBD (update of diff forthcoming)

Reviewers: hector, rahuldesirazu, neha

Reviewed By: neha

Subscribers: bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7273
  • Loading branch information
nspiegelberg committed Oct 4, 2019
1 parent d12d7d6 commit 6607435
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 0 deletions.
52 changes: 52 additions & 0 deletions ent/src/yb/integration-tests/twodc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,28 @@ class TwoDCTest : public YBTest {
}, MonoDelta::FromSeconds(kRpcTimeout), "Verify universe replication");
}

Status ToggleUniverseReplication(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const std::string& universe_id, bool is_enabled) {
master::SetUniverseReplicationEnabledRequestPB req;
master::SetUniverseReplicationEnabledResponsePB resp;

req.set_producer_id(universe_id);
req.set_is_enabled(is_enabled);

auto master_proxy = std::make_shared<master::MasterServiceProxy>(
&consumer_client->proxy_cache(),
consumer_cluster->leader_mini_master()->bound_rpc_addr());

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
RETURN_NOT_OK(master_proxy->SetUniverseReplicationEnabled(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}

Status VerifyUniverseReplicationDeleted(MiniCluster* consumer_cluster, YBClient* consumer_client,
const std::string& universe_id, int timeout) {
return LoggedWaitFor([=]() -> Result<bool> {
Expand Down Expand Up @@ -706,6 +728,36 @@ TEST_F(TwoDCTest, BiDirectionalWrites) {
Destroy();
}

TEST_F(TwoDCTest, ToggleReplicationEnabled) {
uint32_t replication_factor = NonTsanVsTsan(3, 1);
auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor));

std::vector<std::shared_ptr<client::YBTable>> producer_tables;
// tables contains both producer and consumer universe tables (alternately).
// Pick out just the producer table from the list.
producer_tables.reserve(1);
producer_tables.push_back(tables[0]);
ASSERT_OK(SetupUniverseReplication(
producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));

// Verify that universe is now ACTIVE
master::GetUniverseReplicationResponsePB resp;
ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));

// After we know the universe is ACTIVE, make sure all tablets are getting polled.
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));

// Disable the replication and ensure no tablets are being polled
ASSERT_OK(ToggleUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, false));
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 0));

// Enable replication and ensure that all the tablets start being polled again
ASSERT_OK(ToggleUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, true));
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));

Destroy();
}

TEST_F(TwoDCTest, TestDeleteUniverse) {
FLAGS_cdc_rpc_timeout_ms = 5000;
FLAGS_mock_get_changes_response_for_consumer_testing = true;
Expand Down
5 changes: 5 additions & 0 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ class CatalogManager : public yb::master::CatalogManager {
DeleteUniverseReplicationResponsePB* resp,
rpc::RpcContext* rpc);

// Enable/Disable an Existing Universe Replication.
CHECKED_STATUS SetUniverseReplicationEnabled(const SetUniverseReplicationEnabledRequestPB* req,
SetUniverseReplicationEnabledResponsePB* resp,
rpc::RpcContext* rpc);

// Get Universe Replication.
CHECKED_STATUS GetUniverseReplication(const GetUniverseReplicationRequestPB* req,
GetUniverseReplicationResponsePB* resp,
Expand Down
69 changes: 69 additions & 0 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,75 @@ void CatalogManager::DeleteUniverseReplicationUnlocked(
}
}

Status CatalogManager::SetUniverseReplicationEnabled(
const SetUniverseReplicationEnabledRequestPB* req,
SetUniverseReplicationEnabledResponsePB* resp,
rpc::RpcContext* rpc) {
LOG(INFO) << "Servicing SetUniverseReplicationEnabled request from " << RequestorString(rpc)
<< ": " << req->ShortDebugString();

// Sanity Checking Cluster State and Input.
RETURN_NOT_OK(CheckOnline());

if (!req->has_producer_id()) {
Status s = STATUS(InvalidArgument, "Producer universe ID must be provided", req->DebugString());
return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s);
}
if (!req->has_is_enabled()) {
Status s = STATUS(InvalidArgument, "Must explicitly set whether to enable", req->DebugString());
return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s);
}

scoped_refptr<UniverseReplicationInfo> universe;
{
std::shared_lock<LockType> l(lock_);

universe = FindPtrOrNull(universe_replication_map_, req->producer_id());
if (universe == nullptr) {
Status s = STATUS(NotFound, "Could not find CDC producer universe", req->DebugString());
return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s);
}
}

// Update the Master's Universe Config with the new state.
{
auto l = universe->LockForWrite();
if (l->data().pb.state() != SysUniverseReplicationEntryPB::DISABLED &&
l->data().pb.state() != SysUniverseReplicationEntryPB::ACTIVE) {
Status s = STATUS(InvalidArgument,
Format("Universe Replication in invalid state: $0. Retry or Delete.",
SysUniverseReplicationEntryPB::State_Name(l->data().pb.state())),
req->DebugString());
return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s);
}
if (req->is_enabled()) {
l->mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE);
} else { // DISABLE.
l->mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DISABLED);
}
RETURN_NOT_OK(sys_catalog_->UpdateItem(universe.get(), leader_ready_term_));
l->Commit();
}

// Modify the Consumer Registry, which will fan out this info to all TServers on heartbeat.
{
auto l = cluster_config_->LockForWrite();
auto producer_map = l->mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
auto it = producer_map->find(req->producer_id());
if (it == producer_map->end()) {
LOG(WARNING) << "Valid Producer Universe not in Consumer Registry: " << req->producer_id();
Status s = STATUS(NotFound, "Could not find CDC producer universe", req->DebugString());
return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s);
}
(*it).second.set_disable_stream(!req->is_enabled());
l->mutable_data()->pb.set_version(l->mutable_data()->pb.version() + 1);
RETURN_NOT_OK(sys_catalog_->UpdateItem(cluster_config_.get(), leader_ready_term_));
l->Commit();
}

return Status::OK();
}

Status CatalogManager::GetUniverseReplication(const GetUniverseReplicationRequestPB* req,
GetUniverseReplicationResponsePB* resp,
rpc::RpcContext* rpc) {
Expand Down
15 changes: 15 additions & 0 deletions ent/src/yb/tools/yb-admin_cli_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,21 @@ void ClusterAdminCli::RegisterCommandHandlers(ClusterAdminClientClass* client) {
producer_id));
return Status::OK();
});

Register(
"set_universe_replication_enabled", " <producer_universe_uuid> <0|1>",
[client](const CLIArguments& args) -> Status {
if (args.size() < 4) {
return ClusterAdminCli::kInvalidArguments;
}
const string producer_id = args[2];
const bool is_enabled = atoi(args[3].c_str()) != 0;
RETURN_NOT_OK_PREPEND(client->SetUniverseReplicationEnabled(producer_id, is_enabled),
Substitute("Unable to $0 replication for universe $1",
is_enabled ? "enable" : "disable",
producer_id));
return Status::OK();
});
}

} // namespace enterprise
Expand Down
3 changes: 3 additions & 0 deletions ent/src/yb/tools/yb-admin_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class ClusterAdminClient : public yb::tools::ClusterAdminClient {

CHECKED_STATUS DeleteUniverseReplication(const std::string& producer_id);

CHECKED_STATUS SetUniverseReplicationEnabled(const std::string& producer_id,
bool is_enabled);

private:

CHECKED_STATUS SendEncryptionRequest(const std::string& key_path, bool enable_encryption);
Expand Down
23 changes: 23 additions & 0 deletions ent/src/yb/tools/yb-admin_client_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,29 @@ Status ClusterAdminClient::DeleteUniverseReplication(const std::string& producer
return Status::OK();
}

CHECKED_STATUS ClusterAdminClient::SetUniverseReplicationEnabled(const std::string& producer_id,
bool is_enabled) {
master::SetUniverseReplicationEnabledRequestPB req;
master::SetUniverseReplicationEnabledResponsePB resp;
req.set_producer_id(producer_id);
req.set_is_enabled(is_enabled);
const string toggle = (is_enabled ? "enabl" : "disabl");

RpcController rpc;
rpc.set_timeout(timeout_);
master_proxy_->SetUniverseReplicationEnabled(req, &resp, &rpc);

if (resp.has_error()) {
cout << "Error " << toggle << "ing "
<< "universe replication: " << resp.error().status().message() << endl;
return StatusFromPB(resp.error().status());
}

cout << "Replication " << toggle << "ed successfully" << endl;
return Status::OK();
}


} // namespace enterprise
} // namespace tools
} // namespace yb
3 changes: 3 additions & 0 deletions ent/src/yb/tserver/cdc_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ void CDCConsumer::UpdateInMemoryState(const cdc::ConsumerRegistryPB* consumer_re
for (const auto& producer_map : DCHECK_NOTNULL(consumer_registry)->producer_map()) {
const auto& producer_entry_pb = producer_map.second;
proxy_manager_->UpdateProxies(producer_entry_pb);
if (producer_entry_pb.disable_stream()) {
continue;
}
for (const auto& stream_entry : producer_entry_pb.stream_map()) {
const auto& stream_entry_pb = stream_entry.second;
for (const auto& tablet_entry : stream_entry_pb.consumer_producer_tablet_map()) {
Expand Down
1 change: 1 addition & 0 deletions src/yb/cdc/cdc_consumer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ message ProducerEntryPB {
map<string, StreamEntryPB> stream_map = 1;
repeated HostPortPB master_addrs = 2;
repeated HostPortPB tserver_addrs = 3;
bool disable_stream = 4; // [default = false] implicit in proto3
}

message ConsumerRegistryPB {
Expand Down
14 changes: 14 additions & 0 deletions src/yb/master/master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ message SysUniverseReplicationEntryPB {
// creating CDC streams, starting subscribers. If any of these fail, we set the universe
// replication state to FAILED.
FAILED = 3;
// Disabled.
DISABLED = 6;
// Deleted.
DELETED = 4;
// Error while cleaning up state of deleted entry. This indicates that universe replication has
Expand Down Expand Up @@ -1556,6 +1558,16 @@ message DeleteUniverseReplicationResponsePB {
optional MasterErrorPB error = 1;
}


message SetUniverseReplicationEnabledRequestPB {
optional string producer_id = 1;
optional bool is_enabled = 2;
}

message SetUniverseReplicationEnabledResponsePB {
optional MasterErrorPB error = 1;
}

message GetUniverseReplicationRequestPB {
optional string producer_id = 1;
}
Expand Down Expand Up @@ -1669,6 +1681,8 @@ service MasterService {
returns (SetupUniverseReplicationResponsePB);
rpc DeleteUniverseReplication(DeleteUniverseReplicationRequestPB)
returns (DeleteUniverseReplicationResponsePB);
rpc SetUniverseReplicationEnabled(SetUniverseReplicationEnabledRequestPB)
returns (SetUniverseReplicationEnabledResponsePB);
rpc GetUniverseReplication(GetUniverseReplicationRequestPB)
returns (GetUniverseReplicationResponsePB);
}
7 changes: 7 additions & 0 deletions src/yb/master/master_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,13 @@ void MasterServiceImpl::DeleteUniverseReplication(const DeleteUniverseReplicatio
HandleIn(req, resp, &rpc, &enterprise::CatalogManager::DeleteUniverseReplication);
}

void MasterServiceImpl::SetUniverseReplicationEnabled(
const SetUniverseReplicationEnabledRequestPB* req,
SetUniverseReplicationEnabledResponsePB* resp,
rpc::RpcContext rpc) {
HandleIn(req, resp, &rpc, &enterprise::CatalogManager::SetUniverseReplicationEnabled);
}

void MasterServiceImpl::GetUniverseReplication(const GetUniverseReplicationRequestPB* req,
GetUniverseReplicationResponsePB* resp,
rpc::RpcContext rpc) {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/master/master_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ class MasterServiceImpl : public MasterServiceIf,
DeleteUniverseReplicationResponsePB* resp,
rpc::RpcContext rpc) override;

void SetUniverseReplicationEnabled(const SetUniverseReplicationEnabledRequestPB* req,
SetUniverseReplicationEnabledResponsePB* resp,
rpc::RpcContext rpc) override;

void GetUniverseReplication(const GetUniverseReplicationRequestPB* req,
GetUniverseReplicationResponsePB* resp,
rpc::RpcContext rpc) override;
Expand Down

0 comments on commit 6607435

Please sign in to comment.