From b00124b65acdc0e373c0241c6a8e7f6406f0627b Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 17 Jul 2024 09:27:12 +0200 Subject: [PATCH 01/12] feat: implement cluster-friendly engine --- .github/workflows/ci.yml | 11 + package-lock.json | 203 +++++- package.json | 3 + packages/socket.io-clustered-engine/LICENSE | 22 + packages/socket.io-clustered-engine/README.md | 185 +++++ .../socket.io-clustered-engine/compose.yaml | 5 + .../socket.io-clustered-engine/lib/cluster.ts | 65 ++ .../socket.io-clustered-engine/lib/engine.ts | 687 ++++++++++++++++++ .../socket.io-clustered-engine/lib/index.ts | 2 + .../socket.io-clustered-engine/lib/redis.ts | 200 +++++ .../socket.io-clustered-engine/package.json | 41 ++ .../test/cluster.ts | 83 +++ .../test/in-memory.ts | 363 +++++++++ .../socket.io-clustered-engine/test/redis.ts | 225 ++++++ .../socket.io-clustered-engine/test/util.ts | 17 + .../socket.io-clustered-engine/test/worker.js | 16 + .../socket.io-clustered-engine/tsconfig.json | 12 + 17 files changed, 2139 insertions(+), 1 deletion(-) create mode 100644 packages/socket.io-clustered-engine/LICENSE create mode 100644 packages/socket.io-clustered-engine/README.md create mode 100644 packages/socket.io-clustered-engine/compose.yaml create mode 100644 packages/socket.io-clustered-engine/lib/cluster.ts create mode 100644 packages/socket.io-clustered-engine/lib/engine.ts create mode 100644 packages/socket.io-clustered-engine/lib/index.ts create mode 100644 packages/socket.io-clustered-engine/lib/redis.ts create mode 100644 packages/socket.io-clustered-engine/package.json create mode 100644 packages/socket.io-clustered-engine/test/cluster.ts create mode 100644 packages/socket.io-clustered-engine/test/in-memory.ts create mode 100644 packages/socket.io-clustered-engine/test/redis.ts create mode 100644 packages/socket.io-clustered-engine/test/util.ts create mode 100644 packages/socket.io-clustered-engine/test/worker.js create mode 100644 packages/socket.io-clustered-engine/tsconfig.json diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ba343e3a12..709314738f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,6 +21,17 @@ jobs: - 18 - 20 + services: + redis: + image: redis:7 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 + steps: - name: Checkout repository uses: actions/checkout@v4 diff --git a/package-lock.json b/package-lock.json index bc27df6094..c3efa6ae62 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,6 +8,7 @@ "packages/socket.io-component-emitter", "packages/engine.io-parser", "packages/engine.io", + "packages/socket.io-clustered-engine", "packages/engine.io-client", "packages/socket.io-adapter", "packages/socket.io-parser", @@ -46,10 +47,12 @@ "express-session": "^1.18.0", "has-cors": "^1.1.0", "helmet": "^7.1.0", + "ioredis": "^5.4.1", "mocha": "^10.6.0", "node-forge": "^1.3.1", "nyc": "^17.0.0", "prettier": "^2.8.8", + "redis": "^4.6.15", "rimraf": "^6.0.0", "rollup": "^2.79.1", "rollup-plugin-terser": "^7.0.2", @@ -1846,6 +1849,12 @@ "node": ">=16" } }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "dev": true + }, "node_modules/@isaacs/cliui": { "version": "8.0.2", "resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz", @@ -2235,6 +2244,14 @@ "node": ">= 0.4" } }, + "node_modules/@msgpack/msgpack": { + "version": "2.8.0", + "resolved": "https://registry.npmjs.org/@msgpack/msgpack/-/msgpack-2.8.0.tgz", + "integrity": "sha512-h9u4u/jiIRKbq25PM+zymTyW6bhTzELvOoUd+AvYriWOAKpLGnIamaET3pnHYoI5iYphAHBI4ayx0MehR+VVPQ==", + "engines": { + "node": ">= 10" + } + }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", @@ -2359,6 +2376,71 @@ "streamx": "^2.15.0" } }, + "node_modules/@redis/bloom": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", + "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==", + "dev": true, + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/client": { + "version": "1.5.17", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.17.tgz", + "integrity": "sha512-IPvU9A31qRCZ7lds/x+ksuK/UMndd0EASveAvCvEtFFKIZjZ+m/a4a0L7S28KEWoR5ka8526hlSghDo4Hrc2Hg==", + "dev": true, + "dependencies": { + "cluster-key-slot": "1.1.2", + "generic-pool": "3.9.0", + "yallist": "4.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/@redis/client/node_modules/yallist": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", + "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==", + "dev": true + }, + "node_modules/@redis/graph": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.1.tgz", + "integrity": "sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==", + "dev": true, + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/json": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.6.tgz", + "integrity": "sha512-rcZO3bfQbm2zPRpqo82XbW8zg4G/w4W3tI7X8Mqleq9goQjAGLL7q/1n1ZX4dXEAmORVZ4s1+uKLaUOg7LrUhw==", + "dev": true, + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/search": { + "version": "1.1.6", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.6.tgz", + "integrity": "sha512-mZXCxbTYKBQ3M2lZnEddwEAks0Kc7nauire8q20oA0oA/LoA+E/b5Y5KZn232ztPb1FkIGqo12vh3Lf+Vw5iTw==", + "dev": true, + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/time-series": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.5.tgz", + "integrity": "sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==", + "dev": true, + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, "node_modules/@rollup/plugin-alias": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/@rollup/plugin-alias/-/plugin-alias-5.1.0.tgz", @@ -2640,6 +2722,10 @@ "@sinonjs/commons": "^3.0.0" } }, + "node_modules/@socket.io/clustered-engine": { + "resolved": "packages/socket.io-clustered-engine", + "link": true + }, "node_modules/@socket.io/component-emitter": { "resolved": "packages/socket.io-component-emitter", "link": true @@ -5349,6 +5435,15 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "dev": true, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/color-convert": { "version": "1.9.3", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz", @@ -5975,6 +6070,15 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "dev": true, + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -7645,6 +7749,15 @@ "node": "^16.13.0 || >=18.0.0" } }, + "node_modules/generic-pool": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", + "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==", + "dev": true, + "engines": { + "node": ">= 4" + } + }, "node_modules/gensync": { "version": "1.0.0-beta.2", "resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz", @@ -8373,6 +8486,30 @@ "node": ">=8" } }, + "node_modules/ioredis": { + "version": "5.4.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.4.1.tgz", + "integrity": "sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==", + "dev": true, + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/ip": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.1.tgz", @@ -9480,12 +9617,24 @@ "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", "dev": true }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "dev": true + }, "node_modules/lodash.flattendeep": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/lodash.flattendeep/-/lodash.flattendeep-4.4.0.tgz", "integrity": "sha512-uHaJFihxmJcEX3kT4I23ABqKKalJ/zDrDg0lsFtc1h+3uw49SIJ5beyhx5ExVRti3AvKoOJngIj7xz3oylPdWQ==", "dev": true }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "dev": true + }, "node_modules/lodash.isequal": { "version": "4.5.0", "resolved": "https://registry.npmjs.org/lodash.isequal/-/lodash.isequal-4.5.0.tgz", @@ -11857,6 +12006,41 @@ "node": ">=8" } }, + "node_modules/redis": { + "version": "4.6.15", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.15.tgz", + "integrity": "sha512-2NtuOpMW3tnYzBw6S8mbXSX7RPzvVFCA2wFJq9oErushO2UeBkxObk+uvo7gv7n0rhWeOj/IzrHO8TjcFlRSOg==", + "dev": true, + "dependencies": { + "@redis/bloom": "1.2.0", + "@redis/client": "1.5.17", + "@redis/graph": "1.1.1", + "@redis/json": "1.0.6", + "@redis/search": "1.1.6", + "@redis/time-series": "1.0.5" + } + }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "dev": true, + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dev": true, + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/regenerate": { "version": "1.4.2", "resolved": "https://registry.npmjs.org/regenerate/-/regenerate-1.4.2.tgz", @@ -12997,6 +13181,12 @@ "node": ">=8" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "dev": true + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", @@ -15008,7 +15198,7 @@ } }, "packages/engine.io-parser": { - "version": "5.2.2", + "version": "5.2.3", "license": "MIT", "devDependencies": { "prettier": "^3.3.2" @@ -15113,6 +15303,17 @@ "xmlhttprequest-ssl": "~2.0.0" } }, + "packages/socket.io-clustered-engine": { + "name": "@socket.io/clustered-engine", + "version": "0.0.1", + "license": "MIT", + "dependencies": { + "@msgpack/msgpack": "~2.8.0", + "debug": "~4.3.3", + "engine.io": "~6.6.0", + "engine.io-parser": "~5.2.3" + } + }, "packages/socket.io-component-emitter": { "name": "@socket.io/component-emitter", "version": "3.1.2", diff --git a/package.json b/package.json index f360ebfbfd..6ebf746e26 100644 --- a/package.json +++ b/package.json @@ -4,6 +4,7 @@ "packages/socket.io-component-emitter", "packages/engine.io-parser", "packages/engine.io", + "packages/socket.io-clustered-engine", "packages/engine.io-client", "packages/socket.io-adapter", "packages/socket.io-parser", @@ -47,10 +48,12 @@ "express-session": "^1.18.0", "has-cors": "^1.1.0", "helmet": "^7.1.0", + "ioredis": "^5.4.1", "mocha": "^10.6.0", "node-forge": "^1.3.1", "nyc": "^17.0.0", "prettier": "^2.8.8", + "redis": "^4.6.15", "rimraf": "^6.0.0", "rollup": "^2.79.1", "rollup-plugin-terser": "^7.0.2", diff --git a/packages/socket.io-clustered-engine/LICENSE b/packages/socket.io-clustered-engine/LICENSE new file mode 100644 index 0000000000..0d20e09147 --- /dev/null +++ b/packages/socket.io-clustered-engine/LICENSE @@ -0,0 +1,22 @@ +(The MIT License) + +Copyright (c) 2024-present Guillermo Rauch and Socket.IO contributors + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +'Software'), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/packages/socket.io-clustered-engine/README.md b/packages/socket.io-clustered-engine/README.md new file mode 100644 index 0000000000..8527752a11 --- /dev/null +++ b/packages/socket.io-clustered-engine/README.md @@ -0,0 +1,185 @@ +# Socket.IO clustered engine + +A cluster-friendly engine to share load between multiple Node.js processes (without sticky sessions). + +**Table of contents** + + + * [Usage](#usage) + * [Node.js cluster](#nodejs-cluster) + * [Redis](#redis) + * [Node.js cluster & Redis](#nodejs-cluster--redis) + * [Options](#options) + * [How it works](#how-it-works) + * [License](#license) + + +## Usage + +### Node.js cluster + +```js +import cluster from "node:cluster"; +import process from "node:process"; +import { availableParallelism } from "node:os"; +import { setupPrimary, NodeClusterEngine } from "@socket.io/clustered-engine"; +import { createServer } from "node:http"; +import { Server } from "socket.io"; + +if (cluster.isPrimary) { + console.log(`Primary ${process.pid} is running`); + + const numCPUs = availableParallelism(); + + // fork workers + for (let i = 0; i < numCPUs; i++) { + cluster.fork(); + } + + // setup connection within the cluster + setupPrimary(); + + // needed for packets containing Buffer objects (you can ignore it if you only send plaintext objects) + cluster.setupPrimary({ + serialization: "advanced", + }); + + cluster.on("exit", (worker, code, signal) => { + console.log(`worker ${worker.process.pid} died`); + }); +} else { + const httpServer = createServer((req, res) => { + res.writeHead(404).end(); + }); + + const engine = new NodeClusterEngine(); + + engine.attach(httpServer, { + path: "/socket.io/" + }); + + const io = new Server(); + + io.bind(engine); + + // workers will share the same port + httpServer.listen(3000); + + console.log(`Worker ${process.pid} started`); +} +``` + +### Redis + +```js +import { createServer } from "node:http"; +import { createClient } from "redis"; +import { RedisEngine } from "@socket.io/clustered-engine"; +import { Server } from "socket.io"; + +const httpServer = createServer((req, res) => { + res.writeHead(404).end(); +}); + +const pubClient = createClient(); +const subClient = pubClient.duplicate(); + +await Promise.all([ + pubClient.connect(), + subClient.connect(), +]); + +const engine = new RedisEngine(pubClient, subClient); + +engine.attach(httpServer, { + path: "/socket.io/" +}); + +const io = new Server(); + +io.bind(engine); + +httpServer.listen(3000); +``` + +### Node.js cluster & Redis + +```js +import cluster from "node:cluster"; +import process from "node:process"; +import { availableParallelism } from "node:os"; +import { createClient } from "redis"; +import { setupPrimaryWithRedis, NodeClusterEngine } from "@socket.io/clustered-engine"; +import { createServer } from "node:http"; +import { Server } from "socket.io"; + +if (cluster.isPrimary) { + console.log(`Primary ${process.pid} is running`); + + const numCPUs = availableParallelism(); + + // fork workers + for (let i = 0; i < numCPUs; i++) { + cluster.fork(); + } + + const pubClient = createClient(); + const subClient = pubClient.duplicate(); + + await Promise.all([ + pubClient.connect(), + subClient.connect(), + ]); + + // setup connection between and within the clusters + setupPrimaryWithRedis(pubClient, subClient); + + // needed for packets containing Buffer objects (you can ignore it if you only send plaintext objects) + cluster.setupPrimary({ + serialization: "advanced", + }); + + cluster.on("exit", (worker, code, signal) => { + console.log(`worker ${worker.process.pid} died`); + }); +} else { + const httpServer = createServer((req, res) => { + res.writeHead(404).end(); + }); + + const engine = new NodeClusterEngine(); + + engine.attach(httpServer, { + path: "/socket.io/" + }); + + const io = new Server(); + + io.bind(engine); + + // workers will share the same port + httpServer.listen(3000); + + console.log(`Worker ${process.pid} started`); +} +``` + +## Options + +| Name | Description | Default value | +|----------------------------|-----------------------------------------------------------------------|---------------| +| `responseTimeout` | The maximum waiting time for responses from other nodes, in ms. | `1000 ms` | +| `noopUpgradeInterval` | The delay between two "noop" packets when the client upgrades, in ms. | `200 ms` | +| `delayedConnectionTimeout` | The maximum waiting time for a successful upgrade, in ms. | `300 ms` | + +## How it works + +This engine extends the one provided by the `engine.io` package, so that sticky sessions are not required when scaling horizontally. + +The Node.js workers communicate via the IPC channel (or via Redis pub/sub) to check whether the Engine.IO session exists on another worker. In that case, the packets are forwarded to the worker which owns the session. + +Additionally, when a client starts with HTTP long-polling, the connection is delayed to allow the client to upgrade, so that the WebSocket connection ends up on the worker which owns the session. + +## License + +[MIT](LICENSE) diff --git a/packages/socket.io-clustered-engine/compose.yaml b/packages/socket.io-clustered-engine/compose.yaml new file mode 100644 index 0000000000..c5c85d2a9e --- /dev/null +++ b/packages/socket.io-clustered-engine/compose.yaml @@ -0,0 +1,5 @@ +services: + redis: + image: redis:7 + ports: + - "6379:6379" diff --git a/packages/socket.io-clustered-engine/lib/cluster.ts b/packages/socket.io-clustered-engine/lib/cluster.ts new file mode 100644 index 0000000000..8dde7b21e1 --- /dev/null +++ b/packages/socket.io-clustered-engine/lib/cluster.ts @@ -0,0 +1,65 @@ +import cluster from "node:cluster"; +import { type ServerOptions } from "engine.io"; +import { ClusterEngine, type Message } from "./engine"; +import debugModule from "debug"; + +const debug = debugModule("engine:cluster"); +const MESSAGE_SOURCE = "_eio"; +const kNodeId = Symbol("nodeId"); + +function ignoreError() {} + +export function setupPrimary() { + cluster.on("message", (sourceWorker, message: { _source?: string }) => { + if (message._source !== MESSAGE_SOURCE) { + debug("ignore message from unknown source"); + return; + } + + if (!sourceWorker[kNodeId]) { + sourceWorker[kNodeId] = (message as Message).senderId; + } + + // @ts-expect-error recipientId is not defined for all messages + let recipientId = (message as Message).recipientId; + if (recipientId) { + for (const worker of Object.values(cluster.workers)) { + if (worker[kNodeId] === recipientId) { + debug("forward message to worker %d", worker.id); + worker.send(message, null, ignoreError); + return; + } + } + } + + debug("forward message to all other workers"); + for (const worker of Object.values(cluster.workers)) { + if (worker.id !== sourceWorker.id) { + worker.send(message, null, ignoreError); + } + } + }); +} + +export class NodeClusterEngine extends ClusterEngine { + constructor(opts?: ServerOptions) { + super(opts); + + process.on("message", (message: Message & { _source?: string }) => { + if (message._source !== MESSAGE_SOURCE) { + debug("ignore message from unknown source"); + return; + } + + debug("received message: %j", message); + this.onMessage(message); + }); + } + + override publishMessage(message: Message & { _source?: string }) { + message._source = MESSAGE_SOURCE; + + debug("send message to primary"); + process.send(message, null, { swallowErrors: true }, ignoreError); + } +} diff --git a/packages/socket.io-clustered-engine/lib/engine.ts b/packages/socket.io-clustered-engine/lib/engine.ts new file mode 100644 index 0000000000..90ce6762ec --- /dev/null +++ b/packages/socket.io-clustered-engine/lib/engine.ts @@ -0,0 +1,687 @@ +import { Server, type ServerOptions, Socket, type Transport } from "engine.io"; +import { randomBytes } from "node:crypto"; +import { setTimeout, clearTimeout } from "node:timers"; +import { type IncomingMessage } from "node:http"; +import { type Packet } from "engine.io-parser"; +import debugModule from "debug"; + +const debug = debugModule("engine"); + +const kDelayed = Symbol("delayed"); +const kDelayedTimer = Symbol("delayedTimer"); +const kBuffer = Symbol("buffer"); +const kPacketListener = Symbol("packetListener"); +const kNoopTimer = Symbol("noopTimer"); +const kSenderId = Symbol("senderId"); + +type Brand = K & { __brand: T }; + +type NodeId = Brand; +type SessionId = Brand; +type RequestId = Brand; + +function randomId() { + return randomBytes(3).toString("hex"); +} + +enum MessageType { + ACQUIRE_LOCK = 0, + ACQUIRE_LOCK_RESPONSE, + DRAIN, + PACKET, + UPGRADE, + UPGRADE_RESPONSE, + CLOSE, +} + +export type Message = { + senderId: NodeId; +} & ( + | { + requestId: RequestId; + type: MessageType.ACQUIRE_LOCK; + data: { + sid: SessionId; + transportName: string; + type: "read" | "write"; + }; + } + | { + recipientId: NodeId; + requestId: RequestId; + type: MessageType.ACQUIRE_LOCK_RESPONSE; + data: { + success: boolean; + }; + } + | { + recipientId: NodeId; + type: MessageType.DRAIN; + data: { + sid: SessionId; + packets: Packet[]; + }; + } + | { + recipientId: NodeId; + type: MessageType.PACKET; + data: { + sid: SessionId; + packet: Packet; + }; + } + | { + requestId: RequestId; + recipientId: NodeId; + type: MessageType.UPGRADE; + data: { + sid: SessionId; + success: boolean; + }; + } + | { + requestId: RequestId; + recipientId: NodeId; + type: MessageType.UPGRADE_RESPONSE; + data: { + takeOver: boolean; + packets: Packet[]; + }; + } + | { + recipientId: NodeId; + type: MessageType.CLOSE; + data: { + sid: SessionId; + reason: string; + }; + } +); + +type ClusterRequest = { + timer: NodeJS.Timer; + onSuccess: (...args: any[]) => void; + onError: () => void; +}; + +function isClientLockable( + client: Socket, + transportName: string, + lockType: "read" | "write" +) { + switch (transportName) { + case "polling": + return ( + client.transport.name === "polling" && + (lockType === "write" || !client.transport.writable) + ); + case "websocket": + case "webtransport": + return ( + client.transport.name === "polling" && + !client.upgrading && + !client.upgraded + ); + } +} + +function isValidSessionId(str: string) { + return typeof str === "string" && str.length === 20; +} + +interface ClusterEngineOptions { + /** + * The maximum waiting time for responses from other nodes, in ms. + * + * @default 1000 + */ + responseTimeout?: number; + /** + * The delay between two "noop" packets when the client upgrades, in ms. + * + * @default 200 + */ + noopUpgradeInterval?: number; + /** + * The maximum waiting time for a successful upgrade, in ms. + * + * @default 300 + */ + delayedConnectionTimeout?: number; +} + +// @ts-expect-error onWebSocket() method is private in parent class +export abstract class ClusterEngine extends Server { + private readonly _opts: Required; + protected readonly _nodeId = randomId() as NodeId; + private readonly _requests = new Map(); + private readonly _remoteTransports = new Map(); + private _requestCount = 0; + + constructor(opts?: ServerOptions & ClusterEngineOptions) { + super(opts); + this._opts = Object.assign( + { + responseTimeout: 1000, + noopUpgradeInterval: 200, + delayedConnectionTimeout: 300, + }, + opts + ); + } + + protected onMessage(message: Message) { + if (message.senderId === this._nodeId) { + return; + } + debug("received: %j", message); + + switch (message.type) { + case MessageType.ACQUIRE_LOCK: { + const sid = message.data.sid; + const client = this.clients[sid]; + if (!client) { + return; + } + + const transportName = message.data.transportName; + const success = isClientLockable( + client, + transportName, + message.data.type + ); + + this.publishMessage({ + requestId: message.requestId, + senderId: this._nodeId, + recipientId: message.senderId, + type: MessageType.ACQUIRE_LOCK_RESPONSE, + data: { + success, + }, + }); + + switch (transportName) { + case "polling": { + if (message.data.type === "read") { + this._forwardFlushWhenPolling(client, sid, message.senderId); + } + break; + } + case "websocket": + case "webtransport": { + client.upgrading = true; + client[kNoopTimer] = setTimeout(() => { + debug("writing a noop packet to polling for fast upgrade"); + // @ts-expect-error sendPacket() is private + client.sendPacket("noop"); + }, this._opts.noopUpgradeInterval); + } + } + break; + } + + case MessageType.ACQUIRE_LOCK_RESPONSE: { + const requestId = message.requestId; + const request = this._requests.get(requestId); + if (!request) { + return; + } + this._requests.delete(requestId); + clearTimeout(request.timer); + if (message.data.success) { + request.onSuccess(message.senderId); + } else { + request.onError(); + } + break; + } + + case MessageType.DRAIN: { + const transport = this._remoteTransports.get(message.data.sid); + if (!transport) { + return; + } + if (transport.name === "polling") { + // HTTP long-polling can only be drained once + this._remoteTransports.delete(message.data.sid); + } + transport.send(message.data.packets); + break; + } + + case MessageType.PACKET: { + const client = this.clients[message.data.sid]; + if (!client) { + return; + } + if (client[kDelayed]) { + client[kBuffer].push(message.data.packet); + } else { + // @ts-expect-error onPacket() is private + client.onPacket(message.data.packet); + } + break; + } + + case MessageType.UPGRADE: { + const sid = message.data.sid; + const client = this.clients[sid]; + if (!client) { + return; + } + + clearInterval(client[kNoopTimer]); + client.upgrading = false; + + if (message.data.success) { + client.upgraded = true; + client.emit("upgrade"); + + if (client[kDelayed]) { + client[kDelayed] = false; + clearTimeout(client[kDelayedTimer]); + client.close(true); + delete this.clients[sid]; + + this.publishMessage({ + requestId: message.requestId, + senderId: this._nodeId, + recipientId: message.senderId, + type: MessageType.UPGRADE_RESPONSE, + data: { + takeOver: true, + packets: client[kBuffer], + }, + }); + } else { + this._forwardFlushWhenWebSocket(client, sid, message.senderId); + + this.publishMessage({ + requestId: message.requestId, + senderId: this._nodeId, + recipientId: message.senderId, + type: MessageType.UPGRADE_RESPONSE, + data: { + takeOver: false, + packets: [], + }, + }); + } + } + break; + } + + case MessageType.UPGRADE_RESPONSE: { + const requestId = message.requestId; + const request = this._requests.get(requestId); + if (!request) { + return; + } + this._requests.delete(requestId); + clearTimeout(request.timer); + request.onSuccess(message.data.takeOver, message.data.packets); + break; + } + + case MessageType.CLOSE: { + const client = this.clients[message.data.sid]; + if (!client) { + return; + } + this._doConnect(client); + // @ts-expect-error onClose() is private + client.onClose(message.data.reason); + break; + } + } + } + + private _forwardFlushWhenPolling( + client: Socket, + sid: SessionId, + senderId: NodeId + ) { + // @ts-expect-error req is private + client.transport.req = true; + client.transport.writable = true; + const oldSend = client.transport.send; + + client.transport.send = (packets) => { + this.publishMessage({ + senderId: this._nodeId, + recipientId: senderId, + type: MessageType.DRAIN, + data: { + sid, + packets, + }, + }); + // @ts-expect-error req is private + client.transport.req = null; + client.transport.writable = false; + client.transport.send = oldSend; + }; + + // @ts-expect-error flush() is private + client.flush(); + } + + private _forwardFlushWhenWebSocket( + client: Socket, + sid: SessionId, + senderId: NodeId + ) { + client.transport.writable = true; + client.transport.send = (packets) => { + this.publishMessage({ + senderId: this._nodeId, + recipientId: senderId, + type: MessageType.DRAIN, + data: { + sid, + packets, + }, + }); + }; + + // @ts-expect-error flush() is private + client.flush(); + } + + override verify( + req: IncomingMessage & { _query: Record }, + upgrade: boolean, + fn: (errorCode?: number, context?: any) => void + ): void { + super.verify(req, upgrade, (errorCode: number, errorContext: any) => { + if (errorCode !== Server.errors.UNKNOWN_SID) { + return fn(errorCode, errorContext); + } + + const sid = req._query.sid as SessionId; + if (!isValidSessionId(sid)) { + return fn(errorCode, errorContext); + } + + const transportName = req._query.transport; + const lockType = req.method === "GET" ? "read" : "write"; + + const onSuccess = async (senderId: NodeId) => { + if (upgrade) { + req[kSenderId] = senderId; + fn(); + } else { + const transport = this.createTransport(transportName, req); + this._hookTransport(sid, transport, lockType, senderId); + transport.onRequest(req); + } + }; + + this._acquireLock(sid, transportName, lockType, onSuccess, () => + fn(errorCode, errorContext) + ); + }); + } + + private _acquireLock( + sid: SessionId, + transportName: string, + lockType: "read" | "write", + onSuccess: (senderId: NodeId) => void, + onError: () => void + ) { + const requestId = ++this._requestCount as RequestId; + + const timer = setTimeout(() => { + this._requests.delete(requestId); + onError(); + }, this._opts.responseTimeout); + + this._requests.set(requestId, { + timer, + onSuccess, + onError, + }); + + this.publishMessage({ + requestId, + senderId: this._nodeId, + type: MessageType.ACQUIRE_LOCK, + data: { + sid, + transportName, + type: lockType, + }, + }); + } + + private _hookTransport( + sid: SessionId, + transport: Transport, + lockType: "read" | "write", + senderId: NodeId + ) { + if (lockType === "read") { + this._remoteTransports.set(sid, transport); + } + + transport.on("packet", async (packet: Packet) => + this._onPacket(sid, senderId, packet) + ); + transport.once("error", () => + this._onClose(sid, senderId, "transport error") + ); + transport.once("close", () => + this._onClose(sid, senderId, "transport close") + ); + } + + private _tryUpgrade( + transport: Transport, + onSuccess: () => void, + onError: () => void + ) { + debug("starting upgrade process"); + + const upgradeTimeoutTimer = setTimeout(() => { + debug("client did not complete upgrade - closing transport"); + transport.close(); + transport.removeAllListeners(); + onError(); + }, this.opts.upgradeTimeout); + + transport.on("packet", (packet) => { + if (packet.type === "ping" && packet.data === "probe") { + debug("got probe ping packet, sending pong"); + transport.send([{ type: "pong", data: "probe" }]); + } else if (packet.type === "upgrade") { + clearTimeout(upgradeTimeoutTimer); + transport.removeAllListeners(); + onSuccess(); + } else { + transport.removeAllListeners(); + transport.close(); + onError(); + } + }); + + transport.on("error", () => { + transport.removeAllListeners(); + onError(); + }); + + transport.on("close", () => { + transport.removeAllListeners(); + onError(); + }); + } + + private _onPacket(sid: SessionId, senderId: NodeId, packet: Packet) { + this.publishMessage({ + senderId: this._nodeId, + recipientId: senderId, + type: MessageType.PACKET, + data: { + sid, + packet, + }, + }); + } + + private _onClose( + sid: SessionId, + senderId: NodeId, + reason: "transport error" | "transport close" + ) { + this.publishMessage({ + senderId: this._nodeId, + recipientId: senderId, + type: MessageType.CLOSE, + data: { + sid, + reason, + }, + }); + } + + override onWebSocket(req: any, socket: any, websocket: any) { + const sid = req._query.sid; + if (!sid || this.clients[sid]) { + // @ts-expect-error onWebSocket() is private + return super.onWebSocket(req, socket, websocket); + } + + websocket.on("error", () => {}); + req.websocket = websocket; + + const transport = this.createTransport(req._query.transport, req); + const senderId = req[kSenderId]; + + this._tryUpgrade( + transport, + () => this._onUpgradeSuccess(sid, transport, req, senderId), + () => { + debug("upgrade failure"); + } + ); + } + + private _onUpgradeSuccess( + sid: SessionId, + transport: Transport, + req: any, + senderId: NodeId + ) { + debug("upgrade success"); + this._hookTransport(sid, transport, "read", senderId); + + const requestId = ++this._requestCount as RequestId; + + const onSuccess = (takeOver: boolean, packets: Packet[]) => { + if (takeOver) { + this._remoteTransports.delete(sid); + + const send = transport.send; + transport.send = () => {}; + const socket = new Socket(sid, this, transport, req, 4); + transport.send = send; + + this.clients[sid] = socket; + this.clientsCount++; + + socket.once("close", () => { + delete this.clients[sid]; + this.clientsCount--; + }); + + super.emit("connection", socket); + socket.emit("upgrade"); + for (const packet of packets) { + // @ts-expect-error onPacket() is private + socket.onPacket(packet); + } + } + }; + + const onError = () => { + transport.close(); + }; + + const timer = setTimeout(() => { + this._requests.delete(requestId); + onError(); + }, this._opts.responseTimeout); + + this._requests.set(requestId, { + timer, + onSuccess, + onError, + }); + + this.publishMessage({ + requestId, + senderId: this._nodeId, + recipientId: senderId, + type: MessageType.UPGRADE, + data: { + sid, + success: true, + }, + }); + } + + override emit(ev: string, ...args: any[]): boolean { + if (ev !== "connection") { + return super.emit(ev, ...args); + } + + const socket = args[0] as Socket; + + if (socket.transport.name === "websocket") { + return super.emit(ev, ...args); + } + + debug("delaying connection"); + + socket[kDelayed] = true; + socket[kBuffer] = []; + + socket[kPacketListener] = (packet: Packet) => { + socket[kBuffer].push(packet); + }; + + socket.on("packet", socket[kPacketListener]); + + socket[kDelayedTimer] = setTimeout( + () => this._doConnect(socket), + this._opts.delayedConnectionTimeout + ); + } + + private _doConnect(socket: Socket) { + if (!socket[kDelayed] || socket.readyState !== "open") { + return; + } + debug( + "the client has not upgraded yet, so the connection process is completed here" + ); + socket[kDelayed] = false; + socket.off("packet", socket[kPacketListener]); + clearTimeout(socket[kDelayedTimer]); + + super.emit("connection", socket); + + socket[kBuffer].forEach((packet: Packet) => { + // @ts-expect-error onPacket() method is private + socket.onPacket(packet); + }); + delete socket[kBuffer]; + + if (socket.upgraded) { + socket.emit("upgrade"); + } + } + + abstract publishMessage(message: Message): void; +} diff --git a/packages/socket.io-clustered-engine/lib/index.ts b/packages/socket.io-clustered-engine/lib/index.ts new file mode 100644 index 0000000000..500b8082bd --- /dev/null +++ b/packages/socket.io-clustered-engine/lib/index.ts @@ -0,0 +1,2 @@ +export { setupPrimary, NodeClusterEngine } from "./cluster"; +export { setupPrimaryWithRedis, RedisEngine } from "./redis"; diff --git a/packages/socket.io-clustered-engine/lib/redis.ts b/packages/socket.io-clustered-engine/lib/redis.ts new file mode 100644 index 0000000000..2123c307ee --- /dev/null +++ b/packages/socket.io-clustered-engine/lib/redis.ts @@ -0,0 +1,200 @@ +import { ClusterEngine, type Message } from "./engine"; +import { encode, decode } from "@msgpack/msgpack"; +import { type ServerOptions } from "engine.io"; +import cluster from "node:cluster"; +import { randomUUID } from "node:crypto"; +import debugModule from "debug"; + +const debug = debugModule("engine:redis"); +const MESSAGE_SOURCE = "_eio"; +const kNodeId = Symbol("nodeId"); + +function ignoreError() {} + +interface PrimaryWithRedisOptions { + /** + * The prefix for the Redis Pub/Sub channels. + * + * @default "engine.io" + */ + channelPrefix?: string; +} + +function channelName(prefix: string, nodeId?: string) { + if (nodeId) { + return prefix + "#" + nodeId + "#"; + } else { + return prefix + "#"; + } +} + +export function setupPrimaryWithRedis( + pubClient: any, + subClient: any, + opts?: PrimaryWithRedisOptions +) { + const primaryId = randomUUID(); + const prefix = opts?.channelPrefix || "engine.io"; + const channels = [channelName(prefix), channelName(prefix, primaryId)]; + + debug("subscribing to redis channels: %s", channels); + SUBSCRIBE(subClient, channels, (buffer: Buffer) => { + let message: Message & { _source?: string; _primaryId?: string }; + try { + message = decode(buffer) as Message; + } catch (e) { + debug("ignore malformed buffer"); + return; + } + + if (message._source !== MESSAGE_SOURCE) { + debug("ignore message from unknown source"); + return; + } + + if (message._primaryId === primaryId) { + debug("ignore message from self"); + return; + } + + debug("received message: %j", message); + + // @ts-expect-error recipientId is not defined for all messages + const recipientId = (message as Message).recipientId; + if (recipientId) { + for (const worker of Object.values(cluster.workers)) { + if (worker[kNodeId] === recipientId) { + debug("forward message to worker %d", worker.id); + worker.send(message, null, ignoreError); + return; + } + } + } + + debug("forward message to all workers"); + for (const worker of Object.values(cluster.workers)) { + worker.send(message, null, ignoreError); + } + }); + + cluster.on( + "message", + ( + sourceWorker, + message: Message & { _source?: string; _primaryId?: string } + ) => { + if (message._source !== MESSAGE_SOURCE) { + debug("ignore message from unknown source"); + return; + } + + if (!sourceWorker[kNodeId]) { + sourceWorker[kNodeId] = (message as Message).senderId; + } + + // @ts-expect-error recipientId is not defined for all messages + let recipientId = message.recipientId; + if (recipientId) { + for (const worker of Object.values(cluster.workers)) { + if (worker[kNodeId] === recipientId) { + debug("forward message to worker %d", worker.id); + worker.send(message, null, ignoreError); + return; + } + } + } + + debug("forward message to all other workers"); + for (const worker of Object.values(cluster.workers)) { + if (worker.id !== sourceWorker.id) { + worker.send(message, null, ignoreError); + } + } + + // @ts-expect-error recipientId is not defined for all messages + const channel = channelName(prefix, message.recipientId); + message._primaryId = primaryId; + + debug("publish message to channel %s", channel); + pubClient.publish(channel, encode(message)); + } + ); +} + +interface RedisEngineOptions extends ServerOptions { + /** + * The prefix for the Redis Pub/Sub channels. + * + * @default "engine.io" + */ + channelPrefix?: string; +} + +export class RedisEngine extends ClusterEngine { + private readonly _pubClient: any; + private readonly _channelPrefix: string; + + constructor(pubClient: any, subClient: any, opts?: RedisEngineOptions) { + super(opts); + this._pubClient = pubClient; + this._channelPrefix = opts?.channelPrefix || "engine.io"; + const channels = [ + channelName(this._channelPrefix), + channelName(this._channelPrefix, this._nodeId), + ]; + + debug("subscribing to redis channels: %s", channels); + SUBSCRIBE(subClient, channels, (buffer: Buffer) => { + let message: Message & { _source?: string; _primaryId?: string }; + try { + message = decode(buffer) as Message; + } catch (e) { + debug("ignore malformed buffer"); + return; + } + + if (message._source !== MESSAGE_SOURCE) { + debug("ignore message from unknown source"); + return; + } + + debug("received message: %j", message); + this.onMessage(message); + }); + } + + publishMessage(message: Message & { _source?: string }): void { + // @ts-expect-error recipientId is not defined for all messages + const channel = channelName(this._channelPrefix, message.recipientId); + + message._source = MESSAGE_SOURCE; + + debug("publish message to channel %s", channel); + this._pubClient.publish(channel, Buffer.from(encode(message))); + } +} + +const RETURN_BUFFERS = true; + +function SUBSCRIBE( + redisClient: any, + channels: string[], + listener: (message: Buffer) => void +) { + if (isRedisClient(redisClient)) { + redisClient.subscribe(channels, listener, RETURN_BUFFERS); + } else { + redisClient.subscribe(channels); + redisClient.on("messageBuffer", (_channel: Buffer, message: Buffer) => + listener(message) + ); + } +} + +/** + * Whether the redis client comes from the 'redis' or the 'ioredis' package + * @param redisClient + */ +function isRedisClient(redisClient: any) { + return typeof redisClient.sSubscribe === "function"; +} diff --git a/packages/socket.io-clustered-engine/package.json b/packages/socket.io-clustered-engine/package.json new file mode 100644 index 0000000000..cdb2a3085f --- /dev/null +++ b/packages/socket.io-clustered-engine/package.json @@ -0,0 +1,41 @@ +{ + "name": "@socket.io/clustered-engine", + "version": "0.0.1", + "description": "A cluster-friendly engine to share load between multiple Node.js processes (without sticky sessions)", + "type": "commonjs", + "license": "MIT", + "homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io-clustered-engine#readme", + "repository": { + "type": "git", + "url": "git+https://github.com/socketio/socket.io.git" + }, + "bugs": { + "url": "https://github.com/socketio/socket.io/issues" + }, + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "files": [ + "dist/" + ], + "dependencies": { + "@msgpack/msgpack": "~2.8.0", + "debug": "~4.3.3", + "engine.io": "~6.6.0", + "engine.io-parser": "~5.2.3" + }, + "scripts": { + "compile": "rimraf ./dist && tsc", + "test": "npm run format:check && npm run compile && npm run test:unit", + "test:unit": "mocha --require ts-node/register test/*.ts", + "format:check": "prettier --check \"lib/**/*.ts\" \"test/**/*.ts\"", + "format:fix": "prettier --write \"lib/**/*.ts\" \"test/**/*.ts\"", + "prepack": "npm run compile" + }, + "engines": { + "node": ">=10.2.0" + }, + "keywords": [ + "socket.io", + "cluster" + ] +} diff --git a/packages/socket.io-clustered-engine/test/cluster.ts b/packages/socket.io-clustered-engine/test/cluster.ts new file mode 100644 index 0000000000..87f2385c2f --- /dev/null +++ b/packages/socket.io-clustered-engine/test/cluster.ts @@ -0,0 +1,83 @@ +import cluster from "node:cluster"; +import expect = require("expect.js"); +import { handshake, url } from "./util"; +import { setupPrimary } from "../lib"; + +cluster.setupPrimary({ + exec: "./test/worker.js", + // @ts-expect-error + serialization: "advanced", // needed for packets with Buffer objects +}); + +setupPrimary(); + +describe("cluster", () => { + beforeEach((done) => { + for (let i = 0; i < 3; i++) { + const worker = cluster.fork(); + + if (i === 2) { + worker.on("listening", () => done()); + } + } + }); + + afterEach((done) => { + for (const worker of Object.values(cluster.workers)) { + worker.kill(); + } + function onExit() { + if (Object.keys(cluster.workers).length === 0) { + cluster.off("exit", onExit); + done(); + } + } + cluster.on("exit", onExit); + }); + + it("should ping/pong", (done) => { + (async () => { + const sid = await handshake(3000); + + for (let i = 0; i < 10; i++) { + const pollRes = await fetch(url(3000, sid)); + expect(pollRes.status).to.eql(200); + const body = await pollRes.text(); + expect(body).to.eql("2"); + + const dataRes = await fetch(url(3000, sid), { + method: "POST", + body: "3", + }); + expect(dataRes.status).to.eql(200); + } + + done(); + })(); + }); + + it("should send and receive binary", (done) => { + (async () => { + const sid = await handshake(3000); + + const dataRes = await fetch(url(3000, sid), { + method: "POST", + body: "bAQIDBA==", // buffer <01 02 03 04> encoded as base64 + }); + expect(dataRes.status).to.eql(200); + + for (let i = 0; i < 100; i++) { + const pollRes = await fetch(url(3000, sid)); + expect(pollRes.status).to.eql(200); + const body = await pollRes.text(); + + if (body === "bAQIDBA==") { + done(); + break; + } else { + // ping packet + } + } + })(); + }); +}); diff --git a/packages/socket.io-clustered-engine/test/in-memory.ts b/packages/socket.io-clustered-engine/test/in-memory.ts new file mode 100644 index 0000000000..fe3a0ff95e --- /dev/null +++ b/packages/socket.io-clustered-engine/test/in-memory.ts @@ -0,0 +1,363 @@ +import { EventEmitter } from "node:events"; +import { createServer, Server } from "node:http"; +import expect = require("expect.js"); +import { WebSocket } from "ws"; +import { ClusterEngine, type Message } from "../lib/engine"; +import { type ServerOptions } from "engine.io"; +import { url, handshake } from "./util"; + +class InMemoryEngine extends ClusterEngine { + constructor(readonly eventBus: EventEmitter, opts?: ServerOptions) { + super(opts); + eventBus.on("message", (message) => this.onMessage(message)); + } + + publishMessage(message: Message) { + this.eventBus.emit("message", message); + } +} + +describe("in-memory", () => { + let engine1: ClusterEngine, + httpServer1: Server, + engine2: ClusterEngine, + httpServer2: Server, + engine3: ClusterEngine, + httpServer3: Server; + + beforeEach(() => { + const eventBus = new EventEmitter(); + + httpServer1 = createServer(); + engine1 = new InMemoryEngine(eventBus); + engine1.attach(httpServer1); + httpServer1.listen(3000); + + httpServer2 = createServer(); + engine2 = new InMemoryEngine(eventBus); + engine2.attach(httpServer2); + httpServer2.listen(3001); + + httpServer3 = createServer(); + engine3 = new InMemoryEngine(eventBus, { + pingInterval: 50, + }); + engine3.attach(httpServer3); + httpServer3.listen(3002); + }); + + afterEach(() => { + engine1.close(); + engine2.close(); + engine3.close(); + httpServer1.close(); + httpServer1.closeAllConnections(); + httpServer2.close(); + httpServer2.closeAllConnections(); + httpServer3.close(); + httpServer3.closeAllConnections(); + }); + + it("should work (read)", (done) => { + engine1.on("connection", (socket) => { + socket.send("hello"); + }); + + (async () => { + const sid = await handshake(3000); + + const res = await fetch(url(3001, sid)); + expect(res.status).to.eql(200); + + const body = await res.text(); + expect(body).to.eql("4hello"); + + done(); + })(); + }); + + it("should work (read - deferred)", (done) => { + engine1.on("connection", (socket) => { + setTimeout(() => { + socket.send("hello"); + }, 200); + }); + + (async () => { + const sid = await handshake(3000); + + const res = await fetch(url(3001, sid)); + expect(res.status).to.eql(200); + + const body = await res.text(); + expect(body).to.eql("4hello"); + + done(); + })(); + }); + + it("should work (write)", (done) => { + engine1.on("connection", (socket) => { + socket.on("message", (data) => { + expect(data).to.eql("hello"); + done(); + }); + }); + + (async () => { + const sid = await handshake(3000); + + const res = await fetch(url(3001, sid), { + method: "POST", + body: "4hello", + }); + expect(res.status).to.eql(200); + })(); + }); + + it("should work (write - multiple)", (done) => { + engine1.on("connection", (socket) => { + let packets = []; + + socket.on("message", (data) => { + packets.push(data); + if (packets.length === 6) { + expect(packets).to.eql(["1", "2", "3", "4", "5", "6"]); + done(); + } + }); + }); + + (async () => { + const sid = await handshake(3000); + + const res1 = await fetch(url(3001, sid), { + method: "POST", + body: "41\x1e42\x1e43", + }); + expect(res1.status).to.eql(200); + + const res2 = await fetch(url(3000, sid), { + method: "POST", + body: "44\x1e45", + }); + expect(res2.status).to.eql(200); + + const res3 = await fetch(url(3001, sid), { + method: "POST", + body: "46", + }); + expect(res3.status).to.eql(200); + })(); + }); + + it("should acquire read lock (different process)", (done) => { + (async () => { + const sid = await handshake(3000); + + const controller = new AbortController(); + fetch(url(3000, sid), { + signal: controller.signal, + }); + + const res = await fetch(url(3001, sid)); + expect(res.status).to.eql(400); + + controller.abort(); + done(); + })(); + }); + + it("should acquire read lock (same process)", (done) => { + (async () => { + const sid = await handshake(3000); + + const controller = new AbortController(); + fetch(url(3001, sid), { + signal: controller.signal, + }); + + const res = await fetch(url(3000, sid)); + expect(res.status).to.eql(400); + + controller.abort(); + done(); + })(); + }); + + it("should handle close from main process", (done) => { + engine1.on("connection", (socket) => { + setTimeout(() => { + socket.close(); + }, 100); + }); + + (async () => { + const sid = await handshake(3000); + + const res = await fetch(url(3001, sid)); + expect(res.status).to.eql(200); + + const body = await res.text(); + expect(body).to.eql("1"); + + done(); + })(); + }); + + it("should handle close from client", (done) => { + engine1.on("connection", (socket) => { + socket.on("close", (reason) => { + expect(reason).to.eql("transport error"); + done(); + }); + }); + + (async () => { + const sid = await handshake(3000); + + const controller = new AbortController(); + fetch(url(3001, sid), { + signal: controller.signal, + }); + + setTimeout(() => { + controller.abort(); + }, 100); + })(); + }); + + it("should ping/pong", function (done) { + (async () => { + const sid = await handshake(3002); + + for (let i = 0; i < 10; i++) { + const port1 = [3000, 3001, 3002][i % 3]; + const res1 = await fetch(url(port1, sid)); + expect(res1.status).to.eql(200); + const body1 = await res1.text(); + expect(body1).to.eql("2"); + + const port2 = [3000, 3001, 3002][(i + 1) % 3]; + const res2 = await fetch(url(port2, sid), { + method: "POST", + body: "3", + }); + expect(res2.status).to.eql(200); + } + + // @ts-expect-error + expect(engine1._requests.size).to.eql(0); + // @ts-expect-error + expect(engine2._requests.size).to.eql(0); + // @ts-expect-error + expect(engine3._requests.size).to.eql(0); + // @ts-expect-error + expect(engine1._remoteTransports.size).to.eql(0); + // @ts-expect-error + expect(engine2._remoteTransports.size).to.eql(0); + // @ts-expect-error + expect(engine3._remoteTransports.size).to.eql(0); + + done(); + })(); + }); + + it("should reject an invalid id", (done) => { + (async () => { + const res = await fetch(url(3001, "01234567890123456789")); + expect(res.status).to.eql(400); + + done(); + })(); + }); + + it("should upgrade", (done) => { + engine2.on("connection", (socket) => { + socket.on("upgrade", () => { + socket.send("hello"); + }); + + socket.on("message", (val) => { + expect(val).to.eql("hi"); + + socket.close(); + done(); + }); + }); + + (async () => { + const sid = await handshake(3000); + + const socket = new WebSocket( + `ws://localhost:3001/engine.io/?EIO=4&transport=websocket&sid=${sid}` + ); + + socket.onopen = () => { + socket.send("2probe"); + }; + + let i = 0; + + socket.onmessage = ({ data }) => { + switch (i++) { + case 0: + expect(data).to.eql("3probe"); + socket.send("5"); + break; + case 1: + expect(data).to.eql("4hello"); + socket.send("4hi"); + break; + } + }; + })(); + }); + + it("should upgrade and send buffered messages", (done) => { + engine2.on("connection", (socket) => { + socket.on("upgrade", () => { + socket.send("hello"); + }); + + socket.on("message", (val) => { + expect(val).to.eql("hi"); + + socket.close(); + done(); + }); + }); + + (async () => { + const sid = await handshake(3000); + + const res = await fetch(url(3001, sid), { + method: "POST", + body: "4hi", + }); + expect(res.status).to.eql(200); + + const socket = new WebSocket( + `ws://localhost:3001/engine.io/?EIO=4&transport=websocket&sid=${sid}` + ); + + socket.onopen = () => { + socket.send("2probe"); + }; + + let i = 0; + + socket.onmessage = ({ data }) => { + switch (i++) { + case 0: + expect(data).to.eql("3probe"); + socket.send("5"); + break; + case 1: + expect(data).to.eql("4hello"); + break; + } + }; + })(); + }); +}); diff --git a/packages/socket.io-clustered-engine/test/redis.ts b/packages/socket.io-clustered-engine/test/redis.ts new file mode 100644 index 0000000000..481b36fbd5 --- /dev/null +++ b/packages/socket.io-clustered-engine/test/redis.ts @@ -0,0 +1,225 @@ +import expect = require("expect.js"); +import { createServer } from "node:http"; +import { createClient } from "redis"; +import { handshake, url } from "./util"; +import { type ClusterEngine } from "../lib/engine"; +import { RedisEngine } from "../lib"; +import Redis from "ioredis"; + +describe("redis", () => { + let engine1: ClusterEngine, + engine2: ClusterEngine, + engine3: ClusterEngine, + cleanup: () => Promise; + + describe("redis package", () => { + beforeEach(async () => { + const pubClient = createClient(); + const subClient1 = pubClient.duplicate(); + const subClient2 = pubClient.duplicate(); + const subClient3 = pubClient.duplicate(); + + await Promise.all([ + pubClient.connect(), + subClient1.connect(), + subClient2.connect(), + subClient3.connect(), + ]); + + const httpServer1 = createServer(); + engine1 = new RedisEngine(pubClient, subClient1); + engine1.attach(httpServer1); + httpServer1.listen(3000); + + const httpServer2 = createServer(); + engine2 = new RedisEngine(pubClient, subClient2); + engine2.attach(httpServer2); + httpServer2.listen(3001); + + const httpServer3 = createServer(); + engine3 = new RedisEngine(pubClient, subClient3, { + pingInterval: 50, + }); + engine3.attach(httpServer3); + httpServer3.listen(3002); + + cleanup = () => { + engine1.close(); + engine2.close(); + engine3.close(); + httpServer1.close(); + httpServer1.closeAllConnections(); + httpServer2.close(); + httpServer2.closeAllConnections(); + httpServer3.close(); + httpServer3.closeAllConnections(); + + return Promise.all([ + pubClient.disconnect(), + subClient1.disconnect(), + subClient2.disconnect(), + subClient3.disconnect(), + ]).then(); + }; + }); + + afterEach(() => { + return cleanup(); + }); + + it("should ping/pong", (done) => { + (async () => { + const sid = await handshake(3002); + + for (let i = 0; i < 10; i++) { + const pollPort = [3000, 3001, 3002][i % 3]; + const pollRes = await fetch(url(pollPort, sid)); + expect(pollRes.status).to.eql(200); + const body = await pollRes.text(); + expect(body).to.eql("2"); + + const dataPort = [3000, 3001, 3002][(i + 1) % 3]; + const dataRes = await fetch(url(dataPort, sid), { + method: "POST", + body: "3", + }); + expect(dataRes.status).to.eql(200); + } + + done(); + })(); + }); + + it("should send and receive binary", (done) => { + engine1.on("connection", (socket) => { + socket.on("message", (val: any) => { + socket.send(val); + }); + }); + + (async () => { + const sid = await handshake(3000); + + const dataRes = await fetch(url(3001, sid), { + method: "POST", + body: "bAQIDBA==", // buffer <01 02 03 04> encoded as base64 + }); + expect(dataRes.status).to.eql(200); + + while (true) { + const pollRes = await fetch(url(3002, sid)); + expect(pollRes.status).to.eql(200); + const body = await pollRes.text(); + + if (body === "bAQIDBA==") { + done(); + break; + } else { + // ping packet + } + } + })(); + }); + }); + + describe("ioredis package", () => { + beforeEach(async () => { + const pubClient = new Redis(); + const subClient1 = pubClient.duplicate(); + const subClient2 = pubClient.duplicate(); + const subClient3 = pubClient.duplicate(); + + const httpServer1 = createServer(); + engine1 = new RedisEngine(pubClient, subClient1); + engine1.attach(httpServer1); + httpServer1.listen(3000); + + const httpServer2 = createServer(); + engine2 = new RedisEngine(pubClient, subClient2); + engine2.attach(httpServer2); + httpServer2.listen(3001); + + const httpServer3 = createServer(); + engine3 = new RedisEngine(pubClient, subClient3, { + pingInterval: 50, + }); + engine3.attach(httpServer3); + httpServer3.listen(3002); + + cleanup = async () => { + engine1.close(); + engine2.close(); + engine3.close(); + httpServer1.close(); + httpServer1.closeAllConnections(); + httpServer2.close(); + httpServer2.closeAllConnections(); + httpServer3.close(); + httpServer3.closeAllConnections(); + + pubClient.disconnect(); + subClient1.disconnect(); + subClient2.disconnect(); + subClient3.disconnect(); + }; + }); + + afterEach(() => { + return cleanup(); + }); + + it("should ping/pong", (done) => { + (async () => { + const sid = await handshake(3002); + + for (let i = 0; i < 10; i++) { + const pollPort = [3000, 3001, 3002][i % 3]; + const pollRes = await fetch(url(pollPort, sid)); + expect(pollRes.status).to.eql(200); + const body = await pollRes.text(); + expect(body).to.eql("2"); + + const dataPort = [3000, 3001, 3002][(i + 1) % 3]; + const dataRes = await fetch(url(dataPort, sid), { + method: "POST", + body: "3", + }); + expect(dataRes.status).to.eql(200); + } + + done(); + })(); + }); + + it("should send and receive binary", (done) => { + engine1.on("connection", (socket) => { + socket.on("message", (val: any) => { + socket.send(val); + }); + }); + + (async () => { + const sid = await handshake(3000); + + const dataRes = await fetch(url(3001, sid), { + method: "POST", + body: "bAQIDBA==", // buffer <01 02 03 04> encoded as base64 + }); + expect(dataRes.status).to.eql(200); + + while (true) { + const pollRes = await fetch(url(3002, sid)); + expect(pollRes.status).to.eql(200); + const body = await pollRes.text(); + + if (body === "bAQIDBA==") { + done(); + break; + } else { + // ping packet + } + } + })(); + }); + }); +}); diff --git a/packages/socket.io-clustered-engine/test/util.ts b/packages/socket.io-clustered-engine/test/util.ts new file mode 100644 index 0000000000..de72094276 --- /dev/null +++ b/packages/socket.io-clustered-engine/test/util.ts @@ -0,0 +1,17 @@ +import expect = require("expect.js"); + +export function url(port: number, sid?: string) { + let url = `http://localhost:${port}/engine.io/?EIO=4&transport=polling`; + if (sid) { + url += `&sid=${sid}`; + } + return url; +} + +export async function handshake(port: number) { + const res = await fetch(url(port)); + expect(res.status).to.eql(200); + + const body1 = await res.text(); + return JSON.parse(body1.substring(1)).sid; +} diff --git a/packages/socket.io-clustered-engine/test/worker.js b/packages/socket.io-clustered-engine/test/worker.js new file mode 100644 index 0000000000..24e428947c --- /dev/null +++ b/packages/socket.io-clustered-engine/test/worker.js @@ -0,0 +1,16 @@ +const { createServer } = require("node:http"); +const { NodeClusterEngine } = require("../dist/cluster"); + +const httpServer = createServer(); +const engine = new NodeClusterEngine({ + pingInterval: 50 +}); + +engine.on("connection", socket => { + socket.on("message", (val) => { + socket.send(val); + }); +}); + +engine.attach(httpServer); +httpServer.listen(3000); diff --git a/packages/socket.io-clustered-engine/tsconfig.json b/packages/socket.io-clustered-engine/tsconfig.json new file mode 100644 index 0000000000..ea21ecbbe1 --- /dev/null +++ b/packages/socket.io-clustered-engine/tsconfig.json @@ -0,0 +1,12 @@ +{ + "compilerOptions": { + "outDir": "./dist", + "module": "node16", + "target": "ES2022", + "declaration": true, + "strict": false + }, + "include": [ + "./lib/**/*" + ] +} From 7521ac227bf456ece7b395289e7e7ef691c50fd6 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 17 Jul 2024 09:53:34 +0200 Subject: [PATCH 02/12] chore(release): @socket.io/cluster-engine@0.1.0 --- CHANGELOG.md | 1 + CONTRIBUTING.md | 1 + package-lock.json | 20 ++++++++++++++++--- package.json | 2 +- .../socket.io-cluster-engine/CHANGELOG.md | 11 ++++++++++ .../LICENSE | 0 .../README.md | 17 ++++++++++++---- .../compose.yaml | 0 .../lib/cluster.ts | 0 .../lib/engine.ts | 0 .../lib/index.ts | 0 .../lib/redis.ts | 0 .../package.json | 4 ++-- .../test/cluster.ts | 0 .../test/in-memory.ts | 0 .../test/redis.ts | 0 .../test/util.ts | 0 .../test/worker.js | 0 .../tsconfig.json | 0 19 files changed, 46 insertions(+), 10 deletions(-) create mode 100644 packages/socket.io-cluster-engine/CHANGELOG.md rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/LICENSE (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/README.md (94%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/compose.yaml (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/lib/cluster.ts (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/lib/engine.ts (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/lib/index.ts (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/lib/redis.ts (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/package.json (94%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/test/cluster.ts (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/test/in-memory.ts (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/test/redis.ts (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/test/util.ts (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/test/worker.js (100%) rename packages/{socket.io-clustered-engine => socket.io-cluster-engine}/tsconfig.json (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index b7f44eebc3..5ee16880db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,5 +10,6 @@ Here are the detailed changelogs for each package in this monorepo: | `socket.io` | [link](packages/socket.io/CHANGELOG.md) | | `socket.io-adapter` | [link](packages/socket.io-adapter/CHANGELOG.md) | | `socket.io-client` | [link](packages/socket.io-client/CHANGELOG.md) | +| `@socket.io/cluster-engine` | [link](packages/socket.io-cluster-engine/CHANGELOG.md) | | `@socket.io/component-emitter` | [link](packages/socket.io-component-emitter/History.md) | | `socket.io-parser` | [link](packages/socket.io-parser/CHANGELOG.md) | diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 066791c6d7..50e04ec0c1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -78,6 +78,7 @@ This repository is a [monorepo](https://en.wikipedia.org/wiki/Monorepo) which co | `socket.io` | The server-side implementation of the bidirectional channel, built on top on the `engine.io` package. | | `socket.io-adapter` | An extensible component responsible for broadcasting a packet to all connected clients, used by the `socket.io` package. | | `socket.io-client` | The client-side implementation of the bidirectional channel, built on top on the `engine.io-client` package. | +| `@socket.io/cluster-engine` | A cluster-friendly engine to share load between multiple Node.js processes (without sticky sessions) | | `@socket.io/component-emitter` | An `EventEmitter` implementation, similar to the one provided by [Node.js](https://nodejs.org/api/events.html) but for all platforms. | | `socket.io-parser` | The parser responsible for encoding and decoding Socket.IO packets, used by both the `socket.io` and `socket.io-client` packages. | diff --git a/package-lock.json b/package-lock.json index c3efa6ae62..39723d41f8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,7 +8,7 @@ "packages/socket.io-component-emitter", "packages/engine.io-parser", "packages/engine.io", - "packages/socket.io-clustered-engine", + "packages/socket.io-cluster-engine", "packages/engine.io-client", "packages/socket.io-adapter", "packages/socket.io-parser", @@ -2722,8 +2722,8 @@ "@sinonjs/commons": "^3.0.0" } }, - "node_modules/@socket.io/clustered-engine": { - "resolved": "packages/socket.io-clustered-engine", + "node_modules/@socket.io/cluster-engine": { + "resolved": "packages/socket.io-cluster-engine", "link": true }, "node_modules/@socket.io/component-emitter": { @@ -15303,9 +15303,23 @@ "xmlhttprequest-ssl": "~2.0.0" } }, + "packages/socket.io-cluster-engine": { + "version": "0.1.0", + "license": "MIT", + "dependencies": { + "@msgpack/msgpack": "~2.8.0", + "debug": "~4.3.3", + "engine.io": "~6.6.0", + "engine.io-parser": "~5.2.3" + }, + "engines": { + "node": ">=10.2.0" + } + }, "packages/socket.io-clustered-engine": { "name": "@socket.io/clustered-engine", "version": "0.0.1", + "extraneous": true, "license": "MIT", "dependencies": { "@msgpack/msgpack": "~2.8.0", diff --git a/package.json b/package.json index 6ebf746e26..615d8bb067 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "packages/socket.io-component-emitter", "packages/engine.io-parser", "packages/engine.io", - "packages/socket.io-clustered-engine", + "packages/socket.io-cluster-engine", "packages/engine.io-client", "packages/socket.io-adapter", "packages/socket.io-parser", diff --git a/packages/socket.io-cluster-engine/CHANGELOG.md b/packages/socket.io-cluster-engine/CHANGELOG.md new file mode 100644 index 0000000000..c7e6702616 --- /dev/null +++ b/packages/socket.io-cluster-engine/CHANGELOG.md @@ -0,0 +1,11 @@ +# History + +| Version | Release date | +|--------------------------|--------------| +| [0.1.0](#010-2024-07-17) | July 2024 | + +# Release notes + +## `0.1.0` (2024-07-17) + +Initial release! diff --git a/packages/socket.io-clustered-engine/LICENSE b/packages/socket.io-cluster-engine/LICENSE similarity index 100% rename from packages/socket.io-clustered-engine/LICENSE rename to packages/socket.io-cluster-engine/LICENSE diff --git a/packages/socket.io-clustered-engine/README.md b/packages/socket.io-cluster-engine/README.md similarity index 94% rename from packages/socket.io-clustered-engine/README.md rename to packages/socket.io-cluster-engine/README.md index 8527752a11..4fe3bdbfdf 100644 --- a/packages/socket.io-clustered-engine/README.md +++ b/packages/socket.io-cluster-engine/README.md @@ -1,10 +1,11 @@ -# Socket.IO clustered engine +# Socket.IO cluster engine A cluster-friendly engine to share load between multiple Node.js processes (without sticky sessions). **Table of contents** + * [Installation](#installation) * [Usage](#usage) * [Node.js cluster](#nodejs-cluster) * [Redis](#redis) @@ -14,6 +15,14 @@ A cluster-friendly engine to share load between multiple Node.js processes (with * [License](#license) +## Installation + +``` +npm i @socket.io/cluster-engine +``` + +NPM: https://npmjs.com/package/@socket.io/cluster-engine + ## Usage ### Node.js cluster @@ -22,7 +31,7 @@ A cluster-friendly engine to share load between multiple Node.js processes (with import cluster from "node:cluster"; import process from "node:process"; import { availableParallelism } from "node:os"; -import { setupPrimary, NodeClusterEngine } from "@socket.io/clustered-engine"; +import { setupPrimary, NodeClusterEngine } from "@socket.io/cluster-engine"; import { createServer } from "node:http"; import { Server } from "socket.io"; @@ -74,7 +83,7 @@ if (cluster.isPrimary) { ```js import { createServer } from "node:http"; import { createClient } from "redis"; -import { RedisEngine } from "@socket.io/clustered-engine"; +import { RedisEngine } from "@socket.io/cluster-engine"; import { Server } from "socket.io"; const httpServer = createServer((req, res) => { @@ -109,7 +118,7 @@ import cluster from "node:cluster"; import process from "node:process"; import { availableParallelism } from "node:os"; import { createClient } from "redis"; -import { setupPrimaryWithRedis, NodeClusterEngine } from "@socket.io/clustered-engine"; +import { setupPrimaryWithRedis, NodeClusterEngine } from "@socket.io/cluster-engine"; import { createServer } from "node:http"; import { Server } from "socket.io"; diff --git a/packages/socket.io-clustered-engine/compose.yaml b/packages/socket.io-cluster-engine/compose.yaml similarity index 100% rename from packages/socket.io-clustered-engine/compose.yaml rename to packages/socket.io-cluster-engine/compose.yaml diff --git a/packages/socket.io-clustered-engine/lib/cluster.ts b/packages/socket.io-cluster-engine/lib/cluster.ts similarity index 100% rename from packages/socket.io-clustered-engine/lib/cluster.ts rename to packages/socket.io-cluster-engine/lib/cluster.ts diff --git a/packages/socket.io-clustered-engine/lib/engine.ts b/packages/socket.io-cluster-engine/lib/engine.ts similarity index 100% rename from packages/socket.io-clustered-engine/lib/engine.ts rename to packages/socket.io-cluster-engine/lib/engine.ts diff --git a/packages/socket.io-clustered-engine/lib/index.ts b/packages/socket.io-cluster-engine/lib/index.ts similarity index 100% rename from packages/socket.io-clustered-engine/lib/index.ts rename to packages/socket.io-cluster-engine/lib/index.ts diff --git a/packages/socket.io-clustered-engine/lib/redis.ts b/packages/socket.io-cluster-engine/lib/redis.ts similarity index 100% rename from packages/socket.io-clustered-engine/lib/redis.ts rename to packages/socket.io-cluster-engine/lib/redis.ts diff --git a/packages/socket.io-clustered-engine/package.json b/packages/socket.io-cluster-engine/package.json similarity index 94% rename from packages/socket.io-clustered-engine/package.json rename to packages/socket.io-cluster-engine/package.json index cdb2a3085f..2be3a0d75d 100644 --- a/packages/socket.io-clustered-engine/package.json +++ b/packages/socket.io-cluster-engine/package.json @@ -1,6 +1,6 @@ { - "name": "@socket.io/clustered-engine", - "version": "0.0.1", + "name": "@socket.io/cluster-engine", + "version": "0.1.0", "description": "A cluster-friendly engine to share load between multiple Node.js processes (without sticky sessions)", "type": "commonjs", "license": "MIT", diff --git a/packages/socket.io-clustered-engine/test/cluster.ts b/packages/socket.io-cluster-engine/test/cluster.ts similarity index 100% rename from packages/socket.io-clustered-engine/test/cluster.ts rename to packages/socket.io-cluster-engine/test/cluster.ts diff --git a/packages/socket.io-clustered-engine/test/in-memory.ts b/packages/socket.io-cluster-engine/test/in-memory.ts similarity index 100% rename from packages/socket.io-clustered-engine/test/in-memory.ts rename to packages/socket.io-cluster-engine/test/in-memory.ts diff --git a/packages/socket.io-clustered-engine/test/redis.ts b/packages/socket.io-cluster-engine/test/redis.ts similarity index 100% rename from packages/socket.io-clustered-engine/test/redis.ts rename to packages/socket.io-cluster-engine/test/redis.ts diff --git a/packages/socket.io-clustered-engine/test/util.ts b/packages/socket.io-cluster-engine/test/util.ts similarity index 100% rename from packages/socket.io-clustered-engine/test/util.ts rename to packages/socket.io-cluster-engine/test/util.ts diff --git a/packages/socket.io-clustered-engine/test/worker.js b/packages/socket.io-cluster-engine/test/worker.js similarity index 100% rename from packages/socket.io-clustered-engine/test/worker.js rename to packages/socket.io-cluster-engine/test/worker.js diff --git a/packages/socket.io-clustered-engine/tsconfig.json b/packages/socket.io-cluster-engine/tsconfig.json similarity index 100% rename from packages/socket.io-clustered-engine/tsconfig.json rename to packages/socket.io-cluster-engine/tsconfig.json From be1e4cb24d115f0c0b09c41a2901a0985c9a2a91 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 17 Jul 2024 10:26:56 +0200 Subject: [PATCH 03/12] ci(publish): update the tag regex In order to also match "@socket.io/some-package@1.2.3". Reference: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#filter-pattern-cheat-sheet [skip ci] --- .github/workflows/publish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 1d8f9ea835..ca10175d4d 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -6,7 +6,7 @@ on: push: tags: # expected format: @ (example: socket.io@1.2.3) - - '*@*' + - '**@*' jobs: publish: From b9b16132c2042494f946413ecc687edfa10e40cb Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 17 Jul 2024 10:34:57 +0200 Subject: [PATCH 04/12] chore(socket.io-adapter): remove dist before compilation --- packages/socket.io-adapter/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/socket.io-adapter/package.json b/packages/socket.io-adapter/package.json index 6f243cf499..5019cbdf3e 100644 --- a/packages/socket.io-adapter/package.json +++ b/packages/socket.io-adapter/package.json @@ -21,7 +21,7 @@ "ws": "~8.17.1" }, "scripts": { - "compile": "tsc", + "compile": "rimraf ./dist && tsc", "test": "npm run format:check && npm run compile && nyc mocha --require ts-node/register test/*.ts", "format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'", "format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'", From 0af50758f6fa461a153603fd82c8616773ff149b Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Thu, 18 Jul 2024 10:03:57 +0200 Subject: [PATCH 05/12] chore(cluster-engine): update homepage URL --- packages/socket.io-cluster-engine/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/socket.io-cluster-engine/package.json b/packages/socket.io-cluster-engine/package.json index 2be3a0d75d..42261b7d08 100644 --- a/packages/socket.io-cluster-engine/package.json +++ b/packages/socket.io-cluster-engine/package.json @@ -4,7 +4,7 @@ "description": "A cluster-friendly engine to share load between multiple Node.js processes (without sticky sessions)", "type": "commonjs", "license": "MIT", - "homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io-clustered-engine#readme", + "homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io-cluster-engine#readme", "repository": { "type": "git", "url": "git+https://github.com/socketio/socket.io.git" From 1f09a3e97950663cd7c62a34c74af60dad8f0346 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Thu, 18 Jul 2024 10:05:26 +0200 Subject: [PATCH 06/12] ci(publish): compile all packages before publishing As some packages depend on the types of others. [skip ci] --- .github/workflows/publish.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index ca10175d4d..052d308a72 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -28,6 +28,9 @@ jobs: - name: Install dependencies run: npm ci + - name: Compile each package + run: npm run compile --workspaces --if-present + - name: Publish package run: npm publish --workspace=${GITHUB_REF_NAME%@*} --provenance --access public env: From 8b0a40fd4a13be2d22c855501bf226695d3b2b37 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 19 Jul 2024 08:42:38 +0200 Subject: [PATCH 07/12] docs: add examples with @socket.io/cluster-engine [skip ci] --- .../cluster-engine-node-cluster/README.md | 19 ++++++ .../cluster-engine-node-cluster/client.js | 26 ++++++++ .../cluster-engine-node-cluster/package.json | 12 ++++ .../cluster-engine-node-cluster/server.js | 63 ++++++++++++++++++ examples/cluster-engine-redis/README.md | 22 +++++++ examples/cluster-engine-redis/client.js | 26 ++++++++ examples/cluster-engine-redis/compose.yaml | 5 ++ examples/cluster-engine-redis/package.json | 14 ++++ examples/cluster-engine-redis/server.js | 65 +++++++++++++++++++ 9 files changed, 252 insertions(+) create mode 100644 examples/cluster-engine-node-cluster/README.md create mode 100644 examples/cluster-engine-node-cluster/client.js create mode 100644 examples/cluster-engine-node-cluster/package.json create mode 100644 examples/cluster-engine-node-cluster/server.js create mode 100644 examples/cluster-engine-redis/README.md create mode 100644 examples/cluster-engine-redis/client.js create mode 100644 examples/cluster-engine-redis/compose.yaml create mode 100644 examples/cluster-engine-redis/package.json create mode 100644 examples/cluster-engine-redis/server.js diff --git a/examples/cluster-engine-node-cluster/README.md b/examples/cluster-engine-node-cluster/README.md new file mode 100644 index 0000000000..4b8c2ca4d3 --- /dev/null +++ b/examples/cluster-engine-node-cluster/README.md @@ -0,0 +1,19 @@ +# Example with `@socket.io/cluster-engine` and Node.js cluster + +## How to use + +```bash +# run the server +$ node server.js + +# run the client +$ node client.js +``` + +## Explanation + +The `server.js` script will create one Socket.IO server per core, each listening on the same port (`3000`). + +With the default engine (provided by the `engine.io` package), sticky sessions would be required, so that each HTTP request of the same Engine.IO session reaches the same worker. + +The `NodeClusterEngine` is a custom engine which takes care of the synchronization between the servers by using [the IPC channel](https://nodejs.org/api/cluster.html#workersendmessage-sendhandle-options-callback) and removes the need for sticky sessions when scaling horizontally. diff --git a/examples/cluster-engine-node-cluster/client.js b/examples/cluster-engine-node-cluster/client.js new file mode 100644 index 0000000000..dd4abb5e9c --- /dev/null +++ b/examples/cluster-engine-node-cluster/client.js @@ -0,0 +1,26 @@ +import { io } from "socket.io-client"; + +const CLIENTS_COUNT = 3; + +for (let i = 0; i < CLIENTS_COUNT; i++) { + const socket = io("ws://localhost:3000/", { + // transports: ["polling"], + // transports: ["websocket"], + }); + + socket.on("connect", () => { + console.log(`connected as ${socket.id}`); + }); + + socket.on("disconnect", (reason) => { + console.log(`disconnected due to ${reason}`); + }); + + socket.on("hello", (socketId, workerId) => { + console.log(`received "hello" from ${socketId} (worker: ${workerId})`); + }); + + setInterval(() => { + socket.emit("hello"); + }, 2000); +} diff --git a/examples/cluster-engine-node-cluster/package.json b/examples/cluster-engine-node-cluster/package.json new file mode 100644 index 0000000000..a97882b356 --- /dev/null +++ b/examples/cluster-engine-node-cluster/package.json @@ -0,0 +1,12 @@ +{ + "private": true, + "name": "cluster-engine-node-cluster", + "version": "0.0.1", + "type": "module", + "dependencies": { + "@socket.io/cluster-adapter": "^0.2.2", + "@socket.io/cluster-engine": "^0.1.0", + "socket.io": "^4.7.5", + "socket.io-client": "^4.7.5" + } +} diff --git a/examples/cluster-engine-node-cluster/server.js b/examples/cluster-engine-node-cluster/server.js new file mode 100644 index 0000000000..d010f576c2 --- /dev/null +++ b/examples/cluster-engine-node-cluster/server.js @@ -0,0 +1,63 @@ +import cluster from "node:cluster"; +import process from "node:process"; +import { availableParallelism } from "node:os"; +import { + setupPrimary as setupPrimaryEngine, + NodeClusterEngine, +} from "@socket.io/cluster-engine"; +import { + setupPrimary as setupPrimaryAdapter, + createAdapter, +} from "@socket.io/cluster-adapter"; +import { createServer } from "node:http"; +import { Server } from "socket.io"; + +if (cluster.isPrimary) { + console.log(`Primary ${process.pid} is running`); + + const numCPUs = availableParallelism(); + + // fork workers + for (let i = 0; i < numCPUs; i++) { + cluster.fork(); + } + + setupPrimaryEngine(); + setupPrimaryAdapter(); + + // needed for packets containing Buffer objects (you can ignore it if you only send plaintext objects) + cluster.setupPrimary({ + serialization: "advanced", + }); + + cluster.on("exit", (worker, code, signal) => { + console.log(`worker ${worker.process.pid} died`); + }); +} else { + const httpServer = createServer((req, res) => { + res.writeHead(404).end(); + }); + + const engine = new NodeClusterEngine(); + + engine.attach(httpServer, { + path: "/socket.io/", + }); + + const io = new Server({ + adapter: createAdapter(), + }); + + io.bind(engine); + + io.on("connection", (socket) => { + socket.on("hello", () => { + socket.broadcast.emit("hello", socket.id, process.pid); + }); + }); + + // workers will share the same port + httpServer.listen(3000); + + console.log(`Worker ${process.pid} started`); +} diff --git a/examples/cluster-engine-redis/README.md b/examples/cluster-engine-redis/README.md new file mode 100644 index 0000000000..32b37fc918 --- /dev/null +++ b/examples/cluster-engine-redis/README.md @@ -0,0 +1,22 @@ +# Example with `@socket.io/cluster-engine` and Redis + +## How to use + +```bash +# start the redis server +$ docker compose up -d + +# run the server +$ node server.js + +# run the client +$ node client.js +``` + +## Explanation + +The `server.js` script will create 3 Socket.IO servers, each listening on a distinct port (`3001`, `3002` and `3003`), and a proxy server listening on port `3000` which randomly redirects to one of those servers. + +With the default engine (provided by the `engine.io` package), sticky sessions would be required, so that each HTTP request of the same Engine.IO session reaches the same server. + +The `RedisEngine` is a custom engine which takes care of the synchronization between the servers by using [Redis pub/sub](https://redis.io/docs/latest/develop/interact/pubsub/) and removes the need for sticky sessions when scaling horizontally. diff --git a/examples/cluster-engine-redis/client.js b/examples/cluster-engine-redis/client.js new file mode 100644 index 0000000000..dd4abb5e9c --- /dev/null +++ b/examples/cluster-engine-redis/client.js @@ -0,0 +1,26 @@ +import { io } from "socket.io-client"; + +const CLIENTS_COUNT = 3; + +for (let i = 0; i < CLIENTS_COUNT; i++) { + const socket = io("ws://localhost:3000/", { + // transports: ["polling"], + // transports: ["websocket"], + }); + + socket.on("connect", () => { + console.log(`connected as ${socket.id}`); + }); + + socket.on("disconnect", (reason) => { + console.log(`disconnected due to ${reason}`); + }); + + socket.on("hello", (socketId, workerId) => { + console.log(`received "hello" from ${socketId} (worker: ${workerId})`); + }); + + setInterval(() => { + socket.emit("hello"); + }, 2000); +} diff --git a/examples/cluster-engine-redis/compose.yaml b/examples/cluster-engine-redis/compose.yaml new file mode 100644 index 0000000000..c5c85d2a9e --- /dev/null +++ b/examples/cluster-engine-redis/compose.yaml @@ -0,0 +1,5 @@ +services: + redis: + image: redis:7 + ports: + - "6379:6379" diff --git a/examples/cluster-engine-redis/package.json b/examples/cluster-engine-redis/package.json new file mode 100644 index 0000000000..1bf3f4add2 --- /dev/null +++ b/examples/cluster-engine-redis/package.json @@ -0,0 +1,14 @@ +{ + "private": true, + "name": "cluster-engine-redis", + "version": "0.0.1", + "type": "module", + "dependencies": { + "@socket.io/cluster-engine": "^0.1.0", + "@socket.io/redis-adapter": "^8.3.0", + "http-proxy": "^1.18.1", + "redis": "^4.6.15", + "socket.io": "^4.7.5", + "socket.io-client": "^4.7.5" + } +} diff --git a/examples/cluster-engine-redis/server.js b/examples/cluster-engine-redis/server.js new file mode 100644 index 0000000000..d9b451dcaa --- /dev/null +++ b/examples/cluster-engine-redis/server.js @@ -0,0 +1,65 @@ +import { RedisEngine } from "@socket.io/cluster-engine"; +import { createServer } from "node:http"; +import { createClient } from "redis"; +import { Server } from "socket.io"; +import { createAdapter } from "@socket.io/redis-adapter"; +import proxyModule from "http-proxy"; + +const { createProxyServer } = proxyModule; + +async function initServer(port) { + const httpServer = createServer((req, res) => { + res.writeHead(404).end(); + }); + + const pubClient = createClient(); + const subClient = pubClient.duplicate(); + + await Promise.all([pubClient.connect(), subClient.connect()]); + + const engine = new RedisEngine(pubClient, subClient); + + engine.attach(httpServer, { + path: "/socket.io/", + }); + + const io = new Server({ + adapter: createAdapter(pubClient, subClient), + }); + + io.bind(engine); + + io.on("connection", (socket) => { + socket.on("hello", () => { + socket.broadcast.emit("hello", socket.id, port); + }); + }); + + httpServer.listen(port); +} + +function initProxy() { + const proxy = createProxyServer(); + + function randomTarget() { + return [ + "http://localhost:3001", + "http://localhost:3002", + "http://localhost:3003", + ][Math.floor(Math.random() * 3)]; + } + + const httpServer = createServer((req, res) => { + proxy.web(req, res, { target: randomTarget() }); + }); + + httpServer.on("upgrade", function (req, socket, head) { + proxy.ws(req, socket, head, { target: randomTarget() }); + }); + + httpServer.listen(3000); +} + +await Promise.all([initServer(3001), initServer(3002), initServer(3003)]); + +initProxy(); From 6e9bff4fcf2a76ae71274af320f008aec1eadfce Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 19 Jul 2024 15:20:12 +0200 Subject: [PATCH 08/12] docs: add example with @socket.io/postgres-adapter [skip ci] --- examples/postgres-adapter-example/README.md | 25 ++++++++++++ examples/postgres-adapter-example/client.js | 31 ++++++++++++++ examples/postgres-adapter-example/cluster.js | 13 ++++++ .../postgres-adapter-example/compose.yaml | 7 ++++ .../postgres-adapter-example/package.json | 12 ++++++ examples/postgres-adapter-example/server.js | 40 +++++++++++++++++++ 6 files changed, 128 insertions(+) create mode 100644 examples/postgres-adapter-example/README.md create mode 100644 examples/postgres-adapter-example/client.js create mode 100644 examples/postgres-adapter-example/cluster.js create mode 100644 examples/postgres-adapter-example/compose.yaml create mode 100644 examples/postgres-adapter-example/package.json create mode 100644 examples/postgres-adapter-example/server.js diff --git a/examples/postgres-adapter-example/README.md b/examples/postgres-adapter-example/README.md new file mode 100644 index 0000000000..7b5bd78859 --- /dev/null +++ b/examples/postgres-adapter-example/README.md @@ -0,0 +1,25 @@ +# Example with `@socket.io/postgres-adapter` + +**Table of contents** + + + * [How to use](#how-to-use) + * [Documentation](#documentation) + + +## How to use + +```bash +# start the postgres server +$ docker compose up -d + +# run the cluster +$ node cluster.js + +# run the client +$ node client.js +``` + +## Documentation + +The documentation can be found here: https://socket.io/docs/v4/postgres-adapter/ diff --git a/examples/postgres-adapter-example/client.js b/examples/postgres-adapter-example/client.js new file mode 100644 index 0000000000..b6efb6b1fd --- /dev/null +++ b/examples/postgres-adapter-example/client.js @@ -0,0 +1,31 @@ +import { io } from "socket.io-client"; + +const CLIENTS_COUNT = 3; +const PORTS = [3000, 3001, 3002]; + +for (let i = 0; i < CLIENTS_COUNT; i++) { + const socket = io(`ws://localhost:${PORTS[i % 3]}`, { + // transports: ["polling"], + // transports: ["websocket"], + }); + + socket.on("connect", () => { + console.log(`connected as ${socket.id}`); + }); + + socket.on("connect_error", () => { + console.log(`connect_error`); + }); + + socket.on("disconnect", (reason) => { + console.log(`disconnected due to ${reason}`); + }); + + socket.on("hello", (socketId, pid) => { + console.log(`received "hello" from ${socketId} (process: ${pid})`); + }); + + setInterval(() => { + socket.emit("hello"); + }, 2000); +} diff --git a/examples/postgres-adapter-example/cluster.js b/examples/postgres-adapter-example/cluster.js new file mode 100644 index 0000000000..c1aa138135 --- /dev/null +++ b/examples/postgres-adapter-example/cluster.js @@ -0,0 +1,13 @@ +import cluster from 'node:cluster'; + +const SERVERS_COUNT = 3; + +cluster.setupPrimary({ + exec: 'server.js', +}); + +for (let i = 0; i < SERVERS_COUNT; i++) { + cluster.fork({ + PORT: 3000 + i + }); +} diff --git a/examples/postgres-adapter-example/compose.yaml b/examples/postgres-adapter-example/compose.yaml new file mode 100644 index 0000000000..84ebf53d4c --- /dev/null +++ b/examples/postgres-adapter-example/compose.yaml @@ -0,0 +1,7 @@ +services: + postgres: + image: postgres:14 + ports: + - "5432:5432" + environment: + POSTGRES_PASSWORD: "changeit" diff --git a/examples/postgres-adapter-example/package.json b/examples/postgres-adapter-example/package.json new file mode 100644 index 0000000000..a114f6bd39 --- /dev/null +++ b/examples/postgres-adapter-example/package.json @@ -0,0 +1,12 @@ +{ + "private": true, + "name": "postgres-adapter-example", + "version": "0.0.1", + "type": "module", + "dependencies": { + "@socket.io/postgres-adapter": "^0.4.0", + "pg": "^8.12.0", + "socket.io": "^4.7.5", + "socket.io-client": "^4.7.5" + } +} diff --git a/examples/postgres-adapter-example/server.js b/examples/postgres-adapter-example/server.js new file mode 100644 index 0000000000..6cd07fdc54 --- /dev/null +++ b/examples/postgres-adapter-example/server.js @@ -0,0 +1,40 @@ +import { Server } from "socket.io"; +import { createAdapter } from "@socket.io/postgres-adapter"; +import pg from "pg"; +import process from "node:process"; + +const PORT = process.env.PORT || 3000; + +const pool = new pg.Pool({ + user: "postgres", + host: "localhost", + database: "postgres", + password: "changeit", + port: 5432, +}); + +await pool.query(` + CREATE TABLE IF NOT EXISTS socket_io_attachments ( + id bigserial UNIQUE, + created_at timestamptz DEFAULT NOW(), + payload bytea + ); +`); + +pool.on("error", (err) => { + console.error("Postgres error", err); +}); + +const io = new Server({ + adapter: createAdapter(pool) +}); + +io.on("connection", (socket) => { + socket.on("hello", () => { + // send to anyone except the sender + socket.broadcast.emit("hello", socket.id, process.pid); + }); +}); + +io.listen(PORT); +console.log(`server listening on port ${PORT}`); From b7577556e377c52277df118164995acaed874fc0 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Mon, 22 Jul 2024 10:07:21 +0200 Subject: [PATCH 09/12] docs: add example with NestJS Reference: https://docs.nestjs.com/websockets/gateways [skip ci] --- examples/nestjs-example/.eslintrc.js | 25 +++++++ examples/nestjs-example/.gitignore | 56 ++++++++++++++ examples/nestjs-example/.prettierrc | 4 + examples/nestjs-example/README.md | 73 +++++++++++++++++++ examples/nestjs-example/nest-cli.json | 8 ++ examples/nestjs-example/package.json | 72 ++++++++++++++++++ .../nestjs-example/src/app.controller.spec.ts | 22 ++++++ examples/nestjs-example/src/app.controller.ts | 13 ++++ examples/nestjs-example/src/app.module.ts | 11 +++ examples/nestjs-example/src/app.service.ts | 8 ++ .../src/events/events.gateway.ts | 18 +++++ .../src/events/events.module.ts | 7 ++ examples/nestjs-example/src/main.ts | 15 ++++ examples/nestjs-example/test/app.e2e-spec.ts | 24 ++++++ examples/nestjs-example/test/jest-e2e.json | 9 +++ examples/nestjs-example/tsconfig.build.json | 4 + examples/nestjs-example/tsconfig.json | 21 ++++++ examples/nestjs-example/views/index.hbs | 47 ++++++++++++ 18 files changed, 437 insertions(+) create mode 100644 examples/nestjs-example/.eslintrc.js create mode 100644 examples/nestjs-example/.gitignore create mode 100644 examples/nestjs-example/.prettierrc create mode 100644 examples/nestjs-example/README.md create mode 100644 examples/nestjs-example/nest-cli.json create mode 100644 examples/nestjs-example/package.json create mode 100644 examples/nestjs-example/src/app.controller.spec.ts create mode 100644 examples/nestjs-example/src/app.controller.ts create mode 100644 examples/nestjs-example/src/app.module.ts create mode 100644 examples/nestjs-example/src/app.service.ts create mode 100644 examples/nestjs-example/src/events/events.gateway.ts create mode 100644 examples/nestjs-example/src/events/events.module.ts create mode 100644 examples/nestjs-example/src/main.ts create mode 100644 examples/nestjs-example/test/app.e2e-spec.ts create mode 100644 examples/nestjs-example/test/jest-e2e.json create mode 100644 examples/nestjs-example/tsconfig.build.json create mode 100644 examples/nestjs-example/tsconfig.json create mode 100644 examples/nestjs-example/views/index.hbs diff --git a/examples/nestjs-example/.eslintrc.js b/examples/nestjs-example/.eslintrc.js new file mode 100644 index 0000000000..259de13c73 --- /dev/null +++ b/examples/nestjs-example/.eslintrc.js @@ -0,0 +1,25 @@ +module.exports = { + parser: '@typescript-eslint/parser', + parserOptions: { + project: 'tsconfig.json', + tsconfigRootDir: __dirname, + sourceType: 'module', + }, + plugins: ['@typescript-eslint/eslint-plugin'], + extends: [ + 'plugin:@typescript-eslint/recommended', + 'plugin:prettier/recommended', + ], + root: true, + env: { + node: true, + jest: true, + }, + ignorePatterns: ['.eslintrc.js'], + rules: { + '@typescript-eslint/interface-name-prefix': 'off', + '@typescript-eslint/explicit-function-return-type': 'off', + '@typescript-eslint/explicit-module-boundary-types': 'off', + '@typescript-eslint/no-explicit-any': 'off', + }, +}; diff --git a/examples/nestjs-example/.gitignore b/examples/nestjs-example/.gitignore new file mode 100644 index 0000000000..4b56acfbeb --- /dev/null +++ b/examples/nestjs-example/.gitignore @@ -0,0 +1,56 @@ +# compiled output +/dist +/node_modules +/build + +# Logs +logs +*.log +npm-debug.log* +pnpm-debug.log* +yarn-debug.log* +yarn-error.log* +lerna-debug.log* + +# OS +.DS_Store + +# Tests +/coverage +/.nyc_output + +# IDEs and editors +/.idea +.project +.classpath +.c9/ +*.launch +.settings/ +*.sublime-workspace + +# IDE - VSCode +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json + +# dotenv environment variable files +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# temp directory +.temp +.tmp + +# Runtime data +pids +*.pid +*.seed +*.pid.lock + +# Diagnostic reports (https://nodejs.org/api/report.html) +report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json diff --git a/examples/nestjs-example/.prettierrc b/examples/nestjs-example/.prettierrc new file mode 100644 index 0000000000..dcb72794f5 --- /dev/null +++ b/examples/nestjs-example/.prettierrc @@ -0,0 +1,4 @@ +{ + "singleQuote": true, + "trailingComma": "all" +} \ No newline at end of file diff --git a/examples/nestjs-example/README.md b/examples/nestjs-example/README.md new file mode 100644 index 0000000000..00a13b112a --- /dev/null +++ b/examples/nestjs-example/README.md @@ -0,0 +1,73 @@ +

