Skip to content

Commit

Permalink
Bugfix: fix kvstore stuck issue during desctruction sequence
Browse files Browse the repository at this point in the history
Summary:
As titled, several different approaches have been added to address original issue of crash when eventbase is destructed. See P135204939

```
(gdb) bt
#0  0x000000000187ea8e in re2::RE2::Set::Match (this=0x2, text=..., v=0x7fffd8c0d500, error_info=0x0) at re2/set.cc:110
#1  0x000000000072d304 in openr::KeyPrefix::keyMatch (this=<optimized out>, key=...) at openr/common/Util.cpp:50
#2  0x00000000006d3649 in openr::KvStoreFilters::keyMatch (this=0x7fb22c613660, key=..., value=...) at openr/kvstore/KvStore.cpp:69
#3  0x00000000006d60e4 in openr::KvStore::mergeKeyValues (kvStore=..., keyVals=..., filters=...) at openr/kvstore/KvStore.cpp:246
#4  0x00000000006e3813 in openr::KvStoreDb::mergePublication (this=0x7fb232fbad28, rcvdPublication=..., senderId=...)
    at openr/kvstore/KvStore.cpp:2835
#5  0x00000000006e32d7 in openr::KvStoreDb::processThriftSuccess (this=0x7fb232fbad28, peerName=..., pub=..., timeDelta=...)
    at openr/kvstore/KvStore.cpp:1395
#6  0x000000000070487b in openr::KvStoreDb::requestThriftPeerSync()::$_19::operator()(openr::thrift::Publication&&) const (
    this=<optimized out>, pub=...) at openr/kvstore/KvStore.cpp:1338
#7  folly::futures::detail::wrapInvoke<openr::thrift::Publication, openr::KvStoreDb::requestThriftPeerSync()::$_19>(folly::Try<openr::thrift::Publication>&&, openr::KvStoreDb::requestThriftPeerSync()::$_19&&)::{lambda()#1}::operator()() const (this=<optimized out>)
    at folly/futures/Future-inl.h:99
#8  folly::futures::detail::InvokeResultWrapper<void>::wrapResult<folly::futures::detail::wrapInvoke<openr::thrift::Publication, openr::KvStoreDb::requestThriftPeerSync()::$_19>(folly::Try<openr::thrift::Publication>&&, openr::KvStoreDb::requestThriftPeerSync()::$_19&&)::{lambda()#1}>(folly::futures::detail::wrapInvoke<openr::thrift::Publication, openr::KvStoreDb::requestThriftPeerSync()::$_19>(folly::Try<openr::thrift::Publication>&&, openr::KvStoreDb::requestThriftPeerSync()::$_19&&)::{lambda()#1}) (fn=...) at folly/futures/Future-inl.h:91
#9  folly::futures::detail::wrapInvoke<openr::thrift::Publication, openr::KvStoreDb::requestThriftPeerSync()::$_19>(folly::Try<openr::thrift::Publication>&&, openr::KvStoreDb::requestThriftPeerSync()::$_19&&) (t=..., f=...) at folly/futures/Future-inl.h:109
#10 folly::Future<openr::thrift::Publication>::thenValue<openr::KvStoreDb::requestThriftPeerSync()::$_19>(openr::KvStoreDb::requestThriftPeerSync()::$_19&&) &&::{lambda(folly::Executor::KeepAlive<folly::Executor>&&, folly::Try<openr::thrift::Publication>&&)#1}::operator()(folly::Executor::KeepAlive<folly::Executor>&&, folly::Try<openr::thrift::Publication>&&) (this=<optimized out>, t=...)
    at folly/futures/Future-inl.h:1033
#11 folly::futures::detail::CoreCallbackState<folly::Unit, folly::Future<openr::thrift::Publication>::thenValue<openr::KvStoreDb::requestThriftPeerSync()::$_19>(openr::KvStoreDb::requestThriftPeerSync()::$_19&&) &&::{lambda(folly::Executor::KeepAlive<folly::Executor>&&, folly::Try<openr::thrift::Publication>&&)#1}>::invoke<folly::Executor::KeepAlive<folly::Executor>, folly::Try<openr::thrift::Publication> >(folly::Executor::KeepAlive<folly::Executor>&&, folly::Try<openr::thrift::Publication>&&) (this=<optimized out>, args=..., args=...)
    at folly/futures/Future-inl.h:145
...
#19 0x00000000015d00ee in folly::detail::function::FunctionTraits<void ()>::operator()() (this=0x7fb22c63b1f0) at folly/Function.h:416
#20 folly::EventBase::FunctionLoopCallback::runLoopCallback (this=0x7fb22c63b1c0) at folly/io/async/EventBase.h:188
#21 folly::EventBase::runLoopCallbacks (this=<optimized out>) at folly/io/async/EventBase.cpp:703
#22 folly::EventBase::loopBody (this=0x7fb23ecb9410, flags=1, ignoreKeepAlive=false) at folly/io/async/EventBase.cpp:402
#23 0x00000000015cdc60 in folly::EventBase::loopOnce (this=0x7fb23ecb9410, flags=0) at folly/io/async/EventBase.cpp:330
#24 folly::EventBase::~EventBase (this=0x7fb23ecb9410, vtt=<optimized out>) at folly/io/async/EventBase.cpp:211
#25 0x00000000006f284e in openr::KvStore::~KvStore (this=0x7fb23ecb9400) at openr/kvstore/KvStore.h:532
```

