Skip to content

Commit

Permalink
Propagate request context in QueuedImmediateExecutor
Browse files Browse the repository at this point in the history
Summary: See added unit test.

Reviewed By: Gownta

Differential Revision: D58645760

fbshipit-source-id: 6371e61f8bd88bf4b4ed51273b869a38c2eac353
  • Loading branch information
ot authored and facebook-github-bot committed Jun 19, 2024
1 parent c9cf0d5 commit 113a478
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 10 deletions.
2 changes: 2 additions & 0 deletions folly/executors/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,13 @@ cpp_library(
headers = ["QueuedImmediateExecutor.h"],
deps = [
"//folly:indestructible",
"//folly:scope_guard",
],
exported_deps = [
":inline_executor",
"//folly:executor",
"//folly:thread_local",
"//folly/io/async:request_context",
],
)

Expand Down
34 changes: 26 additions & 8 deletions folly/executors/QueuedImmediateExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

#include <folly/executors/QueuedImmediateExecutor.h>

#include <functional>

#include <folly/Indestructible.h>
#include <folly/ScopeGuard.h>

namespace folly {

Expand All @@ -25,14 +28,29 @@ QueuedImmediateExecutor& QueuedImmediateExecutor::instance() {
return *instance;
}

void QueuedImmediateExecutor::add(Func callback) {
auto& q = *q_;
q.push(std::move(callback));
if (q.size() == 1) {
while (!q.empty()) {
q.front()();
q.pop();
}
void QueuedImmediateExecutor::add(Func func) {
auto& [running, queue] = *q_;
if (running) {
queue.push(Task{std::move(func), RequestContext::saveContext()});
return;
}

running = true;
auto cleanup = makeGuard([&r = running] { r = false; });

// No need to save/restore request context if this is the first call.
invokeCatchingExns("QueuedImmediateExecutor", std::exchange(func, {}));

if (queue.empty()) {
return;
}

RequestContextSaverScopeGuard guard;
while (!queue.empty()) {
auto& task = queue.front();
RequestContext::setContext(std::move(task.ctx));
invokeCatchingExns("QueuedImmediateExecutor", std::ref(task.func));
queue.pop();
}
}

Expand Down
12 changes: 10 additions & 2 deletions folly/executors/QueuedImmediateExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
#pragma once

#include <queue>
#include <utility>

#include <folly/Executor.h>
#include <folly/ThreadLocal.h>
#include <folly/executors/InlineExecutor.h>
#include <folly/io/async/Request.h>

namespace folly {

Expand All @@ -33,10 +35,16 @@ class QueuedImmediateExecutor : public InlineLikeExecutor {
public:
static QueuedImmediateExecutor& instance();

void add(Func callback) override;
void add(Func func) override;

private:
folly::ThreadLocal<std::queue<Func>> q_;
struct Task {
Func func;
std::shared_ptr<RequestContext> ctx;
};

using Queue = std::queue<Task>;
folly::ThreadLocal<std::pair</* running */ bool, Queue>> q_;
};

} // namespace folly
1 change: 1 addition & 0 deletions folly/executors/test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ cpp_unittest(
"//folly/executors:manual_executor",
"//folly/executors:queued_immediate_executor",
"//folly/futures:core",
"//folly/io/async:request_context",
"//folly/portability:gtest",
"//folly/synchronization:baton",
],
Expand Down
15 changes: 15 additions & 0 deletions folly/executors/test/ExecutorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <folly/executors/ManualExecutor.h>
#include <folly/executors/QueuedImmediateExecutor.h>
#include <folly/futures/Future.h>
#include <folly/io/async/Request.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>

Expand Down Expand Up @@ -247,6 +248,20 @@ TEST(Executor, QueuedImmediateExecutor) {
EXPECT_EQ(2, counter);
}

TEST(Executor, QueuedImmediateExecutorRequestContext) {
QueuedImmediateExecutor x;
RequestContextScopeGuard guard1;
auto* ctx1 = RequestContext::try_get();
x.add([&] {
EXPECT_EQ(ctx1, RequestContext::try_get());
RequestContextScopeGuard guard2;
x.add([&, ctx2 = RequestContext::try_get()] {
EXPECT_EQ(ctx2, RequestContext::try_get());
});
});
EXPECT_EQ(ctx1, RequestContext::try_get());
}

TEST(Executor, Runnable) {
InlineExecutor x;
size_t counter = 0;
Expand Down

0 comments on commit 113a478

Please sign in to comment.