Skip to content

Commit

Permalink
refactor: Use KeepAlive instead of Executor*
Browse files Browse the repository at this point in the history
Summary:
folly::Executor::KeepAlive is the recommended way of holding
references to executors as they ensure they are actually kept alive (block the
executor destructor). A shared_ptr or a naked pointer won't ensure the executor
is still available. Other functions from folly (like global pools) only provide
KeepAlive APIs.

Differential Revision: D66724539
  • Loading branch information
pedroerp authored and facebook-github-bot committed Dec 3, 2024
1 parent a0bbea2 commit 434b95b
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 30 deletions.
5 changes: 2 additions & 3 deletions velox/dwio/common/ExecutorBarrier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "velox/dwio/common/ExecutorBarrier.h"

namespace facebook::velox::dwio::common {

namespace {

class BarrierElement {
Expand Down Expand Up @@ -72,11 +71,11 @@ auto ExecutorBarrier::wrapMethod(folly::Func f) {
}

void ExecutorBarrier::add(folly::Func f) {
executor_.add(wrapMethod(std::move(f)));
executor_->add(wrapMethod(std::move(f)));
}

void ExecutorBarrier::addWithPriority(folly::Func f, int8_t priority) {
executor_.addWithPriority(wrapMethod(std::move(f)), priority);
executor_->addWithPriority(wrapMethod(std::move(f)), priority);
}

} // namespace facebook::velox::dwio::common
14 changes: 9 additions & 5 deletions velox/dwio/common/ExecutorBarrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ namespace facebook::velox::dwio::common {

class ExecutorBarrier : public folly::Executor {
public:
explicit ExecutorBarrier(folly::Executor& executor)
: executor_{executor}, count_{0} {}
explicit ExecutorBarrier(folly::Executor::KeepAlive<> executor)
: executor_{std::move(executor)}, count_{0} {}

// Constructor version that holds ownership over the executor (holds a
// shared_ptr copy).
explicit ExecutorBarrier(std::shared_ptr<folly::Executor> executor)
: owned_{std::move(executor)}, executor_{*owned_}, count_{0} {}
: owned_{std::move(executor)},
executor_{folly::getKeepAliveToken(*owned_)},
count_{0} {}

~ExecutorBarrier() override {
// If this object gets destroyed while there are still tasks pending, those
Expand All @@ -50,7 +54,7 @@ class ExecutorBarrier : public folly::Executor {
void addWithPriority(folly::Func, int8_t priority) override;

uint8_t getNumPriorities() const override {
return executor_.getNumPriorities();
return executor_->getNumPriorities();
}

void waitAll() {
Expand All @@ -68,7 +72,7 @@ class ExecutorBarrier : public folly::Executor {
auto wrapMethod(folly::Func f);

std::shared_ptr<folly::Executor> owned_;
folly::Executor& executor_;
folly::Executor::KeepAlive<> executor_;
size_t count_;
std::mutex mutex_;
std::condition_variable cv_;
Expand Down
5 changes: 2 additions & 3 deletions velox/dwio/common/ParallelFor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "velox/dwio/common/ExecutorBarrier.h"

namespace facebook::velox::dwio::common {

namespace {

std::vector<std::pair<size_t, size_t>>
Expand Down Expand Up @@ -60,7 +59,7 @@ splitRange(size_t from, size_t to, size_t factor) {
} // namespace

ParallelFor::ParallelFor(
folly::Executor* executor,
folly::Executor::KeepAlive<> executor,
size_t from,
size_t to,
size_t parallelismFactor)
Expand Down Expand Up @@ -89,7 +88,7 @@ void ParallelFor::execute(std::function<void(size_t)> func) {
VELOX_CHECK(
executor_,
"Executor wasn't provided so we shouldn't have more than 1 range");
ExecutorBarrier barrier(*executor_);
ExecutorBarrier barrier(executor_);
const size_t last = ranges_.size() - 1;
// First N-1 ranges in executor threads
for (size_t r = 0; r < last; ++r) {
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/common/ParallelFor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace facebook::velox::dwio::common {
class ParallelFor {
public:
ParallelFor(
folly::Executor* executor,
folly::Executor::KeepAlive<> executor,
size_t from, // start index
size_t to, // past end index
// number of threads.
Expand All @@ -53,7 +53,7 @@ class ParallelFor {

private:
std::shared_ptr<folly::Executor> owned_;
folly::Executor* executor_;
folly::Executor::KeepAlive<> executor_;
std::vector<std::pair<size_t, size_t>> ranges_;
};

Expand Down
34 changes: 17 additions & 17 deletions velox/dwio/common/tests/ExecutorBarrierTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ TEST(ExecutorBarrierTest, GetNumPriorities) {
const uint8_t kNumPriorities = 5;
auto executor =
std::make_shared<folly::CPUThreadPoolExecutor>(10, kNumPriorities);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
EXPECT_EQ(barrier->getNumPriorities(), kNumPriorities);
}

Expand All @@ -41,15 +41,15 @@ TEST(ExecutorBarrierTest, CanOwn) {

TEST(ExecutorBarrierTest, CanAwaitMultipleTimes) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
for (int time = 0, multipleTimes = 10; time < multipleTimes; ++time) {
barrier->waitAll();
}
}

TEST(ExecutorBarrierTest, AddCanBeReused) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -68,7 +68,7 @@ TEST(ExecutorBarrierTest, AddCanBeReused) {

TEST(ExecutorBarrierTest, AddWithPriorityCanBeReused) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand All @@ -88,7 +88,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityCanBeReused) {

TEST(ExecutorBarrierTest, AddCanBeReusedAfterException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -110,7 +110,7 @@ TEST(ExecutorBarrierTest, AddCanBeReusedAfterException) {

TEST(ExecutorBarrierTest, AddWithPriorityCanBeReusedAfterException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand All @@ -135,7 +135,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityCanBeReusedAfterException) {

TEST(ExecutorBarrierTest, Add) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -148,7 +148,7 @@ TEST(ExecutorBarrierTest, Add) {

TEST(ExecutorBarrierTest, AddWithPriority) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand All @@ -162,7 +162,7 @@ TEST(ExecutorBarrierTest, AddWithPriority) {

TEST(ExecutorBarrierTest, AddCanIgnore) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
for (int i = 0; i < kCalls; ++i) {
Expand All @@ -173,7 +173,7 @@ TEST(ExecutorBarrierTest, AddCanIgnore) {

TEST(ExecutorBarrierTest, AddWithPriorityCanIgnore) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
for (int i = 0; i < kCalls; ++i) {
Expand All @@ -187,7 +187,7 @@ TEST(ExecutorBarrierTest, DestructorDoesntThrow) {
std::atomic<int> count{0};
{
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

for (int i = 0; i < kCalls; ++i) {
barrier->add([shouldThrow = (i == 0), &count]() {
Expand All @@ -203,7 +203,7 @@ TEST(ExecutorBarrierTest, DestructorDoesntThrow) {

TEST(ExecutorBarrierTest, AddException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -221,7 +221,7 @@ TEST(ExecutorBarrierTest, AddException) {

TEST(ExecutorBarrierTest, AddWithPriorityException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand All @@ -242,7 +242,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityException) {

TEST(ExecutorBarrierTest, AddNonStdException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -261,7 +261,7 @@ TEST(ExecutorBarrierTest, AddNonStdException) {

TEST(ExecutorBarrierTest, AddWithPriorityNonStdException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand All @@ -283,7 +283,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityNonStdException) {

TEST(ExecutorBarrierTest, AddExceptions) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -299,7 +299,7 @@ TEST(ExecutorBarrierTest, AddExceptions) {

TEST(ExecutorBarrierTest, AddWithPriorityExceptions) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
auto barrier = std::make_shared<ExecutorBarrier>(executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand Down

0 comments on commit 434b95b

Please sign in to comment.