Skip to content

Commit

Permalink
Use custom StreamListener instead of fixed buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
addaleax committed Jan 13, 2019
1 parent f84b416 commit d8b783f
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 134 deletions.
16 changes: 9 additions & 7 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,18 @@ function getFlags(ipv6Only) {
return ipv6Only === true ? TCPConstants.UV_TCP_IPV6ONLY : 0;
}

function createHandle(fd, is_server, buf) {
function createHandle(fd, is_server) {
validateInt32(fd, 'fd', 0);
const type = TTYWrap.guessHandleType(fd);
if (type === 'PIPE') {
return new Pipe(
is_server ? PipeConstants.SERVER : PipeConstants.SOCKET,
buf
is_server ? PipeConstants.SERVER : PipeConstants.SOCKET
);
}

if (type === 'TCP') {
return new TCP(
is_server ? TCPConstants.SERVER : TCPConstants.SOCKET,
buf
is_server ? TCPConstants.SERVER : TCPConstants.SOCKET
);
}

Expand Down Expand Up @@ -223,6 +221,10 @@ function initSocketHandle(self) {
self._handle[owner_symbol] = self;
self._handle.onread = onStreamRead;
self[async_id_symbol] = getNewAsyncId(self._handle);

if (self[kBuffer]) {
self._handle.useUserBuffer(self[kBuffer]);
}
}
}

Expand Down Expand Up @@ -904,8 +906,8 @@ Socket.prototype.connect = function(...args) {

if (!this._handle) {
this._handle = pipe ?
new Pipe(PipeConstants.SOCKET, this[kBuffer]) :
new TCP(TCPConstants.SOCKET, this[kBuffer]);
new Pipe(PipeConstants.SOCKET) :
new TCP(TCPConstants.SOCKET);
initSocketHandle(this);
}

Expand Down
23 changes: 0 additions & 23 deletions src/connection_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,6 @@ ConnectionWrap<WrapType, UVType>::ConnectionWrap(Environment* env,
reinterpret_cast<uv_stream_t*>(&handle_),
provider) {}

template <typename WrapType, typename UVType>
ConnectionWrap<WrapType, UVType>::ConnectionWrap(Environment* env,
Local<Object> object,
ProviderType provider,
uv_buf_t buf)
: LibuvStreamWrap(env,
object,
reinterpret_cast<uv_stream_t*>(&handle_),
provider,
buf) {}


template <typename WrapType, typename UVType>
void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle,
Expand Down Expand Up @@ -127,23 +116,11 @@ template ConnectionWrap<PipeWrap, uv_pipe_t>::ConnectionWrap(
Local<Object> object,
ProviderType provider);

template ConnectionWrap<PipeWrap, uv_pipe_t>::ConnectionWrap(
Environment* env,
Local<Object> object,
ProviderType provider,
uv_buf_t buf);

template ConnectionWrap<TCPWrap, uv_tcp_t>::ConnectionWrap(
Environment* env,
Local<Object> object,
ProviderType provider);

template ConnectionWrap<TCPWrap, uv_tcp_t>::ConnectionWrap(
Environment* env,
Local<Object> object,
ProviderType provider,
uv_buf_t buf);

template void ConnectionWrap<PipeWrap, uv_pipe_t>::OnConnection(
uv_stream_t* handle, int status);

Expand Down
4 changes: 0 additions & 4 deletions src/connection_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ class ConnectionWrap : public LibuvStreamWrap {
ConnectionWrap(Environment* env,
v8::Local<v8::Object> object,
ProviderType provider);
ConnectionWrap(Environment* env,
v8::Local<v8::Object> object,
ProviderType provider,
uv_buf_t buf);

UVType handle_;
};
Expand Down
20 changes: 1 addition & 19 deletions src/pipe_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,7 @@ void PipeWrap::New(const FunctionCallbackInfo<Value>& args) {
UNREACHABLE();
}

if (args.Length() > 1 && Buffer::HasInstance(args[1])) {
uv_buf_t buf;
buf.base = Buffer::Data(args[1]);
buf.len = Buffer::Length(args[1]);
new PipeWrap(env, args.This(), provider, ipc, buf);
} else {
new PipeWrap(env, args.This(), provider, ipc);
}
new PipeWrap(env, args.This(), provider, ipc);
}


Expand All @@ -170,17 +163,6 @@ PipeWrap::PipeWrap(Environment* env,
// Suggestion: uv_pipe_init() returns void.
}

PipeWrap::PipeWrap(Environment* env,
Local<Object> object,
ProviderType provider,
bool ipc,
uv_buf_t buf)
: ConnectionWrap(env, object, provider, buf) {
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
}


void PipeWrap::Bind(const FunctionCallbackInfo<Value>& args) {
PipeWrap* wrap;
Expand Down
5 changes: 0 additions & 5 deletions src/pipe_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
v8::Local<v8::Object> object,
ProviderType provider,
bool ipc);
PipeWrap(Environment* env,
v8::Local<v8::Object> object,
ProviderType provider,
bool ipc,
uv_buf_t buf);

static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Bind(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down
15 changes: 3 additions & 12 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,13 @@ inline void StreamResource::EmitWantsWrite(size_t suggested_size) {
}

inline StreamBase::StreamBase(Environment* env) : env_(env) {
buf_.base = nullptr;
buf_.len = 0;
PushStreamListener(&default_listener_);
}

inline StreamBase::StreamBase(Environment* env, uv_buf_t buf)
: env_(env),
buf_(buf) {
PushStreamListener(&default_listener_);
}

inline Environment* StreamBase::stream_env() const {
return env_;
}

inline uv_buf_t StreamBase::stream_buf() const {
return buf_;
}

inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
Environment* env = stream_env();

Expand Down Expand Up @@ -338,6 +326,9 @@ void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>);
env->SetProtoMethod(t,
"useUserBuffer",
JSMethod<Base, &StreamBase::UseUserBuffer>);
env->SetProtoMethod(t, "writev", JSMethod<Base, &StreamBase::Writev>);
env->SetProtoMethod(t,
"writeBuffer",
Expand Down
64 changes: 39 additions & 25 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
return ReadStop();
}

int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
CHECK(Buffer::HasInstance(args[0]));

uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
PushStreamListener(new CustomBufferJSListener(buf));
return 0;
}

