Skip to content

Commit

Permalink
src: split out callback queue implementation from Environment
Browse files Browse the repository at this point in the history
This isn’t conceptually tied to anything Node.js-specific at all.

PR-URL: #33272
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
  • Loading branch information
addaleax authored and codebytere committed Jun 7, 2020
1 parent 77caf92 commit a5e8c5c
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 120 deletions.
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@
'src/base_object.h',
'src/base_object-inl.h',
'src/base64.h',
'src/callback_queue.h',
'src/callback_queue-inl.h',
'src/connect_wrap.h',
'src/connection_wrap.h',
'src/debug_utils.h',
Expand Down
97 changes: 97 additions & 0 deletions src/callback_queue-inl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#ifndef SRC_CALLBACK_QUEUE_INL_H_
#define SRC_CALLBACK_QUEUE_INL_H_

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "callback_queue.h"

namespace node {

template <typename R, typename... Args>
template <typename Fn>
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
CallbackQueue<R, Args...>::CreateCallback(Fn&& fn, bool refed) {
return std::make_unique<CallbackImpl<Fn>>(std::move(fn), refed);
}

template <typename R, typename... Args>
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
CallbackQueue<R, Args...>::Shift() {
std::unique_ptr<Callback> ret = std::move(head_);
if (ret) {
head_ = ret->get_next();
if (!head_)
tail_ = nullptr; // The queue is now empty.
}
size_--;
return ret;
}

template <typename R, typename... Args>
void CallbackQueue<R, Args...>::Push(std::unique_ptr<Callback> cb) {
Callback* prev_tail = tail_;

size_++;
tail_ = cb.get();
if (prev_tail != nullptr)
prev_tail->set_next(std::move(cb));
else
head_ = std::move(cb);
}

template <typename R, typename... Args>
void CallbackQueue<R, Args...>::ConcatMove(CallbackQueue<R, Args...>&& other) {
size_ += other.size_;
if (tail_ != nullptr)
tail_->set_next(std::move(other.head_));
else
head_ = std::move(other.head_);
tail_ = other.tail_;
other.tail_ = nullptr;
other.size_ = 0;
}

template <typename R, typename... Args>
size_t CallbackQueue<R, Args...>::size() const {
return size_.load();
}

template <typename R, typename... Args>
CallbackQueue<R, Args...>::Callback::Callback(bool refed)
: refed_(refed) {}

template <typename R, typename... Args>
bool CallbackQueue<R, Args...>::Callback::is_refed() const {
return refed_;
}

template <typename R, typename... Args>
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
CallbackQueue<R, Args...>::Callback::get_next() {
return std::move(next_);
}

template <typename R, typename... Args>
void CallbackQueue<R, Args...>::Callback::set_next(
std::unique_ptr<Callback> next) {
next_ = std::move(next);
}

template <typename R, typename... Args>
template <typename Fn>
CallbackQueue<R, Args...>::CallbackImpl<Fn>::CallbackImpl(
Fn&& callback, bool refed)
: Callback(refed),
callback_(std::move(callback)) {}

template <typename R, typename... Args>
template <typename Fn>
R CallbackQueue<R, Args...>::CallbackImpl<Fn>::Call(Args... args) {
return callback_(std::forward<Args>(args)...);
}

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#endif // SRC_CALLBACK_QUEUE_INL_H_
70 changes: 70 additions & 0 deletions src/callback_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#ifndef SRC_CALLBACK_QUEUE_H_
#define SRC_CALLBACK_QUEUE_H_

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include <atomic>

namespace node {

// A queue of C++ functions that take Args... as arguments and return R
// (this is similar to the signature of std::function).
// New entries are added using `CreateCallback()`/`Push()`, and removed using
// `Shift()`.
// The `refed` flag is left for easier use in situations in which some of these
// should be run even if nothing else is keeping the event loop alive.
template <typename R, typename... Args>
class CallbackQueue {
public:
class Callback {
public:
explicit inline Callback(bool refed);

virtual ~Callback() = default;
virtual R Call(Args... args) = 0;

inline bool is_refed() const;

private:
inline std::unique_ptr<Callback> get_next();
inline void set_next(std::unique_ptr<Callback> next);

bool refed_;
std::unique_ptr<Callback> next_;

friend class CallbackQueue;
};

template <typename Fn>
inline std::unique_ptr<Callback> CreateCallback(Fn&& fn, bool refed);

inline std::unique_ptr<Callback> Shift();
inline void Push(std::unique_ptr<Callback> cb);
// ConcatMove adds elements from 'other' to the end of this list, and clears
// 'other' afterwards.
inline void ConcatMove(CallbackQueue&& other);

// size() is atomic and may be called from any thread.
inline size_t size() const;

private:
template <typename Fn>
class CallbackImpl final : public Callback {
public:
CallbackImpl(Fn&& callback, bool refed);
R Call(Args... args) override;

private:
Fn callback_;
};

std::atomic<size_t> size_ {0};
std::unique_ptr<Callback> head_;
Callback* tail_ = nullptr;
};

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#endif // SRC_CALLBACK_QUEUE_H_
80 changes: 6 additions & 74 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "aliased_buffer.h"
#include "callback_queue-inl.h"
#include "env.h"
#include "node.h"
#include "util-inl.h"
Expand Down Expand Up @@ -730,50 +731,9 @@ inline void IsolateData::set_options(
options_ = std::move(options);
}

std::unique_ptr<Environment::NativeImmediateCallback>
Environment::NativeImmediateQueue::Shift() {
std::unique_ptr<Environment::NativeImmediateCallback> ret = std::move(head_);
if (ret) {
head_ = ret->get_next();
if (!head_)
tail_ = nullptr; // The queue is now empty.
}
size_--;
return ret;
}

void Environment::NativeImmediateQueue::Push(
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
NativeImmediateCallback* prev_tail = tail_;

size_++;
tail_ = cb.get();
if (prev_tail != nullptr)
prev_tail->set_next(std::move(cb));
else
head_ = std::move(cb);
}

void Environment::NativeImmediateQueue::ConcatMove(
NativeImmediateQueue&& other) {
size_ += other.size_;
if (tail_ != nullptr)
tail_->set_next(std::move(other.head_));
else
head_ = std::move(other.head_);
tail_ = other.tail_;
other.tail_ = nullptr;
other.size_ = 0;
}

size_t Environment::NativeImmediateQueue::size() const {
return size_.load();
}

template <typename Fn>
void Environment::CreateImmediate(Fn&& cb, bool ref) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), ref);
auto callback = native_immediates_.CreateCallback(std::move(cb), ref);
native_immediates_.Push(std::move(callback));
}

