Skip to content

Commit

Permalink
src: turn JS stream into a full duplex
Browse files Browse the repository at this point in the history
Remove unused methods for reading data from `JSStream` and add
those required for emitting data or an EOF event to the JS side,
in essentially the same way that `LibuvStreamWrap` does it.

PR-URL: #16269
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax committed Oct 23, 2017
1 parent 127f83a commit 170bc31
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 35 deletions.
75 changes: 42 additions & 33 deletions src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,55 @@ JSStream::JSStream(Environment* env, Local<Object> obj)
StreamBase(env) {
node::Wrap(obj, this);
MakeWeak<JSStream>(this);

set_alloc_cb({ OnAllocImpl, this });
set_read_cb({ OnReadImpl, this });
}


JSStream::~JSStream() {
}


void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
buf->base = Malloc(size);
buf->len = size;
}


void JSStream::OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
JSStream* wrap = static_cast<JSStream*>(ctx);
CHECK_NE(wrap, nullptr);
Environment* env = wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());

if (nread < 0) {
if (buf != nullptr && buf->base != nullptr)
free(buf->base);
wrap->EmitData(nread, Local<Object>(), Local<Object>());
return;
}

if (nread == 0) {
if (buf->base != nullptr)
free(buf->base);
return;
}

CHECK_LE(static_cast<size_t>(nread), buf->len);
char* base = node::Realloc(buf->base, nread);

CHECK_EQ(pending, UV_UNKNOWN_HANDLE);

Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
wrap->EmitData(nread, obj, Local<Object>());
}


void* JSStream::Cast() {
return static_cast<void*>(this);
}
Expand Down Expand Up @@ -134,37 +176,6 @@ void JSStream::New(const FunctionCallbackInfo<Value>& args) {
}


static void FreeCallback(char* data, void* hint) {
// Intentional no-op
}


void JSStream::DoAlloc(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

uv_buf_t buf;
wrap->OnAlloc(args[0]->Int32Value(), &buf);
Local<Object> vbuf = Buffer::New(
wrap->env(),
buf.base,
buf.len,
FreeCallback,
nullptr).ToLocalChecked();
return args.GetReturnValue().Set(vbuf);
}


void JSStream::DoRead(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

CHECK(Buffer::HasInstance(args[1]));
uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1]));
wrap->OnRead(args[0]->Int32Value(), &buf);
}


void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
CHECK(args[0]->IsObject());
Expand Down Expand Up @@ -230,8 +241,6 @@ void JSStream::Initialize(Local<Object> target,

AsyncWrap::AddWrapMethods(env, t);

env->SetProtoMethod(t, "doAlloc", DoAlloc);
env->SetProtoMethod(t, "doRead", DoRead);
env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite);
env->SetProtoMethod(t, "finishWrite", Finish<WriteWrap>);
env->SetProtoMethod(t, "finishShutdown", Finish<ShutdownWrap>);
Expand Down
8 changes: 6 additions & 2 deletions src/js_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ class JSStream : public AsyncWrap, public StreamBase {
AsyncWrap* GetAsyncWrap() override;

static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void DoAlloc(const v8::FunctionCallbackInfo<v8::Value>& args);
static void DoRead(const v8::FunctionCallbackInfo<v8::Value>& args);
static void DoAfterWrite(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ReadBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
static void EmitEOF(const v8::FunctionCallbackInfo<v8::Value>& args);

static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
static void OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);

template <class Wrap>
static void Finish(const v8::FunctionCallbackInfo<v8::Value>& args);
};
Expand Down
22 changes: 22 additions & 0 deletions test/parallel/test-wrap-js-stream-duplex.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const StreamWrap = require('_stream_wrap');
const { PassThrough } = require('stream');
const { Socket } = require('net');

{
const wrap = new StreamWrap(new PassThrough());
assert(wrap instanceof Socket);
wrap.on('data', common.mustCall((d) => assert.strictEqual(`${d}`, 'foo')));
wrap.on('end', common.mustNotCall());
wrap.write('foo');
}

{
const wrap = new StreamWrap(new PassThrough());
assert(wrap instanceof Socket);
wrap.on('data', common.mustCall((d) => assert.strictEqual(`${d}`, 'foo')));
wrap.on('end', common.mustCall());
wrap.end('foo');
}

0 comments on commit 170bc31

Please sign in to comment.