int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject());
Expand Down Expand Up @@ -295,7 +302,7 @@ void StreamBase::CallJSOnreadMethod(ssize_t nread,
DCHECK_EQ(static_cast<int32_t>(nread), nread);
DCHECK_LE(offset, INT32_MAX);

if (ab.IsEmpty() && buf_.base == nullptr) {
if (ab.IsEmpty()) {
DCHECK_EQ(offset, 0);
DCHECK_LE(nread, 0);
} else {
Expand Down Expand Up @@ -347,12 +354,7 @@ void StreamResource::ClearError() {


uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
StreamBase* stream = static_cast<StreamBase*>(stream_);
const uv_buf_t stream_buf = stream->stream_buf();
if (stream_buf.base != nullptr)
return stream_buf;
else
return uv_buf_init(Malloc(suggested_size), suggested_size);
return uv_buf_init(Malloc(suggested_size), suggested_size);
}


Expand All @@ -362,32 +364,44 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
Environment* env = stream->stream_env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
const uv_buf_t stream_buf = stream->stream_buf();
Local<ArrayBuffer> obj;

if (nread <= 0) {
if (stream_buf.base == nullptr)
free(buf.base);
if (nread == 0)
return;
} else if (stream_buf.base != nullptr) {
CHECK_LE(static_cast<size_t>(nread), stream_buf.len);
} else {
CHECK_LE(static_cast<size_t>(nread), buf.len);
free(buf.base);
if (nread < 0)
stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
return;
}

char* base = Realloc(buf.base, nread);
CHECK_LE(static_cast<size_t>(nread), buf.len);
char* base = Realloc(buf.base, nread);

obj = ArrayBuffer::New(
env->isolate(),
base,
nread,
// Transfer ownership to V8.
v8::ArrayBufferCreationMode::kInternalized);
}
Local<ArrayBuffer> obj = ArrayBuffer::New(
env->isolate(),
base,
nread,
v8::ArrayBufferCreationMode::kInternalized); // Transfer ownership to V8.
stream->CallJSOnreadMethod(nread, obj);
}


uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
return buffer_;
}


