Skip to content
This repository has been archived by the owner on Aug 31, 2018. It is now read-only.

Commit

Permalink
worker: add SharedArrayBuffer sharing
Browse files Browse the repository at this point in the history
Logic is added to the `MessagePort` mechanism that
attaches hidden objects to those instances when they are transferred
that track their lifetime and maintain a reference count, to make
sure that memory is freed at the appropriate times.

PR-URL: #106
Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
  • Loading branch information
addaleax committed Oct 16, 2017
1 parent 734866f commit b8ff855
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 2 deletions.
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
'src/node_i18n.cc',
'src/pipe_wrap.cc',
'src/process_wrap.cc',
'src/sharedarraybuffer-metadata.cc',
'src/signal_wrap.cc',
'src/spawn_sync.cc',
'src/string_bytes.cc',
Expand Down Expand Up @@ -279,6 +280,7 @@
'src/udp_wrap.h',
'src/req-wrap.h',
'src/req-wrap-inl.h',
'src/sharedarraybuffer-metadata.h',
'src/string_bytes.h',
'src/stream_base.h',
'src/stream_base-inl.h',
Expand Down
2 changes: 2 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class ModuleWrap;
V(decorated_private_symbol, "node:decorated") \
V(npn_buffer_private_symbol, "node:npnBuffer") \
V(processed_private_symbol, "node:processed") \
V(sab_lifetimepartner_symbol, "node:sharedArrayBufferLifetimePartner") \
V(selected_npn_buffer_private_symbol, "node:selectedNpnBuffer") \
V(domain_private_symbol, "node:domain") \

Expand Down Expand Up @@ -324,6 +325,7 @@ class ModuleWrap;
V(promise_wrap_template, v8::ObjectTemplate) \
V(push_values_to_array_function, v8::Function) \
V(randombytes_constructor_template, v8::ObjectTemplate) \
V(sab_lifetimepartner_constructor_template, v8::FunctionTemplate) \
V(script_context_constructor_template, v8::FunctionTemplate) \
V(script_data_constructor_function, v8::Function) \
V(secure_context_constructor_template, v8::FunctionTemplate) \
Expand Down
38 changes: 38 additions & 0 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::SharedArrayBuffer;
using v8::String;
using v8::Value;
using v8::ValueDeserializer;
Expand Down Expand Up @@ -55,6 +56,7 @@ Message& Message::operator=(Message&& other) {
main_message_buf_ = other.main_message_buf_;
other.main_message_buf_ = uv_buf_init(nullptr, 0);
array_buffer_contents_ = std::move(other.array_buffer_contents_);
shared_array_buffers_ = std::move(other.shared_array_buffers_);
message_ports_ = std::move(other.message_ports_);
return *this;
}
Expand Down Expand Up @@ -98,6 +100,7 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
// This is for messages generated in C++ with the expectation that they
// are handled in JS, e.g. serialized error messages from workers.
CHECK(array_buffer_contents_.empty());
CHECK(shared_array_buffers_.empty());
CHECK(message_ports_.empty());
char* buf = main_message_buf_.base;
main_message_buf_.base = nullptr;
Expand Down Expand Up @@ -137,12 +140,26 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
}
array_buffer_contents_.clear();

for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
Local<SharedArrayBuffer> sab;
if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
.ToLocal(&sab))
return MaybeLocal<Value>();
deserializer.TransferSharedArrayBuffer(i, sab);
}
shared_array_buffers_.clear();

if (deserializer.ReadHeader(context).IsNothing())
return MaybeLocal<Value>();
return handle_scope.Escape(
deserializer.ReadValue(context).FromMaybe(Local<Value>()));
}

void Message::AddSharedArrayBuffer(
SharedArrayBufferMetadataReference reference) {
shared_array_buffers_.push_back(reference);
}

void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
message_ports_.emplace_back(std::move(data));
}
Expand All @@ -167,6 +184,26 @@ class SerializerDelegate : public ValueSerializer::Delegate {
return Nothing<bool>();
}

Maybe<uint32_t> GetSharedArrayBufferId(
Isolate* isolate, Local<SharedArrayBuffer> shared_array_buffer) override {
uint32_t i;
for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
if (seen_shared_array_buffers_[i] == shared_array_buffer)
return Just(i);
}