+ Nest Logo +

+ +[circleci-image]: https://img.shields.io/circleci/build/github/nestjs/nest/master?token=abc123def456 +[circleci-url]: https://circleci.com/gh/nestjs/nest + +

A progressive Node.js framework for building efficient and scalable server-side applications.

+

+NPM Version +Package License +NPM Downloads +CircleCI +Coverage +Discord +Backers on Open Collective +Sponsors on Open Collective + + Support us + +

+ + +## Description + +[Nest](https://github.com/nestjs/nest) framework TypeScript starter repository. + +## Installation + +```bash +$ npm install +``` + +## Running the app + +```bash +# development +$ npm run start + +# watch mode +$ npm run start:dev + +# production mode +$ npm run start:prod +``` + +## Test + +```bash +# unit tests +$ npm run test + +# e2e tests +$ npm run test:e2e + +# test coverage +$ npm run test:cov +``` + +## Support + +Nest is an MIT-licensed open source project. It can grow thanks to the sponsors and support by the amazing backers. If you'd like to join them, please [read more here](https://docs.nestjs.com/support). + +## Stay in touch + +- Author - [Kamil Myśliwiec](https://kamilmysliwiec.com) +- Website - [https://nestjs.com](https://nestjs.com/) +- Twitter - [@nestframework](https://twitter.com/nestframework) + +## License + +Nest is [MIT licensed](LICENSE). diff --git a/examples/nestjs-example/nest-cli.json b/examples/nestjs-example/nest-cli.json new file mode 100644 index 0000000000..f9aa683b1a --- /dev/null +++ b/examples/nestjs-example/nest-cli.json @@ -0,0 +1,8 @@ +{ + "$schema": "https://json.schemastore.org/nest-cli", + "collection": "@nestjs/schematics", + "sourceRoot": "src", + "compilerOptions": { + "deleteOutDir": true + } +} diff --git a/examples/nestjs-example/package.json b/examples/nestjs-example/package.json new file mode 100644 index 0000000000..5502028674 --- /dev/null +++ b/examples/nestjs-example/package.json @@ -0,0 +1,72 @@ +{ + "name": "nestjs-example", + "version": "0.0.1", + "description": "", + "author": "", + "private": true, + "license": "UNLICENSED", + "scripts": { + "build": "nest build", + "format": "prettier --write \"src/**/*.ts\" \"test/**/*.ts\"", + "start": "nest start", + "start:dev": "nest start --watch", + "start:debug": "nest start --debug --watch", + "start:prod": "node dist/main", + "lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix", + "test": "jest", + "test:watch": "jest --watch", + "test:cov": "jest --coverage", + "test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand", + "test:e2e": "jest --config ./test/jest-e2e.json" + }, + "dependencies": { + "@nestjs/common": "^10.0.0", + "@nestjs/core": "^10.0.0", + "@nestjs/platform-express": "^10.0.0", + "@nestjs/platform-socket.io": "^10.3.10", + "@nestjs/websockets": "^10.3.10", + "hbs": "^4.2.0", + "reflect-metadata": "^0.2.0", + "rxjs": "^7.8.1" + }, + "devDependencies": { + "@nestjs/cli": "^10.0.0", + "@nestjs/schematics": "^10.0.0", + "@nestjs/testing": "^10.0.0", + "@types/express": "^4.17.17", + "@types/jest": "^29.5.2", + "@types/node": "^20.3.1", + "@types/supertest": "^6.0.0", + "@typescript-eslint/eslint-plugin": "^7.0.0", + "@typescript-eslint/parser": "^7.0.0", + "eslint": "^8.42.0", + "eslint-config-prettier": "^9.0.0", + "eslint-plugin-prettier": "^5.0.0", + "jest": "^29.5.0", + "prettier": "^3.0.0", + "source-map-support": "^0.5.21", + "supertest": "^7.0.0", + "ts-jest": "^29.1.0", + "ts-loader": "^9.4.3", + "ts-node": "^10.9.1", + "tsconfig-paths": "^4.2.0", + "typescript": "^5.1.3" + }, + "jest": { + "moduleFileExtensions": [ + "js", + "json", + "ts" + ], + "rootDir": "src", + "testRegex": ".*\\.spec\\.ts$", + "transform": { + "^.+\\.(t|j)s$": "ts-jest" + }, + "collectCoverageFrom": [ + "**/*.(t|j)s" + ], + "coverageDirectory": "../coverage", + "testEnvironment": "node" + } +} diff --git a/examples/nestjs-example/src/app.controller.spec.ts b/examples/nestjs-example/src/app.controller.spec.ts new file mode 100644 index 0000000000..d22f3890a3 --- /dev/null +++ b/examples/nestjs-example/src/app.controller.spec.ts @@ -0,0 +1,22 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { AppController } from './app.controller'; +import { AppService } from './app.service'; + +describe('AppController', () => { + let appController: AppController; + + beforeEach(async () => { + const app: TestingModule = await Test.createTestingModule({ + controllers: [AppController], + providers: [AppService], + }).compile(); + + appController = app.get(AppController); + }); + + describe('root', () => { + it('should return "Hello World!"', () => { + expect(appController.getHello()).toBe('Hello World!'); + }); + }); +}); diff --git a/examples/nestjs-example/src/app.controller.ts b/examples/nestjs-example/src/app.controller.ts new file mode 100644 index 0000000000..124aec7a43 --- /dev/null +++ b/examples/nestjs-example/src/app.controller.ts @@ -0,0 +1,13 @@ +import { Controller, Get, Render } from '@nestjs/common'; +import { AppService } from './app.service'; + +@Controller() +export class AppController { + constructor(private readonly appService: AppService) {} + + @Get() + @Render('index') + root() { + return { message: 'Hello world2!' }; + } +} diff --git a/examples/nestjs-example/src/app.module.ts b/examples/nestjs-example/src/app.module.ts new file mode 100644 index 0000000000..a93eadfa59 --- /dev/null +++ b/examples/nestjs-example/src/app.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; +import { AppController } from './app.controller'; +import { AppService } from './app.service'; +import { EventsModule } from './events/events.module'; + +@Module({ + imports: [EventsModule], + controllers: [AppController], + providers: [AppService], +}) +export class AppModule {} diff --git a/examples/nestjs-example/src/app.service.ts b/examples/nestjs-example/src/app.service.ts new file mode 100644 index 0000000000..927d7cca0b --- /dev/null +++ b/examples/nestjs-example/src/app.service.ts @@ -0,0 +1,8 @@ +import { Injectable } from '@nestjs/common'; + +@Injectable() +export class AppService { + getHello(): string { + return 'Hello World!'; + } +} diff --git a/examples/nestjs-example/src/events/events.gateway.ts b/examples/nestjs-example/src/events/events.gateway.ts new file mode 100644 index 0000000000..728d2c9312 --- /dev/null +++ b/examples/nestjs-example/src/events/events.gateway.ts @@ -0,0 +1,18 @@ +import { + MessageBody, + SubscribeMessage, + WebSocketGateway, + WebSocketServer, +} from '@nestjs/websockets'; +import { Server } from 'socket.io'; + +@WebSocketGateway({}) +export class EventsGateway { + @WebSocketServer() + io: Server; + + @SubscribeMessage('hello') + handleEvent(@MessageBody() data: string): string { + return data.split('').reverse().join(''); + } +} diff --git a/examples/nestjs-example/src/events/events.module.ts b/examples/nestjs-example/src/events/events.module.ts new file mode 100644 index 0000000000..2b2d1cbb36 --- /dev/null +++ b/examples/nestjs-example/src/events/events.module.ts @@ -0,0 +1,7 @@ +import { Module } from '@nestjs/common'; +import { EventsGateway } from './events.gateway'; + +@Module({ + providers: [EventsGateway], +}) +export class EventsModule {} diff --git a/examples/nestjs-example/src/main.ts b/examples/nestjs-example/src/main.ts new file mode 100644 index 0000000000..010b6fd7a7 --- /dev/null +++ b/examples/nestjs-example/src/main.ts @@ -0,0 +1,15 @@ +import { NestFactory } from '@nestjs/core'; +import { AppModule } from './app.module'; +import { NestExpressApplication } from '@nestjs/platform-express'; +import { join } from 'node:path'; + +async function bootstrap() { + const app = await NestFactory.create(AppModule); + + app.setBaseViewsDir(join(__dirname, '..', 'views')); + app.setViewEngine('hbs'); + + await app.listen(3000); +} + +bootstrap(); diff --git a/examples/nestjs-example/test/app.e2e-spec.ts b/examples/nestjs-example/test/app.e2e-spec.ts new file mode 100644 index 0000000000..50cda62332 --- /dev/null +++ b/examples/nestjs-example/test/app.e2e-spec.ts @@ -0,0 +1,24 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { INestApplication } from '@nestjs/common'; +import * as request from 'supertest'; +import { AppModule } from './../src/app.module'; + +describe('AppController (e2e)', () => { + let app: INestApplication; + + beforeEach(async () => { + const moduleFixture: TestingModule = await Test.createTestingModule({ + imports: [AppModule], + }).compile(); + + app = moduleFixture.createNestApplication(); + await app.init(); + }); + + it('/ (GET)', () => { + return request(app.getHttpServer()) + .get('/') + .expect(200) + .expect('Hello World!'); + }); +}); diff --git a/examples/nestjs-example/test/jest-e2e.json b/examples/nestjs-example/test/jest-e2e.json new file mode 100644 index 0000000000..e9d912f3e3 --- /dev/null +++ b/examples/nestjs-example/test/jest-e2e.json @@ -0,0 +1,9 @@ +{ + "moduleFileExtensions": ["js", "json", "ts"], + "rootDir": ".", + "testEnvironment": "node", + "testRegex": ".e2e-spec.ts$", + "transform": { + "^.+\\.(t|j)s$": "ts-jest" + } +} diff --git a/examples/nestjs-example/tsconfig.build.json b/examples/nestjs-example/tsconfig.build.json new file mode 100644 index 0000000000..64f86c6bd2 --- /dev/null +++ b/examples/nestjs-example/tsconfig.build.json @@ -0,0 +1,4 @@ +{ + "extends": "./tsconfig.json", + "exclude": ["node_modules", "test", "dist", "**/*spec.ts"] +} diff --git a/examples/nestjs-example/tsconfig.json b/examples/nestjs-example/tsconfig.json new file mode 100644 index 0000000000..95f5641cf7 --- /dev/null +++ b/examples/nestjs-example/tsconfig.json @@ -0,0 +1,21 @@ +{ + "compilerOptions": { + "module": "commonjs", + "declaration": true, + "removeComments": true, + "emitDecoratorMetadata": true, + "experimentalDecorators": true, + "allowSyntheticDefaultImports": true, + "target": "ES2021", + "sourceMap": true, + "outDir": "./dist", + "baseUrl": "./", + "incremental": true, + "skipLibCheck": true, + "strictNullChecks": false, + "noImplicitAny": false, + "strictBindCallApply": false, + "forceConsistentCasingInFileNames": false, + "noFallthroughCasesInSwitch": false + } +} diff --git a/examples/nestjs-example/views/index.hbs b/examples/nestjs-example/views/index.hbs new file mode 100644 index 0000000000..0422a874d0 --- /dev/null +++ b/examples/nestjs-example/views/index.hbs @@ -0,0 +1,47 @@ + + + + + + App + + +