void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
CHECK_NOT_NULL(stream_);
CHECK_EQ(buf.base, buffer_.base);

StreamBase* stream = static_cast<StreamBase*>(stream_);
Environment* env = stream->stream_env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());

stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
}


void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
StreamReq* req_wrap, int status) {
StreamBase* stream = static_cast<StreamBase*>(stream_);
Expand Down
19 changes: 16 additions & 3 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
};


// An alternative listener that uses a custom, user-provided buffer
// for reading data.
class CustomBufferJSListener : public ReportWritesToJSStreamListener {
public:
uv_buf_t OnStreamAlloc(size_t suggested_size) override;
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
void OnStreamDestroy() override { delete this; }

explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}

private:
uv_buf_t buffer_;
};


// A generic stream, comparable to JS land’s `Duplex` streams.
// A stream is always controlled through one `StreamListener` instance.
class StreamResource {
Expand Down Expand Up @@ -271,7 +286,6 @@ class StreamBase : public StreamResource {
// This is named `stream_env` to avoid name clashes, because a lot of
// subclasses are also `BaseObject`s.
Environment* stream_env() const;
uv_buf_t stream_buf() const;

// Shut down the current stream. This request can use an existing
// ShutdownWrap object (that was created in JS), or a new one will be created.
Expand Down Expand Up @@ -302,7 +316,6 @@ class StreamBase : public StreamResource {

protected:
explicit StreamBase(Environment* env);
explicit StreamBase(Environment* env, uv_buf_t buf);

// JS Methods
int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand All @@ -312,6 +325,7 @@ class StreamBase : public StreamResource {
int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
template <enum encoding enc>
int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);

template <class Base>
static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down Expand Up @@ -341,7 +355,6 @@ class StreamBase : public StreamResource {

private:
Environment* env_;
uv_buf_t buf_;
EmitToJSStreamListener default_listener_;

void SetWriteResult(const StreamWriteResult& res);
Expand Down
13 changes: 0 additions & 13 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
stream_(stream) {
}

LibuvStreamWrap::LibuvStreamWrap(Environment* env,
Local<Object> object,
uv_stream_t* stream,
AsyncWrap::ProviderType provider,
uv_buf_t buf)
: HandleWrap(env,
object,
reinterpret_cast<uv_handle_t*>(stream),
provider),
StreamBase(env, buf),
stream_(stream) {
}


Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
Environment* env) {
Expand Down
5 changes: 0 additions & 5 deletions src/stream_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
v8::Local<v8::Object> object,
uv_stream_t* stream,
AsyncWrap::ProviderType provider);
LibuvStreamWrap(Environment* env,
v8::Local<v8::Object> object,
uv_stream_t* stream,
AsyncWrap::ProviderType provider,
uv_buf_t buf);

AsyncWrap* GetAsyncWrap() override;

Expand Down
17 changes: 1 addition & 16 deletions src/tcp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,7 @@ void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {
UNREACHABLE();
}

if (args.Length() > 1 && Buffer::HasInstance(args[1])) {
uv_buf_t buf;
buf.base = Buffer::Data(args[1]);
buf.len = Buffer::Length(args[1]);
new TCPWrap(env, args.This(), provider, buf);
} else {
new TCPWrap(env, args.This(), provider);
}
new TCPWrap(env, args.This(), provider);
}


Expand All @@ -176,14 +169,6 @@ TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
// Suggestion: uv_tcp_init() returns void.
}

TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider,
uv_buf_t buf)
: ConnectionWrap(env, object, provider, buf) {
int r = uv_tcp_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_tcp_init() returns void.
}


void TCPWrap::SetNoDelay(const FunctionCallbackInfo<Value>& args) {
TCPWrap* wrap;
Expand Down
Loading

0 comments on commit d8b783f

Please sign in to comment.