SharedArrayBufferMetadataReference reference(
SharedArrayBufferMetadata::ForIncomingSharedArrayBuffer(env_,
context_,
shared_array_buffer));
if (!reference) {
return Nothing<uint32_t>();
}
seen_shared_array_buffers_.push_back(shared_array_buffer);
msg_->AddSharedArrayBuffer(reference);
return Just(i);
}

void Finish() {
for (MessagePort* port : ports_) {
port->Close();
Expand All @@ -192,6 +229,7 @@ class SerializerDelegate : public ValueSerializer::Delegate {
Environment* env_;
Local<Context> context_;
Message* msg_;
std::vector<Local<SharedArrayBuffer>> seen_shared_array_buffers_;
std::vector<MessagePort*> ports_;

friend class worker::Message;
Expand Down
7 changes: 5 additions & 2 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

#include "env.h"
#include "node_mutex.h"
#include "sharedarraybuffer-metadata.h"
#include <list>
#include <memory>

namespace node {
namespace worker {
Expand All @@ -27,7 +27,6 @@ class MessagePort;

// Any further flagged message codes are defined by the modules that use them.


// Represents a single communication message. The only non-standard extension
// here is passing of a separate flag that the Workers implementation uses
// for internal cross-thread information passing.
Expand All @@ -54,6 +53,9 @@ class Message {
v8::Local<v8::Value> input,
v8::Local<v8::Value> transfer_list);

// Internal method of Message that is called when a new SharedArrayBuffer
// object is encountered in the incoming value's structure.
void AddSharedArrayBuffer(SharedArrayBufferMetadataReference ref);
// Internal method of Message that is called once serialization finishes
// and that transfers ownership of `data` to this message.
void AddMessagePort(std::unique_ptr<MessagePortData>&& data);
Expand All @@ -62,6 +64,7 @@ class Message {
int32_t flag_ = MESSAGE_FLAG_NONE;
uv_buf_t main_message_buf_;
std::vector<uv_buf_t> array_buffer_contents_;
std::vector<SharedArrayBufferMetadataReference> shared_array_buffers_;
std::vector<std::unique_ptr<MessagePortData>> message_ports_;

friend class MessagePort;
Expand Down
137 changes: 137 additions & 0 deletions src/sharedarraybuffer-metadata.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#include "sharedarraybuffer-metadata.h"
#include "base-object.h"
#include "base-object-inl.h"

using v8::Context;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Local;
using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::SharedArrayBuffer;
using v8::Value;

namespace node {
namespace worker {

namespace {

// Yield a JS constructor for SABLifetimePartner objects in the form of a
// standard API object, that has a single field for containing the raw
// SABLiftimePartner* pointer.
Local<Function> GetSABLifetimePartnerConstructor(
Environment* env, Local<Context> context) {
Local<FunctionTemplate> templ;
templ = env->sab_lifetimepartner_constructor_template();
if (!templ.IsEmpty())
return templ->GetFunction(context).ToLocalChecked();

{
Local<FunctionTemplate> m = env->NewFunctionTemplate(
[](const FunctionCallbackInfo<Value>& info) {
CHECK(info.IsConstructCall());
});
m->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(),
"SABLifetimePartner"));
m->InstanceTemplate()->SetInternalFieldCount(1);

env->set_sab_lifetimepartner_constructor_template(m);
}

return GetSABLifetimePartnerConstructor(env, context);
}

class SABLifetimePartner : public BaseObject {
public:
SABLifetimePartner(Environment* env,
Local<Object> obj,
SharedArrayBufferMetadataReference r)
: BaseObject(env, obj),
reference(r) {
MakeWeak<SABLifetimePartner>(this);
}

SharedArrayBufferMetadataReference reference;
};

} // anonymous namespace

SharedArrayBufferMetadataReference
SharedArrayBufferMetadata::ForIncomingSharedArrayBuffer(
Environment* env, Local<Context> context, Local<SharedArrayBuffer> source) {
Local<Value> lifetime_partner;

if (!source->GetPrivate(context,
env->sab_lifetimepartner_symbol())
.ToLocal(&lifetime_partner)) {
return nullptr;
}

if (lifetime_partner->IsObject() &&
env->sab_lifetimepartner_constructor_template()
->HasInstance(lifetime_partner)) {
if (!source->IsExternal()) {
env->ThrowError("Found internalized SharedArrayBuffer with "
"lifetime partner object");
return nullptr;
}

SABLifetimePartner* partner =
Unwrap<SABLifetimePartner>(lifetime_partner.As<Object>());
CHECK_NE(partner, nullptr);
return partner->reference;
}

if (source->IsExternal()) {
// If this is an external SharedArrayBuffer but we do not see a lifetime
// partner object, we did not externalize it. In that case, there is no
// way to serialize it.
env->ThrowError("Cannot serialize externalized SharedArrayBuffer");
return nullptr;
}

SharedArrayBuffer::Contents contents = source->Externalize();
SharedArrayBufferMetadataReference r(new SharedArrayBufferMetadata(
contents.Data(), contents.ByteLength()));
if (r->AssignToSharedArrayBuffer(env, context, source).IsNothing())
return nullptr;
return r;
}

Maybe<bool> SharedArrayBufferMetadata::AssignToSharedArrayBuffer(
Environment* env, Local<Context> context,
Local<SharedArrayBuffer> target) {
Local<Function> ctor = GetSABLifetimePartnerConstructor(env, context);
Local<Object> obj;
if (!ctor->NewInstance(context).ToLocal(&obj))
return Nothing<bool>();

new SABLifetimePartner(env, obj, shared_from_this());
return target->SetPrivate(context,
env->sab_lifetimepartner_symbol(),
obj);
}

SharedArrayBufferMetadata::SharedArrayBufferMetadata(void* data, size_t size)
: data(data), size(size) { }

SharedArrayBufferMetadata::~SharedArrayBufferMetadata() {
free(data);
}

MaybeLocal<SharedArrayBuffer> SharedArrayBufferMetadata::GetSharedArrayBuffer(
Environment* env, Local<Context> context) {
Local<SharedArrayBuffer> obj =
SharedArrayBuffer::New(env->isolate(), data, size);

if (AssignToSharedArrayBuffer(env, context, obj).IsNothing())
return MaybeLocal<SharedArrayBuffer>();

return obj;
}

} // namespace worker
} // namespace node
66 changes: 66 additions & 0 deletions src/sharedarraybuffer-metadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#ifndef SRC_SHAREDARRAYBUFFER_METADATA_H_
#define SRC_SHAREDARRAYBUFFER_METADATA_H_

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "node.h"
#include <memory>

namespace node {
namespace worker {

class SharedArrayBufferMetadata;

// This is an object associated with a SharedArrayBuffer, which keeps track
// of a cross-thread reference count. Once a SharedArrayBuffer is transferred
// for the first time (or is attempted to be transferred), one of these objects
// is created, and the SharedArrayBuffer is moved from internalized mode into
// externalized mode (i.e. the JS engine no longer frees the memory on its own).
//
// This will always be referred to using a std::shared_ptr, since it keeps
// a reference count and is guaranteed to be thread-safe.
typedef std::shared_ptr<SharedArrayBufferMetadata>
SharedArrayBufferMetadataReference;

class SharedArrayBufferMetadata
: public std::enable_shared_from_this<SharedArrayBufferMetadata> {
public:
static SharedArrayBufferMetadataReference ForIncomingSharedArrayBuffer(
Environment* env, v8::Local<v8::Context> context,
v8::Local<v8::SharedArrayBuffer> source);
~SharedArrayBufferMetadata();

// Create a SharedArrayBuffer object for a specific Environment and Context.
// The created SharedArrayBuffer will be in externalized mode and has
// a hidden object attached to it, during whose lifetime the reference
// count is increased by 1.
v8::MaybeLocal<v8::SharedArrayBuffer> GetSharedArrayBuffer(
Environment* env, v8::Local<v8::Context> context);

SharedArrayBufferMetadata(SharedArrayBufferMetadata&& other) = delete;
SharedArrayBufferMetadata& operator=(
SharedArrayBufferMetadata&& other) = delete;
SharedArrayBufferMetadata& operator=(
const SharedArrayBufferMetadata&) = delete;
SharedArrayBufferMetadata(const SharedArrayBufferMetadata&) = delete;

private:
explicit SharedArrayBufferMetadata(void* data, size_t size);

// Attach a lifetime tracker object with a reference count to `target`.
v8::Maybe<bool> AssignToSharedArrayBuffer(
Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::SharedArrayBuffer> target);

void* data = nullptr;
size_t size = 0;
};

} // namespace worker
} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS


#endif // SRC_SHAREDARRAYBUFFER_METADATA_H_

0 comments on commit b8ff855

Please sign in to comment.