diff --git a/src/env-inl.h b/src/env-inl.h index cf6de689c4558f..ae4c02b6b2d081 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -748,6 +748,7 @@ Environment::NativeImmediateQueue::Shift() { if (!head_) tail_ = nullptr; // The queue is now empty. } + size_--; return ret; } @@ -755,6 +756,7 @@ void Environment::NativeImmediateQueue::Push( std::unique_ptr cb) { NativeImmediateCallback* prev_tail = tail_; + size_++; tail_ = cb.get(); if (prev_tail != nullptr) prev_tail->set_next(std::move(cb)); @@ -774,6 +776,10 @@ void Environment::NativeImmediateQueue::ConcatMove( other.size_ = 0; } +size_t Environment::NativeImmediateQueue::size() const { + return size_.load(); +} + template void Environment::CreateImmediate(Fn&& cb, bool ref) { auto callback = std::make_unique>( @@ -795,6 +801,17 @@ void Environment::SetUnrefImmediate(Fn&& cb) { CreateImmediate(std::move(cb), false); } +template +void Environment::SetImmediateThreadsafe(Fn&& cb) { + auto callback = std::make_unique>( + std::move(cb), false); + { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + native_immediates_threadsafe_.Push(std::move(callback)); + } + uv_async_send(&task_queues_async_); +} + Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed) : refed_(refed) {} @@ -1164,7 +1181,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) { inline void Environment::RegisterFinalizationGroupForCleanup( v8::Local group) { cleanup_finalization_groups_.emplace_back(isolate(), group); - uv_async_send(&cleanup_finalization_groups_async_); + uv_async_send(&task_queues_async_); } size_t CleanupHookCallback::Hash::operator()( diff --git a/src/env.cc b/src/env.cc index 48ba78fff82d7c..6d6d77abd02da3 100644 --- a/src/env.cc +++ b/src/env.cc @@ -460,15 +460,16 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { uv_check_init(event_loop(), &idle_check_handle_); uv_async_init( event_loop(), - &cleanup_finalization_groups_async_, + &task_queues_async_, [](uv_async_t* async) { Environment* env = ContainerOf( - &Environment::cleanup_finalization_groups_async_, async); + &Environment::task_queues_async_, async); env->CleanupFinalizationGroups(); + env->RunAndClearNativeImmediates(); }); uv_unref(reinterpret_cast(&idle_prepare_handle_)); uv_unref(reinterpret_cast(&idle_check_handle_)); - uv_unref(reinterpret_cast(&cleanup_finalization_groups_async_)); + uv_unref(reinterpret_cast(&task_queues_async_)); thread_stopper()->Install( this, static_cast(this), [](uv_async_t* handle) { @@ -532,7 +533,7 @@ void Environment::RegisterHandleCleanups() { close_and_finish, nullptr); RegisterHandleCleanup( - reinterpret_cast(&cleanup_finalization_groups_async_), + reinterpret_cast(&task_queues_async_), close_and_finish, nullptr); } @@ -663,6 +664,15 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { "RunAndClearNativeImmediates", this); size_t ref_count = 0; + // It is safe to check .size() first, because there is a causal relationship + // between pushes to the threadsafe and this function being called. + // For the common case, it's worth checking the size first before establishing + // a mutex lock. + if (native_immediates_threadsafe_.size() > 0) { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_)); + } + NativeImmediateQueue queue; queue.ConcatMove(std::move(native_immediates_)); @@ -1084,7 +1094,7 @@ void Environment::CleanupFinalizationGroups() { if (try_catch.HasCaught() && !try_catch.HasTerminated()) errors::TriggerUncaughtException(isolate(), try_catch); // Re-schedule the execution of the remainder of the queue. - uv_async_send(&cleanup_finalization_groups_async_); + uv_async_send(&task_queues_async_); return; } } diff --git a/src/env.h b/src/env.h index f5fee470308a3c..5a8c870ebd2caf 100644 --- a/src/env.h +++ b/src/env.h @@ -1199,6 +1199,9 @@ class Environment : public MemoryRetainer { inline void SetImmediate(Fn&& cb); template inline void SetUnrefImmediate(Fn&& cb); + template + // This behaves like SetImmediate() but can be called from any thread. + inline void SetImmediateThreadsafe(Fn&& cb); // This needs to be available for the JS-land setImmediate(). void ToggleImmediateRef(bool ref); @@ -1284,7 +1287,7 @@ class Environment : public MemoryRetainer { uv_idle_t immediate_idle_handle_; uv_prepare_t idle_prepare_handle_; uv_check_t idle_check_handle_; - uv_async_t cleanup_finalization_groups_async_; + uv_async_t task_queues_async_; bool profiler_idle_notifier_started_ = false; AsyncHooks async_hooks_; @@ -1436,12 +1439,18 @@ class Environment : public MemoryRetainer { // 'other' afterwards. inline void ConcatMove(NativeImmediateQueue&& other); + // size() is atomic and may be called from any thread. + inline size_t size() const; + private: + std::atomic size_ {0}; std::unique_ptr head_; NativeImmediateCallback* tail_ = nullptr; }; NativeImmediateQueue native_immediates_; + Mutex native_immediates_threadsafe_mutex_; + NativeImmediateQueue native_immediates_threadsafe_; void RunAndClearNativeImmediates(bool only_refed = false); static void CheckImmediate(uv_check_t* handle);