From 5e575785e234c895b2cdd84b7c8a6be7174350b8 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Mon, 23 Feb 2015 23:09:44 +0300 Subject: [PATCH 1/6] streams: introduce StreamWrap and JSStream Introduce a way to wrap plain-js `stream.Duplex` streams into C++ StreamBase's child class. With such method at hand it is now possible to pass `stream.Duplex` instance as a `socket` parameter to `tls.connect()`. --- lib/_stream_wrap.js | 95 +++++++++++++ lib/_tls_wrap.js | 11 +- node.gyp | 3 + src/async-wrap.h | 1 + src/env.h | 7 + src/js_stream.cc | 199 ++++++++++++++++++++++++++++ src/js_stream.h | 30 ++++- src/node_wrap.h | 5 + src/stream_base.cc | 26 +++- src/stream_base.h | 16 +-- src/stream_wrap.cc | 18 +-- src/stream_wrap.h | 10 +- src/tls_wrap.cc | 22 ++- src/tls_wrap.h | 11 +- test/parallel/test-tls-js-stream.js | 69 ++++++++++ 15 files changed, 471 insertions(+), 52 deletions(-) create mode 100644 lib/_stream_wrap.js create mode 100644 test/parallel/test-tls-js-stream.js diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js new file mode 100644 index 00000000000000..c8f40fe45c7bdc --- /dev/null +++ b/lib/_stream_wrap.js @@ -0,0 +1,95 @@ +const util = require('util'); +const Socket = require('net').Socket; +const JSStream = process.binding('js_stream').JSStream; + +function StreamWrap(stream) { + var handle = new JSStream(); + + this.stream = stream; + + var self = this; + handle.close = function(cb) { + cb(); + }; + handle.isAlive = function() { + return self.isAlive(); + }; + handle.isClosing = function() { + return self.isClosing(); + }; + handle.onreadstart = function() { + return self.readStart(); + }; + handle.onreadstop = function() { + return self.readStop(); + }; + handle.onshutdown = function(req) { + return self.shutdown(req); + }; + handle.onwrite = function(req, bufs) { + return self.write(req, bufs); + }; + + this.stream.pause(); + this.stream.on('data', function(chunk) { + self._handle.readBuffer(chunk); + }); + this.stream.once('end', function() { + self._handle.emitEOF(); + }); + + Socket.call(this, { + handle: handle + }); +} +util.inherits(StreamWrap, Socket); +exports.StreamWrap = StreamWrap; + +StreamWrap.prototype.isAlive = function isAlive() { + return this.readable && this.writable; +}; + +StreamWrap.prototype.isClosing = function isClosing() { + return !this.isAlive(); +}; + +StreamWrap.prototype.readStart = function readStart() { + this.stream.resume(); + return 0; +}; + +StreamWrap.prototype.readStop = function readStop() { + this.stream.pause(); + return 0; +}; + +StreamWrap.prototype.shutdown = function shutdown(req) { + var self = this; + + this.stream.end(function() { + // Ensure that write was dispatched + setImmediate(function() { + self._handle.finishShutdown(req, 0); + }); + }); +}; + +StreamWrap.prototype.write = function write(req, bufs) { + var pending = bufs.length; + var self = this; + + bufs.forEach(function(buf) { + self.stream.write(buf, done); + }); + + function done() { + if (--pending !== 0) + return; + + // Ensure that write was dispatched + setImmediate(function() { + self._handle.doAfterWrite(req); + self._handle.finishWrite(req, 0); + }); + } +}; diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index 10221b99c30847..386d6446e64a6d 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -7,6 +7,8 @@ const tls = require('tls'); const util = require('util'); const listenerCount = require('events').listenerCount; const common = require('_tls_common'); +const StreamWrap = require('_stream_wrap').StreamWrap; +const Duplex = require('stream').Duplex; const debug = util.debuglog('tls'); const Timer = process.binding('timer_wrap').Timer; const tls_wrap = process.binding('tls_wrap'); @@ -224,6 +226,10 @@ function TLSSocket(socket, options) { this.authorized = false; this.authorizationError = null; + // Wrap plain JS Stream into StreamWrap + if (socket instanceof Duplex) + socket = new StreamWrap(socket); + // Just a documented property to make secure sockets // distinguishable from regular ones. this.encrypted = true; @@ -280,7 +286,8 @@ TLSSocket.prototype._wrapHandle = function(handle) { // Proxy HandleWrap, PipeWrap and TCPWrap methods proxiedMethods.forEach(function(name) { res[name] = function methodProxy() { - return handle[name].apply(handle, arguments); + if (handle[name]) + return handle[name].apply(handle, arguments); }; }); @@ -373,7 +380,7 @@ TLSSocket.prototype._init = function(socket) { this.setTimeout(options.handshakeTimeout, this._handleTimeout); // Socket already has some buffered data - emulate receiving it - if (socket && socket._readableState.length) { + if (socket && socket._readableState && socket._readableState.length) { var buf; while ((buf = socket.read()) !== null) ssl.receive(buf); diff --git a/node.gyp b/node.gyp index 996121ee45cfe7..4af27d8de093c2 100644 --- a/node.gyp +++ b/node.gyp @@ -56,6 +56,7 @@ 'lib/_stream_duplex.js', 'lib/_stream_transform.js', 'lib/_stream_passthrough.js', + 'lib/_stream_wrap.js', 'lib/string_decoder.js', 'lib/sys.js', 'lib/timers.js', @@ -95,6 +96,7 @@ 'src/fs_event_wrap.cc', 'src/cares_wrap.cc', 'src/handle_wrap.cc', + 'src/js_stream.cc', 'src/node.cc', 'src/node_buffer.cc', 'src/node_constants.cc', @@ -132,6 +134,7 @@ 'src/env.h', 'src/env-inl.h', 'src/handle_wrap.h', + 'src/js_stream.h', 'src/node.h', 'src/node_buffer.h', 'src/node_constants.h', diff --git a/src/async-wrap.h b/src/async-wrap.h index 86748a5fefd89b..5e898fe4c24534 100644 --- a/src/async-wrap.h +++ b/src/async-wrap.h @@ -17,6 +17,7 @@ namespace node { V(FSREQWRAP) \ V(GETADDRINFOREQWRAP) \ V(GETNAMEINFOREQWRAP) \ + V(JSSTREAM) \ V(PIPEWRAP) \ V(PROCESSWRAP) \ V(QUERYWRAP) \ diff --git a/src/env.h b/src/env.h index c9b4cc0736301c..18fed1830476de 100644 --- a/src/env.h +++ b/src/env.h @@ -107,6 +107,8 @@ namespace node { V(ipv4_string, "IPv4") \ V(ipv6_lc_string, "ipv6") \ V(ipv6_string, "IPv6") \ + V(isalive_string, "isAlive") \ + V(isclosing_string, "isClosing") \ V(issuer_string, "issuer") \ V(issuercert_string, "issuerCertificate") \ V(kill_signal_string, "killSignal") \ @@ -141,9 +143,13 @@ namespace node { V(onnewsessiondone_string, "onnewsessiondone") \ V(onocspresponse_string, "onocspresponse") \ V(onread_string, "onread") \ + V(onreadstart_string, "onreadstart") \ + V(onreadstop_string, "onreadstop") \ V(onselect_string, "onselect") \ + V(onshutdown_string, "onshutdown") \ V(onsignal_string, "onsignal") \ V(onstop_string, "onstop") \ + V(onwrite_string, "onwrite") \ V(output_string, "output") \ V(order_string, "order") \ V(owner_string, "owner") \ @@ -225,6 +231,7 @@ namespace node { V(context, v8::Context) \ V(domain_array, v8::Array) \ V(fs_stats_constructor_function, v8::Function) \ + V(jsstream_constructor_template, v8::FunctionTemplate) \ V(module_load_list_array, v8::Array) \ V(pipe_constructor_template, v8::FunctionTemplate) \ V(process_object, v8::Object) \ diff --git a/src/js_stream.cc b/src/js_stream.cc index 3cc3a895fc5054..38ab847954d056 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -3,19 +3,218 @@ #include "async-wrap.h" #include "env.h" #include "env-inl.h" +#include "node_buffer.h" #include "stream_base.h" #include "v8.h" namespace node { +using v8::Array; using v8::Context; +using v8::External; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; using v8::Handle; +using v8::HandleScope; +using v8::Local; using v8::Object; using v8::Value; + +JSStream::JSStream(Environment* env, Handle obj, AsyncWrap* parent) + : StreamBase(env), + AsyncWrap(env, obj, AsyncWrap::PROVIDER_JSSTREAM, parent) { + node::Wrap(obj, this); +} + + +JSStream::~JSStream() { +} + + +void* JSStream::Cast() { + return static_cast(this); +} + + +AsyncWrap* JSStream::GetAsyncWrap() { + return static_cast(this); +} + + +bool JSStream::IsAlive() { + return MakeCallback(env()->isalive_string(), 0, nullptr)->IsTrue(); +} + + +bool JSStream::IsClosing() { + return MakeCallback(env()->isclosing_string(), 0, nullptr)->IsTrue(); +} + + +int JSStream::ReadStart() { + return MakeCallback(env()->onreadstart_string(), 0, nullptr)->Int32Value(); +} + + +int JSStream::ReadStop() { + return MakeCallback(env()->onreadstop_string(), 0, nullptr)->Int32Value(); +} + + +int JSStream::DoShutdown(ShutdownWrap* req_wrap) { + HandleScope scope(env()->isolate()); + + Local argv[] = { + req_wrap->object() + }; + + Local res = + MakeCallback(env()->onshutdown_string(), ARRAY_SIZE(argv), argv); + + return res->Int32Value(); +} + + +int JSStream::DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) { + CHECK_EQ(send_handle, nullptr); + + HandleScope scope(env()->isolate()); + + Local bufs_arr = Array::New(env()->isolate(), count); + for (size_t i = 0; i < count; i++) + bufs_arr->Set(i, Buffer::New(env(), bufs[0].base, bufs[0].len)); + + Local argv[] = { + w->object(), + bufs_arr + }; + + Local res = + MakeCallback(env()->onwrite_string(), ARRAY_SIZE(argv), argv); + + return res->Int32Value(); +} + + +void JSStream::New(const FunctionCallbackInfo& args) { + // This constructor should not be exposed to public javascript. + // Therefore we assert that we are not trying to call this as a + // normal function. + CHECK(args.IsConstructCall()); + Environment* env = Environment::GetCurrent(args); + JSStream* wrap; + + if (args.Length() == 0) { + wrap = new JSStream(env, args.This(), nullptr); + } else if (args[0]->IsExternal()) { + void* ptr = args[0].As()->Value(); + wrap = new JSStream(env, args.This(), static_cast(ptr)); + } else { + UNREACHABLE(); + } + CHECK(wrap); +} + + +static void FreeCallback(char* data, void* hint) { + // Intentional no-op +} + + +void JSStream::DoAlloc(const FunctionCallbackInfo& args) { + JSStream* wrap = Unwrap(args.Holder()); + + uv_buf_t buf; + wrap->OnAlloc(args[0]->Int32Value(), &buf); + args.GetReturnValue().Set(Buffer::New(wrap->env(), + buf.base, + buf.len, + FreeCallback, + nullptr)); +} + + +void JSStream::DoRead(const FunctionCallbackInfo& args) { + JSStream* wrap = Unwrap(args.Holder()); + + CHECK(Buffer::HasInstance(args[1])); + uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1])); + wrap->OnRead(args[0]->Int32Value(), &buf); +} + + +void JSStream::DoAfterWrite(const FunctionCallbackInfo& args) { + JSStream* wrap = Unwrap(args.Holder()); + WriteWrap* w = Unwrap(args[0].As()); + + wrap->OnAfterWrite(w); +} + + +template +void JSStream::Finish(const FunctionCallbackInfo& args) { + Wrap* w = Unwrap(args[0].As()); + + w->Done(args[0]->Int32Value()); +} + + +void JSStream::ReadBuffer(const FunctionCallbackInfo& args) { + JSStream* wrap = Unwrap(args.Holder()); + + CHECK(Buffer::HasInstance(args[0])); + char* data = Buffer::Data(args[0]); + int len = Buffer::Length(args[0]); + + do { + uv_buf_t buf; + ssize_t avail = len; + wrap->OnAlloc(len, &buf); + if (static_cast(buf.len) < avail) + avail = buf.len; + + memcpy(buf.base, data, avail); + data += avail; + len -= avail; + wrap->OnRead(avail, &buf); + } while (len != 0); +} + + +void JSStream::EmitEOF(const FunctionCallbackInfo& args) { + JSStream* wrap = Unwrap(args.Holder()); + + wrap->OnRead(UV_EOF, nullptr); +} + + void JSStream::Initialize(Handle target, Handle unused, Handle context) { + Environment* env = Environment::GetCurrent(context); + + Local t = env->NewFunctionTemplate(New); + t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream")); + t->InstanceTemplate()->SetInternalFieldCount(1); + + env->SetProtoMethod(t, "doAlloc", DoAlloc); + env->SetProtoMethod(t, "doRead", DoRead); + env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite); + env->SetProtoMethod(t, "finishWrite", Finish); + env->SetProtoMethod(t, "finishShutdown", Finish); + env->SetProtoMethod(t, "readBuffer", ReadBuffer); + env->SetProtoMethod(t, "emitEOF", EmitEOF); + + StreamBase::AddMethods(env, t); + target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"), + t->GetFunction()); + env->set_jsstream_constructor_template(t); } } // namespace node + +NODE_MODULE_CONTEXT_AWARE_BUILTIN(js_stream, node::JSStream::Initialize) diff --git a/src/js_stream.h b/src/js_stream.h index 6a2d3bfb4fc18e..8e2ff13258b219 100644 --- a/src/js_stream.h +++ b/src/js_stream.h @@ -8,11 +8,39 @@ namespace node { -class JSStream : public StreamBase { +class JSStream : public StreamBase, public AsyncWrap { public: static void Initialize(v8::Handle target, v8::Handle unused, v8::Handle context); + + void* Cast() override; + bool IsAlive() override; + bool IsClosing() override; + int ReadStart() override; + int ReadStop() override; + + int DoShutdown(ShutdownWrap* req_wrap) override; + int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) override; + + protected: + JSStream(Environment* env, v8::Handle obj, AsyncWrap* parent); + ~JSStream(); + + AsyncWrap* GetAsyncWrap() override; + + static void New(const v8::FunctionCallbackInfo& args); + static void DoAlloc(const v8::FunctionCallbackInfo& args); + static void DoRead(const v8::FunctionCallbackInfo& args); + static void DoAfterWrite(const v8::FunctionCallbackInfo& args); + static void ReadBuffer(const v8::FunctionCallbackInfo& args); + static void EmitEOF(const v8::FunctionCallbackInfo& args); + + template + static void Finish(const v8::FunctionCallbackInfo& args); }; } // namespace node diff --git a/src/node_wrap.h b/src/node_wrap.h index ddd7bd16e0d8c5..58b042a63b475a 100644 --- a/src/node_wrap.h +++ b/src/node_wrap.h @@ -3,6 +3,7 @@ #include "env.h" #include "env-inl.h" +#include "js_stream.h" #include "pipe_wrap.h" #include "tcp_wrap.h" #include "tty_wrap.h" @@ -40,6 +41,10 @@ namespace node { env->tls_wrap_constructor_template()->HasInstance(obj)) { \ TLSWrap* const wrap = Unwrap(obj); \ BODY \ + } else if (env->jsstream_constructor_template().IsEmpty() == false && \ + env->jsstream_constructor_template()->HasInstance(obj)) { \ + JSStream* const wrap = Unwrap(obj); \ + BODY \ } \ }); \ } while (0) diff --git a/src/stream_base.cc b/src/stream_base.cc index 0a1324bb5872c5..82b1d65396c352 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -5,6 +5,7 @@ #include "node_buffer.h" #include "env.h" #include "env-inl.h" +#include "js_stream.h" #include "string_bytes.h" #include "tls_wrap.h" #include "util.h" @@ -34,6 +35,8 @@ template void StreamBase::AddMethods(Environment* env, Handle t); template void StreamBase::AddMethods(Environment* env, Handle t); +template void StreamBase::AddMethods(Environment* env, + Handle t); template @@ -488,8 +491,29 @@ void StreamBase::EmitData(ssize_t nread, } -AsyncWrap* StreamBase::GetAsyncWrap() { +bool StreamBase::IsIPCPipe() { + return false; +} + + +int StreamBase::GetFD() { + return -1; +} + + +int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) { + // No TryWrite by default + return 0; +} + + +const char* StreamResource::Error() const { return nullptr; } + +void StreamResource::ClearError() { + // No-op +} + } // namespace node diff --git a/src/stream_base.h b/src/stream_base.h index d6b3a555b0596b..87aae0597376fd 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -106,13 +106,13 @@ class StreamResource { virtual ~StreamResource() = default; virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; - virtual int DoTryWrite(uv_buf_t** bufs, size_t* count) = 0; + virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); virtual int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) = 0; - virtual const char* Error() const = 0; - virtual void ClearError() = 0; + virtual const char* Error() const; + virtual void ClearError(); // Events inline void OnAfterWrite(WriteWrap* w) { @@ -127,7 +127,7 @@ class StreamResource { inline void OnRead(size_t nread, const uv_buf_t* buf, - uv_handle_type pending) { + uv_handle_type pending = UV_UNKNOWN_HANDLE) { if (read_cb_ != nullptr) read_cb_(nread, buf, pending, read_ctx_); } @@ -163,10 +163,10 @@ class StreamBase : public StreamResource { v8::Handle target); virtual void* Cast() = 0; - virtual bool IsAlive() const = 0; - virtual bool IsClosing() const = 0; - virtual bool IsIPCPipe() const = 0; - virtual int GetFD() const = 0; + virtual bool IsAlive() = 0; + virtual bool IsClosing() = 0; + virtual bool IsIPCPipe(); + virtual int GetFD(); virtual int ReadStart() = 0; virtual int ReadStop() = 0; diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 3b50f638eb0fc7..c8ea8d228fe081 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -84,7 +84,7 @@ void StreamWrap::AddMethods(Environment* env, } -int StreamWrap::GetFD() const { +int StreamWrap::GetFD() { int fd = -1; #if !defined(_WIN32) if (stream() != nullptr) @@ -94,12 +94,12 @@ int StreamWrap::GetFD() const { } -bool StreamWrap::IsAlive() const { +bool StreamWrap::IsAlive() { return HandleWrap::IsAlive(this); } -bool StreamWrap::IsClosing() const { +bool StreamWrap::IsClosing() { return uv_is_closing(reinterpret_cast(stream())); } @@ -114,7 +114,7 @@ AsyncWrap* StreamWrap::GetAsyncWrap() { } -bool StreamWrap::IsIPCPipe() const { +bool StreamWrap::IsIPCPipe() { return is_named_pipe_ipc(); } @@ -359,16 +359,6 @@ void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { wrap->UpdateWriteQueueSize(); } - -const char* StreamWrap::Error() const { - return nullptr; -} - - -void StreamWrap::ClearError() { - // No-op -} - } // namespace node NODE_MODULE_CONTEXT_AWARE_BUILTIN(stream_wrap, node::StreamWrap::Initialize) diff --git a/src/stream_wrap.h b/src/stream_wrap.h index ca673b4ef11879..99561e843ae11d 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -19,11 +19,11 @@ class StreamWrap : public HandleWrap, public StreamBase { v8::Handle unused, v8::Handle context); - int GetFD() const override; + int GetFD() override; void* Cast() override; - bool IsAlive() const override; - bool IsClosing() const override; - bool IsIPCPipe() const override; + bool IsAlive() override; + bool IsClosing() override; + bool IsIPCPipe() override; // JavaScript functions int ReadStart() override; @@ -36,8 +36,6 @@ class StreamWrap : public HandleWrap, public StreamBase { uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) override; - const char* Error() const override; - void ClearError() override; inline uv_stream_t* stream() const { return stream_; diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index ab8db6951bdc30..86056723df18ff 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -216,7 +216,7 @@ void TLSWrap::Receive(const FunctionCallbackInfo& args) { size_t copy = buf.len > len ? len : buf.len; memcpy(buf.base, data, copy); buf.len = copy; - wrap->stream_->OnRead(buf.len, &buf, UV_UNKNOWN_HANDLE); + wrap->stream_->OnRead(buf.len, &buf); data += copy; len -= copy; @@ -414,7 +414,7 @@ void TLSWrap::ClearOut() { if (static_cast(buf.len) < avail) avail = buf.len; memcpy(buf.base, out, avail); - OnRead(avail, &buf, UV_UNKNOWN_HANDLE); + OnRead(avail, &buf); read -= avail; } @@ -423,7 +423,7 @@ void TLSWrap::ClearOut() { int flags = SSL_get_shutdown(ssl_); if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) { eof_ = true; - OnRead(UV_EOF, nullptr, UV_UNKNOWN_HANDLE); + OnRead(UV_EOF, nullptr); } if (read == -1) { @@ -495,22 +495,22 @@ AsyncWrap* TLSWrap::GetAsyncWrap() { } -bool TLSWrap::IsIPCPipe() const { +bool TLSWrap::IsIPCPipe() { return stream_->IsIPCPipe(); } -int TLSWrap::GetFD() const { +int TLSWrap::GetFD() { return stream_->GetFD(); } -bool TLSWrap::IsAlive() const { +bool TLSWrap::IsAlive() { return stream_->IsAlive(); } -bool TLSWrap::IsClosing() const { +bool TLSWrap::IsClosing() { return stream_->IsClosing(); } @@ -536,12 +536,6 @@ void TLSWrap::ClearError() { } -int TLSWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) { - // TODO(indutny): Support it - return 0; -} - - int TLSWrap::DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, @@ -668,7 +662,7 @@ void TLSWrap::DoRead(ssize_t nread, HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(env()->context()); - OnRead(nread, nullptr, UV_UNKNOWN_HANDLE); + OnRead(nread, nullptr); return; } diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 42452055ced275..73a9f84ec0d1b7 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -32,16 +32,15 @@ class TLSWrap : public crypto::SSLWrap, v8::Handle context); void* Cast() override; - int GetFD() const override; - bool IsAlive() const override; - bool IsClosing() const override; + int GetFD() override; + bool IsAlive() override; + bool IsClosing() override; // JavaScript functions int ReadStart() override; int ReadStop() override; int DoShutdown(ShutdownWrap* req_wrap) override; - int DoTryWrite(uv_buf_t** bufs, size_t* count) override; int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, @@ -78,7 +77,7 @@ class TLSWrap : public crypto::SSLWrap, TLSWrap(Environment* env, Kind kind, - StreamBase* steram, + StreamBase* stream, v8::Handle stream_obj, v8::Handle sc); @@ -104,7 +103,7 @@ class TLSWrap : public crypto::SSLWrap, } AsyncWrap* GetAsyncWrap() override; - bool IsIPCPipe() const override; + bool IsIPCPipe() override; // Resource implementation static void OnAfterWriteImpl(WriteWrap* w, void* ctx); diff --git a/test/parallel/test-tls-js-stream.js b/test/parallel/test-tls-js-stream.js new file mode 100644 index 00000000000000..479ea0abe2724b --- /dev/null +++ b/test/parallel/test-tls-js-stream.js @@ -0,0 +1,69 @@ +var assert = require('assert'); +var stream = require('stream'); +var tls = require('tls'); +var fs = require('fs'); +var net = require('net'); + +var common = require('../common'); + +var connected = { + client: 0, + server: 0 +}; + +var server = tls.createServer({ + key: fs.readFileSync(common.fixturesDir + '/keys/agent1-key.pem'), + cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem') +}, function(c) { + console.log('new client'); + connected.server++; + c.end('ohai'); +}).listen(common.PORT, function() { + var raw = net.connect(common.PORT); + + var pending = false; + raw.on('readable', function() { + if (pending) + p._read(); + }); + + var p = new stream.Duplex({ + read: function read() { + pending = false; + + var chunk = raw.read(); + if (chunk) { + console.log('read', chunk); + this.push(chunk); + } else { + pending = true; + } + }, + write: function write(data, enc, cb) { + console.log('write', data, enc); + raw.write(data, enc, cb); + } + }); + + var socket = tls.connect({ + socket: raw, + rejectUnauthorized: false + }, function() { + console.log('client secure'); + + connected.client++; + + socket.end('hello'); + socket.resume(); + }); + + socket.once('close', function() { + console.log('client close'); + server.close(); + }); +}); + +process.once('exit', function() { + assert.equal(connected.client, 1); + assert.equal(connected.server, 1); +}); From d9711b155817d2b40f63f88433d43c079844d07e Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Mon, 23 Feb 2015 23:35:20 +0300 Subject: [PATCH 2/6] fix --- test/parallel/test-tls-js-stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-tls-js-stream.js b/test/parallel/test-tls-js-stream.js index 479ea0abe2724b..7caa7e3f1944d7 100644 --- a/test/parallel/test-tls-js-stream.js +++ b/test/parallel/test-tls-js-stream.js @@ -46,7 +46,7 @@ var server = tls.createServer({ }); var socket = tls.connect({ - socket: raw, + socket: p, rejectUnauthorized: false }, function() { console.log('client secure'); From 866e9574e502ffb967cb49d54ebb1f04f6884171 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Mon, 23 Feb 2015 23:56:50 +0300 Subject: [PATCH 3/6] fixes --- lib/_stream_wrap.js | 5 ++++- lib/_tls_wrap.js | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index c8f40fe45c7bdc..0b713737d884ee 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -43,7 +43,10 @@ function StreamWrap(stream) { }); } util.inherits(StreamWrap, Socket); -exports.StreamWrap = StreamWrap; +module.exports = StreamWrap; + +// require('_stream_wrap').StreamWrap +StreamWrap.StreamWrap = StreamWrap; StreamWrap.prototype.isAlive = function isAlive() { return this.readable && this.writable; diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index 386d6446e64a6d..7d8ac85bffa097 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -227,7 +227,7 @@ function TLSSocket(socket, options) { this.authorizationError = null; // Wrap plain JS Stream into StreamWrap - if (socket instanceof Duplex) + if (!(socket instanceof net.Socket) && socket instanceof Duplex) socket = new StreamWrap(socket); // Just a documented property to make secure sockets From 3310b4874969cc6bdc037ffce5a560dd48e889a9 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 24 Feb 2015 22:06:37 +0300 Subject: [PATCH 4/6] fixes --- lib/_stream_wrap.js | 24 +++++++++++++++++++++--- lib/_tls_wrap.js | 4 ++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index 0b713737d884ee..e4e5490f4dff3b 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -1,6 +1,7 @@ const util = require('util'); const Socket = require('net').Socket; const JSStream = process.binding('js_stream').JSStream; +const uv = process.binding('uv'); function StreamWrap(stream) { var handle = new JSStream(); @@ -37,6 +38,9 @@ function StreamWrap(stream) { this.stream.once('end', function() { self._handle.emitEOF(); }); + this.stream.on('error', function(err) { + self.emit('error', err); + }); Socket.call(this, { handle: handle @@ -75,6 +79,7 @@ StreamWrap.prototype.shutdown = function shutdown(req) { self._handle.finishShutdown(req, 0); }); }); + return 0; }; StreamWrap.prototype.write = function write(req, bufs) { @@ -85,14 +90,27 @@ StreamWrap.prototype.write = function write(req, bufs) { self.stream.write(buf, done); }); - function done() { - if (--pending !== 0) + function done(err) { + if (err || --pending !== 0) return; + // Ensure that this is called once in case of error + pending = 0; + // Ensure that write was dispatched setImmediate(function() { + var errCode = 0; + if (err) { + if (err.code && uv['UV_' + err.code]) + errCode = uv['UV_' + err.code]; + else + errCode = uv.UV_EPIPE; + } + self._handle.doAfterWrite(req); - self._handle.finishWrite(req, 0); + self._handle.finishWrite(req, errCode); }); } + + return 0; }; diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index 7d8ac85bffa097..41421e19880890 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -395,6 +395,10 @@ TLSSocket.prototype._init = function(socket) { self._connecting = false; self.emit('connect'); }); + + socket.on('error', function(err) { + self._tlsError(err); + }); } // Assume `tls.connect()` From b9c004b035873085766854ea231086aa20e04869 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 24 Feb 2015 22:11:23 +0300 Subject: [PATCH 5/6] fix --- lib/_stream_wrap.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index e4e5490f4dff3b..b644bea44bebe8 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -91,7 +91,7 @@ StreamWrap.prototype.write = function write(req, bufs) { }); function done(err) { - if (err || --pending !== 0) + if (!err && --pending !== 0) return; // Ensure that this is called once in case of error From cef4c461cfe5b0347ed075592fcea0afc0f4b027 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 24 Feb 2015 22:30:27 +0300 Subject: [PATCH 6/6] fix --- lib/_stream_wrap.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index b644bea44bebe8..c3dcfe51b6c9b5 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -86,9 +86,11 @@ StreamWrap.prototype.write = function write(req, bufs) { var pending = bufs.length; var self = this; + self.stream.cork(); bufs.forEach(function(buf) { self.stream.write(buf, done); }); + self.stream.uncork(); function done(err) { if (!err && --pending !== 0)