Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: minor refactoring to StreamBase #17564

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ void JSStream::OnReadImpl(ssize_t nread,
}


void* JSStream::Cast() {
return static_cast<void*>(this);
}


AsyncWrap* JSStream::GetAsyncWrap() {
return static_cast<AsyncWrap*>(this);
}
Expand Down Expand Up @@ -181,7 +176,7 @@ void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());

wrap->OnAfterWrite(w);
w->Done(0);
}


Expand Down
1 change: 0 additions & 1 deletion src/js_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class JSStream : public AsyncWrap, public StreamBase {

~JSStream();

void* Cast() override;
bool IsAlive() override;
bool IsClosing() override;
int ReadStart() override;
Expand Down
5 changes: 1 addition & 4 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -987,9 +987,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {

WriteWrap* Http2Session::AllocateSend() {
HandleScope scope(env()->isolate());
auto AfterWrite = [](WriteWrap* req, int status) {
req->Dispose();
};
Local<Object> obj =
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
Expand All @@ -999,7 +996,7 @@ WriteWrap* Http2Session::AllocateSend() {
session(),
NGHTTP2_SETTINGS_MAX_FRAME_SIZE);
// Max frame size + 9 bytes for the header
return WriteWrap::New(env(), obj, stream_, AfterWrite, size + 9);
return WriteWrap::New(env(), obj, stream_, size + 9);
}

void Http2Session::Send(WriteWrap* req, char* buf, size_t length) {
Expand Down
3 changes: 1 addition & 2 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,7 @@ class Http2Stream : public AsyncWrap,
return false;
}

AsyncWrap* GetAsyncWrap() override { return static_cast<AsyncWrap*>(this); }
void* Cast() override { return reinterpret_cast<void*>(this); }
AsyncWrap* GetAsyncWrap() override { return this; }

int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count,
uv_stream_t* send_handle) override;
Expand Down
12 changes: 10 additions & 2 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,19 @@ void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
}


inline void ShutdownWrap::OnDone(int status) {
stream()->AfterShutdown(this, status);
}


WriteWrap* WriteWrap::New(Environment* env,
Local<Object> obj,
StreamBase* wrap,
DoneCb cb,
size_t extra) {
size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra;
char* storage = new char[storage_size];

return new(storage) WriteWrap(env, obj, wrap, cb, storage_size);
return new(storage) WriteWrap(env, obj, wrap, storage_size);
}


Expand All @@ -171,6 +175,10 @@ size_t WriteWrap::ExtraSize() const {
return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize);
}

inline void WriteWrap::OnDone(int status) {
stream()->AfterWrite(this, status);
}

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
Expand Down
21 changes: 9 additions & 12 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
env->set_init_trigger_async_id(wrap->get_async_id());
ShutdownWrap* req_wrap = new ShutdownWrap(env,
req_wrap_obj,
this,
AfterShutdown);
this);

int err = DoShutdown(req_wrap);
if (err)
Expand All @@ -66,7 +65,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {


void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
StreamBase* wrap = req_wrap->wrap();
Environment* env = req_wrap->env();

// The wrap and request objects should still be there.
Expand All @@ -78,7 +76,7 @@ void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
Local<Object> req_wrap_obj = req_wrap->object();
Local<Value> argv[3] = {
Integer::New(env->isolate(), status),
wrap->GetObject(),
GetObject(),
req_wrap_obj
};

Expand Down Expand Up @@ -158,7 +156,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr);
env->set_init_trigger_async_id(wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);

