Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib,src: improve writev() performance for Buffers #13187

Merged
merged 1 commit into from
May 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!isBuf) {
var newChunk = decodeChunk(state, chunk, encoding);
if (chunk !== newChunk) {
isBuf = true;
encoding = 'buffer';
chunk = newChunk;
}
Expand All @@ -335,7 +336,13 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {

if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = { chunk, encoding, callback: cb, next: null };
state.lastBufferedRequest = {
chunk,
encoding,
isBuf,
callback: cb,
next: null
};
if (last) {
last.next = state.lastBufferedRequest;
} else {
Expand Down Expand Up @@ -438,11 +445,15 @@ function clearBuffer(stream, state) {
holder.entry = entry;

var count = 0;
var allBuffers = true;
while (entry) {
buffer[count] = entry;
if (!entry.isBuf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably do allBuffers = allBuffers && entry.isBuf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried something similar (continuing the rest of the loop without any conditional) and did not see any difference in performance.

allBuffers = false;
entry = entry.next;
count += 1;
}
buffer.allBuffers = allBuffers;

doWrite(stream, state, true, state.length, buffer, '', holder.finish);

Expand Down
21 changes: 15 additions & 6 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -726,13 +726,22 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
var err;

if (writev) {
var chunks = new Array(data.length << 1);
for (var i = 0; i < data.length; i++) {
var entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
var allBuffers = data.allBuffers;
var chunks;
var i;
if (allBuffers) {
chunks = data;
for (i = 0; i < data.length; i++)
data[i] = data[i].chunk;
} else {
chunks = new Array(data.length << 1);
for (i = 0; i < data.length; i++) {
var entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
}
}
err = this._handle.writev(req, chunks);
err = this._handle.writev(req, chunks, allBuffers);

// Retain chunks
if (err === 0) req._chunks = chunks;
Expand Down
150 changes: 87 additions & 63 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,92 +100,116 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {

Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Array> chunks = args[1].As<Array>();
bool all_buffers = args[2]->IsTrue();

size_t count = chunks->Length() >> 1;
size_t count;
if (all_buffers)
count = chunks->Length();
else
count = chunks->Length() >> 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I’d really just write / 2, it’s clearer and the resulting code will be the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's copied from before these changes. It's also similar to what was already being used in js.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but in JS that has a different meaning. ¯\_(ツ)_/¯ If you want to keep it it’s fine.


MaybeStackBuffer<uv_buf_t, 16> bufs(count);
uv_buf_t* buf_list = *bufs;

// Determine storage size first
size_t storage_size = 0;
for (size_t i = 0; i < count; i++) {
storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);

Local<Value> chunk = chunks->Get(i * 2);

if (Buffer::HasInstance(chunk))
continue;
// Buffer chunk, no additional storage required

// String chunk
Local<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
size_t chunk_size;
if (encoding == UTF8 && string->Length() > 65535)
chunk_size = StringBytes::Size(env->isolate(), string, encoding);
else
chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);

storage_size += chunk_size;
}
uint32_t bytes = 0;
size_t offset;
AsyncWrap* wrap;
WriteWrap* req_wrap;
int err;

if (storage_size > INT_MAX)
return UV_ENOBUFS;
if (!all_buffers) {
// Determine storage size first
for (size_t i = 0; i < count; i++) {
storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);

AsyncWrap* wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr);
env->set_init_trigger_id(wrap->get_id());
WriteWrap* req_wrap = WriteWrap::New(env,
req_wrap_obj,
this,
AfterWrite,
storage_size);
Local<Value> chunk = chunks->Get(i * 2);

uint32_t bytes = 0;
size_t offset = 0;
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(i * 2);
if (Buffer::HasInstance(chunk))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: chunk->IsUint8Array()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also copied from before these changes. I think changing it to check for a more general type is outside the scope of this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think changing it to check for a more general type is outside the scope of this PR.

It’s literally the same thing, just inlined. :)

Copy link
Contributor Author

@mscdex mscdex May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually HasInstance() in master and v8.x use IsArrayBufferView() while v7.x and v6.x use IsUint8Array(). Ideally this could be backported more easily if we kept HasInstance()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right, we made that switch. Idc, you can also keep Buffer::HasInstance.

continue;
// Buffer chunk, no additional storage required

// String chunk
Local<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
size_t chunk_size;
if (encoding == UTF8 && string->Length() > 65535)
chunk_size = StringBytes::Size(env->isolate(), string, encoding);
else
chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);

// Write buffer
if (Buffer::HasInstance(chunk)) {
storage_size += chunk_size;
}

if (storage_size > INT_MAX)
return UV_ENOBUFS;
} else {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(i);
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len;
continue;
}

// Write string
offset = ROUND_UP(offset, WriteWrap::kAlignSize);
CHECK_LE(offset, storage_size);
char* str_storage = req_wrap->Extra(offset);
size_t str_size = storage_size - offset;

Local<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
str_size = StringBytes::Write(env->isolate(),
str_storage,
str_size,
string,
encoding);
bufs[i].base = str_storage;
bufs[i].len = str_size;
offset += str_size;
bytes += str_size;
// Try writing immediately without allocation
err = DoTryWrite(&buf_list, &count);
if (err != 0 || count == 0)
goto done;
}

int err = DoWrite(req_wrap, *bufs, count, nullptr);
wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr);
env->set_init_trigger_id(wrap->get_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);

offset = 0;
if (!all_buffers) {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(i * 2);

// Write buffer
if (Buffer::HasInstance(chunk)) {
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len;
continue;
}

// Write string
offset = ROUND_UP(offset, WriteWrap::kAlignSize);
CHECK_LE(offset, storage_size);
char* str_storage = req_wrap->Extra(offset);
size_t str_size = storage_size - offset;

Local<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
str_size = StringBytes::Write(env->isolate(),
str_storage,
str_size,
string,
encoding);
bufs[i].base = str_storage;
bufs[i].len = str_size;
offset += str_size;
bytes += str_size;
}
}

err = DoWrite(req_wrap, buf_list, count, nullptr);
req_wrap_obj->Set(env->async(), True(env->isolate()));
req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));

if (err)
req_wrap->Dispose();

done:
const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
ClearError();
}

if (err)
req_wrap->Dispose();
req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));

return err;
}
Expand Down