Skip to content

Commit

Permalink
if already connected, connect() returns rpc, not false
Browse files Browse the repository at this point in the history
  • Loading branch information
staltz committed Mar 18, 2021
1 parent 835fc13 commit 7a8a880
Showing 1 changed file with 34 additions and 3 deletions.
37 changes: 34 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ function inferPeerType(address: Address, meta: any): Data['inferredType'] {
class ConnHub {
private readonly _server: any;
private readonly _peers: Map<Address, Data>;
private readonly _rpcs: Map<Address, any>;
private readonly _notifyEvent: any;
private readonly _notifyEntries: any;
private _closed: boolean;
Expand All @@ -46,6 +47,7 @@ class ConnHub {
this._closed = false;
this._connectRetries = new Set<Address>();
this._peers = new Map<Address, Data>();
this._rpcs = new Map<Address, any>();
this._notifyEvent = Notify();
this._notifyEntries = Notify();
this._init();
Expand Down Expand Up @@ -134,7 +136,7 @@ class ConnHub {
}

if (!peer) {
// Here, weAreClient is surely `false`
// Here, isClient is surely `false`
debug('peer %s initiated an RPC connection with us', rpc.id);
}

Expand All @@ -149,6 +151,7 @@ class ConnHub {
const state = 'connected';
const disconnect: Data['disconnect'] = (cb) => rpc.close(true, cb ?? noop);
this._setPeer(address, {...data, state, disconnect});
this._rpcs.set(address, rpc);
debug('connected to %s', address);
this._notifyEvent({
type: state,
Expand All @@ -159,6 +162,7 @@ class ConnHub {
this._updateLiveEntries();

rpc.on('closed', () => {
this._rpcs.delete(address);
this._peers.delete(address);
debug('disconnected from %s', address);
this._notifyEvent({type: 'disconnected', address, key} as ListenEvent);
Expand All @@ -179,8 +183,33 @@ class ConnHub {

if (this._peers.has(address)) {
const peer = this._peers.get(address)!;
if (peer.state === 'connecting' || peer.state === 'connected') {
return false;
if (peer.state === 'connected') {
if (this._rpcs.has(address)) return this._rpcs.get(address);
else return false;
} else if (peer.state === 'connecting') {
return new Promise((resolve, reject) => {
let drainer: any;
setTimeout(() => {
if (drainer) drainer.abort();
resolve(false);
}, 60e3);
pull(
this._notifyEvent.listen(),
pull.filter(
(ev: ListenEvent) =>
ev.type === 'connected' && ev.address === address,
),
pull.take(1),
(drainer = pull.drain(
(ev: ListenEvent) => {
resolve(ev.details.rpc);
},
(err: any) => {
if (err && err !== true) reject(err);
},
)),
);
});
} else if (peer.state === 'disconnecting') {
// If disconnecting, schedule a connect() after disconnection completed
this._connectRetries.add(address);
Expand Down Expand Up @@ -228,6 +257,7 @@ class ConnHub {
} as ListenEvent);
this._updateLiveEntries();
}
this._rpcs.set(address, rpc);
return rpc;
}

Expand Down Expand Up @@ -343,6 +373,7 @@ class ConnHub {
);
this._closed = true;
this._peers.clear();
this._rpcs.clear();
this._notifyEvent.end();
this._notifyEntries.end();
debug('closed the ConnHub instance');
Expand Down

0 comments on commit 7a8a880

Please sign in to comment.