Skip to content

Commit

Permalink
src: minor refactoring to StreamBase writes
Browse files Browse the repository at this point in the history
Instead of having per-request callbacks, always call a callback
on the `StreamBase` instance itself for `WriteWrap` and `ShutdownWrap`.

This makes `WriteWrap` cleanup consistent for all stream classes,
since the after-write callback is always the same now.

If special handling is needed for writes that happen to a sub-class,
`AfterWrite` can be overridden by that class, rather than that
class providing its own callback (e.g. updating the write
queue size for libuv streams).

If special handling is needed for writes that happen on another
stream instance, the existing `after_write_cb()` callback
is used for that (e.g. custom code after writing to the
transport from a TLS stream).

As a nice bonus, this also makes `WriteWrap` and `ShutdownWrap`
instances slightly smaller.

PR-URL: nodejs#17564
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax committed Dec 13, 2017
1 parent 785debc commit a58f89d
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 87 deletions.
2 changes: 1 addition & 1 deletion src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());

wrap->OnAfterWrite(w);
w->Done(0);
}


Expand Down
5 changes: 1 addition & 4 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -987,9 +987,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {

WriteWrap* Http2Session::AllocateSend() {
HandleScope scope(env()->isolate());
auto AfterWrite = [](WriteWrap* req, int status) {
req->Dispose();
};
Local<Object> obj =
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
Expand All @@ -999,7 +996,7 @@ WriteWrap* Http2Session::AllocateSend() {
session(),
NGHTTP2_SETTINGS_MAX_FRAME_SIZE);
// Max frame size + 9 bytes for the header
return WriteWrap::New(env(), obj, stream_, AfterWrite, size + 9);
return WriteWrap::New(env(), obj, stream_, size + 9);
}

void Http2Session::Send(WriteWrap* req, char* buf, size_t length) {
Expand Down
12 changes: 10 additions & 2 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,19 @@ void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
}


inline void ShutdownWrap::OnDone(int status) {
stream()->AfterShutdown(this, status);
}


WriteWrap* WriteWrap::New(Environment* env,
Local<Object> obj,
StreamBase* wrap,
DoneCb cb,
size_t extra) {
size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra;
char* storage = new char[storage_size];

return new(storage) WriteWrap(env, obj, wrap, cb, storage_size);
return new(storage) WriteWrap(env, obj, wrap, storage_size);
}


Expand All @@ -171,6 +175,10 @@ size_t WriteWrap::ExtraSize() const {
return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize);
}

inline void WriteWrap::OnDone(int status) {
stream()->AfterWrite(this, status);
}

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
Expand Down
21 changes: 9 additions & 12 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
env->set_init_trigger_async_id(wrap->get_async_id());
ShutdownWrap* req_wrap = new ShutdownWrap(env,
req_wrap_obj,
this,
AfterShutdown);
this);

int err = DoShutdown(req_wrap);
if (err)
Expand All @@ -66,7 +65,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {


void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
StreamBase* wrap = req_wrap->wrap();
Environment* env = req_wrap->env();

// The wrap and request objects should still be there.
Expand All @@ -78,7 +76,7 @@ void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
Local<Object> req_wrap_obj = req_wrap->object();
Local<Value> argv[3] = {
Integer::New(env->isolate(), status),
wrap->GetObject(),
GetObject(),
req_wrap_obj
};

Expand Down Expand Up @@ -158,7 +156,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr);
env->set_init_trigger_async_id(wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);

