diff --git a/ent/src/yb/tserver/twodc_output_client.cc b/ent/src/yb/tserver/twodc_output_client.cc index 5fbb444c73e8..fba05e8516f6 100644 --- a/ent/src/yb/tserver/twodc_output_client.cc +++ b/ent/src/yb/tserver/twodc_output_client.cc @@ -76,6 +76,7 @@ class TwoDCOutputClient : public cdc::CDCOutputClient { ~TwoDCOutputClient() { std::lock_guard l(lock_); + shutdown_ = true; rpcs_->Abort({&write_handle_}); } @@ -101,7 +102,7 @@ class TwoDCOutputClient : public cdc::CDCOutputClient { const std::string partition_key_end, const Result>& tablets); - Status ProcessSplitOp(const cdc::CDCRecordPB& record); + Result ProcessSplitOp(const cdc::CDCRecordPB& record); // Processes the Record and sends the CDCWrite for it. Status ProcessRecord( @@ -138,6 +139,7 @@ class TwoDCOutputClient : public cdc::CDCOutputClient { Status error_status_ GUARDED_BY(lock_); OpIdPB op_id_ GUARDED_BY(lock_) = consensus::MinimumOpId(); bool done_processing_ GUARDED_BY(lock_) = false; + bool shutdown_ GUARDED_BY(lock_) = false; uint32_t processed_record_count_ GUARDED_BY(lock_) = 0; uint32_t record_count_ GUARDED_BY(lock_) = 0; @@ -225,7 +227,11 @@ Status TwoDCOutputClient::ProcessChangesStartingFromIndex(int start) { break; } // No other records to process, so we can process the SPLIT_OP. - RETURN_NOT_OK(ProcessSplitOp(record)); + bool done = VERIFY_RESULT(ProcessSplitOp(record)); + if (done) { + HandleResponse(); + return Status::OK(); + } continue; } @@ -317,7 +323,7 @@ Status TwoDCOutputClient::ProcessRecordForLocalTablet(const int record_idx) { return ProcessRecord({consumer_tablet_info_.tablet_id}, twodc_resp_copy_.records(record_idx)); } -Status TwoDCOutputClient::ProcessSplitOp(const cdc::CDCRecordPB& record) { +Result TwoDCOutputClient::ProcessSplitOp(const cdc::CDCRecordPB& record) { // Construct and send the update request. master::ProducerSplitTabletInfoPB split_info; split_info.set_tablet_id(record.split_tablet_request().tablet_id()); @@ -340,10 +346,7 @@ Status TwoDCOutputClient::ProcessSplitOp(const cdc::CDCRecordPB& record) { std::lock_guard l(lock_); done = IncProcessedRecordCount(); } - if (done) { - HandleResponse(); - } - return Status::OK(); + return done; } void TwoDCOutputClient::SendNextCDCWriteToTablet(std::unique_ptr write_request) { @@ -374,6 +377,10 @@ void TwoDCOutputClient::WriteCDCRecordDone(const Status& status, const WriteResp { std::lock_guard l(lock_); retained = rpcs_->Unregister(&write_handle_); + if (shutdown_) { + LOG(INFO) << "Aborting ApplyChanges since the client is shutting down."; + return; + } } if (!status.ok()) { HandleError(status, true /* done */); diff --git a/src/yb/integration-tests/xcluster-tablet-split-itest.cc b/src/yb/integration-tests/xcluster-tablet-split-itest.cc index c097b418050b..bd13b9498ca3 100644 --- a/src/yb/integration-tests/xcluster-tablet-split-itest.cc +++ b/src/yb/integration-tests/xcluster-tablet-split-itest.cc @@ -397,18 +397,24 @@ TEST_F(XClusterTabletSplitITest, SplittingOnProducerAndConsumer) { ASSERT_OK(producer_table.Open(table_->name(), client_.get())); auto producer_session = client_->NewSession(); producer_session->SetTimeout(60s); - int32_t key = kDefaultNumRows; + int32_t key = kDefaultNumRows + 1; while (!stop) { - key = (key + 1); - ASSERT_RESULT(client::kv_table_test::WriteRow( + auto res = client::kv_table_test::WriteRow( &producer_table, producer_session, key, key, - client::WriteOpType::INSERT, client::Flush::kTrue)); + client::WriteOpType::INSERT, client::Flush::kTrue); + if (!res.ok() && res.status().IsNotFound()) { + LOG(INFO) << "Encountered NotFound error on write : " << res; + } else { + ASSERT_OK(res); + key++; + } } }); // Perform tablet splits on both sides. ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 1)); SwitchToConsumer(); + ASSERT_OK(FlushTestTable()); ASSERT_OK(SplitAllTablets( /* cur_num_tablets */ 1, /* parent_tablet_protected_from_deletion */ false)); SwitchToProducer();