diff --git a/src/js_stream.cc b/src/js_stream.cc index d3d43ac6c9d8f2..a279970c1bbfca 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -27,6 +27,9 @@ JSStream::JSStream(Environment* env, Local obj) StreamBase(env) { node::Wrap(obj, this); MakeWeak(this); + + set_alloc_cb({ OnAllocImpl, this }); + set_read_cb({ OnReadImpl, this }); } @@ -34,6 +37,45 @@ JSStream::~JSStream() { } +void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) { + buf->base = Malloc(size); + buf->len = size; +} + + +void JSStream::OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx) { + JSStream* wrap = static_cast(ctx); + CHECK_NE(wrap, nullptr); + Environment* env = wrap->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + if (nread < 0) { + if (buf != nullptr && buf->base != nullptr) + free(buf->base); + wrap->EmitData(nread, Local(), Local()); + return; + } + + if (nread == 0) { + if (buf->base != nullptr) + free(buf->base); + return; + } + + CHECK_LE(static_cast(nread), buf->len); + char* base = node::Realloc(buf->base, nread); + + CHECK_EQ(pending, UV_UNKNOWN_HANDLE); + + Local obj = Buffer::New(env, base, nread).ToLocalChecked(); + wrap->EmitData(nread, obj, Local()); +} + + void* JSStream::Cast() { return static_cast(this); } @@ -134,37 +176,6 @@ void JSStream::New(const FunctionCallbackInfo& args) { } -static void FreeCallback(char* data, void* hint) { - // Intentional no-op -} - - -void JSStream::DoAlloc(const FunctionCallbackInfo& args) { - JSStream* wrap; - ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); - - uv_buf_t buf; - wrap->OnAlloc(args[0]->Int32Value(), &buf); - Local vbuf = Buffer::New( - wrap->env(), - buf.base, - buf.len, - FreeCallback, - nullptr).ToLocalChecked(); - return args.GetReturnValue().Set(vbuf); -} - - -void JSStream::DoRead(const FunctionCallbackInfo& args) { - JSStream* wrap; - ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); - - CHECK(Buffer::HasInstance(args[1])); - uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1])); - wrap->OnRead(args[0]->Int32Value(), &buf); -} - - void JSStream::DoAfterWrite(const FunctionCallbackInfo& args) { JSStream* wrap; CHECK(args[0]->IsObject()); @@ -230,8 +241,6 @@ void JSStream::Initialize(Local target, AsyncWrap::AddWrapMethods(env, t); - env->SetProtoMethod(t, "doAlloc", DoAlloc); - env->SetProtoMethod(t, "doRead", DoRead); env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite); env->SetProtoMethod(t, "finishWrite", Finish); env->SetProtoMethod(t, "finishShutdown", Finish); diff --git a/src/js_stream.h b/src/js_stream.h index fc0b7abe15a633..a4a67ae3372620 100644 --- a/src/js_stream.h +++ b/src/js_stream.h @@ -38,12 +38,16 @@ class JSStream : public AsyncWrap, public StreamBase { AsyncWrap* GetAsyncWrap() override; static void New(const v8::FunctionCallbackInfo& args); - static void DoAlloc(const v8::FunctionCallbackInfo& args); - static void DoRead(const v8::FunctionCallbackInfo& args); static void DoAfterWrite(const v8::FunctionCallbackInfo& args); static void ReadBuffer(const v8::FunctionCallbackInfo& args); static void EmitEOF(const v8::FunctionCallbackInfo& args); + static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); + static void OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx); + template static void Finish(const v8::FunctionCallbackInfo& args); }; diff --git a/test/parallel/test-wrap-js-stream-duplex.js b/test/parallel/test-wrap-js-stream-duplex.js new file mode 100644 index 00000000000000..6bd860e6ba1f56 --- /dev/null +++ b/test/parallel/test-wrap-js-stream-duplex.js @@ -0,0 +1,22 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const StreamWrap = require('_stream_wrap'); +const { PassThrough } = require('stream'); +const { Socket } = require('net'); + +{ + const wrap = new StreamWrap(new PassThrough()); + assert(wrap instanceof Socket); + wrap.on('data', common.mustCall((d) => assert.strictEqual(`${d}`, 'foo'))); + wrap.on('end', common.mustNotCall()); + wrap.write('foo'); +} + +{ + const wrap = new StreamWrap(new PassThrough()); + assert(wrap instanceof Socket); + wrap.on('data', common.mustCall((d) => assert.strictEqual(`${d}`, 'foo'))); + wrap.on('end', common.mustCall()); + wrap.end('foo'); +}