diff --git a/lib/_http_server.js b/lib/_http_server.js index 9541993df53321..111a8525c47d62 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -680,7 +680,7 @@ function onSocketPause() { function unconsume(parser, socket) { if (socket._handle) { if (parser._consumed) - parser.unconsume(socket._handle._externalStream); + parser.unconsume(); parser._consumed = false; socket.removeListener('pause', onSocketPause); socket.removeListener('resume', onSocketResume); diff --git a/src/connection_wrap.cc b/src/connection_wrap.cc index 8de77f361dcde4..a6cf67ceee2477 100644 --- a/src/connection_wrap.cc +++ b/src/connection_wrap.cc @@ -3,6 +3,7 @@ #include "connect_wrap.h" #include "env-inl.h" #include "pipe_wrap.h" +#include "stream_base-inl.h" #include "stream_wrap.h" #include "tcp_wrap.h" #include "util-inl.h" diff --git a/src/js_stream.cc b/src/js_stream.cc index 7d1115f12ac3e2..9e67a2094ded89 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -25,9 +25,6 @@ JSStream::JSStream(Environment* env, Local obj) StreamBase(env) { node::Wrap(obj, this); MakeWeak(this); - - set_alloc_cb({ OnAllocImpl, this }); - set_read_cb({ OnReadImpl, this }); } @@ -35,45 +32,6 @@ JSStream::~JSStream() { } -void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) { - buf->base = Malloc(size); - buf->len = size; -} - - -void JSStream::OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - JSStream* wrap = static_cast(ctx); - CHECK_NE(wrap, nullptr); - Environment* env = wrap->env(); - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - if (nread < 0) { - if (buf != nullptr && buf->base != nullptr) - free(buf->base); - wrap->EmitData(nread, Local(), Local()); - return; - } - - if (nread == 0) { - if (buf->base != nullptr) - free(buf->base); - return; - } - - CHECK_LE(static_cast(nread), buf->len); - char* base = node::Realloc(buf->base, nread); - - CHECK_EQ(pending, UV_UNKNOWN_HANDLE); - - Local obj = Buffer::New(env, base, nread).ToLocalChecked(); - wrap->EmitData(nread, obj, Local()); -} - - AsyncWrap* JSStream::GetAsyncWrap() { return static_cast(this); } @@ -212,18 +170,19 @@ void JSStream::ReadBuffer(const FunctionCallbackInfo& args) { char* data = Buffer::Data(args[0]); int len = Buffer::Length(args[0]); - do { - uv_buf_t buf; + // Repeatedly ask the stream's owner for memory, copy the data that we + // just read from JS into those buffers and emit them as reads. + while (len != 0) { + uv_buf_t buf = wrap->EmitAlloc(len); ssize_t avail = len; - wrap->EmitAlloc(len, &buf); if (static_cast(buf.len) < avail) avail = buf.len; memcpy(buf.base, data, avail); data += avail; len -= avail; - wrap->EmitRead(avail, &buf); - } while (len != 0); + wrap->EmitRead(avail, buf); + } } @@ -231,7 +190,7 @@ void JSStream::EmitEOF(const FunctionCallbackInfo& args) { JSStream* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); - wrap->EmitRead(UV_EOF, nullptr); + wrap->EmitRead(UV_EOF); } diff --git a/src/node_http2.cc b/src/node_http2.cc index bd7eeee8655e52..bd2e93a13c208b 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -531,24 +531,12 @@ Http2Session::Http2Session(Environment* env, outgoing_buffers_.reserve(32); } -void Http2Session::Unconsume() { - if (stream_ != nullptr) { - DEBUG_HTTP2SESSION(this, "unconsuming the i/o stream"); - stream_->set_destruct_cb({ nullptr, nullptr }); - stream_->set_alloc_cb({ nullptr, nullptr }); - stream_->set_read_cb({ nullptr, nullptr }); - stream_->Unconsume(); - stream_ = nullptr; - } -} - Http2Session::~Http2Session() { CHECK_EQ(flags_ & SESSION_STATE_HAS_SCOPE, 0); if (!object().IsEmpty()) ClearWrap(object()); persistent().Reset(); CHECK(persistent().IsEmpty()); - Unconsume(); DEBUG_HTTP2SESSION(this, "freeing nghttp2 session"); nghttp2_session_del(session_); } @@ -646,7 +634,8 @@ void Http2Session::Close(uint32_t code, bool socket_closed) { DEBUG_HTTP2SESSION2(this, "terminating session with code %d", code); CHECK_EQ(nghttp2_session_terminate_session(session_, code), 0); } else { - Unconsume(); + if (stream_ != nullptr) + stream_->RemoveStreamListener(this); } // If there are outstanding pings, those will need to be canceled, do @@ -1044,22 +1033,38 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle, stream->statistics_.received_bytes += len; - // There is a single large array buffer for the entire data read from the - // network; create a slice of that array buffer and emit it as the - // received data buffer. - CHECK(!session->stream_buf_ab_.IsEmpty()); - size_t offset = reinterpret_cast(data) - session->stream_buf_; - // Verify that the data offset is inside the current read buffer. - CHECK_LE(offset, session->stream_buf_size_); - - Local buf = - Buffer::New(env, session->stream_buf_ab_, offset, len).ToLocalChecked(); - - stream->EmitData(len, buf, Local()); - if (!stream->IsReading()) - stream->inbound_consumed_data_while_paused_ += len; - else - nghttp2_session_consume_stream(handle, id, len); + // Repeatedly ask the stream's owner for memory, and copy the read data + // into those buffers. + // The typical case is actually the exception here; Http2StreamListeners + // know about the HTTP2 session associated with this stream, so they know + // about the larger from-socket read buffer, so they do not require copying. + do { + uv_buf_t buf = stream->EmitAlloc(len); + ssize_t avail = len; + if (static_cast(buf.len) < avail) + avail = buf.len; + + // `buf.base == nullptr` is the default Http2StreamListener's way + // of saying that it wants a pointer to the raw original. + // Since it has access to the original socket buffer from which the data + // was read in the first place, it can use that to minizime ArrayBuffer + // allocations. + if (LIKELY(buf.base == nullptr)) + buf.base = reinterpret_cast(const_cast(data)); + else + memcpy(buf.base, data, avail); + data += avail; + len -= avail; + stream->EmitRead(avail, buf); + + // If the stream owner (e.g. the JS Http2Stream) wants more data, just + // tell nghttp2 that all data has been consumed. Otherwise, defer until + // more data is being requested. + if (stream->IsReading()) + nghttp2_session_consume_stream(handle, id, avail); + else + stream->inbound_consumed_data_while_paused_ += avail; + } while (len != 0); } return 0; } @@ -1129,6 +1134,38 @@ inline void Http2Session::GetTrailers(Http2Stream* stream, uint32_t* flags) { } } +uv_buf_t Http2StreamListener::OnStreamAlloc(size_t size) { + // See the comments in Http2Session::OnDataChunkReceived + // (which is the only possible call site for this method). + return uv_buf_init(nullptr, size); +} + +void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + Http2Stream* stream = static_cast(stream_); + Http2Session* session = stream->session(); + Environment* env = stream->env(); + + if (nread < 0) { + PassReadErrorToPreviousListener(nread); + return; + } + + CHECK(!session->stream_buf_ab_.IsEmpty()); + + // There is a single large array buffer for the entire data read from the + // network; create a slice of that array buffer and emit it as the + // received data buffer. + size_t offset = buf.base - session->stream_buf_.base; + + // Verify that the data offset is inside the current read buffer. + CHECK_LE(offset, session->stream_buf_.len); + CHECK_LE(offset + buf.len, session->stream_buf_.len); + + Local buffer = + Buffer::New(env, session->stream_buf_ab_, offset, nread).ToLocalChecked(); + + stream->CallJSOnreadMethod(nread, buffer); +} Http2Stream::SubmitTrailers::SubmitTrailers( Http2Session* session, @@ -1257,7 +1294,7 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) { return; if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { - stream->EmitData(UV_EOF, Local(), Local()); + stream->EmitRead(UV_EOF); } } @@ -1378,16 +1415,15 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) { } // Callback used when data has been written to the stream. -void Http2Session::OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx) { - Http2Session* session = static_cast(ctx); - DEBUG_HTTP2SESSION2(session, "write finished with status %d", status); +void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) { + DEBUG_HTTP2SESSION2(this, "write finished with status %d", status); // Inform all pending writes about their completion. - session->ClearOutgoing(status); + ClearOutgoing(status); - if (!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) { + if (!(flags_ & SESSION_STATE_WRITE_SCHEDULED)) { // Schedule a new write if nghttp2 wants to send data. - session->MaybeScheduleWrite(); + MaybeScheduleWrite(); } } @@ -1625,97 +1661,76 @@ WriteWrap* Http2Session::AllocateSend() { Local obj = env()->write_wrap_constructor_function() ->NewInstance(env()->context()).ToLocalChecked(); - return WriteWrap::New(env(), obj, stream_); -} - -// Allocates the data buffer used to receive inbound data from the i/o stream -void Http2Session::OnStreamAllocImpl(size_t suggested_size, - uv_buf_t* buf, - void* ctx) { - Http2Session* session = static_cast(ctx); - CHECK_EQ(session->stream_buf_, nullptr); - CHECK_EQ(session->stream_buf_size_, 0); - buf->base = session->stream_buf_ = Malloc(suggested_size); - buf->len = session->stream_buf_size_ = suggested_size; - session->IncrementCurrentSessionMemory(suggested_size); + return WriteWrap::New(env(), obj, static_cast(stream_)); } // Callback used to receive inbound data from the i/o stream -void Http2Session::OnStreamReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - Http2Session* session = static_cast(ctx); - Http2Scope h2scope(session); - CHECK_NE(session->stream_, nullptr); - DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread); +void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + Http2Scope h2scope(this); + CHECK_NE(stream_, nullptr); + DEBUG_HTTP2SESSION2(this, "receiving %d bytes", nread); + IncrementCurrentSessionMemory(buf.len); + CHECK(stream_buf_ab_.IsEmpty()); + if (nread <= 0) { - free(session->stream_buf_); + free(buf.base); if (nread < 0) { - uv_buf_t tmp_buf = uv_buf_init(nullptr, 0); - session->prev_read_cb_.fn(nread, - &tmp_buf, - pending, - session->prev_read_cb_.ctx); + PassReadErrorToPreviousListener(nread); } } else { // Only pass data on if nread > 0 + // Makre sure that there was no read previously active. + CHECK_EQ(stream_buf_.base, nullptr); + CHECK_EQ(stream_buf_.len, 0); + + // Remember the current buffer, so that OnDataChunkReceived knows the + // offset of a DATA frame's data into the socket read buffer. + stream_buf_ = uv_buf_init(buf.base, nread); + // Verify that currently: There is memory allocated into which // the data has been read, and that memory buffer is at least as large // as the amount of data we have read, but we have not yet made an // ArrayBuffer out of it. - CHECK_NE(session->stream_buf_, nullptr); - CHECK_EQ(session->stream_buf_, buf->base); - CHECK_EQ(session->stream_buf_size_, buf->len); - CHECK_GE(session->stream_buf_size_, static_cast(nread)); - CHECK(session->stream_buf_ab_.IsEmpty()); + CHECK_LE(static_cast(nread), stream_buf_.len); - Environment* env = session->env(); - Isolate* isolate = env->isolate(); + Isolate* isolate = env()->isolate(); HandleScope scope(isolate); - Local context = env->context(); - Context::Scope context_scope(context); + Context::Scope context_scope(env()->context()); // Create an array buffer for the read data. DATA frames will be emitted // as slices of this array buffer to avoid having to copy memory. - session->stream_buf_ab_ = + stream_buf_ab_ = ArrayBuffer::New(isolate, - session->stream_buf_, - session->stream_buf_size_, + buf.base, + nread, v8::ArrayBufferCreationMode::kInternalized); - uv_buf_t buf_ = uv_buf_init(buf->base, nread); - session->statistics_.data_received += nread; - ssize_t ret = session->Write(&buf_, 1); + statistics_.data_received += nread; + ssize_t ret = Write(&stream_buf_, 1); // Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef // ssize_t to int. Cast here so that the < 0 check actually works on // Windows. if (static_cast(ret) < 0) { - DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret); + DEBUG_HTTP2SESSION2(this, "fatal error receiving data: %d", ret); - Local argv[1] = { + Local argv[] = { Integer::New(isolate, ret), }; - session->MakeCallback(env->error_string(), arraysize(argv), argv); + MakeCallback(env()->error_string(), arraysize(argv), argv); } else { - DEBUG_HTTP2SESSION2(session, "processed %d bytes. wants more? %d", ret, - nghttp2_session_want_read(**session)); + DEBUG_HTTP2SESSION2(this, "processed %d bytes. wants more? %d", ret, + nghttp2_session_want_read(session_)); } } // Since we are finished handling this write, reset the stream buffer. // The memory has either been free()d or was handed over to V8. - session->DecrementCurrentSessionMemory(session->stream_buf_size_); - session->stream_buf_ = nullptr; - session->stream_buf_size_ = 0; - session->stream_buf_ab_ = Local(); -} + DecrementCurrentSessionMemory(buf.len); -void Http2Session::OnStreamDestructImpl(void* ctx) { - Http2Session* session = static_cast(ctx); - session->stream_ = nullptr; + stream_buf_ab_ = Local(); + stream_buf_ = uv_buf_init(nullptr, 0); } // Every Http2Session session is tightly bound to a single i/o StreamBase @@ -1724,14 +1739,7 @@ void Http2Session::OnStreamDestructImpl(void* ctx) { // C++ layer via the StreamBase API. void Http2Session::Consume(Local external) { StreamBase* stream = static_cast(external->Value()); - stream->Consume(); - stream_ = stream; - prev_alloc_cb_ = stream->alloc_cb(); - prev_read_cb_ = stream->read_cb(); - stream->set_alloc_cb({ Http2Session::OnStreamAllocImpl, this }); - stream->set_read_cb({ Http2Session::OnStreamReadImpl, this }); - stream->set_after_write_cb({ Http2Session::OnStreamAfterWriteImpl, this }); - stream->set_destruct_cb({ Http2Session::OnStreamDestructImpl, this }); + stream->PushStreamListener(this); DEBUG_HTTP2SESSION(this, "i/o stream consumed"); } @@ -1769,6 +1777,8 @@ Http2Stream::Http2Stream( if (options & STREAM_OPTION_GET_TRAILERS) flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS; + PushStreamListener(&stream_listener_); + if (options & STREAM_OPTION_EMPTY_PAYLOAD) Shutdown(); session->AddStream(this); diff --git a/src/node_http2.h b/src/node_http2.h index 9ca5c5c06e8a61..b22539f5119919 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -539,6 +539,12 @@ class Http2Priority { nghttp2_priority_spec spec; }; +class Http2StreamListener : public StreamListener { + public: + uv_buf_t OnStreamAlloc(size_t suggested_size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; +}; + class Http2Stream : public AsyncWrap, public StreamBase { public: @@ -751,6 +757,8 @@ class Http2Stream : public AsyncWrap, int64_t fd_offset_ = 0; int64_t fd_length_ = -1; + Http2StreamListener stream_listener_; + friend class Http2Session; }; @@ -802,7 +810,7 @@ class Http2Stream::Provider::Stream : public Http2Stream::Provider { }; -class Http2Session : public AsyncWrap { +class Http2Session : public AsyncWrap, public StreamListener { public: Http2Session(Environment* env, Local wrap, @@ -876,21 +884,11 @@ class Http2Session : public AsyncWrap { size_t self_size() const override { return sizeof(*this); } - char* stream_alloc() { - return stream_buf_; - } - inline void GetTrailers(Http2Stream* stream, uint32_t* flags); - static void OnStreamAllocImpl(size_t suggested_size, - uv_buf_t* buf, - void* ctx); - static void OnStreamReadImpl(ssize_t nread, - const uv_buf_t* bufs, - uv_handle_type pending, - void* ctx); - static void OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx); - static void OnStreamDestructImpl(void* ctx); + // Handle reads/writes from the underlying network transport. + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; + void OnStreamAfterWrite(WriteWrap* w, int status) override; // The JavaScript API static void New(const FunctionCallbackInfo& args); @@ -1078,16 +1076,12 @@ class Http2Session : public AsyncWrap { int flags_ = SESSION_STATE_NONE; // The StreamBase instance being used for i/o - StreamBase* stream_; - StreamResource::Callback prev_alloc_cb_; - StreamResource::Callback prev_read_cb_; padding_strategy_type padding_strategy_ = PADDING_STRATEGY_NONE; // use this to allow timeout tracking during long-lasting writes uint32_t chunks_sent_since_last_write_ = 0; - char* stream_buf_ = nullptr; - size_t stream_buf_size_ = 0; + uv_buf_t stream_buf_ = uv_buf_init(nullptr, 0); v8::Local stream_buf_ab_; size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS; @@ -1103,6 +1097,7 @@ class Http2Session : public AsyncWrap { void ClearOutgoing(int status); friend class Http2Scope; + friend class Http2StreamListener; }; class Http2SessionPerformanceEntry : public PerformanceEntry { diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index 9debb8a205ef1c..d4044f8bbeea7b 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -144,7 +144,7 @@ struct StringPtr { }; -class Parser : public AsyncWrap { +class Parser : public AsyncWrap, public StreamListener { public: Parser(Environment* env, Local wrap, enum http_parser_type type) : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_HTTPPARSER), @@ -494,14 +494,7 @@ class Parser : public AsyncWrap { Local stream_obj = args[0].As(); StreamBase* stream = static_cast(stream_obj->Value()); CHECK_NE(stream, nullptr); - - stream->Consume(); - - parser->prev_alloc_cb_ = stream->alloc_cb(); - parser->prev_read_cb_ = stream->read_cb(); - - stream->set_alloc_cb({ OnAllocImpl, parser }); - stream->set_read_cb({ OnReadImpl, parser }); + stream->PushStreamListener(parser); } @@ -510,22 +503,10 @@ class Parser : public AsyncWrap { ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder()); // Already unconsumed - if (parser->prev_alloc_cb_.is_empty()) + if (parser->stream_ == nullptr) return; - // Restore stream's callbacks - if (args.Length() == 1 && args[0]->IsExternal()) { - Local stream_obj = args[0].As(); - StreamBase* stream = static_cast(stream_obj->Value()); - CHECK_NE(stream, nullptr); - - stream->set_alloc_cb(parser->prev_alloc_cb_); - stream->set_read_cb(parser->prev_read_cb_); - stream->Unconsume(); - } - - parser->prev_alloc_cb_.clear(); - parser->prev_read_cb_.clear(); + parser->stream_->RemoveStreamListener(parser); } @@ -544,33 +525,19 @@ class Parser : public AsyncWrap { protected: static const size_t kAllocBufferSize = 64 * 1024; - static void OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { - Parser* parser = static_cast(ctx); - Environment* env = parser->env(); + uv_buf_t OnStreamAlloc(size_t suggested_size) override { + if (env()->http_parser_buffer() == nullptr) + env()->set_http_parser_buffer(new char[kAllocBufferSize]); - if (env->http_parser_buffer() == nullptr) - env->set_http_parser_buffer(new char[kAllocBufferSize]); - - buf->base = env->http_parser_buffer(); - buf->len = kAllocBufferSize; + return uv_buf_init(env()->http_parser_buffer(), kAllocBufferSize); } - static void OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - Parser* parser = static_cast(ctx); - HandleScope scope(parser->env()->isolate()); + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { + HandleScope scope(env()->isolate()); if (nread < 0) { - uv_buf_t tmp_buf; - tmp_buf.base = nullptr; - tmp_buf.len = 0; - parser->prev_read_cb_.fn(nread, - &tmp_buf, - pending, - parser->prev_read_cb_.ctx); + PassReadErrorToPreviousListener(nread); return; } @@ -578,27 +545,27 @@ class Parser : public AsyncWrap { if (nread == 0) return; - parser->current_buffer_.Clear(); - Local ret = parser->Execute(buf->base, nread); + current_buffer_.Clear(); + Local ret = Execute(buf.base, nread); // Exception if (ret.IsEmpty()) return; - Local obj = parser->object(); - Local cb = obj->Get(kOnExecute); + Local cb = + object()->Get(env()->context(), kOnExecute).ToLocalChecked(); if (!cb->IsFunction()) return; // Hooks for GetCurrentBuffer - parser->current_buffer_len_ = nread; - parser->current_buffer_data_ = buf->base; + current_buffer_len_ = nread; + current_buffer_data_ = buf.base; - parser->MakeCallback(cb.As(), 1, &ret); + MakeCallback(cb.As(), 1, &ret); - parser->current_buffer_len_ = 0; - parser->current_buffer_data_ = nullptr; + current_buffer_len_ = 0; + current_buffer_data_ = nullptr; } @@ -713,8 +680,6 @@ class Parser : public AsyncWrap { Local current_buffer_; size_t current_buffer_len_; char* current_buffer_data_; - StreamResource::Callback prev_alloc_cb_; - StreamResource::Callback prev_read_cb_; // These are helper functions for filling `http_parser_settings`, which turn // a member function of Parser into a C-style HTTP parser callback. diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index c5958a2271a83e..016ce480b6a809 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -29,6 +29,7 @@ #include "node_buffer.h" #include "node_wrap.h" #include "connect_wrap.h" +#include "stream_base-inl.h" #include "stream_wrap.h" #include "util-inl.h" diff --git a/src/process_wrap.cc b/src/process_wrap.cc index b01ef56270767e..314131e1dd319f 100644 --- a/src/process_wrap.cc +++ b/src/process_wrap.cc @@ -22,6 +22,7 @@ #include "env-inl.h" #include "handle_wrap.h" #include "node_wrap.h" +#include "stream_base-inl.h" #include "util-inl.h" #include diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index cdcff67cc55e66..287978a87034eb 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -25,6 +25,87 @@ using v8::Value; using AsyncHooks = Environment::AsyncHooks; + +inline StreamListener::~StreamListener() { + if (stream_ != nullptr) + stream_->RemoveStreamListener(this); +} + +inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamRead(nread, + uv_buf_init(nullptr, 0), + UV_UNKNOWN_HANDLE); +} + + +inline StreamResource::~StreamResource() { + while (listener_ != nullptr) { + listener_->OnStreamDestroy(); + RemoveStreamListener(listener_); + } +} + +inline void StreamResource::PushStreamListener(StreamListener* listener) { + CHECK_NE(listener, nullptr); + CHECK_EQ(listener->stream_, nullptr); + + listener->previous_listener_ = listener_; + listener->stream_ = this; + + listener_ = listener; +} + +inline void StreamResource::RemoveStreamListener(StreamListener* listener) { + CHECK_NE(listener, nullptr); + + StreamListener* previous; + StreamListener* current; + + // Remove from the linked list. + for (current = listener_, previous = nullptr; + /* No loop condition because we want a crash if listener is not found */ + ; previous = current, current = current->previous_listener_) { + CHECK_NE(current, nullptr); + if (current == listener) { + if (previous != nullptr) + previous->previous_listener_ = current->previous_listener_; + else + listener_ = listener->previous_listener_; + break; + } + } + + listener->stream_ = nullptr; + listener->previous_listener_ = nullptr; +} + + +inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { + return listener_->OnStreamAlloc(suggested_size); +} + +inline void StreamResource::EmitRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending) { + if (nread > 0) + bytes_read_ += static_cast(nread); + listener_->OnStreamRead(nread, buf, pending); +} + +inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { + listener_->OnStreamAfterWrite(w, status); +} + + +inline StreamBase::StreamBase(Environment* env) : env_(env) { + PushStreamListener(&default_listener_); +} + +inline Environment* StreamBase::stream_env() const { + return env_; +} + template void StreamBase::AddMethods(Environment* env, Local t, @@ -70,8 +151,8 @@ void StreamBase::AddMethods(Environment* env, Local(), attributes); - env->SetProtoMethod(t, "readStart", JSMethod); - env->SetProtoMethod(t, "readStop", JSMethod); + env->SetProtoMethod(t, "readStart", JSMethod); + env->SetProtoMethod(t, "readStop", JSMethod); if ((flags & kFlagNoShutdown) == 0) env->SetProtoMethod(t, "shutdown", JSMethod); if ((flags & kFlagHasWritev) != 0) diff --git a/src/stream_base.cc b/src/stream_base.cc index 0fb801ddd57445..9acf2273abd78b 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -34,12 +34,12 @@ template int StreamBase::WriteString( const FunctionCallbackInfo& args); -int StreamBase::ReadStart(const FunctionCallbackInfo& args) { +int StreamBase::ReadStartJS(const FunctionCallbackInfo& args) { return ReadStart(); } -int StreamBase::ReadStop(const FunctionCallbackInfo& args) { +int StreamBase::ReadStopJS(const FunctionCallbackInfo& args) { return ReadStop(); } @@ -437,9 +437,9 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { } -void StreamBase::EmitData(ssize_t nread, - Local buf, - Local handle) { +void StreamBase::CallJSOnreadMethod(ssize_t nread, + Local buf, + Local handle) { Environment* env = env_; Local argv[] = { @@ -490,4 +490,43 @@ void StreamResource::ClearError() { // No-op } + +uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) { + return uv_buf_init(Malloc(suggested_size), suggested_size); +} + +void StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + // This cannot be virtual because it is just as valid to override the other + // OnStreamRead() callback. + CHECK(0 && "OnStreamRead() needs to be implemented"); +} + +void StreamListener::OnStreamRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending) { + CHECK_EQ(pending, UV_UNKNOWN_HANDLE); + OnStreamRead(nread, buf); +} + + +void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + CHECK_NE(stream_, nullptr); + StreamBase* stream = static_cast(stream_); + Environment* env = stream->stream_env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + if (nread <= 0) { + free(buf.base); + if (nread < 0) + stream->CallJSOnreadMethod(nread, Local()); + return; + } + + CHECK_LE(static_cast(nread), buf.len); + + Local obj = Buffer::New(env, buf.base, nread).ToLocalChecked(); + stream->CallJSOnreadMethod(nread, obj); +} + } // namespace node diff --git a/src/stream_base.h b/src/stream_base.h index d063176b04a4db..0b176d11819fca 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -15,6 +15,7 @@ namespace node { // Forward declarations class StreamBase; +class StreamResource; template class StreamReq { @@ -123,38 +124,78 @@ class WriteWrap : public ReqWrap, const size_t storage_size_; }; -class StreamResource { + +// This is the generic interface for objects that control Node.js' C++ streams. +// For example, the default `EmitToJSStreamListener` emits a stream's data +// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream. +class StreamListener { public: - template - struct Callback { - Callback() : fn(nullptr), ctx(nullptr) {} - Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {} - Callback(const Callback&) = default; - - inline bool is_empty() { return fn == nullptr; } - inline void clear() { - fn = nullptr; - ctx = nullptr; - } + virtual ~StreamListener(); + + // This is called when a stream wants to allocate memory immediately before + // reading data into the freshly allocated buffer (i.e. it is always followed + // by a `OnStreamRead()` call). + // This memory may be statically or dynamically allocated; for example, + // a protocol parser may want to read data into a static buffer if it knows + // that all data is going to be fully handled during the next + // `OnStreamRead()` call. + // The returned buffer does not need to contain `suggested_size` bytes. + // The default implementation of this method returns a buffer that has exactly + // the suggested size and is allocated using malloc(). + virtual uv_buf_t OnStreamAlloc(size_t suggested_size); + + // `OnStreamRead()` is called when data is available on the socket and has + // been read into the buffer provided by `OnStreamAlloc()`. + // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer + // with base nullpptr in case of an error. + // `nread` is the number of read bytes (which is at most the buffer length), + // or, if negative, a libuv error code. + // The variant with a `uv_handle_type` argument is used by libuv-backed + // streams for handle transfers (e.g. passing net.Socket instances between + // cluster workers). For all other streams, overriding the simple variant + // should be sufficient. + // By default, the second variant crashes if `pending` is set and otherwise + // calls the simple variant. + virtual void OnStreamRead(ssize_t nread, + const uv_buf_t& buf) = 0; + virtual void OnStreamRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending); + + // This is called once a Write has finished. `status` may be 0 or, + // if negative, a libuv error code. + virtual void OnStreamAfterWrite(WriteWrap* w, int status) {} + + // This is called immediately before the stream is destroyed. + virtual void OnStreamDestroy() {} - T fn; - void* ctx; - }; + protected: + // Pass along a read error to the `StreamListener` instance that was active + // before this one. For example, a protocol parser does not care about read + // errors and may instead want to let the original handler + // (e.g. the JS handler) take care of the situation. + void PassReadErrorToPreviousListener(ssize_t nread); - typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx); - typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx); - typedef void (*ReadCb)(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); - typedef void (*DestructCb)(void* ctx); + StreamResource* stream_ = nullptr; + StreamListener* previous_listener_ = nullptr; - StreamResource() : bytes_read_(0) { - } - virtual ~StreamResource() { - if (!destruct_cb_.is_empty()) - destruct_cb_.fn(destruct_cb_.ctx); - } + friend class StreamResource; +}; + + +// A default emitter that just pushes data chunks as Buffer instances to +// JS land via the handle’s .ondata method. +class EmitToJSStreamListener : public StreamListener { + public: + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; +}; + + +// A generic stream, comparable to JS land’s `Duplex` streams. +// A stream is always controlled through one `StreamListener` instance. +class StreamResource { + public: + virtual ~StreamResource(); virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); @@ -162,50 +203,45 @@ class StreamResource { uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) = 0; - virtual const char* Error() const; - virtual void ClearError(); - - // Events - inline void EmitAfterWrite(WriteWrap* w, int status) { - if (!after_write_cb_.is_empty()) - after_write_cb_.fn(w, status, after_write_cb_.ctx); - } - inline void EmitAlloc(size_t size, uv_buf_t* buf) { - if (!alloc_cb_.is_empty()) - alloc_cb_.fn(size, buf, alloc_cb_.ctx); - } - - inline void EmitRead(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending = UV_UNKNOWN_HANDLE) { - if (nread > 0) - bytes_read_ += static_cast(nread); - if (!read_cb_.is_empty()) - read_cb_.fn(nread, buf, pending, read_cb_.ctx); - } - - inline void set_after_write_cb(Callback c) { - after_write_cb_ = c; - } + // Start reading from the underlying resource. This is called by the consumer + // when more data is desired. + virtual int ReadStart() = 0; + // Stop reading from the underlying resource. This is called by the + // consumer when its buffers are full and no more data can be handled. + virtual int ReadStop() = 0; - inline void set_alloc_cb(Callback c) { alloc_cb_ = c; } - inline void set_read_cb(Callback c) { read_cb_ = c; } - inline void set_destruct_cb(Callback c) { destruct_cb_ = c; } + // Optionally, this may provide an error message to be used for + // failing writes. + virtual const char* Error() const; + // Clear the current error (i.e. that would be returned by Error()). + virtual void ClearError(); - inline Callback after_write_cb() { return after_write_cb_; } - inline Callback alloc_cb() { return alloc_cb_; } - inline Callback read_cb() { return read_cb_; } - inline Callback destruct_cb() { return destruct_cb_; } + // Transfer ownership of this tream to `listener`. The previous listener + // will not receive any more callbacks while the new listener was active. + void PushStreamListener(StreamListener* listener); + // Remove a listener, and, if this was the currently active one, + // transfer ownership back to the previous listener. + void RemoveStreamListener(StreamListener* listener); protected: - Callback after_write_cb_; - Callback alloc_cb_; - Callback read_cb_; - Callback destruct_cb_; - uint64_t bytes_read_; + // Call the current listener's OnStreamAlloc() method. + uv_buf_t EmitAlloc(size_t suggested_size); + // Call the current listener's OnStreamRead() method and update the + // stream's read byte counter. + void EmitRead(ssize_t nread, + const uv_buf_t& buf = uv_buf_init(nullptr, 0), + uv_handle_type pending = UV_UNKNOWN_HANDLE); + // Call the current listener's OnStreamAfterWrite() method. + void EmitAfterWrite(WriteWrap* w, int status); + + StreamListener* listener_ = nullptr; + uint64_t bytes_read_ = 0; + + friend class StreamListener; }; + class StreamBase : public StreamResource { public: enum Flags { @@ -224,40 +260,29 @@ class StreamBase : public StreamResource { virtual bool IsIPCPipe(); virtual int GetFD(); - virtual int ReadStart() = 0; - virtual int ReadStop() = 0; - - inline void Consume() { - CHECK_EQ(consumed_, false); - consumed_ = true; - } - - inline void Unconsume() { - CHECK_EQ(consumed_, true); - consumed_ = false; - } - - void EmitData(ssize_t nread, - v8::Local buf, - v8::Local handle); + void CallJSOnreadMethod( + ssize_t nread, + v8::Local buf, + v8::Local handle = v8::Local()); // These are called by the respective {Write,Shutdown}Wrap class. virtual void AfterShutdown(ShutdownWrap* req, int status); virtual void AfterWrite(WriteWrap* req, int status); - protected: - explicit StreamBase(Environment* env) : env_(env), consumed_(false) { - } + // This is named `stream_env` to avoid name clashes, because a lot of + // subclasses are also `BaseObject`s. + Environment* stream_env() const; - virtual ~StreamBase() = default; + protected: + explicit StreamBase(Environment* env); // One of these must be implemented virtual AsyncWrap* GetAsyncWrap() = 0; virtual v8::Local GetObject(); // JS Methods - int ReadStart(const v8::FunctionCallbackInfo& args); - int ReadStop(const v8::FunctionCallbackInfo& args); + int ReadStartJS(const v8::FunctionCallbackInfo& args); + int ReadStopJS(const v8::FunctionCallbackInfo& args); int Shutdown(const v8::FunctionCallbackInfo& args); int Writev(const v8::FunctionCallbackInfo& args); int WriteBuffer(const v8::FunctionCallbackInfo& args); @@ -280,7 +305,7 @@ class StreamBase : public StreamResource { private: Environment* env_; - bool consumed_; + EmitToJSStreamListener default_listener_; }; } // namespace node diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index b639d945004cfa..0be73f9114adb1 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -93,8 +93,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, provider), StreamBase(env), stream_(stream) { - set_alloc_cb({ OnAllocImpl, this }); - set_read_cb({ OnReadImpl, this }); + PushStreamListener(this); } @@ -157,23 +156,18 @@ int LibuvStreamWrap::ReadStop() { void LibuvStreamWrap::OnAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) { + size_t suggested_size, + uv_buf_t* buf) { LibuvStreamWrap* wrap = static_cast(handle->data); HandleScope scope(wrap->env()->isolate()); Context::Scope context_scope(wrap->env()->context()); CHECK_EQ(wrap->stream(), reinterpret_cast(handle)); - return wrap->EmitAlloc(suggested_size, buf); + *buf = wrap->EmitAlloc(suggested_size); } -void LibuvStreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) { - buf->base = node::Malloc(size); - buf->len = size; -} - template static Local AcceptHandle(Environment* env, LibuvStreamWrap* parent) { @@ -196,51 +190,41 @@ static Local AcceptHandle(Environment* env, LibuvStreamWrap* parent) { } -void LibuvStreamWrap::OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - LibuvStreamWrap* wrap = static_cast(ctx); - Environment* env = wrap->env(); - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - Local pending_obj; +void LibuvStreamWrap::OnStreamRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending) { + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); - if (nread < 0) { - if (buf->base != nullptr) - free(buf->base); - wrap->EmitData(nread, Local(), pending_obj); + if (nread <= 0) { + free(buf.base); + if (nread < 0) + CallJSOnreadMethod(nread, Local()); return; } - if (nread == 0) { - if (buf->base != nullptr) - free(buf->base); - return; - } + CHECK_LE(static_cast(nread), buf.len); - CHECK_LE(static_cast(nread), buf->len); - char* base = node::Realloc(buf->base, nread); + Local pending_obj; if (pending == UV_TCP) { - pending_obj = AcceptHandle(env, wrap); + pending_obj = AcceptHandle(env(), this); } else if (pending == UV_NAMED_PIPE) { - pending_obj = AcceptHandle(env, wrap); + pending_obj = AcceptHandle(env(), this); } else if (pending == UV_UDP) { - pending_obj = AcceptHandle(env, wrap); + pending_obj = AcceptHandle(env(), this); } else { CHECK_EQ(pending, UV_UNKNOWN_HANDLE); } - Local obj = Buffer::New(env, base, nread).ToLocalChecked(); - wrap->EmitData(nread, obj, pending_obj); + Local obj = Buffer::New(env(), buf.base, nread).ToLocalChecked(); + CallJSOnreadMethod(nread, obj, pending_obj); } void LibuvStreamWrap::OnRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf) { + ssize_t nread, + const uv_buf_t* buf) { LibuvStreamWrap* wrap = static_cast(handle->data); HandleScope scope(wrap->env()->isolate()); Context::Scope context_scope(wrap->env()->context()); @@ -263,7 +247,7 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle, } } - wrap->EmitRead(nread, buf, type); + wrap->EmitRead(nread, *buf, type); } diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 0146d41c6e8c7b..129006b1600c6c 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -33,7 +33,9 @@ namespace node { -class LibuvStreamWrap : public HandleWrap, public StreamBase { +class LibuvStreamWrap : public HandleWrap, + public StreamListener, + public StreamBase { public: static void Initialize(v8::Local target, v8::Local unused, @@ -79,9 +81,6 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { uv_stream_t* stream, AsyncWrap::ProviderType provider); - ~LibuvStreamWrap() { - } - AsyncWrap* GetAsyncWrap() override; static void AddMethods(Environment* env, @@ -105,11 +104,16 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { static void AfterUvShutdown(uv_shutdown_t* req, int status); // Resource interface implementation - static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); - static void OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); + void OnStreamRead(ssize_t nread, + const uv_buf_t& buf) override { + CHECK(0 && "must not be called"); + } + void OnStreamRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending) override; + void OnStreamAfterWrite(WriteWrap* w, int status) override { + previous_listener_->OnStreamAfterWrite(w, status); + } void AfterWrite(WriteWrap* req_wrap, int status) override; diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 3a0a3f295e2c72..a0a58fb1b5cc8d 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -27,6 +27,7 @@ #include "node_buffer.h" #include "node_wrap.h" #include "connect_wrap.h" +#include "stream_base-inl.h" #include "stream_wrap.h" #include "util-inl.h" diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 47362a47de6ddb..a38aa7a4c3484a 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -59,7 +59,6 @@ TLSWrap::TLSWrap(Environment* env, SSLWrap(env, sc, kind), StreamBase(env), sc_(sc), - stream_(stream), enc_in_(nullptr), enc_out_(nullptr), write_size_(0), @@ -78,14 +77,7 @@ TLSWrap::TLSWrap(Environment* env, SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap::GetSessionCallback); SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap::NewSessionCallback); - stream_->Consume(); - stream_->set_after_write_cb({ OnAfterWriteImpl, this }); - stream_->set_alloc_cb({ OnAllocImpl, this }); - stream_->set_read_cb({ OnReadImpl, this }); - stream_->set_destruct_cb({ OnDestructImpl, this }); - - set_alloc_cb({ OnAllocSelf, this }); - set_read_cb({ OnReadSelf, this }); + stream->PushStreamListener(this); InitSSL(); } @@ -100,19 +92,6 @@ TLSWrap::~TLSWrap() { #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB sni_context_.Reset(); #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB - - // See test/parallel/test-tls-transport-destroy-after-own-gc.js: - // If this TLSWrap is garbage collected, we cannot allow callbacks to be - // called on this stream. - - if (stream_ == nullptr) - return; - stream_->set_destruct_cb({ nullptr, nullptr }); - stream_->set_after_write_cb({ nullptr, nullptr }); - stream_->set_alloc_cb({ nullptr, nullptr }); - stream_->set_read_cb({ nullptr, nullptr }); - stream_->set_destruct_cb({ nullptr, nullptr }); - stream_->Unconsume(); } @@ -214,15 +193,13 @@ void TLSWrap::Receive(const FunctionCallbackInfo& args) { char* data = Buffer::Data(args[0]); size_t len = Buffer::Length(args[0]); - uv_buf_t buf; - // Copy given buffer entirely or partiall if handle becomes closed while (len > 0 && wrap->IsAlive() && !wrap->IsClosing()) { - wrap->stream_->EmitAlloc(len, &buf); + uv_buf_t buf = wrap->OnStreamAlloc(len); size_t copy = buf.len > len ? len : buf.len; memcpy(buf.base, data, copy); buf.len = copy; - wrap->stream_->EmitRead(buf.len, &buf); + wrap->OnStreamRead(copy, buf); data += copy; len -= copy; @@ -315,7 +292,7 @@ void TLSWrap::EncOut() { ->NewInstance(env()->context()).ToLocalChecked(); WriteWrap* write_req = WriteWrap::New(env(), req_wrap_obj, - stream_); + static_cast(stream_)); uv_buf_t buf[arraysize(data)]; for (size_t i = 0; i < count; i++) @@ -332,7 +309,7 @@ void TLSWrap::EncOut() { } -void TLSWrap::EncOutAfterWrite(WriteWrap* req_wrap, int status) { +void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) { // We should not be getting here after `DestroySSL`, because all queued writes // must be invoked with UV_ECANCELED CHECK_NE(ssl_, nullptr); @@ -429,12 +406,11 @@ void TLSWrap::ClearOut() { while (read > 0) { int avail = read; - uv_buf_t buf; - EmitAlloc(avail, &buf); + uv_buf_t buf = EmitAlloc(avail); if (static_cast(buf.len) < avail) avail = buf.len; memcpy(buf.base, current, avail); - EmitRead(avail, &buf); + EmitRead(avail, buf); // Caveat emptor: OnRead() calls into JS land which can result in // the SSL context object being destroyed. We have to carefully @@ -450,7 +426,7 @@ void TLSWrap::ClearOut() { int flags = SSL_get_shutdown(ssl_); if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) { eof_ = true; - EmitRead(UV_EOF, nullptr); + EmitRead(UV_EOF); } // We need to check whether an error occurred or the connection was @@ -532,22 +508,24 @@ AsyncWrap* TLSWrap::GetAsyncWrap() { bool TLSWrap::IsIPCPipe() { - return stream_->IsIPCPipe(); + return static_cast(stream_)->IsIPCPipe(); } int TLSWrap::GetFD() { - return stream_->GetFD(); + return static_cast(stream_)->GetFD(); } bool TLSWrap::IsAlive() { - return ssl_ != nullptr && stream_ != nullptr && stream_->IsAlive(); + return ssl_ != nullptr && + stream_ != nullptr && + static_cast(stream_)->IsAlive(); } bool TLSWrap::IsClosing() { - return stream_->IsClosing(); + return static_cast(stream_)->IsClosing(); } @@ -646,62 +624,16 @@ int TLSWrap::DoWrite(WriteWrap* w, } -void TLSWrap::OnAfterWriteImpl(WriteWrap* w, int status, void* ctx) { - TLSWrap* wrap = static_cast(ctx); - wrap->EncOutAfterWrite(w, status); -} - - -void TLSWrap::OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { - TLSWrap* wrap = static_cast(ctx); - - if (wrap->ssl_ == nullptr) { - *buf = uv_buf_init(nullptr, 0); - return; - } - - size_t size = 0; - buf->base = crypto::NodeBIO::FromBIO(wrap->enc_in_)->PeekWritable(&size); - buf->len = size; -} - - -void TLSWrap::OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - TLSWrap* wrap = static_cast(ctx); - wrap->DoRead(nread, buf, pending); -} - - -void TLSWrap::OnDestructImpl(void* ctx) { - TLSWrap* wrap = static_cast(ctx); - wrap->clear_stream(); -} - - -void TLSWrap::OnAllocSelf(size_t suggested_size, uv_buf_t* buf, void* ctx) { - buf->base = node::Malloc(suggested_size); - buf->len = suggested_size; -} - +uv_buf_t TLSWrap::OnStreamAlloc(size_t suggested_size) { + CHECK_NE(ssl_, nullptr); -void TLSWrap::OnReadSelf(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - TLSWrap* wrap = static_cast(ctx); - Local buf_obj; - if (buf != nullptr) - buf_obj = Buffer::New(wrap->env(), buf->base, buf->len).ToLocalChecked(); - wrap->EmitData(nread, buf_obj, Local()); + size_t size = suggested_size; + char* base = crypto::NodeBIO::FromBIO(enc_in_)->PeekWritable(&size); + return uv_buf_init(base, size); } -void TLSWrap::DoRead(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) { +void TLSWrap::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { if (nread < 0) { // Error should be emitted only after all data was read ClearOut(); @@ -713,13 +645,13 @@ void TLSWrap::DoRead(ssize_t nread, eof_ = true; } - EmitRead(nread, nullptr); + EmitRead(nread); return; } // Only client connections can receive data if (ssl_ == nullptr) { - EmitRead(UV_EPROTO, nullptr); + EmitRead(UV_EPROTO); return; } @@ -814,6 +746,9 @@ void TLSWrap::DestroySSL(const FunctionCallbackInfo& args) { // Destroy the SSL structure and friends wrap->SSLWrap::DestroySSL(); + + if (wrap->stream_ != nullptr) + wrap->stream_->RemoveStreamListener(wrap); } diff --git a/src/tls_wrap.h b/src/tls_wrap.h index ae83c82c3226fd..a1f0b99e86beec 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -48,7 +48,8 @@ class NodeBIO; class TLSWrap : public AsyncWrap, public crypto::SSLWrap, - public StreamBase { + public StreamBase, + public StreamListener { public: ~TLSWrap() override; @@ -76,8 +77,6 @@ class TLSWrap : public AsyncWrap, size_t self_size() const override { return sizeof(*this); } - void clear_stream() { stream_ = nullptr; } - protected: static const int kClearOutChunkSize = 16384; @@ -98,7 +97,6 @@ class TLSWrap : public AsyncWrap, static void SSLInfoCallback(const SSL* ssl_, int where, int ret); void InitSSL(); void EncOut(); - void EncOutAfterWrite(WriteWrap* req_wrap, int status); bool ClearIn(); void ClearOut(); bool InvokeQueued(int status, const char* error_str = nullptr); @@ -119,20 +117,9 @@ class TLSWrap : public AsyncWrap, bool IsIPCPipe() override; // Resource implementation - static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx); - static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); - static void OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); - static void OnAllocSelf(size_t size, uv_buf_t* buf, void* ctx); - static void OnReadSelf(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); - static void OnDestructImpl(void* ctx); - - void DoRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending); + void OnStreamAfterWrite(WriteWrap* w, int status) override; + uv_buf_t OnStreamAlloc(size_t size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; v8::Local GetSSLError(int status, int* err, std::string* msg); @@ -154,7 +141,6 @@ class TLSWrap : public AsyncWrap, #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB crypto::SecureContext* sc_; - StreamBase* stream_; BIO* enc_in_; BIO* enc_out_; std::vector pending_cleartext_input_; diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc index 111c568cb52a8a..d12f0b956348f5 100644 --- a/src/tty_wrap.cc +++ b/src/tty_wrap.cc @@ -26,6 +26,7 @@ #include "node_buffer.h" #include "node_wrap.h" #include "req_wrap-inl.h" +#include "stream_base-inl.h" #include "stream_wrap.h" #include "util-inl.h" diff --git a/test/parallel/test-tls-socket-destroy.js b/test/parallel/test-tls-socket-destroy.js index f62b6f905296db..6f1d4b4186b74f 100644 --- a/test/parallel/test-tls-socket-destroy.js +++ b/test/parallel/test-tls-socket-destroy.js @@ -19,6 +19,7 @@ const server = net.createServer(common.mustCall((conn) => { const socket = new tls.TLSSocket(conn, options); socket.once('data', common.mustCall(() => { socket._destroySSL(); // Should not crash. + socket.destroy(); server.close(); })); }));