To fix this, we introduced map to hold every individual future from thrift client. However, this occasionally will make KvStore destruction stuck when waiting for all futures to be fulfilled.

We should NOT track every individual future, which is NOT necessary at all.

From the crash trace, clearly, we are doing `mergePublications()` when invoking `processThriftPublication()` before checking if the peer is valid or NOT.

Fix:
Ignore the rest logic of callback `processThriftPublication()` if peerName is NOT valid.

Reviewed By: saifhhasan

Differential Revision: D24262710

fbshipit-source-id: fa69aaa5c6e43cfc861de7431b9c1e26195684a0
  • Loading branch information
xiangxu1121 authored and facebook-github-bot committed Oct 12, 2020
1 parent da1c3f8 commit 04ea157
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 133 deletions.
204 changes: 81 additions & 123 deletions openr/kvstore/KvStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1224,22 +1224,12 @@ KvStoreDb::~KvStoreDb() {
// Destroy thrift clients associated with peers, which will
// fulfill promises with exceptions if any.
thriftPeers_.clear();

// Waiting for all pending thrift call to finish
std::vector<folly::Future<folly::Unit>> fs;
for (auto& [_, future] : taskFutures_) {
// Raises a FutureCancellation interrupt
future.cancel();
fs.emplace_back(std::move(future));
}
taskFutures_.clear();
folly::collectAll(std::move(fs)).get();

LOG(INFO) << "Done processing all pending thrift reqs in area: " << area_;
});

// remove ZMQ socket
evb_->removeSocket(fbzmq::RawZmqSocketPtr{*peerSyncSock_});

LOG(INFO) << "Successfully destructed KvStoreDb in area: " << area_;
}

void
Expand Down Expand Up @@ -1493,39 +1483,29 @@ KvStoreDb::requestThriftPeerSync() {
auto startTime = std::chrono::steady_clock::now();
auto sf = thriftPeer.client->semifuture_getKvStoreKeyValsFilteredArea(
params, area_);
auto reqId = ++reqId_;
taskFutures_.emplace(
reqId,
std::move(sf)
.via(evb_->getEvb())
.thenValue(
[this, peerName, startTime, reqId](thrift::Publication&& pub) {
// state transition to INITIALIZED
auto endTime = std::chrono::steady_clock::now();
auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - startTime);
processThriftSuccess(peerName, std::move(pub), timeDelta);

// cleanup pending thrift request
taskFutures_.erase(reqId);
})
.thenError([this, peerName, startTime, reqId](
const folly::exception_wrapper& ew) {
std::move(sf)
.via(evb_->getEvb())
.thenValue([this, peerName, startTime](thrift::Publication&& pub) {
// state transition to INITIALIZED
auto endTime = std::chrono::steady_clock::now();
auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - startTime);
processThriftSuccess(peerName, std::move(pub), timeDelta);
})
.thenError(
[this, peerName, startTime](const folly::exception_wrapper& ew) {
// state transition to IDLE
auto endTime = std::chrono::steady_clock::now();
auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - startTime);
processThriftFailure(peerName, ew.what(), timeDelta);

// cleanup pending thrift request
taskFutures_.erase(reqId);

// record telemetry for thrift calls
fb303::fbData->addStatValue(
"kvstore.thrift.num_full_sync_failure", 1, fb303::COUNT);
}));
});

