From a784258444b052dfd31cca90db57b21dc38bb1eb Mon Sep 17 00:00:00 2001 From: Darshan Sen Date: Sun, 3 Oct 2021 10:24:58 +0530 Subject: [PATCH] src: remove usage of `AllocatedBuffer` from `stream_*` Signed-off-by: Darshan Sen PR-URL: https://github.com/nodejs/node/pull/40293 Reviewed-By: James M Snell Reviewed-By: Anna Henningsen --- src/stream_base-inl.h | 7 ++-- src/stream_base.cc | 89 ++++++++++++++++++++++++------------------- src/stream_base.h | 5 +-- src/stream_pipe.cc | 15 ++++---- src/stream_pipe.h | 3 +- 5 files changed, 63 insertions(+), 56 deletions(-) diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 28b0b209922cc2..22e5518b589714 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -3,7 +3,6 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS -#include "allocated_buffer-inl.h" #include "async_wrap-inl.h" #include "base_object-inl.h" #include "node.h" @@ -270,9 +269,9 @@ ShutdownWrap* ShutdownWrap::FromObject( return FromObject(base_obj->object()); } -void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) { - CHECK_NULL(storage_.data()); - storage_ = std::move(storage); +void WriteWrap::SetBackingStore(std::unique_ptr bs) { + CHECK(!backing_store_); + backing_store_ = std::move(bs); } void StreamReq::Done(int status, const char* error_str) { diff --git a/src/stream_base.cc b/src/stream_base.cc index b2992f9a051b6d..5bef709ef0a559 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -19,6 +19,7 @@ namespace node { using v8::Array; using v8::ArrayBuffer; +using v8::BackingStore; using v8::ConstructorBehavior; using v8::Context; using v8::DontDelete; @@ -29,6 +30,7 @@ using v8::FunctionCallbackInfo; using v8::FunctionTemplate; using v8::HandleScope; using v8::Integer; +using v8::Isolate; using v8::Local; using v8::MaybeLocal; using v8::Object; @@ -80,6 +82,8 @@ void StreamBase::SetWriteResult(const StreamWriteResult& res) { int StreamBase::Writev(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); + Isolate* isolate = env->isolate(); + Local context = env->context(); CHECK(args[0]->IsObject()); CHECK(args[1]->IsArray()); @@ -102,21 +106,21 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { if (!all_buffers) { // Determine storage size first for (size_t i = 0; i < count; i++) { - Local chunk = chunks->Get(env->context(), i * 2).ToLocalChecked(); + Local chunk = chunks->Get(context, i * 2).ToLocalChecked(); if (Buffer::HasInstance(chunk)) continue; // Buffer chunk, no additional storage required // String chunk - Local string = chunk->ToString(env->context()).ToLocalChecked(); - enum encoding encoding = ParseEncoding(env->isolate(), - chunks->Get(env->context(), i * 2 + 1).ToLocalChecked()); + Local string = chunk->ToString(context).ToLocalChecked(); + enum encoding encoding = ParseEncoding(isolate, + chunks->Get(context, i * 2 + 1).ToLocalChecked()); size_t chunk_size; if (encoding == UTF8 && string->Length() > 65535 && - !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size)) + !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) return 0; - else if (!StringBytes::StorageSize(env->isolate(), string, encoding) + else if (!StringBytes::StorageSize(isolate, string, encoding) .To(&chunk_size)) return 0; storage_size += chunk_size; @@ -126,20 +130,22 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { return UV_ENOBUFS; } else { for (size_t i = 0; i < count; i++) { - Local chunk = chunks->Get(env->context(), i).ToLocalChecked(); + Local chunk = chunks->Get(context, i).ToLocalChecked(); bufs[i].base = Buffer::Data(chunk); bufs[i].len = Buffer::Length(chunk); } } - AllocatedBuffer storage; - if (storage_size > 0) - storage = AllocatedBuffer::AllocateManaged(env, storage_size); + std::unique_ptr bs; + if (storage_size > 0) { + NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); + bs = ArrayBuffer::NewBackingStore(isolate, storage_size); + } offset = 0; if (!all_buffers) { for (size_t i = 0; i < count; i++) { - Local chunk = chunks->Get(env->context(), i * 2).ToLocalChecked(); + Local chunk = chunks->Get(context, i * 2).ToLocalChecked(); // Write buffer if (Buffer::HasInstance(chunk)) { @@ -150,13 +156,14 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { // Write string CHECK_LE(offset, storage_size); - char* str_storage = storage.data() + offset; - size_t str_size = storage.size() - offset; - - Local string = chunk->ToString(env->context()).ToLocalChecked(); - enum encoding encoding = ParseEncoding(env->isolate(), - chunks->Get(env->context(), i * 2 + 1).ToLocalChecked()); - str_size = StringBytes::Write(env->isolate(), + char* str_storage = + static_cast(bs ? bs->Data() : nullptr) + offset; + size_t str_size = (bs ? bs->ByteLength() : 0) - offset; + + Local string = chunk->ToString(context).ToLocalChecked(); + enum encoding encoding = ParseEncoding(isolate, + chunks->Get(context, i * 2 + 1).ToLocalChecked()); + str_size = StringBytes::Write(isolate, str_storage, str_size, string, @@ -169,9 +176,8 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj); SetWriteResult(res); - if (res.wrap != nullptr && storage_size > 0) { - res.wrap->SetAllocatedStorage(std::move(storage)); - } + if (res.wrap != nullptr && storage_size > 0) + res.wrap->SetBackingStore(std::move(bs)); return res.err; } @@ -216,6 +222,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { template int StreamBase::WriteString(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); + Isolate* isolate = env->isolate(); CHECK(args[0]->IsObject()); CHECK(args[1]->IsString()); @@ -230,9 +237,9 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { // computing their actual size, rather than tripling the storage. size_t storage_size; if (enc == UTF8 && string->Length() > 65535 && - !StringBytes::Size(env->isolate(), string, enc).To(&storage_size)) + !StringBytes::Size(isolate, string, enc).To(&storage_size)) return 0; - else if (!StringBytes::StorageSize(env->isolate(), string, enc) + else if (!StringBytes::StorageSize(isolate, string, enc) .To(&storage_size)) return 0; @@ -248,7 +255,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { bool try_write = storage_size <= sizeof(stack_storage) && (!IsIPCPipe() || send_handle_obj.IsEmpty()); if (try_write) { - data_size = StringBytes::Write(env->isolate(), + data_size = StringBytes::Write(isolate, stack_storage, storage_size, string, @@ -274,18 +281,20 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { CHECK_EQ(count, 1); } - AllocatedBuffer data; + std::unique_ptr bs; if (try_write) { // Copy partial data - data = AllocatedBuffer::AllocateManaged(env, buf.len); - memcpy(data.data(), buf.base, buf.len); + NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); + bs = ArrayBuffer::NewBackingStore(isolate, buf.len); + memcpy(static_cast(bs->Data()), buf.base, buf.len); data_size = buf.len; } else { // Write it - data = AllocatedBuffer::AllocateManaged(env, storage_size); - data_size = StringBytes::Write(env->isolate(), - data.data(), + NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); + bs = ArrayBuffer::NewBackingStore(isolate, storage_size); + data_size = StringBytes::Write(isolate, + static_cast(bs->Data()), storage_size, string, enc); @@ -293,7 +302,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { CHECK_LE(data_size, storage_size); - buf = uv_buf_init(data.data(), data_size); + buf = uv_buf_init(static_cast(bs->Data()), data_size); uv_stream_t* send_handle = nullptr; @@ -312,9 +321,8 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { res.bytes += synchronously_written; SetWriteResult(res); - if (res.wrap != nullptr) { - res.wrap->SetAllocatedStorage(std::move(data)); - } + if (res.wrap != nullptr) + res.wrap->SetBackingStore(std::move(bs)); return res.err; } @@ -511,16 +519,17 @@ void StreamResource::ClearError() { uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) { CHECK_NOT_NULL(stream_); Environment* env = static_cast(stream_)->stream_env(); - return AllocatedBuffer::AllocateManaged(env, suggested_size).release(); + return env->allocate_managed_buffer(suggested_size); } void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { CHECK_NOT_NULL(stream_); StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); - HandleScope handle_scope(env->isolate()); + Isolate* isolate = env->isolate(); + HandleScope handle_scope(isolate); Context::Scope context_scope(env->context()); - AllocatedBuffer buf(env, buf_); + std::unique_ptr bs = env->release_managed_buffer(buf_); if (nread <= 0) { if (nread < 0) @@ -528,10 +537,10 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { return; } - CHECK_LE(static_cast(nread), buf.size()); - buf.Resize(nread); + CHECK_LE(static_cast(nread), bs->ByteLength()); + bs = BackingStore::Reallocate(isolate, std::move(bs), nread); - stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer()); + stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs))); } diff --git a/src/stream_base.h b/src/stream_base.h index e0da891501ac85..019048fbbd90ba 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -4,7 +4,6 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "env.h" -#include "allocated_buffer.h" #include "async_wrap.h" #include "node.h" #include "util.h" @@ -90,7 +89,7 @@ class ShutdownWrap : public StreamReq { class WriteWrap : public StreamReq { public: - inline void SetAllocatedStorage(AllocatedBuffer&& storage); + inline void SetBackingStore(std::unique_ptr bs); inline WriteWrap( StreamBase* stream, @@ -105,7 +104,7 @@ class WriteWrap : public StreamReq { void OnDone(int status) override; private: - AllocatedBuffer storage_; + std::unique_ptr backing_store_; }; diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc index afd7ec36eef294..aa636dbb75e9ed 100644 --- a/src/stream_pipe.cc +++ b/src/stream_pipe.cc @@ -1,11 +1,11 @@ #include "stream_pipe.h" -#include "allocated_buffer-inl.h" #include "stream_base-inl.h" #include "node_buffer.h" #include "util-inl.h" namespace node { +using v8::BackingStore; using v8::Context; using v8::Function; using v8::FunctionCallbackInfo; @@ -118,13 +118,13 @@ uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) { StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); size_t size = std::min(suggested_size, pipe->wanted_data_); CHECK_GT(size, 0); - return AllocatedBuffer::AllocateManaged(pipe->env(), size).release(); + return pipe->env()->allocate_managed_buffer(size); } void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); - AllocatedBuffer buf(pipe->env(), buf_); + std::unique_ptr bs = pipe->env()->release_managed_buffer(buf_); if (nread < 0) { // EOF or error; stop reading and pass the error to the previous listener // (which might end up in JS). @@ -144,19 +144,20 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread, return; } - pipe->ProcessData(nread, std::move(buf)); + pipe->ProcessData(nread, std::move(bs)); } -void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) { +void StreamPipe::ProcessData(size_t nread, + std::unique_ptr bs) { CHECK(uses_wants_write_ || pending_writes_ == 0); - uv_buf_t buffer = uv_buf_init(buf.data(), nread); + uv_buf_t buffer = uv_buf_init(static_cast(bs->Data()), nread); StreamWriteResult res = sink()->Write(&buffer, 1); pending_writes_++; if (!res.async) { writable_listener_.OnStreamAfterWrite(nullptr, res.err); } else { is_reading_ = false; - res.wrap->SetAllocatedStorage(std::move(buf)); + res.wrap->SetBackingStore(std::move(bs)); if (source() != nullptr) source()->ReadStop(); } diff --git a/src/stream_pipe.h b/src/stream_pipe.h index 36179c95f74a93..eb2e914e793175 100644 --- a/src/stream_pipe.h +++ b/src/stream_pipe.h @@ -4,7 +4,6 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "stream_base.h" -#include "allocated_buffer.h" namespace node { @@ -43,7 +42,7 @@ class StreamPipe : public AsyncWrap { // `OnStreamWantsWrite()` support. size_t wanted_data_ = 0; - void ProcessData(size_t nread, AllocatedBuffer&& buf); + void ProcessData(size_t nread, std::unique_ptr bs); class ReadableListener : public StreamListener { public: