Skip to content

Commit

Permalink
worker: allow transferring/cloning generic BaseObjects
Browse files Browse the repository at this point in the history
Extend support for transferring objects à la `MessagePort` to other
types of `BaseObject` subclasses, as well as implement cloning
support for cases in which destructive transferring is not needed
or optional.

PR-URL: #33772
Backport-PR-URL: #33965
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
addaleax committed Sep 27, 2020
1 parent 667d520 commit 8e1698a
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 56 deletions.
5 changes: 3 additions & 2 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -1570,8 +1570,9 @@ is thrown if a required option is missing.
<a id="ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST"></a>
### `ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`

A `MessagePort` was found in the object passed to a `postMessage()` call,
but not provided in the `transferList` for that call.
An object that needs to be explicitly listed in the `transferList` argument
was found in the object passed to a `postMessage()` call, but not provided in
the `transferList` for that call. Usually, this is a `MessagePort`.

<a id="ERR_MISSING_PASSPHRASE"></a>
### `ERR_MISSING_PASSPHRASE`
Expand Down
38 changes: 37 additions & 1 deletion src/base_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class Environment;
template <typename T, bool kIsWeak>
class BaseObjectPtrImpl;

namespace worker {
class TransferData;
}

class BaseObject : public MemoryRetainer {
public:
enum InternalFields { kSlot, kInternalFieldCount };
Expand Down Expand Up @@ -101,7 +105,39 @@ class BaseObject : public MemoryRetainer {
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(
Environment* env);

protected:
// Interface for transferring BaseObject instances using the .postMessage()
// method of MessagePorts (and, by extension, Workers).
// GetTransferMode() returns a transfer mode that indicates how to deal with
// the current object:
// - kUntransferable:
// No transfer is possible, either because this type of BaseObject does
// not know how to be transfered, or because it is not in a state in
// which it is possible to do so (e.g. because it has already been
// transfered).
// - kTransferable:
// This object can be transfered in a destructive fashion, i.e. will be
// rendered unusable on the sending side of the channel in the process
// of being transfered. (In C++ this would be referred to as movable but
// not copyable.) Objects of this type need to be listed in the
// `transferList` argument of the relevant postMessage() call in order to
// make sure that they are not accidentally destroyed on the sending side.
// TransferForMessaging() will be called to get a representation of the
// object that is used for subsequent deserialization.
// - kCloneable:
// This object can be cloned without being modified.
// CloneForMessaging() will be called to get a representation of the
// object that is used for subsequent deserialization, unless the
// object is listed in transferList, in which case TransferForMessaging()
// is attempted first.
enum class TransferMode {
kUntransferable,
kTransferable,
kCloneable
};
virtual TransferMode GetTransferMode() const;
virtual std::unique_ptr<worker::TransferData> TransferForMessaging();
virtual std::unique_ptr<worker::TransferData> CloneForMessaging() const;

virtual inline void OnGCCollect();

private:
Expand Down
164 changes: 117 additions & 47 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ using v8::ValueSerializer;
using v8::WasmModuleObject;

namespace node {

BaseObject::TransferMode BaseObject::GetTransferMode() const {
return BaseObject::TransferMode::kUntransferable;
}

std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
return CloneForMessaging();
}

std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
return {};
}

namespace worker {

Message::Message(MallocedBuffer<char>&& buffer)
Expand All @@ -54,21 +67,20 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
DeserializerDelegate(
Message* m,
Environment* env,
const std::vector<MessagePort*>& message_ports,
const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules)
: message_ports_(message_ports),
: host_objects_(host_objects),
shared_array_buffers_(shared_array_buffers),
wasm_modules_(wasm_modules) {}

MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
// Currently, only MessagePort hosts objects are supported, so identifying
// by the index in the message's MessagePort array is sufficient.
// Identifying the index in the message's BaseObject array is sufficient.
uint32_t id;
if (!deserializer->ReadUint32(&id))
return MaybeLocal<Object>();
CHECK_LE(id, message_ports_.size());
return message_ports_[id]->object(isolate);
CHECK_LE(id, host_objects_.size());
return host_objects_[id]->object(isolate);
}

MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
Expand All @@ -87,7 +99,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
ValueDeserializer* deserializer = nullptr;

private:
const std::vector<MessagePort*>& message_ports_;
const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules_;
};
Expand All @@ -101,22 +113,25 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);