offset = 0;
if (!all_buffers) {
Expand Down Expand Up @@ -248,7 +246,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
if (wrap != nullptr)
env->set_init_trigger_async_id(wrap->get_async_id());
// Allocate, or write rest
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
req_wrap = WriteWrap::New(env, req_wrap_obj, this);

err = DoWrite(req_wrap, bufs, count, nullptr);
req_wrap_obj->Set(env->async(), True(env->isolate()));
Expand Down Expand Up @@ -332,7 +330,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
wrap = GetAsyncWrap();
if (wrap != nullptr)
env->set_init_trigger_async_id(wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);

data = req_wrap->Extra();

Expand Down Expand Up @@ -393,7 +391,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {


void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
StreamBase* wrap = req_wrap->wrap();
Environment* env = req_wrap->env();

HandleScope handle_scope(env->isolate());
Expand All @@ -405,19 +402,19 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
// Unref handle property
Local<Object> req_wrap_obj = req_wrap->object();
req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
wrap->OnAfterWrite(req_wrap);
OnAfterWrite(req_wrap, status);

Local<Value> argv[] = {
Integer::New(env->isolate(), status),
wrap->GetObject(),
GetObject(),
req_wrap_obj,
Undefined(env->isolate())
};

const char* msg = wrap->Error();
const char* msg = Error();
if (msg != nullptr) {
argv[3] = OneByteString(env->isolate(), msg);
wrap->ClearError();
ClearError();
}

if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
Expand Down
66 changes: 27 additions & 39 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,37 @@ namespace node {
// Forward declarations
class StreamBase;

template <class Req>
template<typename Base>
class StreamReq {
public:
typedef void (*DoneCb)(Req* req, int status);

explicit StreamReq(DoneCb cb) : cb_(cb) {
explicit StreamReq(StreamBase* stream) : stream_(stream) {
}

inline void Done(int status, const char* error_str = nullptr) {
Req* req = static_cast<Req*>(this);
Base* req = static_cast<Base*>(this);
Environment* env = req->env();
if (error_str != nullptr) {
req->object()->Set(env->error_string(),
OneByteString(env->isolate(), error_str));
}

cb_(req, status);
req->OnDone(status);
}

inline StreamBase* stream() const { return stream_; }

private:
DoneCb cb_;
StreamBase* const stream_;
};

class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
public StreamReq<ShutdownWrap> {
public:
ShutdownWrap(Environment* env,
v8::Local<v8::Object> req_wrap_obj,
StreamBase* wrap,
DoneCb cb)
StreamBase* stream)
: ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
StreamReq<ShutdownWrap>(cb),
wrap_(wrap) {
StreamReq<ShutdownWrap>(stream) {
Wrap(req_wrap_obj, this);
}

Expand All @@ -60,27 +58,22 @@ class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
return ContainerOf(&ShutdownWrap::req_, req);
}

inline StreamBase* wrap() const { return wrap_; }
size_t self_size() const override { return sizeof(*this); }

private:
StreamBase* const wrap_;
inline void OnDone(int status); // Just calls stream()->AfterShutdown()
};

class WriteWrap: public ReqWrap<uv_write_t>,
public StreamReq<WriteWrap> {
class WriteWrap : public ReqWrap<uv_write_t>,
public StreamReq<WriteWrap> {
public:
static inline WriteWrap* New(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* wrap,
DoneCb cb,
StreamBase* stream,
size_t extra = 0);
inline void Dispose();
inline char* Extra(size_t offset = 0);
inline size_t ExtraSize() const;

inline StreamBase* wrap() const { return wrap_; }

size_t self_size() const override { return storage_size_; }

static WriteWrap* from_req(uv_write_t* req) {
Expand All @@ -91,24 +84,22 @@ class WriteWrap: public ReqWrap<uv_write_t>,

WriteWrap(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* wrap,
DoneCb cb)
StreamBase* stream)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(cb),
wrap_(wrap),
StreamReq<WriteWrap>(stream),
storage_size_(0) {
Wrap(obj, this);
}

inline void OnDone(int status); // Just calls stream()->AfterWrite()

protected:
WriteWrap(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* wrap,
DoneCb cb,
StreamBase* stream,
size_t storage_size)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(cb),
wrap_(wrap),
StreamReq<WriteWrap>(stream),
storage_size_(storage_size) {
Wrap(obj, this);
}
Expand All @@ -129,7 +120,6 @@ class WriteWrap: public ReqWrap<uv_write_t>,
// WriteWrap. Ensure this never happens.
void operator delete(void* ptr) { UNREACHABLE(); }

StreamBase* const wrap_;
const size_t storage_size_;
};

Expand All @@ -151,7 +141,7 @@ class StreamResource {
void* ctx;
};

typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx);
typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
typedef void (*ReadCb)(ssize_t nread,
const uv_buf_t* buf,
Expand All @@ -176,9 +166,9 @@ class StreamResource {
virtual void ClearError();

// Events
inline void OnAfterWrite(WriteWrap* w) {
inline void OnAfterWrite(WriteWrap* w, int status) {
if (!after_write_cb_.is_empty())
after_write_cb_.fn(w, after_write_cb_.ctx);
after_write_cb_.fn(w, status, after_write_cb_.ctx);
}

inline void OnAlloc(size_t size, uv_buf_t* buf) {
Expand Down Expand Up @@ -208,14 +198,12 @@ class StreamResource {
inline Callback<ReadCb> read_cb() { return read_cb_; }
inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }

private:
protected:
Callback<AfterWriteCb> after_write_cb_;
Callback<AllocCb> alloc_cb_;
Callback<ReadCb> read_cb_;
Callback<DestructCb> destruct_cb_;
uint64_t bytes_read_;

friend class StreamBase;
};

class StreamBase : public StreamResource {
Expand Down Expand Up @@ -253,6 +241,10 @@ class StreamBase : public StreamResource {
v8::Local<v8::Object> buf,
v8::Local<v8::Object> handle);

// These are called by the respective {Write,Shutdown}Wrap class.
virtual void AfterShutdown(ShutdownWrap* req, int status);
virtual void AfterWrite(WriteWrap* req, int status);

protected:
explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
}
Expand All @@ -263,10 +255,6 @@ class StreamBase : public StreamResource {
virtual AsyncWrap* GetAsyncWrap() = 0;
virtual v8::Local<v8::Object> GetObject();

// Libuv callbacks
static void AfterShutdown(ShutdownWrap* req, int status);
static void AfterWrite(WriteWrap* req, int status);

// JS Methods
int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down
17 changes: 8 additions & 9 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
provider),
StreamBase(env),
stream_(stream) {
set_after_write_cb({ OnAfterWriteImpl, this });
set_alloc_cb({ OnAllocImpl, this });
set_read_cb({ OnReadImpl, this });
}
Expand Down Expand Up @@ -293,13 +292,13 @@ void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {

int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
int err;
err = uv_shutdown(req_wrap->req(), stream(), AfterShutdown);
err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown);
req_wrap->Dispatched();
return err;
}


void LibuvStreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
ShutdownWrap* req_wrap = ShutdownWrap::from_req(req);
CHECK_NE(req_wrap, nullptr);
HandleScope scope(req_wrap->env()->isolate());
Expand Down Expand Up @@ -354,9 +353,9 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
uv_stream_t* send_handle) {
int r;
if (send_handle == nullptr) {
r = uv_write(w->req(), stream(), bufs, count, AfterWrite);
r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite);
} else {
r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite);
r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterUvWrite);
}

if (!r) {
Expand All @@ -377,7 +376,7 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
}


void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) {
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
WriteWrap* req_wrap = WriteWrap::from_req(req);
CHECK_NE(req_wrap, nullptr);
HandleScope scope(req_wrap->env()->isolate());
Expand All @@ -386,9 +385,9 @@ void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) {
}


void LibuvStreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(ctx);
wrap->UpdateWriteQueueSize();
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
StreamBase::AfterWrite(w, status);
UpdateWriteQueueSize();
}

} // namespace node
Expand Down
Loading

0 comments on commit a58f89d

Please sign in to comment.