Skip to content

Commit

Permalink
feat: 🎸 add dependency injection to tablist
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Apr 22, 2022
1 parent 04b9608 commit c21aa92
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 24 deletions.
60 changes: 39 additions & 21 deletions src/tablist/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {BehaviorSubject, Observable, timer} from "rxjs";
import {pubsub} from "../pubsub";
import type {PubSub} from "../pubsub/types";

const enum CHANNEL {
Expand All @@ -8,15 +9,23 @@ const enum CHANNEL {
LIST,
/** Notify other tabs that this tab is alive. */
PING,
/** Call another tab. Establish a bi-directional private channel. */
CALL,
}

type HelloMessage = number;
type PingMessage = number;
type ListMessage = [id: number, peers: PeerList];
type CallMessage = [caller: number, callee: number];

type PeerList = Record<number, number>;

/** */
export interface TabListDependencies {
readonly bus: PubSub;
readonly newBus: (name: string) => PubSub;
readonly clock?: number;
}

export class TabList {
/** Unique ID of the current browser tab instance. Zero is not allowed. */
public readonly id: number = Math.ceil(Math.random() * 0xFFFFFFFFFFFFFF);
Expand All @@ -25,58 +34,67 @@ export class TabList {
protected readonly clock$: Observable<number>;
protected pingTimeout: number

/** Send other tabs a heartbeat. This is sent automatically in a loop. */
public ping = () => {
this.pubsub.pub(CHANNEL.PING, this.id);
this.deps.bus.pub(CHANNEL.PING, this.id);
}

public constructor(protected readonly pubsub: PubSub, clock: number = 200) {
public constructor(protected readonly deps: TabListDependencies) {
const {bus, clock = 200} = deps;
this.clock$ = timer(clock, clock);
this.pingTimeout = clock * 5;

// Listen for new peers joining.
pubsub.sub$<HelloMessage>(CHANNEL.HELLO).subscribe((id) => {
bus.sub$<HelloMessage>(CHANNEL.HELLO).subscribe((id) => {
const now = Date.now();
this.mergePeers({[id]: now}, now);
pubsub.pub<ListMessage>(CHANNEL.LIST, [this.id, this.peers]);
bus.pub<ListMessage>(CHANNEL.LIST, [this.id, this.peers]);
});

// Listen for peer list updates.
pubsub.sub$<ListMessage>(CHANNEL.LIST).subscribe(([id, peers]) => {
bus.sub$<ListMessage>(CHANNEL.LIST).subscribe(([id, peers]) => {
const now = Date.now();
peers[id] = now;
this.mergePeers(peers, now);
});

// Join the swarm.
this.pubsub.pub<HelloMessage>(CHANNEL.HELLO, this.id);
bus.pub<HelloMessage>(CHANNEL.HELLO, this.id);

// Send heartbeats to other tabs.
this.clock$.subscribe(this.ping);

// Listen to heartbeats from other tabs.
pubsub.sub$<PingMessage>(CHANNEL.PING).subscribe((id) => {
bus.sub$<PingMessage>(CHANNEL.PING).subscribe((id) => {
const now = Date.now();
this.mergePeers({[id]: now}, now);
});
}

public mergePeers(peers: PeerList, now: number = Date.now()): number {
let leader = 0;
const localList = this.peers;
for (const [key, time] of Object.entries(peers)) {
const id = Number(key);
if ((time + this.pingTimeout < now) || (id === this.id)) continue;
if (!localList[id]) localList[id] = time;
else if (localList[id] < time) localList[id] = time;
if (leader < id) leader = id;
}
for (const [key, time] of Object.entries(localList)) {
protected mergePeers(peers: PeerList, now: number = Date.now()): number {
let leader = this.id;
Object.assign(this.peers, peers);
for (const [key, time] of Object.entries(this.peers)) {
const id = Number(key);
if ((time + this.pingTimeout < now) || (id === this.id)) delete localList[id];
if ((time + this.pingTimeout < now) || (id === this.id)) delete this.peers[id];
if (leader < id) leader = id;
}
if (this.id > leader) leader = this.id;
if (leader !== this.leader$.getValue()) this.leader$.next(leader);
return leader;
}

private getCallNumber(tabId: number): string {
let id1 = this.id;
let id2 = tabId;
if (id1 > id2) [id1, id2] = [id2, id1];
return `${id1.toString(36)}-${id2.toString(36)}`;
}

/** Establish a bi-directional channel with another tab. */
public async call(tabId: number): Promise<PubSub> {
const callNumber = this.getCallNumber(tabId);
const channel = pubsub(callNumber);
this.deps.bus.pub<CallMessage>(CHANNEL.CALL, [this.id, tabId]);
return channel;
};
}
8 changes: 5 additions & 3 deletions stories/tablist.stories.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ export default {
title: 'tab list',
};

const channel = pubsub('test');
const list = new TabList(channel);
const bus = pubsub('test');
const list = new TabList({
bus,
});

list.leader$.subscribe((leader) => {
console.log('leader:', leader);
});

export const Default = () => {
const incoming$ = new BehaviorSubject([]);
channel.sub$(4).subscribe((data) => {
bus.sub$(4).subscribe((data) => {
console.log('data received:', data);
incoming$.next(incoming$.getValue().concat(data));
});
Expand Down

0 comments on commit c21aa92

Please sign in to comment.