// Create all necessary MessagePort handles.
std::vector<MessagePort*> ports(message_ports_.size());
for (uint32_t i = 0; i < message_ports_.size(); ++i) {
ports[i] = MessagePort::New(env,
context,
std::move(message_ports_[i]));
if (ports[i] == nullptr) {
for (MessagePort* port : ports) {
// This will eventually release the MessagePort object itself.
if (port != nullptr)
port->Close();
// Create all necessary objects for transferables, e.g. MessagePort handles.
std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
for (uint32_t i = 0; i < transferables_.size(); ++i) {
TransferData* data = transferables_[i].get();
host_objects[i] = data->Deserialize(
env, context, std::move(transferables_[i]));
if (!host_objects[i]) {
for (BaseObjectPtr<BaseObject> object : host_objects) {
if (!object) continue;

// Since creating one of the objects failed, we don't want to have the
// other objects lying around in memory. We act as if the object has
// been garbage-collected.
object->Detach();
}
return MaybeLocal<Value>();
}
}
message_ports_.clear();
transferables_.clear();

std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
// Attach all transferred SharedArrayBuffers to their new Isolate.
Expand All @@ -130,7 +145,7 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
shared_array_buffers_.clear();

DeserializerDelegate delegate(
this, env, ports, shared_array_buffers, wasm_modules_);
this, env, host_objects, shared_array_buffers, wasm_modules_);
ValueDeserializer deserializer(
env->isolate(),
reinterpret_cast<const uint8_t*>(main_message_buf_.data),
Expand Down Expand Up @@ -175,8 +190,8 @@ void Message::AddSharedArrayBuffer(
shared_array_buffers_.push_back(reference);
}

void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
message_ports_.emplace_back(std::move(data));
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
transferables_.emplace_back(std::move(data));
}

uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {
Expand Down Expand Up @@ -242,8 +257,8 @@ class SerializerDelegate : public ValueSerializer::Delegate {
}

Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
if (env_->message_port_constructor_template()->HasInstance(object)) {
return WriteMessagePort(Unwrap<MessagePort>(object));
if (env_->base_object_ctor_template()->HasInstance(object)) {
return WriteHostObject(Unwrap<BaseObject>(object));
}

ThrowDataCloneError(env_->clone_unsupported_type_str());
Expand Down Expand Up @@ -282,32 +297,61 @@ class SerializerDelegate : public ValueSerializer::Delegate {
void Finish() {
// Only close the MessagePort handles and actually transfer them
// once we know that serialization succeeded.
for (MessagePort* port : ports_) {
port->Close();
msg_->AddMessagePort(port->Detach());
for (uint32_t i = 0; i < host_objects_.size(); i++) {
BaseObject* host_object = host_objects_[i];
std::unique_ptr<TransferData> data;
if (i < first_cloned_object_index_)
data = host_object->TransferForMessaging();
if (!data)
data = host_object->CloneForMessaging();
CHECK(data);
msg_->AddTransferable(std::move(data));
}
}

inline void AddHostObject(BaseObject* host_object) {
// Make sure we have not started serializing the value itself yet.
CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
host_objects_.push_back(host_object);
}

ValueSerializer* serializer = nullptr;

private:
Maybe<bool> WriteMessagePort(MessagePort* port) {
for (uint32_t i = 0; i < ports_.size(); i++) {
if (ports_[i] == port) {
Maybe<bool> WriteHostObject(BaseObject* host_object) {
for (uint32_t i = 0; i < host_objects_.size(); i++) {
if (host_objects_[i] == host_object) {
serializer->WriteUint32(i);
return Just(true);
}
}

THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
return Nothing<bool>();
BaseObject::TransferMode mode = host_object->GetTransferMode();
if (mode == BaseObject::TransferMode::kUntransferable) {
ThrowDataCloneError(env_->clone_unsupported_type_str());
return Nothing<bool>();
} else if (mode == BaseObject::TransferMode::kTransferable) {
// TODO(addaleax): This message code is too specific. Fix that in a
// semver-major follow-up.
THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
return Nothing<bool>();
}

CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
uint32_t index = host_objects_.size();
if (first_cloned_object_index_ == SIZE_MAX)
first_cloned_object_index_ = index;
serializer->WriteUint32(index);
host_objects_.push_back(host_object);
return Just(true);
}

Environment* env_;
Local<Context> context_;
Message* msg_;
std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
std::vector<MessagePort*> ports_;
std::vector<BaseObject*> host_objects_;
size_t first_cloned_object_index_ = SIZE_MAX;

friend class worker::Message;
};
Expand Down Expand Up @@ -366,8 +410,7 @@ Maybe<bool> Message::Serialize(Environment* env,
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
} else if (env->base_object_ctor_template()->HasInstance(entry)) {
// Check if the source MessagePort is being transferred.
if (!source_port.IsEmpty() && entry == source_port) {
ThrowDataCloneException(
Expand All @@ -376,26 +419,34 @@ Maybe<bool> Message::Serialize(Environment* env,
"Transfer list contains source port"));
return Nothing<bool>();
}
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
if (port == nullptr || port->IsDetached()) {
BaseObject* host_object = Unwrap<BaseObject>(entry.As<Object>());
if (env->message_port_constructor_template()->HasInstance(entry) &&
(host_object == nullptr ||
static_cast<MessagePort*>(host_object)->IsDetached())) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"MessagePort in transfer list is already detached"));
return Nothing<bool>();
}
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
delegate.ports_.end()) {
if (std::find(delegate.host_objects_.begin(),
delegate.host_objects_.end(),
host_object) != delegate.host_objects_.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate MessagePort"));
String::Concat(env->isolate(),
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate "),
entry.As<Object>()->GetConstructorName()));
return Nothing<bool>();
}
delegate.ports_.push_back(port);
continue;
if (host_object != nullptr && host_object->GetTransferMode() !=
BaseObject::TransferMode::kUntransferable) {
delegate.AddHostObject(host_object);
continue;
}
}

THROW_ERR_INVALID_TRANSFER_OBJECT(env);
Expand Down Expand Up @@ -436,7 +487,7 @@ void Message::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("array_buffer_contents", array_buffer_contents_);
tracker->TrackFieldWithSize("shared_array_buffers",
shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
tracker->TrackField("message_ports", message_ports_);
tracker->TrackField("transferables", transferables_);
}

MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
Expand Down Expand Up @@ -702,6 +753,25 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {
return std::move(data_);
}

BaseObject::TransferMode MessagePort::GetTransferMode() const {
if (IsDetached())
return BaseObject::TransferMode::kUntransferable;
return BaseObject::TransferMode::kTransferable;
}

std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
Close();
return Detach();
}

BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
Environment* env,
Local<Context> context,
std::unique_ptr<TransferData> self) {
return BaseObjectPtr<MessagePort> { MessagePort::New(
env, context,
static_unique_pointer_cast<MessagePortData>(std::move(self))) };
}

Maybe<bool> MessagePort::PostMessage(Environment* env,
Local<Value> message_v,
Expand Down Expand Up @@ -729,8 +799,8 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,

// Check if the target port is posted to itself.
if (data_->sibling_ != nullptr) {
for (const auto& port_data : msg.message_ports()) {
if (data_->sibling_ == port_data.get()) {
for (const auto& transferable : msg.transferables()) {
if (data_->sibling_ == transferable.get()) {
doomed = true;
ProcessEmitWarning(env, "The target port was posted to itself, and "
"the communication channel was lost");
Expand Down
Loading

0 comments on commit 8e1698a

Please sign in to comment.