Skip to content

Commit

Permalink
quic: implement QuicEndpoint Promise API
Browse files Browse the repository at this point in the history
This is the start of a conversion over to a fully Promise-centric API
for the QUIC implementation.

PR-URL: #34283
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
jasnell authored and cjihrig committed Jul 22, 2020
1 parent 59976b6 commit 3a101ef
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 20 deletions.
42 changes: 42 additions & 0 deletions doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,34 @@ The object will contain the properties:

If the `QuicEndpoint` is not bound, `quicendpoint.address` is an empty object.

#### quicendpoint.bind(\[options\])
<!-- YAML
added: REPLACEME
-->

Binds the `QuicEndpoint` if it has not already been bound. User code will
not typically be responsible for binding a `QuicEndpoint` as the owning
`QuicSocket` will do that automatically.

* `options` {object}
* `signal` {AbortSignal} Optionally allows the `bind()` to be canceled
using an `AbortController`.
* Returns: {Promise}

The `quicendpoint.bind()` function returns `Promise` that will be resolved
with the address once the bind operation is successful.

If the `QuicEndpoint` has been destroyed, or is destroyed while the `Promise`
is pending, the `Promise` will be rejected with an `ERR_INVALID_STATE` error.

If an `AbortSignal` is specified in the `options` and it is triggered while
the `Promise` is pending, the `Promise` will be rejected with an `AbortError`.

If `quicendpoint.bind()` is called again while a previously returned `Promise`
is still pending or has already successfully resolved, the previously returned
pending `Promise` will be returned. If the additional call to
`quicendpoint.bind()` contains an `AbortSignal`, the `signal` will be ignored.

#### quicendpoint.bound
<!-- YAML
added: REPLACEME
Expand All @@ -347,6 +375,20 @@ added: REPLACEME

Set to `true` if the `QuicEndpoint` is bound to the local UDP port.

#### quicendpoint.close()
<!-- YAML
added: REPLACEME
-->

Closes and destroys the `QuicEndpoint`. Returns a `Promise` that is resolved
once the `QuicEndpoint` has been destroyed, or rejects if the `QuicEndpoint`
is destroyed with an error.

* Returns: {Promise}

The `Promise` cannot be canceled. Once `quicendpoint.close()` is called, the
`QuicEndpoint` will be destroyed.

