diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 38f3ba66f214e6..9729c3f42808ee 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -251,6 +251,10 @@ class Worker extends EventEmitter { debug(`[${threadId}] hears end event for Worker ${this.threadId}`); drainMessagePort(this[kPublicPort]); drainMessagePort(this[kPort]); + this.removeAllListeners('message'); + this.removeAllListeners('messageerrors'); + this[kPublicPort].unref(); + this[kPort].unref(); this[kDispose](); if (customErr) { debug(`[${threadId}] failing with custom error ${customErr} \ diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 78fb46ab6f2803..2699bd2792e544 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -565,7 +565,7 @@ MessagePort::MessagePort(Environment* env, auto onmessage = [](uv_async_t* handle) { // Called when data has been put into the queue. MessagePort* channel = ContainerOf(&MessagePort::async_, handle); - channel->OnMessage(); + channel->OnMessage(MessageProcessingMode::kNormalOperation); }; CHECK_EQ(uv_async_init(env->event_loop(), @@ -664,7 +664,7 @@ MessagePort* MessagePort::New( } MaybeLocal MessagePort::ReceiveMessage(Local context, - bool only_if_receiving) { + MessageProcessingMode mode) { std::shared_ptr received; { // Get the head of the message queue. @@ -672,7 +672,9 @@ MaybeLocal MessagePort::ReceiveMessage(Local context, Debug(this, "MessagePort has message"); - bool wants_message = receiving_messages_ || !only_if_receiving; + bool wants_message = + receiving_messages_ || + mode == MessageProcessingMode::kForceReadMessages; // We have nothing to do if: // - There are no pending messages // - We are not intending to receive messages, and the message we would @@ -697,16 +699,18 @@ MaybeLocal MessagePort::ReceiveMessage(Local context, return received->Deserialize(env(), context); } -void MessagePort::OnMessage() { +void MessagePort::OnMessage(MessageProcessingMode mode) { Debug(this, "Running MessagePort::OnMessage()"); HandleScope handle_scope(env()->isolate()); Local context = object(env()->isolate())->CreationContext(); size_t processing_limit; - { + if (mode == MessageProcessingMode::kNormalOperation) { Mutex::ScopedLock(data_->mutex_); processing_limit = std::max(data_->incoming_messages_.size(), static_cast(1000)); + } else { + processing_limit = std::numeric_limits::max(); } // data_ can only ever be modified by the owner thread, so no need to lock. @@ -738,7 +742,7 @@ void MessagePort::OnMessage() { // Catch any exceptions from parsing the message itself (not from // emitting it) as 'messageeror' events. TryCatchScope try_catch(env()); - if (!ReceiveMessage(context, true).ToLocal(&payload)) { + if (!ReceiveMessage(context, mode).ToLocal(&payload)) { if (try_catch.HasCaught() && !try_catch.HasTerminated()) message_error = try_catch.Exception(); goto reschedule; @@ -999,7 +1003,7 @@ void MessagePort::CheckType(const FunctionCallbackInfo& args) { void MessagePort::Drain(const FunctionCallbackInfo& args) { MessagePort* port; ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As()); - port->OnMessage(); + port->OnMessage(MessageProcessingMode::kForceReadMessages); } void MessagePort::ReceiveMessage(const FunctionCallbackInfo& args) { @@ -1018,7 +1022,8 @@ void MessagePort::ReceiveMessage(const FunctionCallbackInfo& args) { } MaybeLocal payload = - port->ReceiveMessage(port->object()->CreationContext(), false); + port->ReceiveMessage(port->object()->CreationContext(), + MessageProcessingMode::kForceReadMessages); if (!payload.IsEmpty()) args.GetReturnValue().Set(payload.ToLocalChecked()); } diff --git a/src/node_messaging.h b/src/node_messaging.h index ad06565977215c..2e63b22e4ceced 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -285,11 +285,16 @@ class MessagePort : public HandleWrap { SET_SELF_SIZE(MessagePort) private: + enum class MessageProcessingMode { + kNormalOperation, + kForceReadMessages + }; + void OnClose() override; - void OnMessage(); + void OnMessage(MessageProcessingMode mode); void TriggerAsync(); v8::MaybeLocal ReceiveMessage(v8::Local context, - bool only_if_receiving); + MessageProcessingMode mode); std::unique_ptr data_ = nullptr; bool receiving_messages_ = false; diff --git a/test/parallel/test-worker-terminate-ref-public-port.js b/test/parallel/test-worker-terminate-ref-public-port.js new file mode 100644 index 00000000000000..4a2de785a36220 --- /dev/null +++ b/test/parallel/test-worker-terminate-ref-public-port.js @@ -0,0 +1,12 @@ +'use strict'; +const common = require('../common'); +const { Worker } = require('worker_threads'); + +// The actual test here is that the Worker does not keep the main thread +// running after it has been .terminate()’ed. + +const w = new Worker(` +const p = require('worker_threads').parentPort; +while(true) p.postMessage({})`, { eval: true }); +w.once('message', () => w.terminate()); +w.once('exit', common.mustCall());