diff --git a/openr/kvstore/KvStore.cpp b/openr/kvstore/KvStore.cpp index 0a2b7f0fd38..fb4f6be0f4e 100644 --- a/openr/kvstore/KvStore.cpp +++ b/openr/kvstore/KvStore.cpp @@ -1224,22 +1224,12 @@ KvStoreDb::~KvStoreDb() { // Destroy thrift clients associated with peers, which will // fulfill promises with exceptions if any. thriftPeers_.clear(); - - // Waiting for all pending thrift call to finish - std::vector> fs; - for (auto& [_, future] : taskFutures_) { - // Raises a FutureCancellation interrupt - future.cancel(); - fs.emplace_back(std::move(future)); - } - taskFutures_.clear(); - folly::collectAll(std::move(fs)).get(); - - LOG(INFO) << "Done processing all pending thrift reqs in area: " << area_; }); // remove ZMQ socket evb_->removeSocket(fbzmq::RawZmqSocketPtr{*peerSyncSock_}); + + LOG(INFO) << "Successfully destructed KvStoreDb in area: " << area_; } void @@ -1493,25 +1483,18 @@ KvStoreDb::requestThriftPeerSync() { auto startTime = std::chrono::steady_clock::now(); auto sf = thriftPeer.client->semifuture_getKvStoreKeyValsFilteredArea( params, area_); - auto reqId = ++reqId_; - taskFutures_.emplace( - reqId, - std::move(sf) - .via(evb_->getEvb()) - .thenValue( - [this, peerName, startTime, reqId](thrift::Publication&& pub) { - // state transition to INITIALIZED - auto endTime = std::chrono::steady_clock::now(); - auto timeDelta = - std::chrono::duration_cast( - endTime - startTime); - processThriftSuccess(peerName, std::move(pub), timeDelta); - - // cleanup pending thrift request - taskFutures_.erase(reqId); - }) - .thenError([this, peerName, startTime, reqId]( - const folly::exception_wrapper& ew) { + std::move(sf) + .via(evb_->getEvb()) + .thenValue([this, peerName, startTime](thrift::Publication&& pub) { + // state transition to INITIALIZED + auto endTime = std::chrono::steady_clock::now(); + auto timeDelta = + std::chrono::duration_cast( + endTime - startTime); + processThriftSuccess(peerName, std::move(pub), timeDelta); + }) + .thenError( + [this, peerName, startTime](const folly::exception_wrapper& ew) { // state transition to IDLE auto endTime = std::chrono::steady_clock::now(); auto timeDelta = @@ -1519,13 +1502,10 @@ KvStoreDb::requestThriftPeerSync() { endTime - startTime); processThriftFailure(peerName, ew.what(), timeDelta); - // cleanup pending thrift request - taskFutures_.erase(reqId); - // record telemetry for thrift calls fb303::fbData->addStatValue( "kvstore.thrift.num_full_sync_failure", 1, fb303::COUNT); - })); + }); // in case pending peer size is over parallelSyncLimit, // wait until kMaxBackoff before sending next round of sync @@ -1561,6 +1541,26 @@ KvStoreDb::processThriftSuccess( std::string const& peerName, thrift::Publication&& pub, std::chrono::milliseconds timeDelta) { + // check if it is valid peer(i.e. peer removed in process of syncing) + auto peerIt = thriftPeers_.find(peerName); + if (peerIt == thriftPeers_.end()) { + LOG(WARNING) << "Received async full-sync response from invalid peer: " + << peerName << ". Ignore state transition."; + return; + } + + // ATTN: In parallel link case, peer state can be set to IDLE when + // parallel adj comes up before the previous full-sync response + // being received. KvStoreDb will ignore the old full-sync + // response and will rely on the new full-sync response to + // promote the state. + auto& peer = thriftPeers_.at(peerName); + if (peer.state == KvStorePeerState::IDLE) { + LOG(WARNING) << "Ignore response from: " << peerName + << " as it is in IDLE state"; + return; + } + // ATTN: `peerName` is MANDATORY to fulfill the finialized // full-sync with peers. const auto kvUpdateCnt = mergePublication(pub, peerName); @@ -1585,26 +1585,6 @@ KvStoreDb::processThriftSuccess( << " key-value updates." << " Processing time: " << timeDelta.count() << "ms."; - // check if it is valid peer(i.e. peer removed in process of syncing) - auto peerIt = thriftPeers_.find(peerName); - if (peerIt == thriftPeers_.end()) { - LOG(WARNING) << "Received async full-sync response from invalid peer: " - << peerName << ". Ignore state transition."; - return; - } - - // ATTN: In parallel link case, peer state can be set to IDLE when - // parallel adj comes up before the previous full-sync response - // being received. KvStoreDb will ignore the old full-sync - // response and will rely on the new full-sync response to - // promote the state. - auto& peer = thriftPeers_.at(peerName); - if (peer.state == KvStorePeerState::IDLE) { - LOG(WARNING) << "Ignore response from: " << peerName - << " as it is in IDLE state"; - return; - } - // State transition KvStorePeerState oldState = peer.state; peer.state = getNextState(oldState, KvStorePeerEvent::SYNC_RESP_RCVD); @@ -1640,17 +1620,16 @@ KvStoreDb::processThriftFailure( std::string const& peerName, folly::fbstring const& exceptionStr, std::chrono::milliseconds timeDelta) { - LOG(INFO) << "[Thrift Sync] Exception: " << exceptionStr - << ". Peer name: " << peerName - << ". Processing time: " << timeDelta.count() << "ms."; - + // check if it is valid peer(i.e. peer removed in process of syncing) auto peerIt = thriftPeers_.find(peerName); if (peerIt == thriftPeers_.end()) { - LOG(ERROR) << "Exception happened against invalid peer: " << peerName - << ". Ignore state transition."; return; } + LOG(INFO) << "[Thrift Sync] Exception: " << exceptionStr + << ". Peer name: " << peerName + << ". Processing time: " << timeDelta.count() << "ms."; + // reset client to reconnect later in next batch of thriftSyncTimer_ scanning auto& peer = thriftPeers_.at(peerName); peer.keepAliveTimer->cancelTimeout(); @@ -1881,8 +1860,6 @@ KvStoreDb::getCounters() const { // Add up pending and in-flight full sync counters["kvstore.pending_full_sync"] = peersToSyncWith_.size() + latestSentPeerSync_.size(); - // Record pending unfulfilled thrift request - counters["kvstore.pending_thrift_request"] = taskFutures_.size(); return counters; } @@ -2802,32 +2779,25 @@ KvStoreDb::finalizeFullSync( auto startTime = std::chrono::steady_clock::now(); auto sf = thriftPeer.client->semifuture_setKvStoreKeyVals(params, area_); - auto reqId = ++reqId_; - taskFutures_.emplace( - reqId, - std::move(sf) - .via(evb_->getEvb()) - .thenValue([this, senderId, startTime, reqId](folly::Unit&&) { - VLOG(4) << "Finalize full-sync ack received from peer: " - << senderId; - auto endTime = std::chrono::steady_clock::now(); - auto timeDelta = - std::chrono::duration_cast( - endTime - startTime); - - // cleanup pending thrift request - taskFutures_.erase(reqId); - - // record telemetry for thrift calls - fb303::fbData->addStatValue( - "kvstore.thrift.num_finalized_sync_success", 1, fb303::COUNT); - fb303::fbData->addStatValue( - "kvstore.thrift.finalized_sync_duration_ms", - timeDelta.count(), - fb303::AVG); - }) - .thenError([this, senderId, startTime, reqId]( - const folly::exception_wrapper& ew) { + std::move(sf) + .via(evb_->getEvb()) + .thenValue([senderId, startTime](folly::Unit&&) { + VLOG(4) << "Finalize full-sync ack received from peer: " << senderId; + auto endTime = std::chrono::steady_clock::now(); + auto timeDelta = + std::chrono::duration_cast( + endTime - startTime); + + // record telemetry for thrift calls + fb303::fbData->addStatValue( + "kvstore.thrift.num_finalized_sync_success", 1, fb303::COUNT); + fb303::fbData->addStatValue( + "kvstore.thrift.finalized_sync_duration_ms", + timeDelta.count(), + fb303::AVG); + }) + .thenError( + [this, senderId, startTime](const folly::exception_wrapper& ew) { // state transition to IDLE auto endTime = std::chrono::steady_clock::now(); auto timeDelta = @@ -2835,13 +2805,10 @@ KvStoreDb::finalizeFullSync( endTime - startTime); processThriftFailure(senderId, ew.what(), timeDelta); - // cleanup pending thrift request - taskFutures_.erase(reqId); - // record telemetry for thrift calls fb303::fbData->addStatValue( "kvstore.thrift.num_finalized_sync_failure", 1, fb303::COUNT); - })); + }); } else { VLOG(1) << "finalizeFullSync back to: " << senderId << " with keys: " << folly::join(",", keys); @@ -3002,32 +2969,26 @@ KvStoreDb::floodPublication( auto startTime = std::chrono::steady_clock::now(); auto sf = thriftPeer.client->semifuture_setKvStoreKeyVals(params, area_); - auto reqId = ++reqId_; - taskFutures_.emplace( - reqId, - std::move(sf) - .via(evb_->getEvb()) - .thenValue([this, peerName, startTime, reqId](folly::Unit&&) { - VLOG(4) << "Flooding ack received from peer: " << peerName; - - auto endTime = std::chrono::steady_clock::now(); - auto timeDelta = - std::chrono::duration_cast( - endTime - startTime); - - // cleanup pending thrift request - taskFutures_.erase(reqId); - - // record telemetry for thrift calls - fb303::fbData->addStatValue( - "kvstore.thrift.num_flood_pub_success", 1, fb303::COUNT); - fb303::fbData->addStatValue( - "kvstore.thrift.flood_pub_duration_ms", - timeDelta.count(), - fb303::AVG); - }) - .thenError([this, peerName, startTime, reqId]( - const folly::exception_wrapper& ew) { + std::move(sf) + .via(evb_->getEvb()) + .thenValue([peerName, startTime](folly::Unit&&) { + VLOG(4) << "Flooding ack received from peer: " << peerName; + + auto endTime = std::chrono::steady_clock::now(); + auto timeDelta = + std::chrono::duration_cast( + endTime - startTime); + + // record telemetry for thrift calls + fb303::fbData->addStatValue( + "kvstore.thrift.num_flood_pub_success", 1, fb303::COUNT); + fb303::fbData->addStatValue( + "kvstore.thrift.flood_pub_duration_ms", + timeDelta.count(), + fb303::AVG); + }) + .thenError( + [this, peerName, startTime](const folly::exception_wrapper& ew) { // state transition to IDLE auto endTime = std::chrono::steady_clock::now(); auto timeDelta = @@ -3035,13 +2996,10 @@ KvStoreDb::floodPublication( endTime - startTime); processThriftFailure(peerName, ew.what(), timeDelta); - // cleanup pending thrift request - taskFutures_.erase(reqId); - // record telemetry for thrift calls fb303::fbData->addStatValue( "kvstore.thrift.num_flood_pub_failure", 1, fb303::COUNT); - })); + }); } } else { for (const auto& peer : floodPeers) { diff --git a/openr/kvstore/KvStore.h b/openr/kvstore/KvStore.h index b6cc195e580..2f9dfc8e285 100644 --- a/openr/kvstore/KvStore.h +++ b/openr/kvstore/KvStore.h @@ -465,16 +465,6 @@ class KvStoreDb : public DualNode { // set of peers with all info over thrift channel std::unordered_map thriftPeers_{}; - // NOTE: KvStoreDb processes full-sync/flooding/etc. in ASYNC fashion. - // `taskFutures_` make sure all pending request finish processing before - // shutting down. - // - // monotonically increasing requestId for pending thrift request. - uint64_t reqId_{0}; - - // collection of futures for pending thrift requests - std::unordered_map> taskFutures_{}; - // [TO BE DEPRECATED] // The peers we will be talking to: both PUB and CMD URLs for each. We use // peerAddCounter_ to uniquely identify a peering session's socket-id.