Skip to content

Commit

Permalink
refactor: converting NodeConnection
Browse files Browse the repository at this point in the history
* Related #512
* Related #495
* Related #234

[ci skip]
  • Loading branch information
tegefaulkes committed Jul 20, 2023
1 parent 045be37 commit 4c119d5
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 168 deletions.
175 changes: 65 additions & 110 deletions src/nodes/NodeConnection.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
import type { ContextTimed } from '@matrixai/contexts';
import type { PromiseCancellable } from '@matrixai/async-cancellable';
import type { NodeId } from './types';
import type Proxy from '../network/Proxy';
import type { Host, Hostname, Port } from '../network/types';
import type { Certificate } from '../keys/types';
import type GRPCClient from '../grpc/GRPCClient';
import type { ClientManifest } from '@/rpc/types';
import type { Host as QUICHost, Port as QUICPort } from '@matrixai/quic';
import type { QUICClientConfig } from './types';
import Logger from '@matrixai/logger';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import * as asyncInit from '@matrixai/async-init';
import { timedCancellable, context } from '@matrixai/contexts/dist/decorators';
import * as nodesErrors from './errors';
import * as grpcErrors from '../grpc/errors';
import { QUICClient } from '@matrixai/quic';
import RPCClient from '@/rpc/RPCClient';import * as nodesErrors from './errors';
import * as networkUtils from '../network/utils';
import { timerStart } from '../utils/index';
import * as rpcUtils from '../rpc/utils';

