Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
squash! quic: implement sendFD() support
Browse files Browse the repository at this point in the history
  • Loading branch information
addaleax committed Oct 7, 2019
1 parent 310497c commit d177d3d
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 63 deletions.
50 changes: 49 additions & 1 deletion doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ socket.on('ready', () => {
});
```

#### Call Results#
#### Call Results

A call on a socket that is not ready to send or no longer open may throw a
Not running Error.
Expand Down Expand Up @@ -1106,6 +1106,54 @@ added: REPLACEME

The `QuicServerSession` or `QuicClientSession`.

### quicstream.sendFD(path[, options])
<!-- YAML
added: REPLACEME
-->

* `fd` {number} A readable file descriptor.
* `options` {Object}
* `offset` {number} The offset position at which to begin reading.
Default: `-1`.
* `length` {number} The amount of data from the fd to send.
Default: `-1`.

Instead of using `quicstream` as a writable stream, send data from a given file
descriptor.

If `offset` is set to a non-negative number, reading starts from that position
and the file offset will not be advanced.
If `length` is set to a non-negative number, it gives the maximum number of
bytes that are read from the file.

The file descriptor is not closed when the stream is closed, so it will need
to be closed manually once it is no longer needed.
Using the same file descriptor concurrently for multiple streams
is not supported and may result in data loss. Re-using a file descriptor
after a stream has finished is supported.

### quicstream.sendFile(path[, options])
<!-- YAML
added: REPLACEME
-->

* `path` {string|Buffer}
* `options` {Object}
* `onError` {Function} Callback function invoked in the case of an
error before send.
* `offset` {number} The offset position at which to begin reading.
Default: `-1`.
* `length` {number} The amount of data from the fd to send.
Default: `-1`.

Instead of using `quicstream` as a writable stream, send data from a given file
path.

The `options.onError` callback will be called if the file could not be opened.
If `offset` is set to a non-negative number, reading starts from that position.
If `length` is set to a non-negative number, it gives the maximum number of
bytes that are read from the file.

### quicstream.unidirectional
<!-- YAML
added: REPLACEME
Expand Down
5 changes: 1 addition & 4 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -2151,14 +2151,11 @@ function processHeaders(oldHeaders) {
return headers;
}

function onFileCloseError(stream, err) {
stream.emit(err);
}

function onFileUnpipe() {
const stream = this.sink[kOwner];
if (stream.ownsFd)
this.source.close().catch(onFileCloseError.bind(stream));
this.source.close().catch(stream.destroy.bind(stream));
else
this.source.releaseFD();
}
Expand Down
51 changes: 39 additions & 12 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const { validateNumber } = require('internal/validators');
const util = require('util');
const assert = require('internal/assert');
const EventEmitter = require('events');
const fs = require('fs');
const { Duplex } = require('stream');
const {
createSecureContext: _createSecureContext
Expand Down Expand Up @@ -61,6 +62,7 @@ const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_OPT_VALUE,
ERR_INVALID_CALLBACK,
ERR_OUT_OF_RANGE,
ERR_QUIC_ERROR,
Expand Down Expand Up @@ -158,8 +160,6 @@ const kHandshakePost = Symbol('kHandshakePost');
const kInit = Symbol('kInit');
const kMaybeBind = Symbol('kMaybeBind');
const kMaybeReady = Symbol('kMaybeReady');
const kOnFileUnpipe = Symbol('kOnFileUnpipe');
const kOnPipedFileHandleRead = Symbol('kOnPipedFileHandleRead');
const kReady = Symbol('kReady');
const kReceiveStart = Symbol('kReceiveStart');
const kReceiveStop = Symbol('kReceiveStop');
Expand All @@ -168,7 +168,6 @@ const kRemoveStream = Symbol('kRemoveStream');
const kServerBusy = Symbol('kServerBusy');
const kSetHandle = Symbol('kSetHandle');
const kSetSocket = Symbol('kSetSocket');
const kStartFilePipe = Symbol('kStartFilePipe');
const kStreamClose = Symbol('kStreamClose');
const kStreamReset = Symbol('kStreamReset');
const kTrackWriteState = Symbol('kTrackWriteState');
Expand Down Expand Up @@ -2261,31 +2260,59 @@ class QuicStream extends Duplex {
streamOnResume.call(this);
}

sendFD(fd, { offset = -1, length = -1 } = {}) {
sendFile(path, options = {}) {
fs.open(path, 'r', QuicStream.#onFileOpened.bind(this, options));
}

static #onFileOpened = function(options, err, fd) {
const onError = options.onError;
if (err) {
if (onError)
onError(err);
else
this.destroy(err);
return;
}

if (this.destroyed || this.closed) {
fs.close(fd, (err) => { if (err) throw err; });
return;
}

this.sendFD(fd, options, true);
}

sendFD(fd, { offset = -1, length = -1 } = {}, ownsFd = false) {
if (this.destroyed || this.#closed)
return;

if (typeof offset !== 'number')
throw new ERR_INVALID_OPT_VALUE('options.offset', offset);
if (typeof length !== 'number')
throw new ERR_INVALID_OPT_VALUE('options.length', length);
validateNumber(fd, 'fd');

this[kUpdateTimer]();
this.ownsFd = false;
this.ownsFd = ownsFd;

// Close the writable side of the stream, but only as far as the writable
// stream implementation is concerned.
this._final = null;
debugger;
this.end();

defaultTriggerAsyncIdScope(this[async_id_symbol],
QuicStream[kStartFilePipe],
QuicStream.#startFilePipe,
this, fd, offset, length);
}

static [kStartFilePipe](stream, fd, offset, length) {
static #startFilePipe = (stream, fd, offset, length) => {
const handle = new FileHandle(fd, offset, length);
handle.onread = QuicStream[kOnPipedFileHandleRead];
handle.onread = QuicStream.#onPipedFileHandleRead;
handle.stream = stream;

const pipe = new StreamPipe(handle, stream[kHandle]);
pipe.onunpipe = QuicStream[kOnFileUnpipe];
pipe.onunpipe = QuicStream.#onFileUnpipe;
pipe.start();

// Exact length of the file doesn't matter here, since the
Expand All @@ -2294,15 +2321,15 @@ class QuicStream extends Duplex {
stream[kTrackWriteState](stream, 1);
}

static [kOnFileUnpipe]() { // Called on the StreamPipe instance.
static #onFileUnpipe = function() { // Called on the StreamPipe instance.
const stream = this.sink[owner_symbol];
if (stream.ownsFd)
this.source.close().catch((err) => stream.emit(err));
this.source.close().catch(stream.destroy.bind(stream));
else
this.source.releaseFD();
}

static [kOnPipedFileHandleRead]() {
static #onPipedFileHandleRead = function() {
const err = streamBaseState[kReadBytesOrError];
if (err < 0 && err !== UV_EOF) {
this.stream.destroy(errnoException(err, 'sendFD'));
Expand Down
115 changes: 69 additions & 46 deletions test/parallel/test-quic-send-fd.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,59 +12,82 @@ const key = fixtures.readKey('agent1-key.pem', 'binary');
const cert = fixtures.readKey('agent1-cert.pem', 'binary');
const ca = fixtures.readKey('ca1-cert.pem', 'binary');

const server = quic.createSocket({ port: 0, validateAddress: true });

server.listen({
key,
cert,
ca,
rejectUnauthorized: false,
maxCryptoBuffer: 4096,
alpn: 'meow'
});

server.on('session', common.mustCall((session) => {
session.on('secure', common.mustCall((servername, alpn, cipher) => {
const stream = session.openStream({ halfOpen: false });
stream.sendFD(fs.openSync(__filename, 'r'));
stream.on('data', common.mustNotCall());
stream.on('finish', common.mustNotCall());
stream.on('close', common.mustCall());
stream.on('end', common.mustNotCall());
}));
const variants = [];
for (const variant of ['sendFD', 'sendFile']) {
for (const offset of [-1, 0, 100]) {
for (const length of [-1, 100]) {
variants.push({ variant, offset, length });
}
}
}

session.on('close', common.mustCall());
}));
for (const { variant, offset, length } of variants) {
const server = quic.createSocket({ port: 0, validateAddress: true });
let fd;

server.on('ready', common.mustCall(() => {
const client = quic.createSocket({
port: 0,
client: {
key,
cert,
ca,
alpn: 'meow'
}
server.listen({
key,
cert,
ca,
rejectUnauthorized: false,
maxCryptoBuffer: 4096,
alpn: 'meow'
});

const req = client.connect({
address: 'localhost',
port: server.address.port
});
server.on('session', common.mustCall((session) => {
session.on('secure', common.mustCall((servername, alpn, cipher) => {
const stream = session.openStream({ halfOpen: false });

req.on('stream', common.mustCall((stream) => {
const data = [];
stream.on('data', (chunk) => data.push(chunk));
stream.on('end', common.mustCall(() => {
assert.deepStrictEqual(Buffer.concat(data), fs.readFileSync(__filename));
stream.on('data', common.mustNotCall());
stream.on('finish', common.mustCall());
stream.on('close', common.mustCall());
stream.on('end', common.mustCall());

// TODO(addaleax): Figure out why .close() is insufficient.
client.destroy();
server.destroy();
if (variant === 'sendFD') {
fd = fs.openSync(__filename, 'r');
stream.sendFD(fd, { offset, length });
} else {
stream.sendFile(__filename, { offset, length });
}
}));

session.on('close', common.mustCall());
}));

req.on('close', common.mustCall());
}));
server.on('ready', common.mustCall(() => {
const client = quic.createSocket({
port: 0,
client: {
key,
cert,
ca,
alpn: 'meow'
}
});

const req = client.connect({
address: 'localhost',
port: server.address.port
});

req.on('stream', common.mustCall((stream) => {
const data = [];
stream.on('data', (chunk) => data.push(chunk));
stream.on('end', common.mustCall(() => {
let expectedContent = fs.readFileSync(__filename);
if (offset !== -1) expectedContent = expectedContent.slice(offset);
if (length !== -1) expectedContent = expectedContent.slice(0, length);
assert.deepStrictEqual(Buffer.concat(data), expectedContent);

stream.end();
client.close();
server.close();
if (fd !== undefined) fs.closeSync(fd);
}));
}));

req.on('close', common.mustCall());
}));

server.on('close', common.mustCall());
server.on('close', common.mustCall());
}

0 comments on commit d177d3d

Please sign in to comment.