diff --git a/src/node_messaging.cc b/src/node_messaging.cc index b9212ba272d0fe..98ef42df758fd9 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -40,6 +40,10 @@ namespace worker { Message::Message(MallocedBuffer&& buffer) : main_message_buf_(std::move(buffer)) {} +bool Message::IsCloseMessage() const { + return main_message_buf_.data == nullptr; +} + namespace { // This is used to tell V8 how to read transferred host objects, like other @@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate { MaybeLocal Message::Deserialize(Environment* env, Local context) { + CHECK(!IsCloseMessage()); + EscapableHandleScope handle_scope(env->isolate()); Context::Scope context_scope(context); @@ -395,6 +401,7 @@ Maybe Message::Serialize(Environment* env, // The serializer gave us a buffer allocated using `malloc()`. std::pair data = serializer.Release(); + CHECK_NOT_NULL(data.first); main_message_buf_ = MallocedBuffer(reinterpret_cast(data.first), data.second); return Just(true); @@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) { } } -bool MessagePortData::IsSiblingClosed() const { - Mutex::ScopedLock lock(*sibling_mutex_); - return sibling_ == nullptr; -} - void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { CHECK_NULL(a->sibling_); CHECK_NULL(b->sibling_); @@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { a->sibling_mutex_ = b->sibling_mutex_; } -void MessagePortData::PingOwnerAfterDisentanglement() { - Mutex::ScopedLock lock(mutex_); - if (owner_ != nullptr) - owner_->TriggerAsync(); -} - void MessagePortData::Disentangle() { // Grab a copy of the sibling mutex, then replace it so that each sibling // has its own sibling_mutex_ now. @@ -462,11 +458,12 @@ void MessagePortData::Disentangle() { sibling_ = nullptr; } - // We close MessagePorts after disentanglement, so we trigger the - // corresponding uv_async_t to let them know that this happened. - PingOwnerAfterDisentanglement(); + // We close MessagePorts after disentanglement, so we enqueue a corresponding + // message and trigger the corresponding uv_async_t to let them know that + // this happened. + AddToIncomingQueue(Message()); if (sibling != nullptr) { - sibling->PingOwnerAfterDisentanglement(); + sibling->AddToIncomingQueue(Message()); } } @@ -590,14 +587,25 @@ void MessagePort::OnMessage() { Debug(this, "MessagePort has message, receiving = %d", static_cast(receiving_messages_)); - if (!receiving_messages_) - break; - if (data_->incoming_messages_.empty()) + // We have nothing to do if: + // - There are no pending messages + // - We are not intending to receive messages, and the message we would + // receive is not the final "close" message. + if (data_->incoming_messages_.empty() || + (!receiving_messages_ && + !data_->incoming_messages_.front().IsCloseMessage())) { break; + } + received = std::move(data_->incoming_messages_.front()); data_->incoming_messages_.pop_front(); } + if (received.IsCloseMessage()) { + Close(); + return; + } + if (!env()->can_call_into_js()) { Debug(this, "MessagePort drains queue because !can_call_into_js()"); // In this case there is nothing to do but to drain the current queue. @@ -628,15 +636,6 @@ void MessagePort::OnMessage() { } } } - - if (data_ && data_->IsSiblingClosed()) { - Close(); - } -} - -bool MessagePort::IsSiblingClosed() const { - CHECK(data_); - return data_->IsSiblingClosed(); } void MessagePort::OnClose() { diff --git a/src/node_messaging.h b/src/node_messaging.h index 0a729c141088cb..08a6798e3cd3c1 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -17,6 +17,9 @@ class MessagePort; // Represents a single communication message. class Message : public MemoryRetainer { public: + // Create a Message with a specific underlying payload, in the format of the + // V8 ValueSerializer API. If `payload` is empty, this message indicates + // that the receiving message port should close itself. explicit Message(MallocedBuffer&& payload = MallocedBuffer()); Message(Message&& other) = default; @@ -24,6 +27,10 @@ class Message : public MemoryRetainer { Message& operator=(const Message&) = delete; Message(const Message&) = delete; + // Whether this is a message indicating that the port is to be closed. + // This is the last message to be received by a MessagePort. + bool IsCloseMessage() const; + // Deserialize the contained JS value. May only be called once, and only // after Serialize() has been called (e.g. by another thread). v8::MaybeLocal Deserialize(Environment* env, @@ -89,10 +96,6 @@ class MessagePortData : public MemoryRetainer { // This may be called from any thread. void AddToIncomingQueue(Message&& message); - // Returns true if and only this MessagePort is currently not entangled - // with another message port. - bool IsSiblingClosed() const; - // Turns `a` and `b` into siblings, i.e. connects the sending side of one // to the receiving side of the other. This is not thread-safe. static void Entangle(MessagePortData* a, MessagePortData* b); @@ -109,10 +112,6 @@ class MessagePortData : public MemoryRetainer { SET_SELF_SIZE(MessagePortData) private: - // After disentangling this message port, the owner handle (if any) - // is asynchronously triggered, so that it can close down naturally. - void PingOwnerAfterDisentanglement(); - // This mutex protects all fields below it, with the exception of // sibling_. mutable Mutex mutex_; @@ -178,7 +177,6 @@ class MessagePort : public HandleWrap { // messages. std::unique_ptr Detach(); - bool IsSiblingClosed() const; void Close( v8::Local close_callback = v8::Local()) override; diff --git a/test/parallel/test-worker-message-port-message-before-close.js b/test/parallel/test-worker-message-port-message-before-close.js new file mode 100644 index 00000000000000..ecaad9c8767a93 --- /dev/null +++ b/test/parallel/test-worker-message-port-message-before-close.js @@ -0,0 +1,38 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { once } = require('events'); +const { Worker, MessageChannel } = require('worker_threads'); + +// This is a regression test for the race condition underlying +// https://github.com/nodejs/node/issues/22762. +// It ensures that all messages send before a MessagePort#close() call are +// received. Previously, what could happen was a race condition like this: +// - Thread 1 sends message A +// - Thread 2 begins receiving/emitting message A +// - Thread 1 sends message B +// - Thread 1 closes its side of the channel +// - Thread 2 finishes receiving/emitting message A +// - Thread 2 sees that the port should be closed +// - Thread 2 closes the port, discarding message B in the process. + +async function test() { + const worker = new Worker(` + require('worker_threads').parentPort.on('message', ({ port }) => { + port.postMessage('firstMessage'); + port.postMessage('lastMessage'); + port.close(); + }); + `, { eval: true }); + + for (let i = 0; i < 10000; i++) { + const { port1, port2 } = new MessageChannel(); + worker.postMessage({ port: port2 }, [ port2 ]); + await once(port1, 'message'); // 'complexObject' + assert.deepStrictEqual(await once(port1, 'message'), ['lastMessage']); + } + + worker.terminate(); +} + +test().then(common.mustCall());