-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.ts
97 lines (81 loc) · 2.92 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import {Mutex} from 'async-mutex';
import net from 'net';
const listenPort = 1337;
const subscriptions: Subscription[] = [];
const subMutex = new Mutex();
type Subscription = {
socket: net.Socket;
eventName: string;
};
async function addConnection(socket: net.Socket, eventName: string) {
await subMutex.acquire();
try {
console.log('Adding subscriber for ' + eventName);
subscriptions.push({
socket,
eventName,
});
} finally {
await subMutex.release();
}
socket.write('Subscription to ' + eventName + ' confirmed\r\n');
}
async function removeConnection(socket: net.Socket, eventName?: string) {
await subMutex.acquire();
try {
if (eventName) {
let idx = subscriptions.findIndex((val) => val.socket === socket && val.eventName === eventName);
while (idx !== -1) {
const deleted = subscriptions.splice(idx, 1);
socket.write('Unsubscribed from ' + deleted[0].eventName + '\r\n');
idx = subscriptions.findIndex((sub) => sub.socket === socket && sub.eventName === eventName);
}
} else {
// note: don't write to the socket here, we're probably already disconnected
let idx = subscriptions.findIndex((sub) => sub.socket === socket);
while (idx !== -1) {
subscriptions.splice(idx, 1);
idx = subscriptions.findIndex((val) => val.socket === socket);
}
}
} finally {
await subMutex.release();
}
console.log(subscriptions.length + ' subs remaining');
}
async function alertNoParam(eventName: string) {
await subMutex.acquire();
try {
console.log('Alerting for ' + eventName);
for (const sub of subscriptions) {
if (sub.eventName === eventName) {
sub.socket.write('alert: ' + eventName + '\r\n');
}
}
} finally {
await subMutex.release();
}
}
async function onClientConnect(socket: net.Socket) {
socket.on('close', async () => {
await removeConnection(socket);
});
socket.on('data', async (data) => {
const lines = data.toString().split('\r\n');
for (const line of lines) {
if (!line) continue;
if (line.startsWith('subscribe: ')) {
await addConnection(socket, line.substr(11));
} else if (line.startsWith('unsubscribe: ')) {
await removeConnection(socket, line.substr(13));
} else if (line.startsWith('alertNoParam: ')) {
await alertNoParam(line.substr(14));
} else {
console.error('Someone said something unknown: ' + line);
}
}
});
socket.write('Welcome!\r\n' + socket.remoteAddress + ':' + socket.remotePort + '\r\n');
}
const server = net.createServer(onClientConnect);
server.listen(listenPort, '0.0.0.0');