Skip to content

Commit

Permalink
Fix stop()/join() implementation for MuxIOThreadPoolExecutor
Browse files Browse the repository at this point in the history
Summary:
Currently the correct shutdown sequence is only implemented in `MuxIOThreadPoolExecutor`'s destructor, but `stop()` and `join()` should behave in the same way: call the `IOObserver`s to unregister the `EventBase`s before stopping the threads that run them. Otherwise, calling `join()` or `stop()` results in the observers being called too late.

To fix this, just move the shutdown sequence to `join()`, and delegate all the other methods to it.

Reviewed By: dmm-fb

Differential Revision: D52521695

fbshipit-source-id: b10a9aa487efae72c85ea0be9df808a7fe885424
  • Loading branch information
ot authored and facebook-github-bot committed Jan 4, 2024
1 parent 13f91cd commit 58bd6dd
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 41 deletions.
24 changes: 8 additions & 16 deletions folly/executors/ThreadPoolExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,14 @@ void ThreadPoolExecutor::joinStoppedThreads(size_t n) {
}
}

void ThreadPoolExecutor::stop() {
joinKeepAliveOnce();
void ThreadPoolExecutor::stopAndJoinAllThreads(bool isJoin) {
size_t n = 0;
{
std::unique_lock w{threadListLock_};
maxThreads_.store(0, std::memory_order_release);
activeThreads_.store(0, std::memory_order_release);
n = threadList_.get().size();
removeThreads(n, false);
removeThreads(n, isJoin);
n += threadsToJoin_.load(std::memory_order_relaxed);
threadsToJoin_.store(0, std::memory_order_relaxed);
}
Expand All @@ -258,21 +257,14 @@ void ThreadPoolExecutor::stop() {
CHECK_EQ(0, stoppedThreads_.size());
}

void ThreadPoolExecutor::stop() {
joinKeepAliveOnce();
stopAndJoinAllThreads(/* isJoin */ false);
}

void ThreadPoolExecutor::join() {
joinKeepAliveOnce();
size_t n = 0;
{
std::unique_lock w{threadListLock_};
maxThreads_.store(0, std::memory_order_release);
activeThreads_.store(0, std::memory_order_release);
n = threadList_.get().size();
removeThreads(n, true);
n += threadsToJoin_.load(std::memory_order_relaxed);
threadsToJoin_.store(0, std::memory_order_relaxed);
}
joinStoppedThreads(n);
CHECK_EQ(0, threadList_.get().size());
CHECK_EQ(0, stoppedThreads_.size());
stopAndJoinAllThreads(/* isJoin */ true);
}

void ThreadPoolExecutor::withAll(FunctionRef<void(ThreadPoolExecutor&)> f) {
Expand Down
11 changes: 8 additions & 3 deletions folly/executors/ThreadPoolExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ class ThreadPoolExecutor : public DefaultKeepAliveExecutor {
* be executed before it returns. Specifically, IOThreadPoolExecutor's stop()
* behaves like join().
*/
void stop();
void join();
virtual void stop();
virtual void join();

/**
* Execute f against all ThreadPoolExecutors, primarily for retrieving and
Expand Down Expand Up @@ -255,6 +255,9 @@ class ThreadPoolExecutor : public DefaultKeepAliveExecutor {
// require a lock on ThreadPoolExecutor.
void joinStoppedThreads(size_t n);

// To implement shutdown.
void stopAndJoinAllThreads(bool isJoin);

// Create a suitable Thread struct
virtual ThreadPtr makeThread() { return std::make_shared<Thread>(this); }

Expand Down Expand Up @@ -354,10 +357,12 @@ class ThreadPoolExecutor : public DefaultKeepAliveExecutor {
std::atomic<size_t> threadsToJoin_{0};
std::atomic<std::chrono::milliseconds> threadTimeout_;

void joinKeepAliveOnce() {
bool joinKeepAliveOnce() {
if (!std::exchange(keepAliveJoined_, true)) {
joinKeepAlive();
return true;
}
return false;
}

bool keepAliveJoined_{false};
Expand Down
21 changes: 14 additions & 7 deletions folly/executors/test/IOThreadPoolExecutorBaseTestLib.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,32 @@ TYPED_TEST_SUITE_P(IOThreadPoolExecutorBaseTest);

TYPED_TEST_P(IOThreadPoolExecutorBaseTest, IOObserver) {
struct EventBaseAccumulator : IOThreadPoolExecutorBase::IOObserver {
void registerEventBase(EventBase& evb) override { evbs.insert(&evb); }
void unregisterEventBase(EventBase& evb) override { evbs.erase(&evb); }
void registerEventBase(EventBase& evb) override {
// Observers should be called while the evbs are running, so this
// operation should complete.
evb.runInEventBaseThreadAndWait([&] { evbs.insert(&evb); });
}
void unregisterEventBase(EventBase& evb) override {
// Same as registerEventBase().
evb.runInEventBaseThreadAndWait([&] { evbs.erase(&evb); });
}

F14FastSet<EventBase*> evbs;
};

static constexpr size_t kNumThreads = 16;

std::optional<TypeParam> ex{std::in_place, kNumThreads};
TypeParam ex{kNumThreads};
auto observer1 = std::make_shared<EventBaseAccumulator>();
auto observer2 = std::make_shared<EventBaseAccumulator>();
ex->addObserver(observer1);
ex->addObserver(observer2);
ex.addObserver(observer1);
ex.addObserver(observer2);
EXPECT_EQ(observer1->evbs.size(), kNumThreads);
EXPECT_EQ(observer2->evbs.size(), kNumThreads);
ex->removeObserver(observer1);
ex.removeObserver(observer1);
EXPECT_EQ(observer1->evbs.size(), 0);
EXPECT_EQ(observer2->evbs.size(), kNumThreads);
ex.reset();
ex.join();
EXPECT_EQ(observer1->evbs.size(), 0);
EXPECT_EQ(observer2->evbs.size(), 0);
}
Expand Down
41 changes: 26 additions & 15 deletions folly/experimental/io/MuxIOThreadPoolExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,7 @@ MuxIOThreadPoolExecutor::MuxIOThreadPoolExecutor(

MuxIOThreadPoolExecutor::~MuxIOThreadPoolExecutor() {
deregisterThreadPoolExecutor(this);

{
std::shared_lock<folly::SharedMutex> lock{threadListLock_};
for (const auto& o : observers_) {
maybeUnregisterEventBases(o.get());
}
}

keepAlives_.clear();

stop_ = true;
returnEvfd_.notifyFd();
mainThread_->join();

stop();
::close(epFd_);
}

void MuxIOThreadPoolExecutor::mainThreadFunc() {
Expand Down Expand Up @@ -371,6 +356,32 @@ void MuxIOThreadPoolExecutor::stopThreads(size_t n) {
}
}

void MuxIOThreadPoolExecutor::stop() {
join();
}

void MuxIOThreadPoolExecutor::join() {
if (!joinKeepAliveOnce()) {
return; // Already called.
}

{
std::shared_lock<folly::SharedMutex> lock{threadListLock_};
for (const auto& o : observers_) {
maybeUnregisterEventBases(o.get());
}
}

keepAlives_.clear();

stop_ = true;
returnEvfd_.notifyFd();
mainThread_->join();

stopAndJoinAllThreads(/* isJoin */ true);
::close(epFd_);
}

std::unique_ptr<folly::EventBase> MuxIOThreadPoolExecutor::makeEventBase() {
auto factory = [] {
folly::EpollBackend::Options options;
Expand Down
3 changes: 3 additions & 0 deletions folly/experimental/io/MuxIOThreadPoolExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ class MuxIOThreadPoolExecutor : public IOThreadPoolExecutorBase {
void addObserver(std::shared_ptr<Observer> o) override;
void removeObserver(std::shared_ptr<Observer> o) override;

void stop() override;
void join() override;

private:
struct alignas(Thread) IOThread : public Thread {
explicit IOThread(MuxIOThreadPoolExecutor* pool) : Thread(pool) {}
Expand Down

0 comments on commit 58bd6dd

Please sign in to comment.