diff --git a/src/tablist/index.ts b/src/tablist/index.ts index c0f4d0e..72cddce 100644 --- a/src/tablist/index.ts +++ b/src/tablist/index.ts @@ -1,4 +1,5 @@ import {BehaviorSubject, Observable, timer} from "rxjs"; +import {pubsub} from "../pubsub"; import type {PubSub} from "../pubsub/types"; const enum CHANNEL { @@ -8,15 +9,23 @@ const enum CHANNEL { LIST, /** Notify other tabs that this tab is alive. */ PING, + /** Call another tab. Establish a bi-directional private channel. */ + CALL, } type HelloMessage = number; type PingMessage = number; type ListMessage = [id: number, peers: PeerList]; +type CallMessage = [caller: number, callee: number]; type PeerList = Record; -/** */ +export interface TabListDependencies { + readonly bus: PubSub; + readonly newBus: (name: string) => PubSub; + readonly clock?: number; +} + export class TabList { /** Unique ID of the current browser tab instance. Zero is not allowed. */ public readonly id: number = Math.ceil(Math.random() * 0xFFFFFFFFFFFFFF); @@ -25,58 +34,67 @@ export class TabList { protected readonly clock$: Observable; protected pingTimeout: number + /** Send other tabs a heartbeat. This is sent automatically in a loop. */ public ping = () => { - this.pubsub.pub(CHANNEL.PING, this.id); + this.deps.bus.pub(CHANNEL.PING, this.id); } - public constructor(protected readonly pubsub: PubSub, clock: number = 200) { + public constructor(protected readonly deps: TabListDependencies) { + const {bus, clock = 200} = deps; this.clock$ = timer(clock, clock); this.pingTimeout = clock * 5; // Listen for new peers joining. - pubsub.sub$(CHANNEL.HELLO).subscribe((id) => { + bus.sub$(CHANNEL.HELLO).subscribe((id) => { const now = Date.now(); this.mergePeers({[id]: now}, now); - pubsub.pub(CHANNEL.LIST, [this.id, this.peers]); + bus.pub(CHANNEL.LIST, [this.id, this.peers]); }); // Listen for peer list updates. - pubsub.sub$(CHANNEL.LIST).subscribe(([id, peers]) => { + bus.sub$(CHANNEL.LIST).subscribe(([id, peers]) => { const now = Date.now(); peers[id] = now; this.mergePeers(peers, now); }); // Join the swarm. - this.pubsub.pub(CHANNEL.HELLO, this.id); + bus.pub(CHANNEL.HELLO, this.id); // Send heartbeats to other tabs. this.clock$.subscribe(this.ping); // Listen to heartbeats from other tabs. - pubsub.sub$(CHANNEL.PING).subscribe((id) => { + bus.sub$(CHANNEL.PING).subscribe((id) => { const now = Date.now(); this.mergePeers({[id]: now}, now); }); } - public mergePeers(peers: PeerList, now: number = Date.now()): number { - let leader = 0; - const localList = this.peers; - for (const [key, time] of Object.entries(peers)) { - const id = Number(key); - if ((time + this.pingTimeout < now) || (id === this.id)) continue; - if (!localList[id]) localList[id] = time; - else if (localList[id] < time) localList[id] = time; - if (leader < id) leader = id; - } - for (const [key, time] of Object.entries(localList)) { + protected mergePeers(peers: PeerList, now: number = Date.now()): number { + let leader = this.id; + Object.assign(this.peers, peers); + for (const [key, time] of Object.entries(this.peers)) { const id = Number(key); - if ((time + this.pingTimeout < now) || (id === this.id)) delete localList[id]; + if ((time + this.pingTimeout < now) || (id === this.id)) delete this.peers[id]; if (leader < id) leader = id; } - if (this.id > leader) leader = this.id; if (leader !== this.leader$.getValue()) this.leader$.next(leader); return leader; } + + private getCallNumber(tabId: number): string { + let id1 = this.id; + let id2 = tabId; + if (id1 > id2) [id1, id2] = [id2, id1]; + return `${id1.toString(36)}-${id2.toString(36)}`; + } + + /** Establish a bi-directional channel with another tab. */ + public async call(tabId: number): Promise { + const callNumber = this.getCallNumber(tabId); + const channel = pubsub(callNumber); + this.deps.bus.pub(CHANNEL.CALL, [this.id, tabId]); + return channel; + }; } diff --git a/stories/tablist.stories.js b/stories/tablist.stories.js index 13aa170..f60ca41 100644 --- a/stories/tablist.stories.js +++ b/stories/tablist.stories.js @@ -7,8 +7,10 @@ export default { title: 'tab list', }; -const channel = pubsub('test'); -const list = new TabList(channel); +const bus = pubsub('test'); +const list = new TabList({ + bus, +}); list.leader$.subscribe((leader) => { console.log('leader:', leader); @@ -16,7 +18,7 @@ list.leader$.subscribe((leader) => { export const Default = () => { const incoming$ = new BehaviorSubject([]); - channel.sub$(4).subscribe((data) => { + bus.sub$(4).subscribe((data) => { console.log('data received:', data); incoming$.next(incoming$.getValue().concat(data)); });