Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer,n-api: release external buffers from BackingStore callback #33321

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -727,9 +727,9 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
}

template <typename Fn>
void Environment::SetImmediateThreadsafe(Fn&& cb) {
void Environment::SetImmediateThreadsafe(Fn&& cb, bool refed) {
auto callback =
native_immediates_threadsafe_.CreateCallback(std::move(cb), false);
native_immediates_threadsafe_.CreateCallback(std::move(cb), refed);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
Expand Down
29 changes: 17 additions & 12 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -743,19 +743,10 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
// exceptions, so we do not need to handle that.
RunAndClearInterrupts();

// It is safe to check .size() first, because there is a causal relationship
// between pushes to the threadsafe and this function being called.
// For the common case, it's worth checking the size first before establishing
// a mutex lock.
if (native_immediates_threadsafe_.size() > 0) {
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_));
}

auto drain_list = [&]() {
auto drain_list = [&](NativeImmediateQueue* queue) {
TryCatchScope try_catch(this);
DebugSealHandleScope seal_handle_scope(isolate());
while (auto head = native_immediates_.Shift()) {
while (auto head = queue->Shift()) {
if (head->is_refed())
ref_count++;

Expand All @@ -773,12 +764,26 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
}
return false;
};
while (drain_list()) {}
while (drain_list(&native_immediates_)) {}

immediate_info()->ref_count_dec(ref_count);

if (immediate_info()->ref_count() == 0)
ToggleImmediateRef(false);

// It is safe to check .size() first, because there is a causal relationship
// between pushes to the threadsafe immediate list and this function being
// called. For the common case, it's worth checking the size first before
// establishing a mutex lock.
// This is intentionally placed after the `ref_count` handling, because when
// refed threadsafe immediates are created, they are not counted towards the
// count in immediate_info() either.
NativeImmediateQueue threadsafe_immediates;
if (native_immediates_threadsafe_.size() > 0) {
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
threadsafe_immediates.ConcatMove(std::move(native_immediates_threadsafe_));
}
while (drain_list(&threadsafe_immediates)) {}
}

void Environment::RequestInterruptFromV8() {
Expand Down
2 changes: 1 addition & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ class Environment : public MemoryRetainer {
inline void SetUnrefImmediate(Fn&& cb);
template <typename Fn>
// This behaves like SetImmediate() but can be called from any thread.
inline void SetImmediateThreadsafe(Fn&& cb);
inline void SetImmediateThreadsafe(Fn&& cb, bool refed = true);
// This behaves like V8's Isolate::RequestInterrupt(), but also accounts for
// the event loop (i.e. combines the V8 function with SetImmediate()).
// The passed callback may not throw exceptions.
Expand Down
85 changes: 21 additions & 64 deletions src/js_native_api_v8.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,39 +371,6 @@ class Reference : public RefBase {
v8impl::Persistent<v8::Value> _persistent;
};

class ArrayBufferReference final : public Reference {
public:
// Same signatures for ctor and New() as Reference, except this only works
// with ArrayBuffers:
template <typename... Args>
explicit ArrayBufferReference(napi_env env,
v8::Local<v8::ArrayBuffer> value,
Args&&... args)
: Reference(env, value, std::forward<Args>(args)...) {}

template <typename... Args>
static ArrayBufferReference* New(napi_env env,
v8::Local<v8::ArrayBuffer> value,
Args&&... args) {
return new ArrayBufferReference(env, value, std::forward<Args>(args)...);
}

private:
inline void Finalize(bool is_env_teardown) override {
if (is_env_teardown) {
v8::HandleScope handle_scope(_env->isolate);
v8::Local<v8::Value> obj = Get();
CHECK(!obj.IsEmpty());
CHECK(obj->IsArrayBuffer());
v8::Local<v8::ArrayBuffer> ab = obj.As<v8::ArrayBuffer>();
if (ab->IsDetachable())
ab->Detach();
}

Reference::Finalize(is_env_teardown);
}
};

enum UnwrapAction {
KeepWrap,
RemoveWrap
Expand Down Expand Up @@ -2710,37 +2677,27 @@ napi_status napi_create_external_arraybuffer(napi_env env,
napi_finalize finalize_cb,
void* finalize_hint,
napi_value* result) {
NAPI_PREAMBLE(env);
CHECK_ARG(env, result);

v8::Isolate* isolate = env->isolate;
// The buffer will be freed with v8impl::ArrayBufferReference::New()
// below, hence this BackingStore does not need to free the buffer.
std::unique_ptr<v8::BackingStore> backing =
v8::ArrayBuffer::NewBackingStore(external_data,
byte_length,
[](void*, size_t, void*){},
nullptr);
v8::Local<v8::ArrayBuffer> buffer =
v8::ArrayBuffer::New(isolate, std::move(backing));
v8::Maybe<bool> marked = env->mark_arraybuffer_as_untransferable(buffer);
CHECK_MAYBE_NOTHING(env, marked, napi_generic_failure);

if (finalize_cb != nullptr) {
// Create a self-deleting weak reference that invokes the finalizer
// callback and detaches the ArrayBuffer if it still exists on Environment
// teardown.
v8impl::ArrayBufferReference::New(env,
buffer,
0,
true,
finalize_cb,
external_data,
finalize_hint);
}

*result = v8impl::JsValueFromV8LocalValue(buffer);
return GET_RETURN_STATUS(env);
// The API contract here is that the cleanup function runs on the JS thread,
// and is able to use napi_env. Implementing that properly is hard, so use the
// `Buffer` variant for easier implementation.
napi_value buffer;
napi_status status;
status = napi_create_external_buffer(
env,
byte_length,
external_data,
finalize_cb,
finalize_hint,
&buffer);
if (status != napi_ok) return status;
return napi_get_typedarray_info(
env,
buffer,
nullptr,
nullptr,
nullptr,
result,
nullptr);
}

napi_status napi_get_arraybuffer_info(napi_env env,
Expand Down
140 changes: 75 additions & 65 deletions src/node_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,109 +69,130 @@ using v8::Uint32;
using v8::Uint32Array;
using v8::Uint8Array;
using v8::Value;
using v8::WeakCallbackInfo;

namespace {

class CallbackInfo {
public:
~CallbackInfo();

static inline void Free(char* data, void* hint);
static inline CallbackInfo* New(Environment* env,
Local<ArrayBuffer> object,
FreeCallback callback,
char* data,
void* hint = nullptr);
static inline Local<ArrayBuffer> CreateTrackedArrayBuffer(
Environment* env,
char* data,
size_t length,
FreeCallback callback,
void* hint);

CallbackInfo(const CallbackInfo&) = delete;
CallbackInfo& operator=(const CallbackInfo&) = delete;

private:
static void CleanupHook(void* data);
static void WeakCallback(const WeakCallbackInfo<CallbackInfo>&);
inline void WeakCallback(Isolate* isolate);
inline void OnBackingStoreFree();
inline void CallAndResetCallback();
inline CallbackInfo(Environment* env,
Local<ArrayBuffer> object,
FreeCallback callback,
char* data,
void* hint);
Global<ArrayBuffer> persistent_;
FreeCallback const callback_;
Mutex mutex_; // Protects callback_.
FreeCallback callback_;
char* const data_;
void* const hint_;
Environment* const env_;
};


void CallbackInfo::Free(char* data, void*) {
::free(data);
}

Local<ArrayBuffer> CallbackInfo::CreateTrackedArrayBuffer(
Environment* env,
char* data,
size_t length,
FreeCallback callback,
void* hint) {
CHECK_NOT_NULL(callback);
CHECK_IMPLIES(data == nullptr, length == 0);

CallbackInfo* self = new CallbackInfo(env, callback, data, hint);
std::unique_ptr<BackingStore> bs =
ArrayBuffer::NewBackingStore(data, length, [](void*, size_t, void* arg) {
static_cast<CallbackInfo*>(arg)->OnBackingStoreFree();
}, self);
Local<ArrayBuffer> ab = ArrayBuffer::New(env->isolate(), std::move(bs));

// V8 simply ignores the BackingStore deleter callback if data == nullptr,
// but our API contract requires it being called.
if (data == nullptr) {
ab->Detach();
self->OnBackingStoreFree(); // This calls `callback` asynchronously.
} else {
// Store the ArrayBuffer so that we can detach it later.
self->persistent_.Reset(env->isolate(), ab);
self->persistent_.SetWeak();
}

CallbackInfo* CallbackInfo::New(Environment* env,
Local<ArrayBuffer> object,
FreeCallback callback,
char* data,
void* hint) {
return new CallbackInfo(env, object, callback, data, hint);
return ab;
}


CallbackInfo::CallbackInfo(Environment* env,
Local<ArrayBuffer> object,
FreeCallback callback,
char* data,
void* hint)
: persistent_(env->isolate(), object),
callback_(callback),
: callback_(callback),
data_(data),
hint_(hint),
env_(env) {
std::shared_ptr<BackingStore> obj_backing = object->GetBackingStore();
CHECK_EQ(data_, static_cast<char*>(obj_backing->Data()));
if (object->ByteLength() != 0)
CHECK_NOT_NULL(data_);

persistent_.SetWeak(this, WeakCallback, v8::WeakCallbackType::kParameter);
env->AddCleanupHook(CleanupHook, this);
env->isolate()->AdjustAmountOfExternalAllocatedMemory(sizeof(*this));
}


CallbackInfo::~CallbackInfo() {
persistent_.Reset();
env_->RemoveCleanupHook(CleanupHook, this);
}


void CallbackInfo::CleanupHook(void* data) {
CallbackInfo* self = static_cast<CallbackInfo*>(data);

{
HandleScope handle_scope(self->env_->isolate());
Local<ArrayBuffer> ab = self->persistent_.Get(self->env_->isolate());
CHECK(!ab.IsEmpty());
if (ab->IsDetachable())
if (!ab.IsEmpty() && ab->IsDetachable()) {
ab->Detach();
self->persistent_.Reset();
}
}

self->WeakCallback(self->env_->isolate());
// Call the callback in this case, but don't delete `this` yet because the
// BackingStore deleter callback will do so later.
self->CallAndResetCallback();
}

void CallbackInfo::CallAndResetCallback() {
FreeCallback callback;
{
Mutex::ScopedLock lock(mutex_);
callback = callback_;
callback_ = nullptr;
}
if (callback != nullptr) {
// Clean up all Environment-related state and run the callback.
env_->RemoveCleanupHook(CleanupHook, this);
int64_t change_in_bytes = -static_cast<int64_t>(sizeof(*this));
env_->isolate()->AdjustAmountOfExternalAllocatedMemory(change_in_bytes);

void CallbackInfo::WeakCallback(
const WeakCallbackInfo<CallbackInfo>& data) {
CallbackInfo* self = data.GetParameter();
self->WeakCallback(data.GetIsolate());
callback(data_, hint_);
}
}


void CallbackInfo::WeakCallback(Isolate* isolate) {
callback_(data_, hint_);
int64_t change_in_bytes = -static_cast<int64_t>(sizeof(*this));
isolate->AdjustAmountOfExternalAllocatedMemory(change_in_bytes);
delete this;
void CallbackInfo::OnBackingStoreFree() {
// This method should always release the memory for `this`.
std::unique_ptr<CallbackInfo> self { this };
Mutex::ScopedLock lock(mutex_);
// If callback_ == nullptr, that means that the callback has already run from
// the cleanup hook, and there is nothing left to do here besides to clean
// up the memory involved. In particular, the underlying `Environment` may
// be gone at this point, so don’t attempt to call SetImmediateThreadsafe().
if (callback_ == nullptr) return;

env_->SetImmediateThreadsafe([self = std::move(self)](Environment* env) {
CHECK_EQ(self->env_, env); // Consistency check.

self->CallAndResetCallback();
});
}


Expand Down Expand Up @@ -408,26 +429,15 @@ MaybeLocal<Object> New(Environment* env,
return Local<Object>();
}


// The buffer will be released by a CallbackInfo::New() below,
// hence this BackingStore callback is empty.
std::unique_ptr<BackingStore> backing =
ArrayBuffer::NewBackingStore(data,
length,
[](void*, size_t, void*){},
nullptr);
Local<ArrayBuffer> ab = ArrayBuffer::New(env->isolate(),
std::move(backing));
Local<ArrayBuffer> ab =
CallbackInfo::CreateTrackedArrayBuffer(env, data, length, callback, hint);
if (ab->SetPrivate(env->context(),
env->arraybuffer_untransferable_private_symbol(),
True(env->isolate())).IsNothing()) {
callback(data, hint);
return Local<Object>();
}
MaybeLocal<Uint8Array> ui = Buffer::New(env, ab, 0, length);

CallbackInfo::New(env, ab, callback, data, hint);

if (ui.IsEmpty())
return MaybeLocal<Object>();

Expand Down
2 changes: 1 addition & 1 deletion src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
env, std::move(snapshot));
Local<Value> args[] = { stream->object() };
taker->MakeCallback(env->ondone_string(), arraysize(args), args);
});
}, /* refed */ false);
});
args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
}
Expand Down
Loading