Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: allow reading data into a static buffer #25436

Merged
merged 1 commit into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 46 additions & 11 deletions benchmark/net/net-s2c.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,84 @@ const common = require('../common.js');
const PORT = common.PORT;

const bench = common.createBenchmark(main, {
len: [64, 102400, 1024 * 1024 * 16],
sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024],
type: ['utf', 'asc', 'buf'],
recvbuflen: [0, 64 * 1024, 1024 * 1024],
recvbufgenfn: ['true', 'false'],
dur: [5]
});

var chunk;
var encoding;
var recvbuf;
var received = 0;

function main({ dur, sendchunklen, type, recvbuflen, recvbufgenfn }) {
if (isFinite(recvbuflen) && recvbuflen > 0)
recvbuf = Buffer.alloc(recvbuflen);

function main({ dur, len, type }) {
switch (type) {
case 'buf':
chunk = Buffer.alloc(len, 'x');
chunk = Buffer.alloc(sendchunklen, 'x');
break;
case 'utf':
encoding = 'utf8';
chunk = 'ü'.repeat(len / 2);
chunk = 'ü'.repeat(sendchunklen / 2);
break;
case 'asc':
encoding = 'ascii';
chunk = 'x'.repeat(len);
chunk = 'x'.repeat(sendchunklen);
break;
default:
throw new Error(`invalid type: ${type}`);
}

const reader = new Reader();
const writer = new Writer();
var writer;
var socketOpts;
if (recvbuf === undefined) {
writer = new Writer();
socketOpts = { port: PORT };
} else {
let buffer = recvbuf;
if (recvbufgenfn === 'true') {
let bufidx = -1;
const bufpool = [
recvbuf,
Buffer.from(recvbuf),
Buffer.from(recvbuf),
];
buffer = () => {
bufidx = (bufidx + 1) % bufpool.length;
return bufpool[bufidx];
};
}
socketOpts = {
port: PORT,
onread: {
buffer,
callback: function(nread, buf) {
received += nread;
}
}
};
}

// The actual benchmark.
const server = net.createServer((socket) => {
reader.pipe(socket);
});

server.listen(PORT, () => {
const socket = net.connect(PORT);
const socket = net.connect(socketOpts);
socket.on('connect', () => {
bench.start();

socket.pipe(writer);
if (recvbuf === undefined)
socket.pipe(writer);

setTimeout(() => {
const bytes = writer.received;
const bytes = received;
const gbits = (bytes * 8) / (1024 * 1024 * 1024);
bench.end(gbits);
process.exit(0);
Expand All @@ -58,12 +94,11 @@ function main({ dur, len, type }) {
const net = require('net');

function Writer() {
this.received = 0;
this.writable = true;
}

Writer.prototype.write = function(chunk, encoding, cb) {
this.received += chunk.length;
received += chunk.length;

if (typeof encoding === 'function')
encoding();
Expand Down
36 changes: 36 additions & 0 deletions doc/api/net.md
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,9 @@ for the [`'connect'`][] event **once**.
<!-- YAML
added: v0.1.90
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/25436
description: Added `onread` option.
- version: v6.0.0
pr-url: https://github.com/nodejs/node/pull/6021
description: The `hints` option defaults to `0` in all cases now.
Expand Down Expand Up @@ -629,6 +632,39 @@ For [IPC][] connections, available `options` are:
See [Identifying paths for IPC connections][]. If provided, the TCP-specific
options above are ignored.

For both types, available `options` include:

* `onread` {Object} - If specified, incoming data is stored in a single `buffer`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example showing this in use would be ideal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Let me know if it suffices.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! looks good

and passed to the supplied `callback` when data arrives on the socket.
Note: this will cause the streaming functionality to not provide any data,
however events like `'error'`, `'end'`, and `'close'` will still be emitted
as normal and methods like `pause()` and `resume()` will also behave as
expected.
* `buffer` {Buffer|Uint8Array|Function} - Either a reusable chunk of memory to
use for storing incoming data or a function that returns such.
* `callback` {Function} This function is called for every chunk of incoming
data. Two arguments are passed to it: the number of bytes written to
`buffer` and a reference to `buffer`. Return `false` from this function to
implicitly `pause()` the socket. This function will be executed in the
global context.

mscdex marked this conversation as resolved.
Show resolved Hide resolved
Following is an example of a client using the `onread` option:

```js
const net = require('net');
net.connect({
port: 80,
onread: {
// Reuses a 4KiB Buffer for every read from the socket
buffer: Buffer.alloc(4 * 1024),
callback: function(nread, buf) {
// Received data is available in `buf` from 0 to `nread`
console.log(buf.toString('utf8', 0, nread));
}
}
});
```

#### socket.connect(path[, connectListener])

* `path` {string} Path the client should connect to. See
Expand Down
31 changes: 26 additions & 5 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
setUnrefTimeout,
getTimerDuration
} = require('internal/timers');
const { isUint8Array } = require('internal/util/types');
const { clearTimeout } = require('timers');

const kMaybeDestroy = Symbol('kMaybeDestroy');
Expand All @@ -32,6 +33,9 @@ const kHandle = Symbol('kHandle');
const kSession = Symbol('kSession');

const debug = require('internal/util/debuglog').debuglog('stream');
const kBuffer = Symbol('kBuffer');
const kBufferGen = Symbol('kBufferGen');
const kBufferCb = Symbol('kBufferCb');

function handleWriteReq(req, data, encoding) {
const { handle } = req;
Expand Down Expand Up @@ -161,9 +165,23 @@ function onStreamRead(arrayBuffer) {
stream[kUpdateTimer]();

if (nread > 0 && !stream.destroyed) {
const offset = streamBaseState[kArrayBufferOffset];
const buf = new FastBuffer(arrayBuffer, offset, nread);
if (!stream.push(buf)) {
let ret;
let result;
const userBuf = stream[kBuffer];
if (userBuf) {
result = (stream[kBufferCb](nread, userBuf) !== false);
const bufGen = stream[kBufferGen];
if (bufGen !== null) {
const nextBuf = bufGen();
if (isUint8Array(nextBuf))
stream[kBuffer] = ret = nextBuf;
}
} else {
const offset = streamBaseState[kArrayBufferOffset];
const buf = new FastBuffer(arrayBuffer, offset, nread);
result = stream.push(buf);
}
if (!result) {
handle.reading = false;
if (!stream.destroyed) {
const err = handle.readStop();
Expand All @@ -172,7 +190,7 @@ function onStreamRead(arrayBuffer) {
}
}

return;
return ret;
}

if (nread === 0) {
Expand Down Expand Up @@ -241,5 +259,8 @@ module.exports = {
kUpdateTimer,
kHandle,
kSession,
setStreamTimeout
setStreamTimeout,
kBuffer,
kBufferCb,
kBufferGen
};
Loading