Skip to content

Commit

Permalink
fs: implemented WriteStream#writev
Browse files Browse the repository at this point in the history
Streams with writev allow many buffers to be pushed to underlying OS
APIs in one batch, in this case improving write throughput by an order
of magnitude. This is especially noticeable when writing many (small)
buffers.

PR-URL: #2167
Reviewed-By: Trevor Norris <trev.norris@gmail.com>
  • Loading branch information
Ron Korving authored and trevnorris committed Sep 14, 2015
1 parent 40ec84d commit 05d30d5
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
44 changes: 44 additions & 0 deletions lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,50 @@ WriteStream.prototype._write = function(data, encoding, cb) {
};


function writev(fd, chunks, position, callback) {
function wrapper(err, written) {
// Retain a reference to chunks so that they can't be GC'ed too soon.
callback(err, written || 0, chunks);
}

const req = new FSReqWrap();
req.oncomplete = wrapper;
binding.writeBuffers(fd, chunks, position, req);
}


WriteStream.prototype._writev = function(data, cb) {
if (typeof this.fd !== 'number')
return this.once('open', function() {
this._writev(data, cb);
});

const self = this;
const len = data.length;
const chunks = new Array(len);
var size = 0;

for (var i = 0; i < len; i++) {
var chunk = data[i].chunk;

chunks[i] = chunk;
size += chunk.length;
}

writev(this.fd, chunks, this.pos, function(er, bytes) {
if (er) {
self.destroy();
return cb(er);
}
self.bytesWritten += bytes;
cb();
});

if (this.pos !== undefined)
this.pos += size;
};


WriteStream.prototype.destroy = ReadStream.prototype.destroy;
WriteStream.prototype.close = ReadStream.prototype.close;

Expand Down
55 changes: 55 additions & 0 deletions src/node_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,60 @@ static void WriteBuffer(const FunctionCallbackInfo<Value>& args) {
}


// Wrapper for writev(2).
//
// bytesWritten = writev(fd, chunks, position, callback)
// 0 fd integer. file descriptor
// 1 chunks array of buffers to write
// 2 position if integer, position to write at in the file.
// if null, write from the current position
static void WriteBuffers(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

CHECK(args[0]->IsInt32());
CHECK(args[1]->IsArray());

int fd = args[0]->Int32Value();
Local<Array> chunks = args[1].As<Array>();
int64_t pos = GET_OFFSET(args[2]);
Local<Value> req = args[3];

uint32_t chunkCount = chunks->Length();

uv_buf_t s_iovs[1024]; // use stack allocation when possible
uv_buf_t* iovs;

if (chunkCount > ARRAY_SIZE(s_iovs))
iovs = new uv_buf_t[chunkCount];
else
iovs = s_iovs;

for (uint32_t i = 0; i < chunkCount; i++) {
Local<Value> chunk = chunks->Get(i);

if (!Buffer::HasInstance(chunk)) {
if (iovs != s_iovs)
delete[] iovs;
return env->ThrowTypeError("Array elements all need to be buffers");
}

iovs[i] = uv_buf_init(Buffer::Data(chunk), Buffer::Length(chunk));
}

if (req->IsObject()) {
ASYNC_CALL(write, req, fd, iovs, chunkCount, pos)
if (iovs != s_iovs)
delete[] iovs;
return;
}

SYNC_CALL(write, nullptr, fd, iovs, chunkCount, pos)
if (iovs != s_iovs)
delete[] iovs;
args.GetReturnValue().Set(SYNC_RESULT);
}


// Wrapper for write(2).
//
// bytesWritten = write(fd, string, position, enc, callback)
Expand Down Expand Up @@ -1248,6 +1302,7 @@ void InitFs(Local<Object> target,
env->SetMethod(target, "readlink", ReadLink);
env->SetMethod(target, "unlink", Unlink);
env->SetMethod(target, "writeBuffer", WriteBuffer);
env->SetMethod(target, "writeBuffers", WriteBuffers);
env->SetMethod(target, "writeString", WriteString);

env->SetMethod(target, "chmod", Chmod);
Expand Down

0 comments on commit 05d30d5

Please sign in to comment.