Skip to content

Commit

Permalink
[#22479] docdb: Pass epoch through DB cloning calls
Browse files Browse the repository at this point in the history
Summary:
Previously, the `CloneStateManager`'s `Upsert` function just used the catalog manager leader term. This incorrectly handled master failover and failback (e.g. we might "unabort" a clone). This diff attaches the initial LeaderEpoch from the request to the clone state and uses it for all upserts. On failover, we set the epoch to the current epoch so we can abort the task, however.

Fixes #22479.

Jira: DB-11395

Test Plan: Existing tests.

Reviewers: zdrudi

Reviewed By: zdrudi

Subscribers: mhaddad, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D36487
  • Loading branch information
SrivastavaAnubhav committed Jul 15, 2024
1 parent 19ab966 commit 835e30d
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 88 deletions.
2 changes: 1 addition & 1 deletion src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ Status CatalogManager::RunLoaders(SysCatalogLoadingState* state) {
RETURN_NOT_OK(LoadUniverseReplicationBootstrap());

RETURN_NOT_OK(xcluster_manager_->RunLoaders(hidden_tablets_));
RETURN_NOT_OK(clone_state_manager_->ClearAndRunLoaders());
RETURN_NOT_OK(clone_state_manager_->ClearAndRunLoaders(state->epoch));

return Status::OK();
}
Expand Down
10 changes: 10 additions & 0 deletions src/yb/master/clone/clone_state_entity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ void CloneStateInfo::AddTabletData(TabletData tablet_data) {
tablet_data_.push_back(std::move(tablet_data));
}

LeaderEpoch CloneStateInfo::Epoch() {
std::lock_guard l(mutex_);
return epoch_;
}

void CloneStateInfo::SetEpoch(const LeaderEpoch& epoch) {
std::lock_guard l(mutex_);
epoch_ = epoch;
}

YQLDatabase CloneStateInfo::DatabaseType() {
std::lock_guard l(mutex_);
return database_type_;
Expand Down
12 changes: 9 additions & 3 deletions src/yb/master/clone/clone_state_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

namespace yb::master {

struct PersistentCloneStateInfo :
public Persistent<SysCloneStatePB, SysRowEntryType::CLONE_STATE> {
bool IsDone() const {
struct PersistentCloneStateInfo : public Persistent<SysCloneStatePB, SysRowEntryType::CLONE_STATE>
{};

struct CloneStateInfoHelpers {
static bool IsDone(const SysCloneStatePB& pb) {
return pb.aggregate_state() == SysCloneStatePB::COMPLETE ||
pb.aggregate_state() == SysCloneStatePB::ABORTED;
}
Expand All @@ -48,6 +50,9 @@ class CloneStateInfo : public MetadataCowWrapper<PersistentCloneStateInfo> {
YQLDatabase DatabaseType();
void SetDatabaseType(YQLDatabase database_type);

LeaderEpoch Epoch();
void SetEpoch(const LeaderEpoch& epoch);

const TxnSnapshotId& SourceSnapshotId();
void SetSourceSnapshotId(const TxnSnapshotId& source_snapshot_id);

Expand All @@ -61,6 +66,7 @@ class CloneStateInfo : public MetadataCowWrapper<PersistentCloneStateInfo> {
// The ID field is used in the sys_catalog table.
const std::string clone_request_id_;

LeaderEpoch epoch_ GUARDED_BY(mutex_);
YQLDatabase database_type_ GUARDED_BY(mutex_);

// These fields are set before the clone state is set to CREATING.
Expand Down
56 changes: 28 additions & 28 deletions src/yb/master/clone/clone_state_manager-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class CloneStateManagerTest : public YBTest {
AsyncEnableDbConns::EnableDbConnsCallbackType callback), (override));

MOCK_METHOD(
Status, Upsert, (const CloneStateInfoPtr& clone_state), (override));
Status, Upsert, (int64_t leader_term, const CloneStateInfoPtr& clone_state), (override));
MOCK_METHOD(
Status, Load,
(const std::string& type,
Expand Down Expand Up @@ -220,7 +220,7 @@ class CloneStateManagerTest : public YBTest {
Result<CloneStateInfoPtr> CreateCloneState(
uint32_t seq_no, const ExternalTableSnapshotDataMap& table_snapshot_data) {
auto clone_state = VERIFY_RESULT(clone_state_manager_->CreateCloneState(
seq_no, kSourceNamespaceId, GetDatabaseType(), kTargetNamespaceName, kRestoreTime));
kEpoch, seq_no, kSourceNamespaceId, GetDatabaseType(), kTargetNamespaceName, kRestoreTime));

RETURN_NOT_OK(clone_state_manager_->UpdateCloneStateWithSnapshotInfo(
clone_state, kSourceSnapshotId, kTargetSnapshotId, table_snapshot_data));
Expand All @@ -236,7 +236,7 @@ class CloneStateManagerTest : public YBTest {
tablet_ids.set_new_id("test_target_id");
*table_data.table_meta->add_tablets_ids() = tablet_ids;

EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
return CreateCloneState(kSeqNo + 1, table_snapshot_data);
}

Expand All @@ -255,7 +255,7 @@ class CloneStateManagerTest : public YBTest {

// Creates a clone state and schedules clone ops to move it into the CREATING state.
Result<CloneStateInfoPtr> CreateCloneStateAndStartCloning() {
EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
auto clone_state = VERIFY_RESULT(CreateCloneState(kSeqNo, DefaultTableSnapshotData()));

for (int i = 0; i < kNumTablets; ++i) {
Expand All @@ -265,8 +265,8 @@ class CloneStateManagerTest : public YBTest {
.WillOnce(Return(target_tablets_[i]));
EXPECT_CALL(MockFuncs(), ScheduleCloneTabletCall(source_tablets_[i], kEpoch, _));
}
EXPECT_CALL(MockFuncs(), Upsert(_));
RETURN_NOT_OK(ScheduleCloneOps(clone_state, kEpoch, {} /* not_snapshotted_tablets */));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
RETURN_NOT_OK(ScheduleCloneOps(clone_state, {} /* not_snapshotted_tablets */));
return clone_state;
}

Expand All @@ -290,9 +290,9 @@ class CloneStateManagerTest : public YBTest {
}

Status ScheduleCloneOps(
const CloneStateInfoPtr& clone_state, const LeaderEpoch& epoch,
const CloneStateInfoPtr& clone_state,
const std::unordered_set<TabletId>& not_snapshotted_tablets) {
return clone_state_manager_->ScheduleCloneOps(clone_state, epoch, not_snapshotted_tablets);
return clone_state_manager_->ScheduleCloneOps(clone_state, not_snapshotted_tablets);
}
Result<std::pair<NamespaceId, uint32_t>> CloneNamespace(
const NamespaceIdentifierPB& source_namespace_identifier,
Expand All @@ -310,7 +310,7 @@ class CloneStateManagerTest : public YBTest {
const std::string& target_namespace_name,
CoarseTimePoint deadline, const LeaderEpoch& epoch) {
return clone_state_manager_->MakeDoneClonePgSchemaCallback(
clone_state, snapshot_schedule_id, target_namespace_name, deadline, epoch);
clone_state, snapshot_schedule_id, target_namespace_name, deadline);
}

void AssertCloneIsAborted() {
Expand Down Expand Up @@ -356,7 +356,7 @@ class CloneStateManagerPgTest : public CloneStateManagerTest {
};

TEST_F(CloneStateManagerTest, CreateCloneState) {
EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
auto clone_state = ASSERT_RESULT(CreateCloneState(kSeqNo, DefaultTableSnapshotData()));

// Check clone state persisted fields.
Expand Down Expand Up @@ -388,8 +388,8 @@ TEST_F(CloneStateManagerTest, CreateSecondCloneState) {
l.mutable_data()->pb.set_aggregate_state(state);
l.Commit();

if (current_clone_state->LockForRead()->IsDone()) {
EXPECT_CALL(MockFuncs(), Upsert(_));
if (CloneStateInfoHelpers::IsDone(current_clone_state->LockForRead()->pb)) {
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
ASSERT_OK(CreateCloneState(i + 1, DefaultTableSnapshotData()));
} else {
auto s = CreateCloneState(i + 1, DefaultTableSnapshotData());
Expand All @@ -400,7 +400,7 @@ TEST_F(CloneStateManagerTest, CreateSecondCloneState) {
}

TEST_F(CloneStateManagerTest, ScheduleCloneOps) {
EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
auto clone_state = ASSERT_RESULT(CreateCloneState(kSeqNo, DefaultTableSnapshotData()));

for (int i = 0; i < kNumTablets; ++i) {
Expand All @@ -424,8 +424,8 @@ TEST_F(CloneStateManagerTest, ScheduleCloneOps) {
EXPECT_CALL(MockFuncs(), ScheduleCloneTabletCall(
source_tablets_[i], kEpoch, CloneTabletRequestPBMatcher(expected_req)));
}
EXPECT_CALL(MockFuncs(), Upsert(_));
ASSERT_OK(ScheduleCloneOps(clone_state, kEpoch, {} /* not_snapshotted_tablets */));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
ASSERT_OK(ScheduleCloneOps(clone_state, {} /* not_snapshotted_tablets */));
}

TEST_F(CloneStateManagerTest, HandleCreatingStateAllTabletsCreating) {
Expand Down Expand Up @@ -481,7 +481,7 @@ TEST_F(CloneStateManagerTest, HandleCreatingStateAllTabletsRunning) {

// HandleCreatingState should transition aggregate state to RESTORING and should also trigger a
// restore.
EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
EXPECT_CALL(MockFuncs(), Restore(kTargetSnapshotId, kRestoreTime))
.WillOnce(Return(kRestorationId));
ASSERT_OK(HandleCreatingState(clone_state));
Expand Down Expand Up @@ -539,7 +539,7 @@ TEST_F(CloneStateManagerTest, AbortInStartTabletsCloning) {
EXPECT_CALL(MockFuncs(), FindNamespace).WillOnce(Return(source_ns_));
EXPECT_CALL(MockFuncs(), ListSnapshotSchedules)
.WillOnce(DoAll(SetArgPointee<0>(DefaultListSnapshotSchedules()), Return(Status::OK())));
EXPECT_CALL(MockFuncs(), Upsert(_)).WillRepeatedly(Return(Status::OK()));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _)).WillRepeatedly(Return(Status::OK()));
EXPECT_CALL(MockFuncs(), GenerateSnapshotInfoFromScheduleForClone).WillOnce(Return(
STATUS_FORMAT(IllegalState, "Fail GenerateSnapshotInfoFromScheduleForClone for test")));

Expand All @@ -556,7 +556,7 @@ TEST_F_EX(CloneStateManagerTest, AbortIfFailToSchedulePgCloneSchema, CloneStateM
.WillOnce(DoAll(SetArgPointee<0>(DefaultListSnapshotSchedules()), Return(Status::OK())));
TSDescriptorPtr dummy_ts_desc = std::make_shared<TSDescriptor>("ts0" /* perm_id*/);
EXPECT_CALL(MockFuncs(), PickTserver).WillOnce(Return(dummy_ts_desc));
EXPECT_CALL(MockFuncs(), Upsert(_)).WillRepeatedly(Return(Status::OK()));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _)).WillRepeatedly(Return(Status::OK()));
EXPECT_CALL(MockFuncs(), ScheduleClonePgSchemaTask).WillOnce(Return(
STATUS_FORMAT(IllegalState, "Fail ScheduleClonePgSchemaTask for test")));

Expand All @@ -568,21 +568,21 @@ TEST_F_EX(CloneStateManagerTest, AbortIfFailToSchedulePgCloneSchema, CloneStateM
}

TEST_F_EX(CloneStateManagerTest, AbortInPgSchemaClone, CloneStateManagerPgTest) {
EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
auto clone_state = ASSERT_RESULT(CreateCloneState(kSeqNo, DefaultTableSnapshotData()));
auto callback = MakeDoneClonePgSchemaCallback(
clone_state, kSnapshotScheduleId, kTargetNamespaceName,
CoarseMonoClock::Now() + 10s /* deadline */, kEpoch);

// We expect an upsert when aborting the clone.
EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
ASSERT_OK(callback(STATUS_FORMAT(IllegalState, "Fail pg schema clone for test")));

AssertCloneIsAborted();
}

TEST_F_EX(CloneStateManagerTest, AbortInStartTabletsCloningPg, CloneStateManagerPgTest) {
EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
auto clone_state = ASSERT_RESULT(CreateCloneState(kSeqNo, DefaultTableSnapshotData()));
auto callback = MakeDoneClonePgSchemaCallback(
clone_state, kSnapshotScheduleId, kTargetNamespaceName,
Expand All @@ -591,7 +591,7 @@ TEST_F_EX(CloneStateManagerTest, AbortInStartTabletsCloningPg, CloneStateManager
// We expect an upsert when aborting the clone.
EXPECT_CALL(MockFuncs(), GenerateSnapshotInfoFromScheduleForClone).WillOnce(Return(
STATUS_FORMAT(IllegalState, "Fail GenerateSnapshotInfoFromScheduleForClone for test")));
EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
ASSERT_OK(callback(Status::OK() /* pg_schema_cloning_status */));

AssertCloneIsAborted();
Expand All @@ -603,7 +603,7 @@ TEST_F(CloneStateManagerTest, AbortInCreatingState) {
// We expect an upsert when aborting the clone.
EXPECT_CALL(MockFuncs(), GetTabletInfo(_))
.WillOnce(Return(STATUS_FORMAT(IllegalState, "Fail GetTabletInfo for test")));
EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
ASSERT_OK(clone_state_manager_->Run());

AssertCloneIsAborted();
Expand All @@ -620,7 +620,7 @@ TEST_F(CloneStateManagerTest, AbortInRestoringState) {
// We expect an upsert when aborting the clone.
EXPECT_CALL(MockFuncs(), ListRestorations(_, _))
.WillOnce(Return(STATUS_FORMAT(IllegalState, "Fail ListRestorations for test")));
EXPECT_CALL(MockFuncs(), Upsert(_));
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
ASSERT_OK(clone_state_manager_->Run());

AssertCloneIsAborted();
Expand Down Expand Up @@ -648,7 +648,7 @@ TEST_F(CloneStateManagerTest, Load) {
std::function<Status(const std::string&, const SysCloneStatePB&)> inserter;
EXPECT_CALL(MockFuncs(), Load)
.WillRepeatedly(DoAll(SaveArg<1>(&inserter), Return(Status::OK())));
ASSERT_OK(clone_state_manager_->ClearAndRunLoaders());
ASSERT_OK(clone_state_manager_->ClearAndRunLoaders(kEpoch));

// Run the inserter to actually load the data. Load them in reverse order to test that the
// clone state map ordering works.
Expand Down Expand Up @@ -695,11 +695,11 @@ TEST_F(CloneStateManagerTest, AbortIncompleteCloneOnLoad) {
std::function<Status(const std::string&, const SysCloneStatePB&)> inserter;
EXPECT_CALL(MockFuncs(), Load)
.WillOnce(DoAll(SaveArg<1>(&inserter), Return(Status::OK())));
ASSERT_OK(clone_state_manager_->ClearAndRunLoaders());
ASSERT_OK(clone_state_manager_->ClearAndRunLoaders(kEpoch));

// Run the inserter to load the clone state.
if (!orig_lock->IsDone()) {
EXPECT_CALL(MockFuncs(), Upsert(_));
if (!CloneStateInfoHelpers::IsDone(orig_lock->pb)) {
EXPECT_CALL(MockFuncs(), Upsert(kEpoch.leader_term, _));
}
ASSERT_OK(inserter(clone_state->id(), orig_lock->pb));
auto loaded_clone_state = GetLatestCloneState();
Expand Down
Loading

0 comments on commit 835e30d

Please sign in to comment.