Skip to content

Commit

Permalink
when distributing deletes, always route local RPC
Browse files Browse the repository at this point in the history
Summary:
In this diff we change how delete distribution works:
* Distribution happens asynchronously (except spooling)
* Sync path is local region RPC
* In case of directed cross-region requests just return default reply

i.e., the Distribution Layer is responsible for:
* delivering async requests
* distributing cross-regional requests

Reviewed By: stuclar

Differential Revision: D57145745

fbshipit-source-id: b19cf6cce1b9a2cdb531df975030b9e8481fc517
  • Loading branch information
Lenar Fatikhov authored and facebook-github-bot committed Sep 3, 2024
1 parent d80da5d commit d06c77d
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 198 deletions.
113 changes: 63 additions & 50 deletions mcrouter/routes/DistributionRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,76 +156,89 @@ class DistributionRoute {
* 2. Cross-region rpc delete having routing prefix with another region
* 3. Broadcast delete with routing prefix = /(star)/(star)/
*
* If distribution is enabled, we write 2 and 3 to Axon.
* If write to Axon fails, we spool to Async log with the routing prefix.
* In the first case, we skip distribution.
* In the second case, we write to Axon synchronously and return the reply of
* the write success/failure. In the third case, we write to Axon
* asynchronously, also send an RPC request to the local region and return
* the reply of the RPC.
*
* If write to distribution layer fails, we spool to Async log with the
* routing prefix.
*
* If this logic is run in mcreplay, we attempt writing to distribution layer
* synchronously until it succeeds.
*/
McDeleteReply route(const McDeleteRequest& req) const {
auto& ctx = *fiber_local<RouterInfo>::getSharedCtx();
auto& proxy = ctx.proxy();
auto& proxy = fiber_local<RouterInfo>::getSharedCtx()->proxy();
// In mcreplay case we try to infer target region from request
auto distributionRegion = FOLLY_LIKELY(!replay_)
auto distributionRegionOpt = FOLLY_LIKELY(!replay_)
? fiber_local<RouterInfo>::getDistributionTargetRegion()
: inferDistributionRegionForReplay(req, proxy);

if (FOLLY_LIKELY(!distributionRegion.has_value())) {
if (FOLLY_LIKELY(!distributionRegionOpt.has_value())) {
return rh_->route(req);
}

auto& axonCtx = fiber_local<RouterInfo>::getAxonCtx();
auto bucketId = fiber_local<RouterInfo>::getBucketId();
assert(axonCtx && bucketId);

bool spoolSucceeded = false;
auto source = distributionRegion.value().empty()
auto source = distributionRegionOpt.value().empty()
? memcache::McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION
: memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION;
auto finalReq = addDeleteRequestSource(req, source);
finalReq.bucketId_ref() = fmt::to_string(*bucketId);
DestinationRequestCtx dctx(nowUs());
onBeforeDistribution(finalReq, ctx, *finalReq.bucketId_ref(), dctx);

auto axonLogRes = distributeDeleteRequest(
finalReq,
axonCtx,
*bucketId,
invalidation::DistributionType::Distribution,
std::move(
distributionRegion.value().empty() ? std::string(kBroadcast)
: *distributionRegion),
srcRegion_);

auto reply = axonLogRes ? createReply(DefaultReply, finalReq)
: McDeleteReply(carbon::Result::LOCAL_ERROR);

dctx.endTime = nowUs();
onAfterDistribution(finalReq, reply, ctx, *finalReq.bucketId_ref(), dctx);

if (axonLogRes) {
proxy.stats().increment(distribution_axon_write_success_stat);
auto distributionRegion = distributionRegionOpt.value().empty()
? std::string(kBroadcast)
: std::move(distributionRegionOpt.value());
// If it is replay or directed cross-region, we write to distribution
// synchronously:
if (FOLLY_UNLIKELY(replay_ || distributionRegion != kBroadcast)) {
return distributeWithLogging(
finalReq,
distributeDeleteRequest,
axonCtx,
bucketId.value(),
replay_ ? invalidation::DistributionType::Async
: invalidation::DistributionType::Distribution,
distributionRegion,
srcRegion_,
std::nullopt)
.first;
}
spoolSucceeded |= axonLogRes;
if (FOLLY_UNLIKELY(!axonLogRes)) {
proxy.stats().increment(distribution_axon_write_failed_stat);
const auto host =
std::make_shared<AccessPoint>(kAsynclogDistributionEndpoint);
spoolSucceeded |= spoolAsynclog(
&proxy,

folly::fibers::addTask([this,
bucketId,
ctx = fiber_local<RouterInfo>::getSharedCtx(),
axonCtx,
finalReq = std::move(finalReq),
distributionRegion =
std::move(distributionRegion)]() {
auto [_, spoolSucceeded] = distributeWithLogging(
finalReq,
host,
true,
fiber_local<RouterInfo>::getAsynclogName());
if (spoolSucceeded) {
// update reply if axon failed but spool succeeded
reply = createReply(DefaultReply, finalReq);
} else {
proxy.stats().increment(distribution_async_spool_failed_stat);
distributeDeleteRequest,
axonCtx,
bucketId.value(),
invalidation::DistributionType::Distribution,
distributionRegion,
srcRegion_,
std::nullopt);

if (FOLLY_UNLIKELY(!spoolSucceeded)) {
const auto host =
std::make_shared<AccessPoint>(kAsynclogDistributionEndpoint);
spoolSucceeded |= spoolAsynclog(
&ctx->proxy(),
finalReq,
host,
true,
fiber_local<RouterInfo>::getAsynclogName());
if (!spoolSucceeded) {
ctx->proxy().stats().increment(distribution_async_spool_failed_stat);
}
}
}
// if spool to Axon or Asynclog succeeded and rpc is disabled, we return
// default reply to the client:
if (!distributedDeleteRpcEnabled_) {
return reply;
}
});
// route to the local region:
return rh_->route(req);
}

Expand Down
Loading

0 comments on commit d06c77d

Please sign in to comment.