offset = 0;
if (!all_buffers) {
Expand Down Expand Up @@ -248,7 +246,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
if (wrap != nullptr)
env->set_init_trigger_async_id(wrap->get_async_id());
// Allocate, or write rest
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
req_wrap = WriteWrap::New(env, req_wrap_obj, this);

err = DoWrite(req_wrap, bufs, count, nullptr);
req_wrap_obj->Set(env->async(), True(env->isolate()));
Expand Down Expand Up @@ -332,7 +330,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
wrap = GetAsyncWrap();
if (wrap != nullptr)
env->set_init_trigger_async_id(wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);

data = req_wrap->Extra();

Expand Down Expand Up @@ -393,7 +391,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {


void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
StreamBase* wrap = req_wrap->wrap();
Environment* env = req_wrap->env();

HandleScope handle_scope(env->isolate());
Expand All @@ -405,19 +402,19 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
// Unref handle property
Local<Object> req_wrap_obj = req_wrap->object();
req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
wrap->OnAfterWrite(req_wrap);
OnAfterWrite(req_wrap, status);

Local<Value> argv[] = {
Integer::New(env->isolate(), status),
wrap->GetObject(),
GetObject(),
req_wrap_obj,
Undefined(env->isolate())
};

const char* msg = wrap->Error();
const char* msg = Error();
if (msg != nullptr) {
argv[3] = OneByteString(env->isolate(), msg);
wrap->ClearError();
ClearError();
}

if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
Expand Down
70 changes: 27 additions & 43 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,37 @@ namespace node {
// Forward declarations
class StreamBase;

template <class Req>
template<typename Base>
class StreamReq {
public:
typedef void (*DoneCb)(Req* req, int status);

explicit StreamReq(DoneCb cb) : cb_(cb) {
explicit StreamReq(StreamBase* stream) : stream_(stream) {
}

inline void Done(int status, const char* error_str = nullptr) {
Req* req = static_cast<Req*>(this);
Base* req = static_cast<Base*>(this);
Environment* env = req->env();
if (error_str != nullptr) {
req->object()->Set(env->error_string(),
OneByteString(env->isolate(), error_str));
}

cb_(req, status);
req->OnDone(status);
}

inline StreamBase* stream() const { return stream_; }

private:
DoneCb cb_;
StreamBase* const stream_;
};

class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
public StreamReq<ShutdownWrap> {
public:
ShutdownWrap(Environment* env,
v8::Local<v8::Object> req_wrap_obj,
StreamBase* wrap,
DoneCb cb)
StreamBase* stream)
: ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
StreamReq<ShutdownWrap>(cb),
wrap_(wrap) {
StreamReq<ShutdownWrap>(stream) {
Wrap(req_wrap_obj, this);
}

Expand All @@ -60,27 +58,22 @@ class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
return ContainerOf(&ShutdownWrap::req_, req);
}

inline StreamBase* wrap() const { return wrap_; }
size_t self_size() const override { return sizeof(*this); }

private:
StreamBase* const wrap_;
inline void OnDone(int status); // Just calls stream()->AfterShutdown()
};

class WriteWrap: public ReqWrap<uv_write_t>,
public StreamReq<WriteWrap> {
class WriteWrap : public ReqWrap<uv_write_t>,
public StreamReq<WriteWrap> {
public:
static inline WriteWrap* New(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* wrap,
DoneCb cb,
StreamBase* stream,
size_t extra = 0);
inline void Dispose();
inline char* Extra(size_t offset = 0);
inline size_t ExtraSize() const;

inline StreamBase* wrap() const { return wrap_; }

size_t self_size() const override { return storage_size_; }

static WriteWrap* from_req(uv_write_t* req) {
Expand All @@ -91,24 +84,22 @@ class WriteWrap: public ReqWrap<uv_write_t>,

WriteWrap(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* wrap,
DoneCb cb)
StreamBase* stream)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(cb),
wrap_(wrap),
StreamReq<WriteWrap>(stream),
storage_size_(0) {
Wrap(obj, this);
}

inline void OnDone(int status); // Just calls stream()->AfterWrite()

protected:
WriteWrap(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* wrap,
DoneCb cb,
StreamBase* stream,
size_t storage_size)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(cb),
wrap_(wrap),
StreamReq<WriteWrap>(stream),
storage_size_(storage_size) {
Wrap(obj, this);
}
Expand All @@ -129,7 +120,6 @@ class WriteWrap: public ReqWrap<uv_write_t>,
// WriteWrap. Ensure this never happens.
void operator delete(void* ptr) { UNREACHABLE(); }

StreamBase* const wrap_;
const size_t storage_size_;
};

Expand All @@ -151,7 +141,7 @@ class StreamResource {
void* ctx;
};

typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx);
typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
typedef void (*ReadCb)(ssize_t nread,
const uv_buf_t* buf,
Expand All @@ -176,9 +166,9 @@ class StreamResource {
virtual void ClearError();

// Events
inline void OnAfterWrite(WriteWrap* w) {
inline void OnAfterWrite(WriteWrap* w, int status) {
if (!after_write_cb_.is_empty())
after_write_cb_.fn(w, after_write_cb_.ctx);
after_write_cb_.fn(w, status, after_write_cb_.ctx);
}

inline void OnAlloc(size_t size, uv_buf_t* buf) {
Expand Down Expand Up @@ -208,14 +198,12 @@ class StreamResource {
inline Callback<ReadCb> read_cb() { return read_cb_; }
inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }

private:
protected:
Callback<AfterWriteCb> after_write_cb_;
Callback<AllocCb> alloc_cb_;
Callback<ReadCb> read_cb_;
Callback<DestructCb> destruct_cb_;
uint64_t bytes_read_;

friend class StreamBase;
};

class StreamBase : public StreamResource {
Expand All @@ -231,7 +219,6 @@ class StreamBase : public StreamResource {
v8::Local<v8::FunctionTemplate> target,
int flags = kFlagNone);

virtual void* Cast() = 0;
virtual bool IsAlive() = 0;
virtual bool IsClosing() = 0;
virtual bool IsIPCPipe();
Expand All @@ -250,13 +237,14 @@ class StreamBase : public StreamResource {
consumed_ = false;
}

template <class Outer>
inline Outer* Cast() { return static_cast<Outer*>(Cast()); }

void EmitData(ssize_t nread,
v8::Local<v8::Object> buf,
v8::Local<v8::Object> handle);

// These are called by the respective {Write,Shutdown}Wrap class.
virtual void AfterShutdown(ShutdownWrap* req, int status);
virtual void AfterWrite(WriteWrap* req, int status);

protected:
explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
}
Expand All @@ -267,10 +255,6 @@ class StreamBase : public StreamResource {
virtual AsyncWrap* GetAsyncWrap() = 0;
virtual v8::Local<v8::Object> GetObject();

// Libuv callbacks
static void AfterShutdown(ShutdownWrap* req, int status);
static void AfterWrite(WriteWrap* req, int status);

// JS Methods
int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down
Loading