Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream_base: dispatch reqs in the stream impl #1558

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
req_wrap->object()
};

req_wrap->Dispatched();
Local<Value> res =
MakeCallback(env()->onshutdown_string(), ARRAY_SIZE(argv), argv);

Expand All @@ -95,6 +96,7 @@ int JSStream::DoWrite(WriteWrap* w,
bufs_arr
};

w->Dispatched();
Local<Value> res =
MakeCallback(env()->onwrite_string(), ARRAY_SIZE(argv), argv);

Expand Down
4 changes: 0 additions & 4 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
AfterShutdown);

int err = DoShutdown(req_wrap);
req_wrap->Dispatched();
if (err)
delete req_wrap;
return err;
Expand Down Expand Up @@ -181,7 +180,6 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
if (bufs != bufs_)
delete[] bufs;

req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(env->isolate()));
req_wrap->object()->Set(env->bytes_string(),
Number::New(env->isolate(), bytes));
Expand Down Expand Up @@ -228,7 +226,6 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);

err = DoWrite(req_wrap, bufs, count, nullptr);
req_wrap->Dispatched();
req_wrap_obj->Set(env->async(), True(env->isolate()));

if (err)
Expand Down Expand Up @@ -347,7 +344,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
reinterpret_cast<uv_stream_t*>(send_handle));
}

req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(env->isolate()));

if (err)
Expand Down
6 changes: 5 additions & 1 deletion src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ void StreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {


int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
return uv_shutdown(&req_wrap->req_, stream(), AfterShutdown);
int err;
err = uv_shutdown(&req_wrap->req_, stream(), AfterShutdown);
req_wrap->Dispatched();
return err;
}


Expand Down Expand Up @@ -353,6 +356,7 @@ int StreamWrap::DoWrite(WriteWrap* w,
}
}

w->Dispatched();
UpdateWriteQueueSize();

return r;
Expand Down
32 changes: 29 additions & 3 deletions src/tls_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,38 @@ bool TLSWrap::InvokeQueued(int status) {
WriteItemList queue;
pending_write_items_.MoveBack(&queue);
while (WriteItem* wi = queue.PopFront()) {
wi->w_->Done(status);
if (wi->async_) {
wi->w_->Done(status);
} else {
CheckWriteItem* check = new CheckWriteItem(wi->w_, status);
int err = uv_check_init(env()->event_loop(), &check->check_);
check->check_.data = check;
if (err == 0)
err = uv_check_start(&check->check_, CheckWriteItem::CheckCb);

// No luck today, do it on next InvokeQueued
if (err) {
delete check;
pending_write_items_.PushBack(wi);
continue;
}
}
delete wi;
}

return true;
}


void TLSWrap::CheckWriteItem::CheckCb(uv_check_t* check) {
CheckWriteItem* c = reinterpret_cast<CheckWriteItem*>(check->data);

c->w_->Done(c->status_);
uv_close(reinterpret_cast<uv_handle_t*>(check), nullptr);
delete c;
}


void TLSWrap::NewSessionDoneCb() {
Cycle();
}
Expand Down Expand Up @@ -306,7 +330,6 @@ void TLSWrap::EncOut() {
for (size_t i = 0; i < count; i++)
buf[i] = uv_buf_init(data[i], size[i]);
int err = stream_->DoWrite(write_req, buf, count, nullptr);
write_req->Dispatched();

// Ignore errors, this should be already handled in js
if (err) {
Expand Down Expand Up @@ -557,7 +580,10 @@ int TLSWrap::DoWrite(WriteWrap* w,
}

// Queue callback to execute it on next tick
write_item_queue_.PushBack(new WriteItem(w));
WriteItem* item = new WriteItem(w);
WriteItem::SyncScope item_async(item);
write_item_queue_.PushBack(item);
w->Dispatched();

// Write queued data
if (empty) {
Expand Down
32 changes: 31 additions & 1 deletion src/tls_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,46 @@ class TLSWrap : public crypto::SSLWrap<TLSWrap>,
// Write callback queue's item
class WriteItem {
public:
explicit WriteItem(WriteWrap* w) : w_(w) {
class SyncScope {
public:
explicit SyncScope(WriteItem* item) : item_(item) {
item_->async_ = false;
}
~SyncScope() {
item_->async_ = true;
}

private:
WriteItem* item_;
};

explicit WriteItem(WriteWrap* w) : w_(w), async_(false) {
}
~WriteItem() {
w_ = nullptr;
}

WriteWrap* w_;
bool async_;
ListNode<WriteItem> member_;
};

class CheckWriteItem {
public:
CheckWriteItem(WriteWrap* w, int status) : w_(w), status_(status) {
}

~CheckWriteItem() {
w_ = nullptr;
}

static void CheckCb(uv_check_t* check);

WriteWrap* w_;
int status_;
uv_check_t check_;
};

TLSWrap(Environment* env,
Kind kind,
StreamBase* stream,
Expand Down