Skip to content

Commit

Permalink
[yugabyte#12386] xCluster: Fix sanitizer errors in twodc_output_clien…
Browse files Browse the repository at this point in the history
…t.cc

Summary:
Addresses the following errors:

ASAN error:
```
==47493==ERROR: AddressSanitizer: heap-use-after-free on address 0x6150014911e8 at pc 0x00000046ce8e bp 0x7f84d2a01ee0 sp 0x7f84d2a01ed8
READ of size 4 at 0x6150014911e8 thread T561 (rpc_tp_CDCConsu)
    #0 0x46ce8d in google::protobuf::internal::RepeatedPtrFieldBase::size() const /opt/yb-build/thirdparty/yugabyte-db-thirdparty-v20220322021123-e488f7fa5b-almalinux8-x86_64-clang12/installed/asan/include/google/protobuf/repeated_field.h:1515:10
    #1 0x7f864b3e0118 in google::protobuf::RepeatedPtrField<yb::cdc::CDCRecordPB>::size() const /opt/yb-build/thirdparty/yugabyte-db-thirdparty-v20220322021123-e488f7fa5b-almalinux8-x86_64-clang12/installed/asan/include/google/protobuf/repeated_field.h:1984:32
    #2 0x7f864b3dd9fc in yb::cdc::GetChangesResponsePB::records_size() const $YB_SRC_ROOT/build/asan-clang12-dynamic-ninja/src/yb/cdc/cdc_service.pb.h:9334:19
    #3 0x7f864b3da477 in
    yb::tserver::enterprise::TwoDCOutputClient::ProcessChangesStartingFromIndex(int)
    $YB_SRC_ROOT/build/asan-clang12-dynaamic-ninja/../../ent/src/yb/tserver/twodc_output_client.cc:213:44
    #4 0x7f864b3dd131 in yb::tserver::enterprise::TwoDCOutputClient::WriteCDCRecordDone(yb::Status const&, yb::tserver::WriteResponsePB const&) $YB_SRC_ROOT/build/asan-clang12-dynamic-ninja/../../ent/src/yb/tserver/twodc_output_client.cc:408:18
```
```
0x6150014911e8 is located 360 bytes inside of 512-byte region [0x615001491080,0x615001491280)
freed by thread T772 (CDCConsumerHand) here:
    #0 0x7f8657e7465d in operator delete(void*) /opt/yb-build/llvm/yb-llvm-v12.0.1-yb-1-1633143152-bdb147e6-almalinux8-x86_64-build/src/llvm-project/compiler-rt/lib/asan/asan_new_delete.cpp:160:3
    #1 0x7f864b3e00f5 in yb::tserver::enterprise::TwoDCOutputClient::~TwoDCOutputClient() $YB_SRC_ROOT/build/asan-clang12-dynamic-ninja/../../ent/src/yb/tserver/twodc_output_client.cc:77:24
```
Fixed by adding in `shutdown_` variable to stop processing future records if the client is being
shutdown.

---
TSAN error:
```
WARNING: ThreadSanitizer: data race (pid=430)
  Write of size 4 at 0x7b5000b81168 by thread T306 (mutexes: write M1014289907445001104):
    #0 void google::protobuf::internal::RepeatedPtrFieldBase::Clear<google::protobuf::RepeatedPtrField<yb::cdc::CDCRecordPB>::TypeHandler>() /opt/yb-build/thirdparty/yugabyte-db-thirdparty-v20220420172553-91c632476c-centos7-x86_64-clang12/installed/tsan/include/google/protobuf/repeated_field.h:1593:19 (libcdc_service_proto.so+0xf1df8)
    #1 google::protobuf::RepeatedPtrField<yb::cdc::CDCRecordPB>::Clear() /opt/yb-build/thirdparty/yugabyte-db-thirdparty-v20220420172553-91c632476c-centos7-x86_64-clang12/installed/tsan/include/google/protobuf/repeated_field.h:2102:25 (libcdc_service_proto.so+0xe5ce9)
    #2 yb::cdc::GetChangesResponsePB::Clear() /nfusr/centos-gcp-cloud/jenkins-worker-2lx048/jenkins/jenkins-github-yugabyte-db-centos-master-clang12-tsan-175/build/tsan-clang12-dynamic-ninja/src/yb/cdc/cdc_service.pb.cc:10118:12 (libcdc_service_proto.so+0xcdaa9)
```
```
 Previous read of size 4 at 0x7b5000b81168 by thread T352:
    #0 google::protobuf::internal::RepeatedPtrFieldBase::size() const /opt/yb-build/thirdparty/yugabyte-db-thirdparty-v20220420172553-91c632476c-centos7-x86_64-clang12/installed/tsan/include/google/protobuf/repeated_field.h:1515:10 (xcluster-tablet-split-itest+0x35072a)
    #1 google::protobuf::RepeatedPtrField<yb::cdc::CDCRecordPB>::size() const /opt/yb-build/thirdparty/yugabyte-db-thirdparty-v20220420172553-91c632476c-centos7-x86_64-clang12/installed/tsan/include/google/protobuf/repeated_field.h:1984:32 (libtserver.so+0x591ff9)
    #2 yb::cdc::GetChangesResponsePB::records_size() const /nfusr/centos-gcp-cloud/jenkins-worker-2lx048/jenkins/jenkins-github-yugabyte-db-centos-master-clang12-tsan-175/build/tsan-clang12-dynamic-ninja/src/yb/cdc/cdc_service.pb.h:9367:19 (libtserver.so+0x59028d)
    #3 yb::tserver::enterprise::TwoDCOutputClient::ProcessChangesStartingFromIndex(int) /nfusr/centos-gcp-cloud/jenkins-worker-2lx048/jenkins/jenkins-github-yugabyte-db-centos-master-clang12-tsan-175/build/tsan-clang12-dynamic-ninja/../../ent/src/yb/tserver/twodc_output_client.cc:213:44 (libtserver.so+0x58e8ee)
    #4 yb::tserver::enterprise::TwoDCOutputClient::WriteCDCRecordDone(yb::Status const&, yb::tserver::WriteResponsePB const&) /nfusr/centos-gcp-cloud/jenkins-worker-2lx048/jenkins/jenkins-github-yugabyte-db-centos-master-clang12-tsan-175/build/tsan-clang12-dynamic-ninja/../../ent/src/yb/tserver/twodc_output_client.cc:408:18 (libtserver.so+0x58fa0e)
```
Fixed by returning immediately after completing ProcessSplitOp and there are no more records to
process. Previously we would only continue the loop, which would still do an access on the
`twodc_resp_copy_` object.

Test Plan:
```
ybd tsan --cxx-test integration-tests_xcluster-tablet-split-itest --gtest_filter XClusterTabletSplitITest.SplittingOnProducerAndConsumer
ybd asan --cxx-test integration-tests_xcluster-tablet-split-itest --gtest_filter XClusterTabletSplitITest.SplittingOnProducerAndConsumer
```

Reviewers: rahuldesirazu, nicolas

Reviewed By: nicolas

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D16955
  • Loading branch information
hulien22 committed May 23, 2022
1 parent f2b4b04 commit 783e701
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
21 changes: 14 additions & 7 deletions ent/src/yb/tserver/twodc_output_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class TwoDCOutputClient : public cdc::CDCOutputClient {

~TwoDCOutputClient() {
std::lock_guard<decltype(lock_)> l(lock_);
shutdown_ = true;
rpcs_->Abort({&write_handle_});
}

Expand All @@ -101,7 +102,7 @@ class TwoDCOutputClient : public cdc::CDCOutputClient {
const std::string partition_key_end,
const Result<std::vector<client::internal::RemoteTabletPtr>>& tablets);

Status ProcessSplitOp(const cdc::CDCRecordPB& record);
Result<bool> ProcessSplitOp(const cdc::CDCRecordPB& record);

// Processes the Record and sends the CDCWrite for it.
Status ProcessRecord(
Expand Down Expand Up @@ -138,6 +139,7 @@ class TwoDCOutputClient : public cdc::CDCOutputClient {
Status error_status_ GUARDED_BY(lock_);
OpIdPB op_id_ GUARDED_BY(lock_) = consensus::MinimumOpId();
bool done_processing_ GUARDED_BY(lock_) = false;
bool shutdown_ GUARDED_BY(lock_) = false;

uint32_t processed_record_count_ GUARDED_BY(lock_) = 0;
uint32_t record_count_ GUARDED_BY(lock_) = 0;
Expand Down Expand Up @@ -225,7 +227,11 @@ Status TwoDCOutputClient::ProcessChangesStartingFromIndex(int start) {
break;
}
// No other records to process, so we can process the SPLIT_OP.
RETURN_NOT_OK(ProcessSplitOp(record));
bool done = VERIFY_RESULT(ProcessSplitOp(record));
if (done) {
HandleResponse();
return Status::OK();
}
continue;
}

Expand Down Expand Up @@ -317,7 +323,7 @@ Status TwoDCOutputClient::ProcessRecordForLocalTablet(const int record_idx) {
return ProcessRecord({consumer_tablet_info_.tablet_id}, twodc_resp_copy_.records(record_idx));
}

Status TwoDCOutputClient::ProcessSplitOp(const cdc::CDCRecordPB& record) {
Result<bool> TwoDCOutputClient::ProcessSplitOp(const cdc::CDCRecordPB& record) {
// Construct and send the update request.
master::ProducerSplitTabletInfoPB split_info;
split_info.set_tablet_id(record.split_tablet_request().tablet_id());
Expand All @@ -340,10 +346,7 @@ Status TwoDCOutputClient::ProcessSplitOp(const cdc::CDCRecordPB& record) {
std::lock_guard<decltype(lock_)> l(lock_);
done = IncProcessedRecordCount();
}
if (done) {
HandleResponse();
}
return Status::OK();
return done;
}

void TwoDCOutputClient::SendNextCDCWriteToTablet(std::unique_ptr<WriteRequestPB> write_request) {
Expand Down Expand Up @@ -374,6 +377,10 @@ void TwoDCOutputClient::WriteCDCRecordDone(const Status& status, const WriteResp
{
std::lock_guard<decltype(lock_)> l(lock_);
retained = rpcs_->Unregister(&write_handle_);
if (shutdown_) {
LOG(INFO) << "Aborting ApplyChanges since the client is shutting down.";
return;
}
}
if (!status.ok()) {
HandleError(status, true /* done */);
Expand Down
14 changes: 10 additions & 4 deletions src/yb/integration-tests/xcluster-tablet-split-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,18 +397,24 @@ TEST_F(XClusterTabletSplitITest, SplittingOnProducerAndConsumer) {
ASSERT_OK(producer_table.Open(table_->name(), client_.get()));
auto producer_session = client_->NewSession();
producer_session->SetTimeout(60s);
int32_t key = kDefaultNumRows;
int32_t key = kDefaultNumRows + 1;
while (!stop) {
key = (key + 1);
ASSERT_RESULT(client::kv_table_test::WriteRow(
auto res = client::kv_table_test::WriteRow(
&producer_table, producer_session, key, key,
client::WriteOpType::INSERT, client::Flush::kTrue));
client::WriteOpType::INSERT, client::Flush::kTrue);
if (!res.ok() && res.status().IsNotFound()) {
LOG(INFO) << "Encountered NotFound error on write : " << res;
} else {
ASSERT_OK(res);
key++;
}
}
});

// Perform tablet splits on both sides.
ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 1));
SwitchToConsumer();
ASSERT_OK(FlushTestTable());
ASSERT_OK(SplitAllTablets(
/* cur_num_tablets */ 1, /* parent_tablet_protected_from_deletion */ false));
SwitchToProducer();
Expand Down

0 comments on commit 783e701

Please sign in to comment.