#### quicendpoint.closing
<!-- YAML
added: REPLACEME
Expand Down
160 changes: 148 additions & 12 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const {
Error,
Map,
Number,
Promise,
RegExp,
Set,
Symbol,
Expand Down Expand Up @@ -104,6 +105,7 @@ const {
ERR_QUICSESSION_VERSION_NEGOTIATION,
ERR_TLS_DH_PARAM_SIZE,
},
hideStackFrames,
errnoException,
exceptionWithHostPort
} = require('internal/errors');
Expand Down Expand Up @@ -200,10 +202,14 @@ const {

const emit = EventEmitter.prototype.emit;

// TODO(@jasnell): Temporary while converting to Promises-based API
const { lookup } = require('dns').promises;

const kAfterLookup = Symbol('kAfterLookup');
const kAfterPreferredAddressLookup = Symbol('kAfterPreferredAddressLookup');
const kAddSession = Symbol('kAddSession');
const kAddStream = Symbol('kAddStream');
const kBind = Symbol('kBind');
const kClose = Symbol('kClose');
const kCert = Symbol('kCert');
const kClientHello = Symbol('kClientHello');
Expand Down Expand Up @@ -255,6 +261,14 @@ const kSocketDestroyed = 4;
let diagnosticPacketLossWarned = false;
let warnedVerifyHostnameIdentity = false;

let DOMException;

const lazyDOMException = hideStackFrames((message) => {
if (DOMException === undefined)
DOMException = internalBinding('messaging').DOMException;
return new DOMException(message);
});

assert(process.versions.ngtcp2 !== undefined);

// Called by the C++ internals when the QuicSocket is closed with
Expand Down Expand Up @@ -589,12 +603,27 @@ function lookupOrDefault(lookup, type) {
return lookup || (type === AF_INET6 ? lookup6 : lookup4);
}

function deferredClosePromise(state) {
return state.closePromise = new Promise((resolve, reject) => {
state.closePromiseResolve = resolve;
state.closePromiseReject = reject;
}).finally(() => {
state.closePromise = undefined;
state.closePromiseResolve = undefined;
state.closePromiseReject = undefined;
});
}

// QuicEndpoint wraps a UDP socket and is owned
// by a QuicSocket. It does not exist independently
// of the QuicSocket.
class QuicEndpoint {
[kInternalState] = {
state: kSocketUnbound,
bindPromise: undefined,
closePromise: undefined,
closePromiseResolve: undefined,
closePromiseReject: undefined,
socket: undefined,
udpSocket: undefined,
address: undefined,
Expand Down Expand Up @@ -645,15 +674,14 @@ class QuicEndpoint {
return customInspect(this, {
address: this.address,
fd: this.fd,
type: this[kInternalState].type === AF_INET6 ? 'udp6' : 'udp4'
type: this[kInternalState].type === AF_INET6 ? 'udp6' : 'udp4',
destroyed: this.destroyed,
bound: this.bound,
pending: this.pending,
}, depth, options);
}

// afterLookup is invoked when binding a QuicEndpoint. The first
// step to binding is to resolve the given hostname into an ip
// address. Once resolution is complete, the ip address needs to
// be passed on to the [kContinueBind] function or the QuicEndpoint
// needs to be destroyed.
// TODO(@jasnell): Remove once migration to Promise API is complete
static [kAfterLookup](err, ip) {
if (err) {
this.destroy(err);
Expand All @@ -662,10 +690,7 @@ class QuicEndpoint {
this[kContinueBind](ip);
}

// kMaybeBind binds the endpoint on-demand if it is not already
// bound. If it is bound, we return immediately, otherwise put
// the endpoint into the pending state and initiate the binding
// process by calling the lookup to resolve the IP address.
// TODO(@jasnell): Remove once migration to Promise API is complete
[kMaybeBind]() {
const state = this[kInternalState];
if (state.state !== kSocketUnbound)
Expand All @@ -674,8 +699,7 @@ class QuicEndpoint {
state.lookup(state.address, QuicEndpoint[kAfterLookup].bind(this));
}

// IP address resolution is completed and we're ready to finish
// binding to the local port.
// TODO(@jasnell): Remove once migration to Promise API is complete
[kContinueBind](ip) {
const state = this[kInternalState];
const udpHandle = state.udpSocket[internalDgram.kStateSymbol].handle;
Expand Down Expand Up @@ -704,6 +728,95 @@ class QuicEndpoint {
state.socket[kEndpointBound](this);
}

bind(options) {
const state = this[kInternalState];
if (state.bindPromise !== undefined)
return state.bindPromise;

return state.bindPromise = this[kBind]().finally(() => {
state.bindPromise = undefined;
});
}

// Binds the QuicEndpoint to the local port. Returns a Promise
// that is resolved once the QuicEndpoint binds, or rejects if
// binding was not successful. Calling bind() multiple times
// before the Promise is resolved will return the same Promise.
// Calling bind() after the endpoint is already bound will
// immediately return a resolved promise. Calling bind() after
// the endpoint has been destroyed will cause the Promise to
// be rejected.
async [kBind](options) {
const state = this[kInternalState];
if (this.destroyed)
throw new ERR_INVALID_STATE('QuicEndpoint is already destroyed');

if (state.state !== kSocketUnbound)
return this.address;

const { signal } = { ...options };
if (signal != null && !('aborted' in signal))
throw new ERR_INVALID_ARG_TYPE('options.signal', 'AbortSignal', signal);

// If an AbotSignal was passed in, check to make sure it is not already
// aborted before we continue on to do any work.
if (signal && signal.aborted)
throw new lazyDOMException('AbortError');

state.state = kSocketPending;

// TODO(@jasnell): Use passed in lookup function once everything
// has been converted to Promises-based API
const {
address: ip
} = await lookup(state.address, state.type === AF_INET6 ? 6 : 4);

// It's possible for the QuicEndpoint to have been destroyed while
// we were waiting for the DNS lookup to complete. If so, reject
// the Promise.
if (this.destroyed)
throw new ERR_INVALID_STATE('QuicEndpoint was destroyed');

// If an AbortSignal was passed in, check to see if it was triggered
// while we were waiting.
if (signal && signal.aborted) {
state.state = kSocketUnbound;
throw new lazyDOMException('AbortError');
}

// From here on, any errors are fatal for the QuicEndpoint. Keep in
// mind that this means that the Bind Promise will be rejected *and*
// the QuicEndpoint will be destroyed with an error.
try {
const udpHandle = state.udpSocket[internalDgram.kStateSymbol].handle;
if (udpHandle == null) {
// It's not clear what cases trigger this but it is possible.
throw new ERR_OPERATION_FAILED('Acquiring UDP socket handle failed');
}

const flags =
(state.reuseAddr ? UV_UDP_REUSEADDR : 0) |
(state.ipv6Only ? UV_UDP_IPV6ONLY : 0);

const ret = udpHandle.bind(ip, state.port, flags);
if (ret)
throw exceptionWithHostPort(ret, 'bind', ip, state.port);

// On Windows, the fd will be meaningless, but we always record it.
state.fd = udpHandle.fd;
state.state = kSocketBound;

// Notify the owning socket that the QuicEndpoint has been successfully
// bound to the local UDP port.
state.socket[kEndpointBound](this);

return this.address;
} catch (error) {
this.destroy(error);
throw error;
}
}

destroy(error) {
if (this.destroyed)
return;
Expand All @@ -727,12 +840,35 @@ class QuicEndpoint {
handle.ondone = () => {
state.udpSocket.close((err) => {
if (err) error = err;
if (error && typeof state.closePromiseReject === 'function')
state.closePromiseReject(error);
else if (typeof state.closePromiseResolve === 'function')
state.closePromiseResolve();
state.socket[kEndpointClose](this, error);
});
};
handle.waitForPendingCallbacks();
}

// Closes the QuicEndpoint. Returns a Promise that is resolved
// once the QuicEndpoint closes, or rejects if it closes with
// an error. Calling close() multiple times before the Promise
// is resolved will return the same Promise. Calling close()
// after will return a rejected Promise.
close() {
return this[kInternalState].closePromise || this[kClose]();
}

[kClose]() {
if (this.destroyed) {
return Promise.reject(
new ERR_INVALID_STATE('QuicEndpoint is already destroyed'));
}
const promise = deferredClosePromise(this[kInternalState]);
this.destroy();
return promise;
}

// If the QuicEndpoint is bound, returns an object detailing
// the local IP address, port, and address type to which it
// is bound. Otherwise, returns an empty object.
Expand Down
18 changes: 10 additions & 8 deletions test/parallel/test-quic-quicendpoint-address.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ const { createQuicSocket } = require('net');

async function Test1(options, address) {
const server = createQuicSocket(options);
assert.strictEqual(server.endpoints.length, 1);
assert.strictEqual(server.endpoints[0].bound, false);
assert.deepStrictEqual({}, server.endpoints[0].address);

server.listen({ key, cert, ca, alpn: 'zzz' });
server.on('close', common.mustCall());

await once(server, 'ready');
assert.strictEqual(server.endpoints.length, 1);
const endpoint = server.endpoints[0];

assert.strictEqual(endpoint.bound, false);
assert.deepStrictEqual({}, endpoint.address);

await endpoint.bind();

assert.strictEqual(endpoint.bound, true);
assert.strictEqual(endpoint.destroyed, false);
assert.strictEqual(typeof endpoint.address.port, 'number');
assert.strictEqual(endpoint.address.address, address);
server.close();

await endpoint.close();

assert.strictEqual(endpoint.destroyed, true);
}

Expand Down

0 comments on commit 3a101ef

Please sign in to comment.