From 67f1d76956a8a5da9875b113371c8786ad579086 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Tue, 13 Feb 2018 01:23:50 +0100 Subject: [PATCH] src: introduce native-layer stream piping Provide a way to create pipes between native `StreamBase` instances that acts more directly than a `.pipe()` call would. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- node.gyp | 2 + src/async_wrap.h | 1 + src/env.h | 5 + src/node_http2.cc | 4 +- src/node_internals.h | 1 + src/stream_base-inl.h | 10 +- src/stream_base.h | 3 + src/stream_pipe.cc | 266 ++++++++++++++++++ src/stream_pipe.h | 68 +++++ test/sequential/test-async-wrap-getasyncid.js | 1 + 10 files changed, 358 insertions(+), 3 deletions(-) create mode 100644 src/stream_pipe.cc create mode 100644 src/stream_pipe.h diff --git a/node.gyp b/node.gyp index 5efe2323599cff..1b047fe9ac52f2 100644 --- a/node.gyp +++ b/node.gyp @@ -338,6 +338,7 @@ 'src/string_decoder.cc', 'src/string_search.cc', 'src/stream_base.cc', + 'src/stream_pipe.cc', 'src/stream_wrap.cc', 'src/tcp_wrap.cc', 'src/timer_wrap.cc', @@ -394,6 +395,7 @@ 'src/string_decoder-inl.h', 'src/stream_base.h', 'src/stream_base-inl.h', + 'src/stream_pipe.h', 'src/stream_wrap.h', 'src/tracing/agent.h', 'src/tracing/node_trace_buffer.h', diff --git a/src/async_wrap.h b/src/async_wrap.h index 608764bab5361c..f0689d32f3c69f 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -58,6 +58,7 @@ namespace node { V(SHUTDOWNWRAP) \ V(SIGNALWRAP) \ V(STATWATCHER) \ + V(STREAMPIPE) \ V(TCPCONNECTWRAP) \ V(TCPSERVERWRAP) \ V(TCPWRAP) \ diff --git a/src/env.h b/src/env.h index e0f6856f8d0b1d..4fc6b31ffebd21 100644 --- a/src/env.h +++ b/src/env.h @@ -222,6 +222,7 @@ struct PackageConfig { V(onstop_string, "onstop") \ V(onstreamclose_string, "onstreamclose") \ V(ontrailers_string, "ontrailers") \ + V(onunpipe_string, "onunpipe") \ V(onwrite_string, "onwrite") \ V(openssl_error_stack, "opensslErrorStack") \ V(output_string, "output") \ @@ -233,6 +234,8 @@ struct PackageConfig { V(pbkdf2_error_string, "PBKDF2 Error") \ V(pid_string, "pid") \ V(pipe_string, "pipe") \ + V(pipe_target_string, "pipeTarget") \ + V(pipe_source_string, "pipeSource") \ V(port_string, "port") \ V(preference_string, "preference") \ V(priority_string, "priority") \ @@ -255,9 +258,11 @@ struct PackageConfig { V(session_id_string, "sessionId") \ V(shell_string, "shell") \ V(signal_string, "signal") \ + V(sink_string, "sink") \ V(size_string, "size") \ V(sni_context_err_string, "Invalid SNI context") \ V(sni_context_string, "sni_context") \ + V(source_string, "source") \ V(stack_string, "stack") \ V(status_string, "status") \ V(stdio_string, "stdio") \ diff --git a/src/node_http2.cc b/src/node_http2.cc index 8dd222a692a52c..d6df93cf3804a7 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1813,7 +1813,9 @@ inline void Http2Stream::Close(int32_t code) { } int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) { - CHECK(!this->IsDestroyed()); + if (IsDestroyed()) + return UV_EPIPE; + { Http2Scope h2scope(this); flags_ |= NGHTTP2_STREAM_FLAG_SHUT; diff --git a/src/node_internals.h b/src/node_internals.h index 2faa6f93475ad7..79c2ce553200f3 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -120,6 +120,7 @@ struct sockaddr; V(serdes) \ V(signal_wrap) \ V(spawn_sync) \ + V(stream_pipe) \ V(stream_wrap) \ V(string_decoder) \ V(tcp_wrap) \ diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index f0d522a7b06b6c..7523b3a545355f 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -67,8 +67,14 @@ inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) { inline StreamResource::~StreamResource() { while (listener_ != nullptr) { - listener_->OnStreamDestroy(); - RemoveStreamListener(listener_); + StreamListener* listener = listener_; + listener->OnStreamDestroy(); + // Remove the listener if it didn’t remove itself. This makes the logic + // logic in `OnStreamDestroy()` implementations easier, because they + // may call generic cleanup functions which can just remove the + // listener unconditionally. + if (listener == listener_) + RemoveStreamListener(listener_); } } diff --git a/src/stream_base.h b/src/stream_base.h index 96a7787e5bb41c..7264824265a579 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -141,6 +141,9 @@ class StreamListener { // This is called immediately before the stream is destroyed. virtual void OnStreamDestroy() {} + // The stream this is currently associated with, or nullptr if there is none. + inline StreamResource* stream() { return stream_; } + protected: // Pass along a read error to the `StreamListener` instance that was active // before this one. For example, a protocol parser does not care about read diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc new file mode 100644 index 00000000000000..8f0263cd9ae99b --- /dev/null +++ b/src/stream_pipe.cc @@ -0,0 +1,266 @@ +#include "stream_pipe.h" +#include "stream_base-inl.h" +#include "node_buffer.h" +#include "node_internals.h" + +using v8::Context; +using v8::External; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::Local; +using v8::Object; +using v8::Value; + +namespace node { + +StreamPipe::StreamPipe(StreamBase* source, + StreamBase* sink, + Local obj) + : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) { + MakeWeak(this); + + CHECK_NE(sink, nullptr); + CHECK_NE(source, nullptr); + + source->PushStreamListener(&readable_listener_); + sink->PushStreamListener(&writable_listener_); + + CHECK(sink->HasWantsWrite()); + + // Set up links between this object and the source/sink objects. + // In particular, this makes sure that they are garbage collected as a group, + // if that applies to the given streams (for example, Http2Streams use + // weak references). + obj->Set(env()->context(), env()->source_string(), source->GetObject()) + .FromJust(); + source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj) + .FromJust(); + obj->Set(env()->context(), env()->sink_string(), sink->GetObject()) + .FromJust(); + sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj) + .FromJust(); +} + +StreamPipe::~StreamPipe() { + CHECK(is_closed_); +} + +StreamBase* StreamPipe::source() { + return static_cast(readable_listener_.stream()); +} + +StreamBase* StreamPipe::sink() { + return static_cast(writable_listener_.stream()); +} + +void StreamPipe::Unpipe() { + if (is_closed_) + return; + + // Note that we cannot use virtual methods on `source` and `sink` here, + // because this function can be called from their destructors via + // `OnStreamDestroy()`. + + is_closed_ = true; + is_reading_ = false; + source()->RemoveStreamListener(&readable_listener_); + sink()->RemoveStreamListener(&writable_listener_); + + // Delay the JS-facing part with SetImmediate, because this might be from + // inside the garbage collector, so we can’t run JS here. + HandleScope handle_scope(env()->isolate()); + env()->SetImmediate([](Environment* env, void* data) { + StreamPipe* pipe = static_cast(data); + + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + Local object = pipe->object(); + + if (object->Has(env->context(), env->onunpipe_string()).FromJust()) { + pipe->MakeCallback(env->onunpipe_string(), 0, nullptr).ToLocalChecked(); + } + + // Set all the links established in the constructor to `null`. + Local null = Null(env->isolate()); + + Local source_v; + Local sink_v; + source_v = object->Get(env->context(), env->source_string()) + .ToLocalChecked(); + sink_v = object->Get(env->context(), env->sink_string()) + .ToLocalChecked(); + CHECK(source_v->IsObject()); + CHECK(sink_v->IsObject()); + + object->Set(env->context(), env->source_string(), null).FromJust(); + object->Set(env->context(), env->sink_string(), null).FromJust(); + source_v.As()->Set(env->context(), + env->pipe_target_string(), + null).FromJust(); + sink_v.As()->Set(env->context(), + env->pipe_source_string(), + null).FromJust(); + }, static_cast(this), object()); +} + +uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) { + StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); + size_t size = std::min(suggested_size, pipe->wanted_data_); + CHECK_GT(size, 0); + return uv_buf_init(Malloc(size), size); +} + +void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread, + const uv_buf_t& buf) { + StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); + AsyncScope async_scope(pipe); + if (nread < 0) { + // EOF or error; stop reading and pass the error to the previous listener + // (which might end up in JS). + free(buf.base); + pipe->is_eof_ = true; + stream()->ReadStop(); + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); + // If we’re not writing, close now. Otherwise, we’ll do that in + // `OnStreamAfterWrite()`. + if (!pipe->is_writing_) { + pipe->ShutdownWritable(); + pipe->Unpipe(); + } + return; + } + + pipe->ProcessData(nread, buf); +} + +void StreamPipe::ProcessData(size_t nread, const uv_buf_t& buf) { + uv_buf_t buffer = uv_buf_init(buf.base, nread); + StreamWriteResult res = sink()->Write(&buffer, 1); + if (!res.async) { + free(buf.base); + writable_listener_.OnStreamAfterWrite(nullptr, res.err); + } else { + is_writing_ = true; + is_reading_ = false; + res.wrap->SetAllocatedStorage(buf.base, buf.len); + source()->ReadStop(); + } +} + +void StreamPipe::ShutdownWritable() { + sink()->Shutdown(); +} + +void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w, + int status) { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + pipe->is_writing_ = false; + if (pipe->is_eof_) { + AsyncScope async_scope(pipe); + pipe->ShutdownWritable(); + pipe->Unpipe(); + return; + } + + if (status != 0) { + CHECK_NE(previous_listener_, nullptr); + StreamListener* prev = previous_listener_; + pipe->Unpipe(); + prev->OnStreamAfterWrite(w, status); + return; + } +} + +void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w, + int status) { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + CHECK_NE(previous_listener_, nullptr); + StreamListener* prev = previous_listener_; + pipe->Unpipe(); + prev->OnStreamAfterShutdown(w, status); +} + +void StreamPipe::ReadableListener::OnStreamDestroy() { + StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); + if (!pipe->is_eof_) { + OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0)); + } +} + +void StreamPipe::WritableListener::OnStreamDestroy() { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + pipe->is_eof_ = true; + pipe->Unpipe(); +} + +void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + pipe->wanted_data_ = suggested_size; + if (pipe->is_reading_ || pipe->is_closed_) + return; + AsyncScope async_scope(pipe); + pipe->is_reading_ = true; + pipe->source()->ReadStart(); +} + +uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) { + CHECK_NE(previous_listener_, nullptr); + return previous_listener_->OnStreamAlloc(suggested_size); +} + +void StreamPipe::WritableListener::OnStreamRead(ssize_t nread, + const uv_buf_t& buf) { + CHECK_NE(previous_listener_, nullptr); + return previous_listener_->OnStreamRead(nread, buf); +} + +void StreamPipe::New(const FunctionCallbackInfo& args) { + CHECK(args.IsConstructCall()); + CHECK(args[0]->IsExternal()); + CHECK(args[1]->IsExternal()); + auto source = static_cast(args[0].As()->Value()); + auto sink = static_cast(args[1].As()->Value()); + + new StreamPipe(source, sink, args.This()); +} + +void StreamPipe::Start(const FunctionCallbackInfo& args) { + StreamPipe* pipe; + ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); + pipe->is_closed_ = false; + if (pipe->wanted_data_ > 0) + pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_); +} + +void StreamPipe::Unpipe(const FunctionCallbackInfo& args) { + StreamPipe* pipe; + ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); + pipe->Unpipe(); +} + +namespace { + +void InitializeStreamPipe(Local target, + Local unused, + Local context) { + Environment* env = Environment::GetCurrent(context); + + // Create FunctionTemplate for FileHandle::CloseReq + Local pipe = env->NewFunctionTemplate(StreamPipe::New); + Local stream_pipe_string = + FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe"); + env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe); + env->SetProtoMethod(pipe, "start", StreamPipe::Start); + AsyncWrap::AddWrapMethods(env, pipe); + pipe->SetClassName(stream_pipe_string); + pipe->InstanceTemplate()->SetInternalFieldCount(1); + target->Set(context, stream_pipe_string, pipe->GetFunction()).FromJust(); +} + +} // anonymous namespace + +} // namespace node + +NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe, + node::InitializeStreamPipe) diff --git a/src/stream_pipe.h b/src/stream_pipe.h new file mode 100644 index 00000000000000..98d6dae11be841 --- /dev/null +++ b/src/stream_pipe.h @@ -0,0 +1,68 @@ +#ifndef SRC_STREAM_PIPE_H_ +#define SRC_STREAM_PIPE_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "stream_base.h" + +namespace node { + +class StreamPipe : public AsyncWrap { + public: + StreamPipe(StreamBase* source, StreamBase* sink, v8::Local obj); + ~StreamPipe(); + + void Unpipe(); + + static void New(const v8::FunctionCallbackInfo& args); + static void Start(const v8::FunctionCallbackInfo& args); + static void Unpipe(const v8::FunctionCallbackInfo& args); + + size_t self_size() const override { return sizeof(*this); } + + private: + StreamBase* source(); + StreamBase* sink(); + + void ShutdownWritable(); + void FlushToWritable(); + + bool is_reading_ = false; + bool is_writing_ = false; + bool is_eof_ = false; + bool is_closed_ = true; + + // Set a default value so that when we’re coming from Start(), we know + // that we don’t want to read just yet. + // This will likely need to be changed when supporting streams without + // `OnStreamWantsWrite()` support. + size_t wanted_data_ = 0; + + void ProcessData(size_t nread, const uv_buf_t& buf); + + class ReadableListener : public StreamListener { + public: + uv_buf_t OnStreamAlloc(size_t suggested_size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; + void OnStreamDestroy() override; + }; + + class WritableListener : public StreamListener { + public: + uv_buf_t OnStreamAlloc(size_t suggested_size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; + void OnStreamAfterWrite(WriteWrap* w, int status) override; + void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; + void OnStreamWantsWrite(size_t suggested_size) override; + void OnStreamDestroy() override; + }; + + ReadableListener readable_listener_; + WritableListener writable_listener_; +}; + +} // namespace node + +#endif + +#endif // SRC_STREAM_PIPE_H_ diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index 66eaabec25d977..64c4fd5cd8ab50 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -35,6 +35,7 @@ common.crashOnUnhandledRejection(); delete providers.HTTP2STREAM; delete providers.HTTP2PING; delete providers.HTTP2SETTINGS; + delete providers.STREAMPIPE; const objKeys = Object.keys(providers); if (objKeys.length > 0)