Expand All @@ -793,8 +753,8 @@ void Environment::SetUnrefImmediate(Fn&& cb) {

template <typename Fn>
void Environment::SetImmediateThreadsafe(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
auto callback =
native_immediates_threadsafe_.CreateCallback(std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
Expand All @@ -804,8 +764,8 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) {

template <typename Fn>
void Environment::RequestInterrupt(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
auto callback =
native_immediates_interrupts_.CreateCallback(std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_interrupts_.Push(std::move(callback));
Expand All @@ -814,34 +774,6 @@ void Environment::RequestInterrupt(Fn&& cb) {
RequestInterruptFromV8();
}

Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
: refed_(refed) {}

bool Environment::NativeImmediateCallback::is_refed() const {
return refed_;
}

std::unique_ptr<Environment::NativeImmediateCallback>
Environment::NativeImmediateCallback::get_next() {
return std::move(next_);
}

void Environment::NativeImmediateCallback::set_next(
std::unique_ptr<NativeImmediateCallback> next) {
next_ = std::move(next);
}

template <typename Fn>
Environment::NativeImmediateCallbackImpl<Fn>::NativeImmediateCallbackImpl(
Fn&& callback, bool refed)
: NativeImmediateCallback(refed),
callback_(std::move(callback)) {}

template <typename Fn>
void Environment::NativeImmediateCallbackImpl<Fn>::Call(Environment* env) {
callback_(env);
}

inline bool Environment::can_call_into_js() const {
return can_call_into_js_ && !is_stopping();
}
Expand Down
5 changes: 2 additions & 3 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ void Environment::RunAndClearInterrupts() {
}
DebugSealHandleScope seal_handle_scope(isolate());

while (std::unique_ptr<NativeImmediateCallback> head = queue.Shift())
while (auto head = queue.Shift())
head->Call(this);
}
}
Expand All @@ -716,8 +716,7 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
auto drain_list = [&]() {
TryCatchScope try_catch(this);
DebugSealHandleScope seal_handle_scope(isolate());
while (std::unique_ptr<NativeImmediateCallback> head =
native_immediates_.Shift()) {
while (auto head = native_immediates_.Shift()) {
if (head->is_refed())
ref_count++;

Expand Down
45 changes: 2 additions & 43 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "inspector_agent.h"
#include "inspector_profiler.h"
#endif
#include "callback_queue.h"
#include "debug_utils.h"
#include "handle_wrap.h"
#include "node.h"
Expand Down Expand Up @@ -1395,49 +1396,7 @@ class Environment : public MemoryRetainer {

std::list<ExitCallback> at_exit_functions_;

class NativeImmediateCallback {
public:
explicit inline NativeImmediateCallback(bool refed);

virtual ~NativeImmediateCallback() = default;
virtual void Call(Environment* env) = 0;

inline bool is_refed() const;
inline std::unique_ptr<NativeImmediateCallback> get_next();
inline void set_next(std::unique_ptr<NativeImmediateCallback> next);

private:
bool refed_;
std::unique_ptr<NativeImmediateCallback> next_;
};

template <typename Fn>
class NativeImmediateCallbackImpl final : public NativeImmediateCallback {
public:
NativeImmediateCallbackImpl(Fn&& callback, bool refed);
void Call(Environment* env) override;

private:
Fn callback_;
};

class NativeImmediateQueue {
public:
inline std::unique_ptr<NativeImmediateCallback> Shift();
inline void Push(std::unique_ptr<NativeImmediateCallback> cb);
// ConcatMove adds elements from 'other' to the end of this list, and clears
// 'other' afterwards.
inline void ConcatMove(NativeImmediateQueue&& other);

// size() is atomic and may be called from any thread.
inline size_t size() const;

private:
std::atomic<size_t> size_ {0};
std::unique_ptr<NativeImmediateCallback> head_;
NativeImmediateCallback* tail_ = nullptr;
};

typedef CallbackQueue<void, Environment*> NativeImmediateQueue;
NativeImmediateQueue native_immediates_;
Mutex native_immediates_threadsafe_mutex_;
NativeImmediateQueue native_immediates_threadsafe_;
Expand Down

0 comments on commit a5e8c5c

Please sign in to comment.