Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: refactor WriteWrap and ShutdownWrap #18676

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/net/tcp-raw-c2s.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ function client(type, len) {
fail(err, 'write');
}

function afterWrite(err, handle, req) {
function afterWrite(err, handle) {
if (err)
fail(err, 'write');

Expand Down
4 changes: 2 additions & 2 deletions benchmark/net/tcp-raw-pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ function main({ dur, len, type }) {
if (err)
fail(err, 'write');

writeReq.oncomplete = function(status, handle, req, err) {
writeReq.oncomplete = function(status, handle, err) {
if (err)
fail(err, 'write');
};
Expand Down Expand Up @@ -130,7 +130,7 @@ function main({ dur, len, type }) {
fail(err, 'write');
}

function afterWrite(err, handle, req) {
function afterWrite(err, handle) {
if (err)
fail(err, 'write');

Expand Down
8 changes: 4 additions & 4 deletions benchmark/net/tcp-raw-s2c.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ function main({ dur, len, type }) {
fail(err, 'write');
} else if (!writeReq.async) {
process.nextTick(function() {
afterWrite(null, clientHandle, writeReq);
afterWrite(0, clientHandle);
});
}
}

function afterWrite(status, handle, req, err) {
if (err)
fail(err, 'write');
function afterWrite(status, handle) {
if (status)
fail(status, 'write');

while (clientHandle.writeQueueSize === 0)
write();
Expand Down
9 changes: 4 additions & 5 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1399,20 +1399,19 @@ function trackWriteState(stream, bytes) {
session[kHandle].chunksSentSinceLastWrite = 0;
}

function afterDoStreamWrite(status, handle, req) {
function afterDoStreamWrite(status, handle) {
const stream = handle[kOwner];
const session = stream[kSession];

stream[kUpdateTimer]();

const { bytes } = req;
const { bytes } = this;
stream[kState].writeQueueSize -= bytes;

if (session !== undefined)
session[kState].writeQueueSize -= bytes;
if (typeof req.callback === 'function')
req.callback(null);
req.handle = undefined;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me wonder, is handle on WriteWrap used at all? I had a brief look at the C++ side and saw nothing. All I'm seeing is it being set/deleted all around. The only place I see it being used at all is process_wrap.

Admittedly I could be missing something more intricate here...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apapirovski I think it’s only for diagnostic purposes and preventing the handle from being garbage collected; I think at least in the case of JSStreams that could happen because that’s a weak handle and otherwise there might not be any backreference to it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to think about this... Since the socket & http2 implementations reference the handle on _handle and kHandle, this seems like it would mainly come into play if the stream is socket/session is destroyed? Would the handle still be needed in that case? I don't recall if this would still trigger oncomplete or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apapirovski I think it would call oncomplete, but not synchronously (in the case of libuv streams)?

But generally, it’s not a requirement that streams are only destroyed when they are explicitly closed… http2 objects + JSStream contain no strong Persistents, so they can be garbage collected at any time once there no longer is a reference to them, but that shouldn’t happen during a write, should it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's (very) possible that in certain situations the handle is the only thing referencing the stream and the WriteWrap is the only thing referencing the handle. I didn't really think about that originally... Was thinking too literally about its usage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured no harm in putting this to practice... So far, at least as far as http2 is concerned, it seems like removing .handle on WriteWrap & ShutdownWrap is fine, even when running stress tests. I might play around with explicit global.gc() calls and study the code in more detail to understand how useful these references are.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apapirovski I can’t think of any way in which this would break HTTP/2, yes…

I’m a bit worried removing it might break async_hooks users… but then again, this really isn’t supposed to be public API. :/

if (typeof this.callback === 'function')
this.callback(null);
}

function streamOnResume() {
Expand Down
10 changes: 6 additions & 4 deletions lib/internal/wrap_js_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ class JSStreamWrap extends Socket {

const handle = this._handle;

this.stream.end(() => {
// Ensure that write was dispatched
setImmediate(() => {
setImmediate(() => {
// Ensure that write is dispatched asynchronously.
this.stream.end(() => {
this.finishShutdown(handle, 0);
});
});
Expand All @@ -137,7 +137,6 @@ class JSStreamWrap extends Socket {
doWrite(req, bufs) {
assert.strictEqual(this[kCurrentWriteRequest], null);
assert.strictEqual(this[kCurrentShutdownRequest], null);
this[kCurrentWriteRequest] = req;

const handle = this._handle;
const self = this;
Expand All @@ -149,6 +148,9 @@ class JSStreamWrap extends Socket {
this.stream.write(bufs[i], done);
this.stream.uncork();

// Only set the request here, because the `write()` calls could throw.
this[kCurrentWriteRequest] = req;

function done(err) {
if (!err && --pending !== 0)
return;
Expand Down
14 changes: 7 additions & 7 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ function onSocketFinish() {
}


function afterShutdown(status, handle, req) {
function afterShutdown(status, handle) {
var self = handle.owner;

debug('afterShutdown destroyed=%j', self.destroyed,
Expand Down Expand Up @@ -869,12 +869,12 @@ protoGetter('bytesWritten', function bytesWritten() {
});


function afterWrite(status, handle, req, err) {
function afterWrite(status, handle, err) {
var self = handle.owner;
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite', status);

if (req.async)
if (this.async)
self[kLastWriteQueueSize] = 0;

// callback may come after call to destroy.
Expand All @@ -884,9 +884,9 @@ function afterWrite(status, handle, req, err) {
}

if (status < 0) {
var ex = errnoException(status, 'write', req.error);
var ex = errnoException(status, 'write', this.error);
debug('write failure', ex);
self.destroy(ex, req.cb);
self.destroy(ex, this.cb);
return;
}

Expand All @@ -895,8 +895,8 @@ function afterWrite(status, handle, req, err) {
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite call cb');

if (req.cb)
req.cb.call(undefined);
if (this.cb)
this.cb.call(undefined);
}


Expand Down
1 change: 1 addition & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ class ModuleWrap;
V(script_context_constructor_template, v8::FunctionTemplate) \
V(script_data_constructor_function, v8::Function) \
V(secure_context_constructor_template, v8::FunctionTemplate) \
V(shutdown_wrap_constructor_function, v8::Function) \
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tick_callback_function, v8::Function) \
V(timers_callback_function, v8::Function) \
Expand Down
7 changes: 1 addition & 6 deletions src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
req_wrap->object()
};

req_wrap->Dispatched();

TryCatch try_catch(env()->isolate());
Local<Value> value;
int value_int = UV_EPROTO;
Expand Down Expand Up @@ -127,8 +125,6 @@ int JSStream::DoWrite(WriteWrap* w,
bufs_arr
};

w->Dispatched();

TryCatch try_catch(env()->isolate());
Local<Value> value;
int value_int = UV_EPROTO;
Expand All @@ -154,9 +150,8 @@ void JSStream::New(const FunctionCallbackInfo<Value>& args) {

template <class Wrap>
void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
Wrap* w;
CHECK(args[0]->IsObject());
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());
Wrap* w = static_cast<Wrap*>(StreamReq::FromObject(args[0].As<Object>()));

w->Done(args[1]->Int32Value());
}
Expand Down
44 changes: 10 additions & 34 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1552,18 +1552,9 @@ void Http2Session::SendPendingData() {

chunks_sent_since_last_write_++;

// DoTryWrite may modify both the buffer list start itself and the
// base pointers/length of the individual buffers.
uv_buf_t* writebufs = *bufs;
if (stream_->DoTryWrite(&writebufs, &count) != 0 || count == 0) {
// All writes finished synchronously, nothing more to do here.
ClearOutgoing(0);
return;
}

WriteWrap* req = AllocateSend();
if (stream_->DoWrite(req, writebufs, count, nullptr) != 0) {
req->Dispose();
StreamWriteResult res = underlying_stream()->Write(*bufs, count);
if (!res.async) {
ClearOutgoing(res.err);
}

DEBUG_HTTP2SESSION2(this, "wants data in return? %d",
Expand Down Expand Up @@ -1649,15 +1640,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {
chunks_sent_since_last_write_ = n;
}

// Allocates the data buffer used to pass outbound data to the i/o stream.
WriteWrap* Http2Session::AllocateSend() {
HandleScope scope(env()->isolate());
Local<Object> obj =
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
return WriteWrap::New(env(), obj, static_cast<StreamBase*>(stream_));
}

// Callback used to receive inbound data from the i/o stream
void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
Http2Scope h2scope(this);
Expand Down Expand Up @@ -1833,20 +1815,15 @@ inline void Http2Stream::Close(int32_t code) {
DEBUG_HTTP2STREAM2(this, "closed with code %d", code);
}


inline void Http2Stream::Shutdown() {
CHECK(!this->IsDestroyed());
Http2Scope h2scope(this);
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
NGHTTP2_ERR_NOMEM);
DEBUG_HTTP2STREAM(this, "writable side shutdown");
}

int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
CHECK(!this->IsDestroyed());
req_wrap->Dispatched();
Shutdown();
{
Http2Scope h2scope(this);
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
NGHTTP2_ERR_NOMEM);
DEBUG_HTTP2STREAM(this, "writable side shutdown");
}
req_wrap->Done(0);
return 0;
}
Expand Down Expand Up @@ -2038,7 +2015,6 @@ inline int Http2Stream::DoWrite(WriteWrap* req_wrap,
CHECK_EQ(send_handle, nullptr);
Http2Scope h2scope(this);
session_->SetChunksSinceLastWrite();
req_wrap->Dispatched();
if (!IsWritable()) {
req_wrap->Done(UV_EOF);
return 0;
Expand Down
9 changes: 4 additions & 5 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -601,9 +601,6 @@ class Http2Stream : public AsyncWrap,

inline void Close(int32_t code);

// Shutdown the writable side of the stream
inline void Shutdown();

// Destroy this stream instance and free all held memory.
inline void Destroy();

Expand Down Expand Up @@ -818,6 +815,10 @@ class Http2Session : public AsyncWrap, public StreamListener {

inline void EmitStatistics();

inline StreamBase* underlying_stream() {
return static_cast<StreamBase*>(stream_);
}

void Start();
void Stop();
void Close(uint32_t code = NGHTTP2_NO_ERROR,
Expand Down Expand Up @@ -907,8 +908,6 @@ class Http2Session : public AsyncWrap, public StreamListener {
template <get_setting fn>
static void GetSettings(const FunctionCallbackInfo<Value>& args);

WriteWrap* AllocateSend();

uv_loop_t* event_loop() const {
return env()->event_loop();
}
Expand Down
5 changes: 5 additions & 0 deletions src/req_wrap-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ void ReqWrap<T>::Dispatched() {
req_.data = this;
}

template <typename T>
ReqWrap<T>* ReqWrap<T>::from_req(T* req) {
return ContainerOf(&ReqWrap<T>::req_, req);
}

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
Expand Down
2 changes: 2 additions & 0 deletions src/req_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class ReqWrap : public AsyncWrap {
inline void Dispatched(); // Call this after the req has been dispatched.
T* req() { return &req_; }

static ReqWrap* from_req(T* req);

private:
friend class Environment;
ListNode<ReqWrap> req_wrap_queue_;
Expand Down
Loading