Skip to content

Commit

Permalink
quic: implement sendFD() support
Browse files Browse the repository at this point in the history
Fixes: nodejs#75
PR-URL: nodejs#150
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax authored and juanarbol committed Dec 17, 2019
1 parent af6cf27 commit a54ed7f
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 4 deletions.
50 changes: 49 additions & 1 deletion doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ socket.on('ready', () => {
});
```

#### Call Results#
#### Call Results

A call on a socket that is not ready to send or no longer open may throw a
Not running Error.
Expand Down Expand Up @@ -1106,6 +1106,54 @@ added: REPLACEME

The `QuicServerSession` or `QuicClientSession`.

### quicstream.sendFD(fd[, options])
<!-- YAML
added: REPLACEME
-->

* `fd` {number|FileHandle} A readable file descriptor.
* `options` {Object}
* `offset` {number} The offset position at which to begin reading.
Default: `-1`.
* `length` {number} The amount of data from the fd to send.
Default: `-1`.

Instead of using a `Quicstream` as a writable stream, send data from a given file
descriptor.

If `offset` is set to a non-negative number, reading starts from that position
and the file offset will not be advanced.
If `length` is set to a non-negative number, it gives the maximum number of
bytes that are read from the file.

The file descriptor or `FileHandle` is not closed when the stream is closed,
so it will need to be closed manually once it is no longer needed.
Using the same file descriptor concurrently for multiple streams
is not supported and may result in data loss. Re-using a file descriptor
after a stream has finished is supported.

### quicstream.sendFile(path[, options])
<!-- YAML
added: REPLACEME
-->

* `path` {string|Buffer|URL}
* `options` {Object}
* `onError` {Function} Callback function invoked in the case of an
error before send.
* `offset` {number} The offset position at which to begin reading.
Default: `-1`.
* `length` {number} The amount of data from the fd to send.
Default: `-1`.

Instead of using a `QuicStream` as a writable stream, send data from a given file
path.

The `options.onError` callback will be called if the file could not be opened.
If `offset` is set to a non-negative number, reading starts from that position.
If `length` is set to a non-negative number, it gives the maximum number of
bytes that are read from the file.

### quicstream.unidirectional
<!-- YAML
added: REPLACEME
Expand Down
95 changes: 92 additions & 3 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ const {
validateQuicClientSessionOptions,
validateQuicSocketOptions,
} = require('internal/quic/util');
const { validateNumber } = require('internal/validators');
const util = require('util');
const assert = require('internal/assert');
const EventEmitter = require('events');
const fs = require('fs');
const fsPromisesInternal = require('internal/fs/promises');
const { Duplex } = require('stream');
const {
createSecureContext: _createSecureContext
Expand All @@ -32,7 +35,7 @@ const {
translatePeerCertificate
} = require('_tls_common');
const {
defaultTriggerAsyncIdScope, // eslint-disable-line no-unused-vars
defaultTriggerAsyncIdScope,
symbols: {
async_id_symbol,
owner_symbol,
Expand All @@ -52,14 +55,15 @@ const {

const {
ShutdownWrap,
kReadBytesOrError, // eslint-disable-line no-unused-vars
streamBaseState // eslint-disable-line no-unused-vars
kReadBytesOrError,
streamBaseState
} = internalBinding('stream_wrap');

const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_OPT_VALUE,
ERR_INVALID_CALLBACK,
ERR_OUT_OF_RANGE,
ERR_QUIC_ERROR,
Expand All @@ -78,6 +82,10 @@ const {
exceptionWithHostPort
} = require('internal/errors');

const { FileHandle } = internalBinding('fs');
const { StreamPipe } = internalBinding('stream_pipe');
const { UV_EOF } = internalBinding('uv');

const {
QuicSocket: QuicSocketHandle,
initSecureContext,
Expand Down Expand Up @@ -2253,6 +2261,87 @@ class QuicStream extends Duplex {
streamOnResume.call(this);
}

sendFile(path, options = {}) {
fs.open(path, 'r', QuicStream.#onFileOpened.bind(this, options));
}

static #onFileOpened = function(options, err, fd) {
const onError = options.onError;
if (err) {
if (onError) {
this.close();
onError(err);
} else {
this.destroy(err);
}
return;
}

if (this.destroyed || this.closed) {
fs.close(fd, (err) => { if (err) throw err; });
return;
}

this.sendFD(fd, options, true);
}

sendFD(fd, { offset = -1, length = -1 } = {}, ownsFd = false) {
if (this.destroyed || this.#closed)
return;

if (typeof offset !== 'number')
throw new ERR_INVALID_OPT_VALUE('options.offset', offset);
if (typeof length !== 'number')
throw new ERR_INVALID_OPT_VALUE('options.length', length);

if (fd instanceof fsPromisesInternal.FileHandle)
fd = fd.fd;
else if (typeof fd !== 'number')
throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'FileHandle'], fd);

this[kUpdateTimer]();
this.ownsFd = ownsFd;

// Close the writable side of the stream, but only as far as the writable
// stream implementation is concerned.
this._final = null;
this.end();

defaultTriggerAsyncIdScope(this[async_id_symbol],
QuicStream.#startFilePipe,
this, fd, offset, length);
}

static #startFilePipe = (stream, fd, offset, length) => {
const handle = new FileHandle(fd, offset, length);
handle.onread = QuicStream.#onPipedFileHandleRead;
handle.stream = stream;

const pipe = new StreamPipe(handle, stream[kHandle]);
pipe.onunpipe = QuicStream.#onFileUnpipe;
pipe.start();

// Exact length of the file doesn't matter here, since the
// stream is closing anyway - just use 1 to signify that
// a write does exist
stream[kTrackWriteState](stream, 1);
}

static #onFileUnpipe = function() { // Called on the StreamPipe instance.
const stream = this.sink[owner_symbol];
if (stream.ownsFd)
this.source.close().catch(stream.destroy.bind(stream));
else
this.source.releaseFD();
}

static #onPipedFileHandleRead = function() {
const err = streamBaseState[kReadBytesOrError];
if (err < 0 && err !== UV_EOF) {
this.stream.destroy(errnoException(err, 'sendFD'));
}
}

get resetReceived() {
return (this.#resetCode !== undefined) ?
{ code: this.#resetCode | 0, finalSize: this.#resetFinalSize | 0 } :
Expand Down

0 comments on commit a54ed7f

Please sign in to comment.