// in case pending peer size is over parallelSyncLimit,
// wait until kMaxBackoff before sending next round of sync
Expand Down Expand Up @@ -1561,6 +1541,26 @@ KvStoreDb::processThriftSuccess(
std::string const& peerName,
thrift::Publication&& pub,
std::chrono::milliseconds timeDelta) {
// check if it is valid peer(i.e. peer removed in process of syncing)
auto peerIt = thriftPeers_.find(peerName);
if (peerIt == thriftPeers_.end()) {
LOG(WARNING) << "Received async full-sync response from invalid peer: "
<< peerName << ". Ignore state transition.";
return;
}

// ATTN: In parallel link case, peer state can be set to IDLE when
// parallel adj comes up before the previous full-sync response
// being received. KvStoreDb will ignore the old full-sync
// response and will rely on the new full-sync response to
// promote the state.
auto& peer = thriftPeers_.at(peerName);
if (peer.state == KvStorePeerState::IDLE) {
LOG(WARNING) << "Ignore response from: " << peerName
<< " as it is in IDLE state";
return;
}

// ATTN: `peerName` is MANDATORY to fulfill the finialized
// full-sync with peers.
const auto kvUpdateCnt = mergePublication(pub, peerName);
Expand All @@ -1585,26 +1585,6 @@ KvStoreDb::processThriftSuccess(
<< " key-value updates."
<< " Processing time: " << timeDelta.count() << "ms.";

// check if it is valid peer(i.e. peer removed in process of syncing)
auto peerIt = thriftPeers_.find(peerName);
if (peerIt == thriftPeers_.end()) {
LOG(WARNING) << "Received async full-sync response from invalid peer: "
<< peerName << ". Ignore state transition.";
return;
}

// ATTN: In parallel link case, peer state can be set to IDLE when
// parallel adj comes up before the previous full-sync response
// being received. KvStoreDb will ignore the old full-sync
// response and will rely on the new full-sync response to
// promote the state.
auto& peer = thriftPeers_.at(peerName);
if (peer.state == KvStorePeerState::IDLE) {
LOG(WARNING) << "Ignore response from: " << peerName
<< " as it is in IDLE state";
return;
}

// State transition
KvStorePeerState oldState = peer.state;
peer.state = getNextState(oldState, KvStorePeerEvent::SYNC_RESP_RCVD);
Expand Down Expand Up @@ -1640,17 +1620,16 @@ KvStoreDb::processThriftFailure(
std::string const& peerName,
folly::fbstring const& exceptionStr,
std::chrono::milliseconds timeDelta) {
LOG(INFO) << "[Thrift Sync] Exception: " << exceptionStr
<< ". Peer name: " << peerName
<< ". Processing time: " << timeDelta.count() << "ms.";

// check if it is valid peer(i.e. peer removed in process of syncing)
auto peerIt = thriftPeers_.find(peerName);
if (peerIt == thriftPeers_.end()) {
LOG(ERROR) << "Exception happened against invalid peer: " << peerName
<< ". Ignore state transition.";
return;
}

LOG(INFO) << "[Thrift Sync] Exception: " << exceptionStr
<< ". Peer name: " << peerName
<< ". Processing time: " << timeDelta.count() << "ms.";

// reset client to reconnect later in next batch of thriftSyncTimer_ scanning
auto& peer = thriftPeers_.at(peerName);
peer.keepAliveTimer->cancelTimeout();
Expand Down Expand Up @@ -1881,8 +1860,6 @@ KvStoreDb::getCounters() const {
// Add up pending and in-flight full sync
counters["kvstore.pending_full_sync"] =
peersToSyncWith_.size() + latestSentPeerSync_.size();
// Record pending unfulfilled thrift request
counters["kvstore.pending_thrift_request"] = taskFutures_.size();
return counters;
}

Expand Down Expand Up @@ -2802,46 +2779,36 @@ KvStoreDb::finalizeFullSync(

auto startTime = std::chrono::steady_clock::now();
auto sf = thriftPeer.client->semifuture_setKvStoreKeyVals(params, area_);
auto reqId = ++reqId_;
taskFutures_.emplace(
reqId,
std::move(sf)
.via(evb_->getEvb())
.thenValue([this, senderId, startTime, reqId](folly::Unit&&) {
VLOG(4) << "Finalize full-sync ack received from peer: "
<< senderId;
auto endTime = std::chrono::steady_clock::now();
auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - startTime);

// cleanup pending thrift request
taskFutures_.erase(reqId);

// record telemetry for thrift calls
fb303::fbData->addStatValue(
"kvstore.thrift.num_finalized_sync_success", 1, fb303::COUNT);
fb303::fbData->addStatValue(
"kvstore.thrift.finalized_sync_duration_ms",
timeDelta.count(),
fb303::AVG);
})
.thenError([this, senderId, startTime, reqId](
const folly::exception_wrapper& ew) {
std::move(sf)
.via(evb_->getEvb())
.thenValue([senderId, startTime](folly::Unit&&) {
VLOG(4) << "Finalize full-sync ack received from peer: " << senderId;
auto endTime = std::chrono::steady_clock::now();
auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - startTime);

// record telemetry for thrift calls
fb303::fbData->addStatValue(
"kvstore.thrift.num_finalized_sync_success", 1, fb303::COUNT);
fb303::fbData->addStatValue(
"kvstore.thrift.finalized_sync_duration_ms",
timeDelta.count(),
fb303::AVG);
})
.thenError(
[this, senderId, startTime](const folly::exception_wrapper& ew) {
// state transition to IDLE
auto endTime = std::chrono::steady_clock::now();
auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - startTime);
processThriftFailure(senderId, ew.what(), timeDelta);

// cleanup pending thrift request
taskFutures_.erase(reqId);

// record telemetry for thrift calls
fb303::fbData->addStatValue(
"kvstore.thrift.num_finalized_sync_failure", 1, fb303::COUNT);
}));
});
} else {
VLOG(1) << "finalizeFullSync back to: " << senderId
<< " with keys: " << folly::join(",", keys);
Expand Down Expand Up @@ -3002,46 +2969,37 @@ KvStoreDb::floodPublication(

auto startTime = std::chrono::steady_clock::now();
auto sf = thriftPeer.client->semifuture_setKvStoreKeyVals(params, area_);
auto reqId = ++reqId_;
taskFutures_.emplace(
reqId,
std::move(sf)
.via(evb_->getEvb())
.thenValue([this, peerName, startTime, reqId](folly::Unit&&) {
VLOG(4) << "Flooding ack received from peer: " << peerName;

auto endTime = std::chrono::steady_clock::now();
auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - startTime);

// cleanup pending thrift request
taskFutures_.erase(reqId);

// record telemetry for thrift calls
fb303::fbData->addStatValue(
"kvstore.thrift.num_flood_pub_success", 1, fb303::COUNT);
fb303::fbData->addStatValue(
"kvstore.thrift.flood_pub_duration_ms",
timeDelta.count(),
fb303::AVG);
})
.thenError([this, peerName, startTime, reqId](
const folly::exception_wrapper& ew) {
std::move(sf)
.via(evb_->getEvb())
.thenValue([peerName, startTime](folly::Unit&&) {
VLOG(4) << "Flooding ack received from peer: " << peerName;

auto endTime = std::chrono::steady_clock::now();
auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - startTime);

// record telemetry for thrift calls
fb303::fbData->addStatValue(
"kvstore.thrift.num_flood_pub_success", 1, fb303::COUNT);
fb303::fbData->addStatValue(
"kvstore.thrift.flood_pub_duration_ms",
timeDelta.count(),
fb303::AVG);
})
.thenError(
[this, peerName, startTime](const folly::exception_wrapper& ew) {
// state transition to IDLE
auto endTime = std::chrono::steady_clock::now();
auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - startTime);
processThriftFailure(peerName, ew.what(), timeDelta);

// cleanup pending thrift request
taskFutures_.erase(reqId);

// record telemetry for thrift calls
fb303::fbData->addStatValue(
"kvstore.thrift.num_flood_pub_failure", 1, fb303::COUNT);
}));
});
}
} else {
for (const auto& peer : floodPeers) {
Expand Down
10 changes: 0 additions & 10 deletions openr/kvstore/KvStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,16 +465,6 @@ class KvStoreDb : public DualNode {
// set of peers with all info over thrift channel
std::unordered_map<std::string, KvStorePeer> thriftPeers_{};

// NOTE: KvStoreDb processes full-sync/flooding/etc. in ASYNC fashion.
// `taskFutures_` make sure all pending request finish processing before
// shutting down.
//
// monotonically increasing requestId for pending thrift request.
uint64_t reqId_{0};

// collection of futures for pending thrift requests
std::unordered_map<uint64_t, folly::Future<folly::Unit>> taskFutures_{};

// [TO BE DEPRECATED]
// The peers we will be talking to: both PUB and CMD URLs for each. We use
// peerAddCounter_ to uniquely identify a peering session's socket-id.
Expand Down

0 comments on commit 04ea157

Please sign in to comment.