// TODO: extend an event system, use events for cleaning up.
/**
* Encapsulates the unidirectional client-side connection of one node to another.
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars -- False positive for T
interface NodeConnection<T extends GRPCClient> extends CreateDestroy {}
interface NodeConnection<M extends ClientManifest> extends CreateDestroy {}
@CreateDestroy()
class NodeConnection<T extends GRPCClient> {
class NodeConnection<M extends ClientManifest> {
public readonly host: Host;
public readonly port: Port;
/**
Expand All @@ -30,124 +31,84 @@ class NodeConnection<T extends GRPCClient> {
public readonly hostname?: Hostname;

protected logger: Logger;
protected destroyCallback: () => Promise<void>;
protected proxy: Proxy;
protected client: T;
protected quicClient: QUICClient;
protected rpcClient: RPCClient<M>;

static createNodeConnection<T extends GRPCClient>(
static createNodeConnection<M extends ClientManifest>(
{
targetNodeId,
targetHost,
targetPort,
targetHostname,
proxy,
clientFactory,
destroyCallback,
destroyTimeout,
quicClientConfig,
manifest,
logger,
}: {
targetNodeId: NodeId;
targetHost: Host;
targetPort: Port;
targetHostname?: Hostname;
proxy: Proxy;
clientFactory: (...args) => Promise<T>;
destroyCallback?: () => Promise<void>;
destroyTimeout?: number;
quicClientConfig: QUICClientConfig;
manifest: M;
logger?: Logger;
},
ctx?: Partial<ContextTimed>,
): PromiseCancellable<NodeConnection<T>>;
): PromiseCancellable<NodeConnection<M>>;
@timedCancellable(true, 20000)
static async createNodeConnection<T extends GRPCClient>(
static async createNodeConnection<M extends ClientManifest>(
{
targetNodeId,
targetHost,
targetPort,
targetHostname,
proxy,
clientFactory,
destroyCallback = async () => {},
destroyTimeout = 2000,
quicClientConfig,
manifest,
logger = new Logger(this.name),
}: {
targetNodeId: NodeId;
targetHost: Host;
targetPort: Port;
targetHostname?: Hostname;
proxy: Proxy;
clientFactory: (...args) => Promise<T>;
destroyCallback?: () => Promise<void>;
destroyTimeout?: number;
quicClientConfig: QUICClientConfig;
manifest: M;
logger?: Logger;
},
@context ctx: ContextTimed,
): Promise<NodeConnection<T>> {
): Promise<NodeConnection<M>> {
logger.info(`Creating ${this.name}`);
// Checking if attempting to connect to a wildcard IP
if (networkUtils.isHostWildcard(targetHost)) {
throw new nodesErrors.ErrorNodeConnectionHostWildcard();
}
const proxyConfig = {
host: proxy.getForwardHost(),
port: proxy.getForwardPort(),
authToken: proxy.authToken,
};
// 1. Ask fwdProxy for connection to target (the revProxy of other node)
// 2. Start sending hole-punching packets to the target (via the client start -
// this establishes a HTTP CONNECT request with the forward proxy)
// 3. Relay the proxy port to the broker/s (such that they can inform the other node)
// 4. Start sending hole-punching packets to other node (done in openConnection())
// Done in parallel
const nodeConnection = new this<T>({
// TODO: this needs to be updated to take a context,
// still uses old timer style.
const clientLogger = logger.getChild(RPCClient.name);
// TODO: Custom TLS validation with NodeId
// TODO: Idle timeout and connection timeout is the same thing from the `quic` perspective.
// THis means we need to race our timeout timer
const quicClient = await QUICClient.createQUICClient({
host: targetHost as unknown as QUICHost, // FIXME: better type conversion?
port: targetPort as unknown as QUICPort, // FIXME: better type conversion?
...quicClientConfig,
logger: logger.getChild(QUICClient.name),
});
const rpcClient = await RPCClient.createRPCClient<M>({
manifest,
middlewareFactory: rpcUtils.defaultClientMiddlewareWrapper(),
streamFactory: () => {
return quicClient.connection.streamNew();
},
logger: clientLogger,
});
const nodeConnection = new this<M>({
host: targetHost,
port: targetPort,
hostname: targetHostname,
proxy: proxy,
destroyCallback,
quicClient,
rpcClient,
logger,
});
let client: T;
try {
// TODO: this needs to be updated to take a context,
// still uses old timer style.
const clientLogger = logger.getChild(clientFactory.name);
client = await clientFactory({
nodeId: targetNodeId,
host: targetHost,
port: targetPort,
proxyConfig: proxyConfig,
// Think about this
logger: clientLogger,
destroyCallback: async () => {
clientLogger.debug(`GRPC client triggered destroyedCallback`);
if (
nodeConnection[asyncInit.status] !== 'destroying' &&
!nodeConnection[asyncInit.destroyed]
) {
await nodeConnection.destroy({ timeout: destroyTimeout });
}
},
// FIXME: this needs to be replaced with
// the GRPC timerCancellable update
timer: timerStart(ctx.timer.getTimeout()),
});
// 5. When finished, you have a connection to other node
// The GRPCClient is ready to be used for requests
} catch (e) {
await nodeConnection.destroy({ timeout: destroyTimeout });
// If the connection times out, re-throw this with a higher level nodes exception
if (e instanceof grpcErrors.ErrorGRPCClientTimeout) {
throw new nodesErrors.ErrorNodeConnectionTimeout(e.message, {
cause: e,
});
}
throw e;
}
// FIXME: we need a finally block here to do cleanup.
// TODO: This is due to chicken or egg problem
// see if we can move to CreateDestroyStartStop to resolve this
nodeConnection.client = client;
nodeConnection.rpcClient = rpcClient;
logger.info(`Created ${this.name}`);
return nodeConnection;
}
Expand All @@ -156,48 +117,43 @@ class NodeConnection<T extends GRPCClient> {
host,
port,
hostname,
proxy,
destroyCallback,
quicClient,
rpcClient,
logger,
}: {
host: Host;
port: Port;
hostname?: Hostname;
proxy: Proxy;
destroyCallback: () => Promise<void>;
quicClient: QUICClient;
rpcClient: RPCClient<M>;
logger: Logger;
}) {
this.logger = logger;
this.host = host;
this.port = port;
this.hostname = hostname;
this.proxy = proxy;
this.destroyCallback = destroyCallback;
this.quicClient = quicClient;
this.rpcClient = rpcClient;
}

public async destroy({
timeout,
force,
}: {
timeout?: number;
force?: boolean;
} = {}) {
this.logger.info(`Destroying ${this.constructor.name}`);
if (
this.client != null &&
this.client[asyncInit.status] !== 'destroying' &&
!this.client[asyncInit.destroyed]
) {
await this.client.destroy({ timeout });
}
this.logger.debug(`${this.constructor.name} triggered destroyedCallback`);
await this.destroyCallback();
await this.quicClient.destroy({ force });
await this.rpcClient.destroy();
this.logger.debug(`${this.constructor.name} triggered destroyed event`);
// TODO: trigger destroy event
this.logger.info(`Destroyed ${this.constructor.name}`);
}

/**
* Gets GRPCClient for this node connection
*/
public getClient(): T {
return this.client;
public getClient(): RPCClient<M> {
return this.rpcClient;
}

/**
Expand All @@ -207,11 +163,10 @@ class NodeConnection<T extends GRPCClient> {
*/
@ready(new nodesErrors.ErrorNodeConnectionDestroyed())
public getRootCertChain(): Array<Certificate> {
const connInfo = this.proxy.getConnectionInfoByProxy(this.host, this.port);
if (connInfo == null) {
throw new nodesErrors.ErrorNodeConnectionInfoNotExist();
}
return connInfo.remoteCertificates;
const connInfo = this.quicClient.connection.remoteInfo;
// TODO:
throw Error('TMP IMP');
// Return connInfo.remoteCertificates;
}
}

Expand Down
Loading

0 comments on commit 4c119d5

Please sign in to comment.