Status:

+

Transport:

+ + + + From 7fd75e6aac02e6c8345063902d6927a5bae168e1 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Mon, 22 Jul 2024 11:28:11 +0200 Subject: [PATCH 10/12] docs(changelog): add changelog for socket.io-parser@3.3.4 Diff: https://github.com/Automattic/socket.io-parser/compare/3.3.3...3.3.4 [skip ci] --- packages/socket.io-parser/CHANGELOG.md | 70 +++++++++++++------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/packages/socket.io-parser/CHANGELOG.md b/packages/socket.io-parser/CHANGELOG.md index 59a8ab827c..ac5fe76b7e 100644 --- a/packages/socket.io-parser/CHANGELOG.md +++ b/packages/socket.io-parser/CHANGELOG.md @@ -1,48 +1,48 @@ # History -## 2023 +| Version | Release date | +|-------------------------------------------------------------------------------------------------------------|----------------| +| [3.3.4](#334-2024-07-22) (from the [3.3.x](https://github.com/socketio/socket.io-parser/tree/3.3.x) branch) | July 2024 | +| [4.2.4](#424-2023-05-31) | May 2023 | +| [3.4.3](#343-2023-05-22) (from the [3.4.x](https://github.com/socketio/socket.io-parser/tree/3.4.x) branch) | May 2023 | +| [4.2.3](#423-2023-05-22) | May 2023 | +| [4.2.2](#422-2023-01-19) | January 2023 | +| [3.3.3](#333-2022-11-09) (from the [3.3.x](https://github.com/socketio/socket.io-parser/tree/3.3.x) branch) | November 2022 | +| [3.4.2](#342-2022-11-09) (from the [3.4.x](https://github.com/socketio/socket.io-parser/tree/3.4.x) branch) | November 2022 | +| [4.0.5](#405-2022-06-27) (from the [4.0.x](https://github.com/socketio/socket.io-parser/tree/4.0.x) branch) | June 2022 | +| [4.2.1](#421-2022-06-27) | June 2022 | +| [4.2.0](#420-2022-04-17) | April 2022 | +| [4.1.2](#412-2022-02-17) | February 2022 | +| [3.3.3](#333-2022-11-09) (from the [3.3.x](https://github.com/socketio/socket.io-parser/tree/3.3.x) branch) | November 2022 | +| [3.4.2](#342-2022-11-09) (from the [3.4.x](https://github.com/socketio/socket.io-parser/tree/3.4.x) branch) | November 2022 | +| [4.0.5](#405-2022-06-27) (from the [4.0.x](https://github.com/socketio/socket.io-parser/tree/4.0.x) branch) | June 2022 | +| [4.2.1](#421-2022-06-27) | June 2022 | +| [4.2.0](#420-2022-04-17) | April 2022 | +| [4.1.2](#412-2022-02-17) | February 2022 | +| [4.1.1](#411-2021-10-14) | October 2021 | +| [4.1.0](#410-2021-10-11) | October 2021 | +| [4.0.4](#404-2021-01-15) | January 2021 | +| [3.3.2](#332-2021-01-09) (from the [3.3.x](https://github.com/socketio/socket.io-parser/tree/3.3.x) branch) | January 2021 | +| [4.0.3](#403-2021-01-05) | January 2021 | +| [4.0.2](#402-2020-11-25) | November 2020 | +| [4.0.1](#401-2020-11-05) | November 2020 | +| [3.3.1](#331-2020-09-30) (from the [3.3.x](https://github.com/socketio/socket.io-parser/tree/3.3.x) branch) | September 2020 | +| [**4.0.0**](#400-2020-09-28) | September 2020 | +| [3.4.1](#341-2020-05-13) | May 2020 | +| [3.4.0](#340-2019-09-20) | September 2019 | +| [3.3.0](#330-2018-11-07) | November 2018 | -- [4.2.4](#424-2023-05-31) (May 2023) -- [3.4.3](#343-2023-05-22) (May 2023) (from the [3.4.x](https://github.com/socketio/socket.io-parser/tree/3.4.x) branch) -- [4.2.3](#423-2023-05-22) (May 2023) -- [4.2.2](#422-2023-01-19) (Jan 2023) -## 2022 - -- [3.3.3](#333-2022-11-09) (Nov 2022) (from the [3.3.x](https://github.com/socketio/socket.io-parser/tree/3.3.x) branch) -- [3.4.2](#342-2022-11-09) (Nov 2022) (from the [3.4.x](https://github.com/socketio/socket.io-parser/tree/3.4.x) branch) -- [4.0.5](#405-2022-06-27) (Jun 2022) (from the [4.0.x](https://github.com/socketio/socket.io-parser/tree/4.0.x) branch) -- [4.2.1](#421-2022-06-27) (Jun 2022) -- [4.2.0](#420-2022-04-17) (Apr 2022) -- [4.1.2](#412-2022-02-17) (Feb 2022) - -## 2021 - -- [4.1.1](#411-2021-10-14) (Oct 2021) -- [4.1.0](#410-2021-10-11) (Oct 2021) -- [4.0.4](#404-2021-01-15) (Jan 2021) -- [3.3.2](#332-2021-01-09) (Jan 2021) (from the [3.3.x](https://github.com/socketio/socket.io-parser/tree/3.3.x) branch) -- [4.0.3](#403-2021-01-05) (Jan 2021) - -## 2020 - -- [4.0.2](#402-2020-11-25) (Nov 2020) -- [4.0.1](#401-2020-11-05) (Nov 2020) -- [3.3.1](#331-2020-09-30) (Sep 2020) (from the [3.3.x](https://github.com/socketio/socket.io-parser/tree/3.3.x) branch) -- [**4.0.0**](#400-2020-09-28) (Sep 2020) -- [3.4.1](#341-2020-05-13) (May 2020) - -## 2019 +# Release notes -- [3.4.0](#340-2019-09-20) (Sep 2019) +## [3.3.4](https://github.com/Automattic/socket.io-parser/compare/3.3.3...3.3.4) (2024-07-22) -## 2018 -- [3.3.0](#330-2018-11-07) (Nov 2018) +### Bug Fixes +* check the format of the event name ([#125](https://github.com/Automattic/socket.io-parser/issues/125)) ([ee00660](https://github.com/Automattic/socket.io-parser/commit/ee006607495eca4ec7262ad080dd3a91439a5ba4)) -# Release notes ## [4.2.4](https://github.com/socketio/socket.io-parser/compare/4.2.3...4.2.4) (2023-05-31) From b79d80aa5989b21c22f4901a2d1b53cf51351020 Mon Sep 17 00:00:00 2001 From: KartikeSingh Date: Mon, 24 Jun 2024 11:01:14 +0530 Subject: [PATCH 11/12] docs: fix conjunction with fastify example (#5057) [skip ci] --- packages/socket.io/Readme.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/socket.io/Readme.md b/packages/socket.io/Readme.md index 19c0aeddc7..cf120cd71a 100644 --- a/packages/socket.io/Readme.md +++ b/packages/socket.io/Readme.md @@ -160,7 +160,9 @@ called `io`. ```js const app = require('fastify')(); app.register(require('fastify-socket.io')); -app.io.on('connection', () => { /* … */ }); +app.ready().then(() => { + app.io.on('connection', () => { /* … */ }); +}) app.listen(3000); ``` From 582655f679ccc43f0a9cbef1f13ea3cde07dc2e1 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 26 Jul 2024 09:26:47 +0200 Subject: [PATCH 12/12] test(cluster-engine): fix flaky test cleanup --- packages/socket.io-cluster-engine/test/cluster.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/socket.io-cluster-engine/test/cluster.ts b/packages/socket.io-cluster-engine/test/cluster.ts index 87f2385c2f..ac51f24e79 100644 --- a/packages/socket.io-cluster-engine/test/cluster.ts +++ b/packages/socket.io-cluster-engine/test/cluster.ts @@ -3,6 +3,8 @@ import expect = require("expect.js"); import { handshake, url } from "./util"; import { setupPrimary } from "../lib"; +const WORKER_COUNT = 3; + cluster.setupPrimary({ exec: "./test/worker.js", // @ts-expect-error @@ -13,7 +15,7 @@ setupPrimary(); describe("cluster", () => { beforeEach((done) => { - for (let i = 0; i < 3; i++) { + for (let i = 0; i < WORKER_COUNT; i++) { const worker = cluster.fork(); if (i === 2) { @@ -23,16 +25,18 @@ describe("cluster", () => { }); afterEach((done) => { - for (const worker of Object.values(cluster.workers)) { - worker.kill(); - } + let i = 0; function onExit() { - if (Object.keys(cluster.workers).length === 0) { + if (++i === WORKER_COUNT) { cluster.off("exit", onExit); done(); } } cluster.on("exit", onExit); + + for (const worker of Object.values(cluster.workers)) { + worker.kill(); + } }); it("should ping/pong", (done) => {