From 3b11e32f0bc663e865f909fba5b05c3177bb2dd3 Mon Sep 17 00:00:00 2001 From: Taylor Schley Date: Wed, 8 Jun 2022 15:49:51 -0500 Subject: [PATCH] feat(sse): add server-sent events endpoint --- README.md | 11 +++--- api.js | 4 +-- app/controllers/api/v1/index.js | 3 +- app/controllers/api/v1/sse.js | 16 +++++++++ config/api.js | 53 ++++++++++++++++++++++++---- config/index.js | 6 ++++ index.js | 11 ++---- package.json | 2 ++ routes/api/v1/index.js | 2 ++ test/api/v1/index.js | 1 + test/api/v1/no-jwt.js | 14 ++++++++ test/api/v1/sse.js | 61 +++++++++++++++++++++++++++++++++ test/plugin/options.js | 4 ++- test/utils.js | 13 +++++-- yarn.lock | 24 +++++++++++++ 15 files changed, 199 insertions(+), 26 deletions(-) create mode 100644 app/controllers/api/v1/sse.js create mode 100644 test/api/v1/no-jwt.js create mode 100644 test/api/v1/sse.js diff --git a/README.md b/README.md index 82f99e5..c9b5d81 100644 --- a/README.md +++ b/README.md @@ -39,10 +39,11 @@ The API will start automatically when the Bree constructor is called. ## Options -| Option | Type | Description | -|:------:|:------:|----------------------------------------------------------------------------------------------| -| port | Number | The port the API will listen on. Default: `62893` | -| jwt | Object | Configurations for JWT. Only option is `secret` which will be the secret used to verify JWT. | +| Option | Type | Description | +| :------: | :------: | ---------------------------------------------------------------------------------------------- | +| port | Number | The port the API will listen on. Default: `62893` | +| jwt | Object | Configurations for JWT. Only option is `secret` which will be the secret used to verify JWT. | +| sse | Object | Configurations for SSE. See [koa-sse][] for list of options. | ## API @@ -62,3 +63,5 @@ Check out the [API Docs](https://documenter.getpostman.com/view/17142435/TzzDLbN [yarn]: https://yarnpkg.com/ [Bree]: https://jobscheduler.net/#/ + +[koa-sse]: https://github.com/yklykl530/koa-sse diff --git a/api.js b/api.js index c4f77e8..9c88f23 100644 --- a/api.js +++ b/api.js @@ -10,7 +10,7 @@ const ip = require('ip'); const logger = require('./helpers/logger'); const apiConfig = require('./config/api'); -const api = new API(apiConfig); +const api = new API(apiConfig()); if (!module.parent) { const graceful = new Graceful({ @@ -25,7 +25,7 @@ if (!module.parent) { if (process.send) process.send('ready'); const { port } = api.server.address(); logger.info( - `Lad API server listening on ${port} (LAN: ${ip.address()}:${port})` + `Bree API server listening on ${port} (LAN: ${ip.address()}:${port})` ); } catch (error) { logger.error(error); diff --git a/app/controllers/api/v1/index.js b/app/controllers/api/v1/index.js index 5edd213..7477338 100644 --- a/app/controllers/api/v1/index.js +++ b/app/controllers/api/v1/index.js @@ -1,9 +1,10 @@ const config = require('./config'); const jobs = require('./jobs'); const control = require('./control'); +const sse = require('./sse'); const test = (ctx) => { ctx.body = { breeExists: Boolean(ctx.bree) }; }; -module.exports = { config, test, jobs, control }; +module.exports = { config, test, jobs, control, sse }; diff --git a/app/controllers/api/v1/sse.js b/app/controllers/api/v1/sse.js new file mode 100644 index 0000000..d9d1989 --- /dev/null +++ b/app/controllers/api/v1/sse.js @@ -0,0 +1,16 @@ +async function connect(ctx) { + if (ctx.sse) { + // send bree events over sse + for (const event of ['worker created', 'worker deleted']) { + ctx.bree.on(event, (name) => { + ctx.sse.send({ event, data: { name } }); + }); + } + + ctx.sse.on('close', () => { + ctx.logger.error('SSE closed'); + }); + } +} + +module.exports = { connect }; diff --git a/config/api.js b/config/api.js index 91e17a8..a0887d2 100644 --- a/config/api.js +++ b/config/api.js @@ -1,15 +1,54 @@ const sharedConfig = require('@ladjs/shared-config'); const jwt = require('koa-jwt'); +const sse = require('koa-sse-stream'); const logger = require('../helpers/logger'); const routes = require('../routes'); const config = require('../config'); -module.exports = { - ...sharedConfig('API'), - routes: routes.api, - logger, - hookBeforeRoutes(app) { - app.use(jwt(config.jwt)); - } +module.exports = (opts = {}) => { + const sseMiddleware = sse({ ...config.sse, ...opts.sse }); + const jwtMiddleware = jwt({ + ...config.jwt, + ...opts.jwt, + getToken(ctx, _) { + // pull token off of url if it is the sse endpoint + if (ctx.url.indexOf('/v1/sse') === 0) { + const splitUrl = ctx.url.split('/'); + + if (splitUrl.length === 4) { + return splitUrl[3]; + } + } + + return null; + } + }); + + return { + ...sharedConfig('API'), + port: config.port, + ...opts, + routes: routes.api, + logger, + hookBeforeRoutes(app) { + app.use((ctx, next) => { + // return early if jwt is set to false + if (!opts.jwt && typeof opts.jwt === 'boolean') { + return next(); + } + + return jwtMiddleware(ctx, next); + }); + + app.use((ctx, next) => { + // only do this on sse route + if (ctx.url.indexOf('/v1/sse') === 0) { + return sseMiddleware(ctx, next); + } + + return next(); + }); + } + }; }; diff --git a/config/index.js b/config/index.js index 8b56558..5feffce 100644 --- a/config/index.js +++ b/config/index.js @@ -30,6 +30,12 @@ const config = { secret: env.JWT_SECRET }, + // sse options + sse: { + maxClients: 10_000, + pingInterval: 60_000 + }, + // store IP address // storeIPAddress: { diff --git a/index.js b/index.js index 9682037..e3eaf8f 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,4 @@ const API = require('@ladjs/api'); -const jwt = require('koa-jwt'); const api = require('./api'); const apiConfig = require('./config/api'); @@ -10,17 +9,11 @@ function plugin(opts, Bree) { opts = { port: config.port, jwt: config.jwt, + sse: config.sse, ...opts }; - const api = new API({ - ...apiConfig, - port: opts.port, - jwt: opts.jwt, - hookBeforeRoutes(app) { - app.use(jwt(opts.jwt)); - } - }); + const api = new API(apiConfig(opts)); const oldInit = Bree.prototype.init; diff --git a/package.json b/package.json index 51d7498..a987088 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "humanize-string": "^3.0.0", "ip": "^1.1.5", "koa-jwt": "^4.0.1", + "koa-sse-stream": "^0.2.0", "lodash": "^4.17.20", "markdown-it": "^13.0.1", "markdown-it-emoji": "^2.0.0", @@ -55,6 +56,7 @@ "eslint-formatter-pretty": "^4.0.0", "eslint-plugin-compat": "^4.0.2", "eslint-plugin-no-smart-quotes": "^1.1.0", + "eventsource": "^2.0.2", "fixpack": "^4.0.0", "get-port": "^5.1.1", "gulp": "^4.0.2", diff --git a/routes/api/v1/index.js b/routes/api/v1/index.js index df98ba5..c2458b2 100644 --- a/routes/api/v1/index.js +++ b/routes/api/v1/index.js @@ -29,4 +29,6 @@ router.post('/run/:jobName', api.v1.control.run); router.post('/restart', api.v1.control.restart); router.post('/restart/:jobName', api.v1.control.restart); +router.get('/sse/:token', api.v1.sse.connect); + module.exports = router; diff --git a/test/api/v1/index.js b/test/api/v1/index.js index e7c4666..6b46a2b 100644 --- a/test/api/v1/index.js +++ b/test/api/v1/index.js @@ -13,6 +13,7 @@ test.before(async (t) => { test('fails when no creds are presented', async (t) => { const { api } = t.context; const res = await api.get('/v1/test'); + t.is(res.status, 401); }); diff --git a/test/api/v1/no-jwt.js b/test/api/v1/no-jwt.js new file mode 100644 index 0000000..7f56d9c --- /dev/null +++ b/test/api/v1/no-jwt.js @@ -0,0 +1,14 @@ +const test = require('ava'); + +const utils = require('../../utils'); + +test.before(async (t) => { + await utils.setupApiServer(t, {}, { jwt: false }); +}); + +test('works when no creds are presented', async (t) => { + const { api } = t.context; + const res = await api.get('/v1/test'); + + t.is(res.status, 200); +}); diff --git a/test/api/v1/sse.js b/test/api/v1/sse.js new file mode 100644 index 0000000..8e337ec --- /dev/null +++ b/test/api/v1/sse.js @@ -0,0 +1,61 @@ +const path = require('path'); +const { once } = require('events'); +const test = require('ava'); +const jwt = require('jsonwebtoken'); + +const config = require('../../../config'); + +const utils = require('../../utils'); + +const rootUrl = '/v1/sse'; + +test.before(async (t) => { + await utils.setupApiServer(t, { + jobs: [ + { name: 'done', path: path.join(utils.root, 'basic.js') }, + { + name: 'delayed', + path: path.join(utils.root, 'basic.js'), + timeout: 100 + }, + { + name: 'waiting', + path: path.join(utils.root, 'basic.js'), + interval: 100 + }, + { + name: 'active', + path: path.join(utils.root, 'long.js') + } + ] + }); + t.context.token = jwt.sign({}, config.jwt.secret); + + t.context.api = t.context.api.auth(t.context.token, { type: 'bearer' }); + + t.context.bree.start(); +}); + +test('successfully connect to sse', async (t) => { + const es = utils.setupEventSource(t, rootUrl); + + await once(es, 'open'); + + t.pass(); +}); + +const eventsMacro = test.macro({ + async exec(t, event) { + const es = utils.setupEventSource(t, rootUrl); + + await once(es, event); + + t.pass(); + }, + title(_, event) { + return `successfully listen to "${event}" messages`; + } +}); + +test(eventsMacro, 'worker created'); +test(eventsMacro, 'worker deleted'); diff --git a/test/plugin/options.js b/test/plugin/options.js index e854248..d181303 100644 --- a/test/plugin/options.js +++ b/test/plugin/options.js @@ -9,11 +9,13 @@ test('can modify options', (t) => { Bree.extend(plugin, { port: 3000, - jwt: { secret: 'thisisasecret' } + jwt: { secret: 'thisisasecret' }, + sse: { maxClients: 100 } }); const bree = new Bree(baseConfig); t.is(bree.api.config.port, 3000); t.is(bree.api.config.jwt.secret, 'thisisasecret'); + t.is(bree.api.config.sse.maxClients, 100); }); diff --git a/test/utils.js b/test/utils.js index 4360fa1..16332d8 100644 --- a/test/utils.js +++ b/test/utils.js @@ -3,18 +3,19 @@ const path = require('path'); const request = require('supertest'); const getPort = require('get-port'); +const EventSource = require('eventsource'); // // setup utilities // -exports.setupApiServer = async (t, config = {}) => { +exports.setupApiServer = async (t, config = {}, pluginConfig = {}) => { // Must require here in order to load changes made during setup const Bree = require('bree'); const { plugin } = require('../'); const port = await getPort(); - Bree.extend(plugin, { port }); + Bree.extend(plugin, { port, ...pluginConfig }); const bree = new Bree({ ...baseConfig, ...config }); @@ -34,3 +35,11 @@ const baseConfig = { jobs: ['basic'] }; exports.baseConfig = baseConfig; + +exports.setupEventSource = (t, endpoint) => { + const { token, bree } = t.context; + + return new EventSource( + `http://${bree.api.config.serverHost}:${bree.api.config.port}${endpoint}/${token}` + ); +}; diff --git a/yarn.lock b/yarn.lock index c387281..b812dad 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3508,6 +3508,11 @@ elliptic@^6.5.3: minimalistic-assert "^1.0.1" minimalistic-crypto-utils "^1.0.1" +emitter-component@^1.1.1: + version "1.1.1" + resolved "https://registry.npmjs.org/emitter-component/-/emitter-component-1.1.1.tgz#065e2dbed6959bf470679edabeaf7981d1003ab6" + integrity sha512-G+mpdiAySMuB7kesVRLuyvYRqDmshB7ReKEVuyBPkzQlmiDiLrt7hHHIy4Aff552bgknVN7B2/d3lzhGO5dvpQ== + emittery@^0.11.0: version "0.11.0" resolved "https://registry.npmjs.org/emittery/-/emittery-0.11.0.tgz#eb5f756a200d3431de2c6e850cb2d8afd97a03b9" @@ -4341,6 +4346,11 @@ events@^3.0.0: resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400" integrity sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q== +eventsource@^2.0.2: + version "2.0.2" + resolved "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz#76dfcc02930fb2ff339520b6d290da573a9e8508" + integrity sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA== + evp_bytestokey@^1.0.0, evp_bytestokey@^1.0.3: version "1.0.3" resolved "https://registry.yarnpkg.com/evp_bytestokey/-/evp_bytestokey-1.0.3.tgz#7fcbdb198dc71959432efe13842684e0525acb02" @@ -6851,6 +6861,13 @@ koa-passport@^4.1.4: dependencies: passport "^0.4.0" +koa-sse-stream@^0.2.0: + version "0.2.0" + resolved "https://registry.npmjs.org/koa-sse-stream/-/koa-sse-stream-0.2.0.tgz#1effd4e72b18c138e2a0710dfa7fdd9ea0dd26b5" + integrity sha512-Dyp3YS5HfnpE17Mk8HWl+6IMVgO/VU5rGXPKjgej+BrfdZZfRv2AXAm6osd380xcjhcXJgaHg23ntdLbpjCIDQ== + dependencies: + stream "0.0.2" + koa-unless@^1.0.7: version "1.0.7" resolved "https://registry.yarnpkg.com/koa-unless/-/koa-unless-1.0.7.tgz#b9df375e2b4da3043918d48622520c2c0b79f032" @@ -11310,6 +11327,13 @@ stream-shift@^1.0.0: resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.1.tgz#d7088281559ab2778424279b0877da3c392d5a3d" integrity sha512-AiisoFqQ0vbGcZgQPY1cdP2I76glaVA/RauYR4G4thNFgkTqr90yXTo4LYX60Jl+sIlPNHHdGSwo01AvbKUSVQ== +stream@0.0.2: + version "0.0.2" + resolved "https://registry.npmjs.org/stream/-/stream-0.0.2.tgz#7f5363f057f6592c5595f00bc80a27f5cec1f0ef" + integrity sha1-f1Nj8Ff2WSxVlfALyAon9c7B8O8= + dependencies: + emitter-component "^1.1.1" + streaming-json-stringify@3: version "3.1.0" resolved "https://registry.yarnpkg.com/streaming-json-stringify/-/streaming-json-stringify-3.1.0.tgz#80200437a993cc39c4fe00263b7b3b903ac87af5"