Skip to content

Commit

Permalink
feat(sse): add server-sent events endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
shadowgate15 committed Jun 8, 2022
1 parent c0a3863 commit 3b11e32
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 26 deletions.
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ The API will start automatically when the Bree constructor is called.

## Options

| Option | Type | Description |
|:------:|:------:|----------------------------------------------------------------------------------------------|
| port | Number | The port the API will listen on. Default: `62893` |
| jwt | Object | Configurations for JWT. Only option is `secret` which will be the secret used to verify JWT. |
| Option | Type | Description |
| :------: | :------: | ---------------------------------------------------------------------------------------------- |
| port | Number | The port the API will listen on. Default: `62893` |
| jwt | Object | Configurations for JWT. Only option is `secret` which will be the secret used to verify JWT. |
| sse | Object | Configurations for SSE. See [koa-sse][] for list of options. |

## API

Expand All @@ -62,3 +63,5 @@ Check out the [API Docs](https://documenter.getpostman.com/view/17142435/TzzDLbN
[yarn]: https://yarnpkg.com/

[Bree]: https://jobscheduler.net/#/

[koa-sse]: https://github.com/yklykl530/koa-sse
4 changes: 2 additions & 2 deletions api.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const ip = require('ip');
const logger = require('./helpers/logger');
const apiConfig = require('./config/api');

const api = new API(apiConfig);
const api = new API(apiConfig());

if (!module.parent) {
const graceful = new Graceful({
Expand All @@ -25,7 +25,7 @@ if (!module.parent) {
if (process.send) process.send('ready');
const { port } = api.server.address();
logger.info(
`Lad API server listening on ${port} (LAN: ${ip.address()}:${port})`
`Bree API server listening on ${port} (LAN: ${ip.address()}:${port})`
);
} catch (error) {
logger.error(error);
Expand Down
3 changes: 2 additions & 1 deletion app/controllers/api/v1/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const config = require('./config');
const jobs = require('./jobs');
const control = require('./control');
const sse = require('./sse');

const test = (ctx) => {
ctx.body = { breeExists: Boolean(ctx.bree) };
};

module.exports = { config, test, jobs, control };
module.exports = { config, test, jobs, control, sse };
16 changes: 16 additions & 0 deletions app/controllers/api/v1/sse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
async function connect(ctx) {
if (ctx.sse) {
// send bree events over sse
for (const event of ['worker created', 'worker deleted']) {
ctx.bree.on(event, (name) => {
ctx.sse.send({ event, data: { name } });
});
}

ctx.sse.on('close', () => {
ctx.logger.error('SSE closed');
});
}
}

module.exports = { connect };
53 changes: 46 additions & 7 deletions config/api.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,54 @@
const sharedConfig = require('@ladjs/shared-config');
const jwt = require('koa-jwt');
const sse = require('koa-sse-stream');

const logger = require('../helpers/logger');
const routes = require('../routes');
const config = require('../config');

module.exports = {
...sharedConfig('API'),
routes: routes.api,
logger,
hookBeforeRoutes(app) {
app.use(jwt(config.jwt));
}
module.exports = (opts = {}) => {
const sseMiddleware = sse({ ...config.sse, ...opts.sse });
const jwtMiddleware = jwt({
...config.jwt,
...opts.jwt,
getToken(ctx, _) {
// pull token off of url if it is the sse endpoint
if (ctx.url.indexOf('/v1/sse') === 0) {
const splitUrl = ctx.url.split('/');

if (splitUrl.length === 4) {
return splitUrl[3];
}
}

return null;
}
});

return {
...sharedConfig('API'),
port: config.port,
...opts,
routes: routes.api,
logger,
hookBeforeRoutes(app) {
app.use((ctx, next) => {
// return early if jwt is set to false
if (!opts.jwt && typeof opts.jwt === 'boolean') {
return next();
}

return jwtMiddleware(ctx, next);
});

app.use((ctx, next) => {
// only do this on sse route
if (ctx.url.indexOf('/v1/sse') === 0) {
return sseMiddleware(ctx, next);
}

return next();
});
}
};
};
6 changes: 6 additions & 0 deletions config/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ const config = {
secret: env.JWT_SECRET
},

// sse options
sse: {
maxClients: 10_000,
pingInterval: 60_000
},

// store IP address
// <https://github.com/ladjs/store-ip-address>
storeIPAddress: {
Expand Down
11 changes: 2 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const API = require('@ladjs/api');
const jwt = require('koa-jwt');

const api = require('./api');
const apiConfig = require('./config/api');
Expand All @@ -10,17 +9,11 @@ function plugin(opts, Bree) {
opts = {
port: config.port,
jwt: config.jwt,
sse: config.sse,
...opts
};

const api = new API({
...apiConfig,
port: opts.port,
jwt: opts.jwt,
hookBeforeRoutes(app) {
app.use(jwt(opts.jwt));
}
});
const api = new API(apiConfig(opts));

const oldInit = Bree.prototype.init;

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"humanize-string": "^3.0.0",
"ip": "^1.1.5",
"koa-jwt": "^4.0.1",
"koa-sse-stream": "^0.2.0",
"lodash": "^4.17.20",
"markdown-it": "^13.0.1",
"markdown-it-emoji": "^2.0.0",
Expand All @@ -55,6 +56,7 @@
"eslint-formatter-pretty": "^4.0.0",
"eslint-plugin-compat": "^4.0.2",
"eslint-plugin-no-smart-quotes": "^1.1.0",
"eventsource": "^2.0.2",
"fixpack": "^4.0.0",
"get-port": "^5.1.1",
"gulp": "^4.0.2",
Expand Down
2 changes: 2 additions & 0 deletions routes/api/v1/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ router.post('/run/:jobName', api.v1.control.run);
router.post('/restart', api.v1.control.restart);
router.post('/restart/:jobName', api.v1.control.restart);

router.get('/sse/:token', api.v1.sse.connect);

module.exports = router;
1 change: 1 addition & 0 deletions test/api/v1/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ test.before(async (t) => {
test('fails when no creds are presented', async (t) => {
const { api } = t.context;
const res = await api.get('/v1/test');

t.is(res.status, 401);
});

Expand Down
14 changes: 14 additions & 0 deletions test/api/v1/no-jwt.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const test = require('ava');

const utils = require('../../utils');

test.before(async (t) => {
await utils.setupApiServer(t, {}, { jwt: false });
});

test('works when no creds are presented', async (t) => {
const { api } = t.context;
const res = await api.get('/v1/test');

t.is(res.status, 200);
});
61 changes: 61 additions & 0 deletions test/api/v1/sse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
const path = require('path');
const { once } = require('events');
const test = require('ava');
const jwt = require('jsonwebtoken');

const config = require('../../../config');

const utils = require('../../utils');

const rootUrl = '/v1/sse';

test.before(async (t) => {
await utils.setupApiServer(t, {
jobs: [
{ name: 'done', path: path.join(utils.root, 'basic.js') },
{
name: 'delayed',
path: path.join(utils.root, 'basic.js'),
timeout: 100
},
{
name: 'waiting',
path: path.join(utils.root, 'basic.js'),
interval: 100
},
{
name: 'active',
path: path.join(utils.root, 'long.js')
}
]
});
t.context.token = jwt.sign({}, config.jwt.secret);

t.context.api = t.context.api.auth(t.context.token, { type: 'bearer' });

t.context.bree.start();
});

test('successfully connect to sse', async (t) => {
const es = utils.setupEventSource(t, rootUrl);

await once(es, 'open');

t.pass();
});

const eventsMacro = test.macro({
async exec(t, event) {
const es = utils.setupEventSource(t, rootUrl);

await once(es, event);

t.pass();
},
title(_, event) {
return `successfully listen to "${event}" messages`;
}
});

test(eventsMacro, 'worker created');
test(eventsMacro, 'worker deleted');
4 changes: 3 additions & 1 deletion test/plugin/options.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ test('can modify options', (t) => {

Bree.extend(plugin, {
port: 3000,
jwt: { secret: 'thisisasecret' }
jwt: { secret: 'thisisasecret' },
sse: { maxClients: 100 }
});

const bree = new Bree(baseConfig);

t.is(bree.api.config.port, 3000);
t.is(bree.api.config.jwt.secret, 'thisisasecret');
t.is(bree.api.config.sse.maxClients, 100);
});
13 changes: 11 additions & 2 deletions test/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
const path = require('path');
const request = require('supertest');
const getPort = require('get-port');
const EventSource = require('eventsource');

//
// setup utilities
//
exports.setupApiServer = async (t, config = {}) => {
exports.setupApiServer = async (t, config = {}, pluginConfig = {}) => {
// Must require here in order to load changes made during setup
const Bree = require('bree');
const { plugin } = require('../');

const port = await getPort();

Bree.extend(plugin, { port });
Bree.extend(plugin, { port, ...pluginConfig });

const bree = new Bree({ ...baseConfig, ...config });

Expand All @@ -34,3 +35,11 @@ const baseConfig = {
jobs: ['basic']
};
exports.baseConfig = baseConfig;

exports.setupEventSource = (t, endpoint) => {
const { token, bree } = t.context;

return new EventSource(
`http://${bree.api.config.serverHost}:${bree.api.config.port}${endpoint}/${token}`
);
};
24 changes: 24 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3508,6 +3508,11 @@ elliptic@^6.5.3:
minimalistic-assert "^1.0.1"
minimalistic-crypto-utils "^1.0.1"

emitter-component@^1.1.1:
version "1.1.1"
resolved "https://registry.npmjs.org/emitter-component/-/emitter-component-1.1.1.tgz#065e2dbed6959bf470679edabeaf7981d1003ab6"
integrity sha512-G+mpdiAySMuB7kesVRLuyvYRqDmshB7ReKEVuyBPkzQlmiDiLrt7hHHIy4Aff552bgknVN7B2/d3lzhGO5dvpQ==

emittery@^0.11.0:
version "0.11.0"
resolved "https://registry.npmjs.org/emittery/-/emittery-0.11.0.tgz#eb5f756a200d3431de2c6e850cb2d8afd97a03b9"
Expand Down Expand Up @@ -4341,6 +4346,11 @@ events@^3.0.0:
resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400"
integrity sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==

eventsource@^2.0.2:
version "2.0.2"
resolved "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz#76dfcc02930fb2ff339520b6d290da573a9e8508"
integrity sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==

evp_bytestokey@^1.0.0, evp_bytestokey@^1.0.3:
version "1.0.3"
resolved "https://registry.yarnpkg.com/evp_bytestokey/-/evp_bytestokey-1.0.3.tgz#7fcbdb198dc71959432efe13842684e0525acb02"
Expand Down Expand Up @@ -6851,6 +6861,13 @@ koa-passport@^4.1.4:
dependencies:
passport "^0.4.0"

koa-sse-stream@^0.2.0:
version "0.2.0"
resolved "https://registry.npmjs.org/koa-sse-stream/-/koa-sse-stream-0.2.0.tgz#1effd4e72b18c138e2a0710dfa7fdd9ea0dd26b5"
integrity sha512-Dyp3YS5HfnpE17Mk8HWl+6IMVgO/VU5rGXPKjgej+BrfdZZfRv2AXAm6osd380xcjhcXJgaHg23ntdLbpjCIDQ==
dependencies:
stream "0.0.2"

koa-unless@^1.0.7:
version "1.0.7"
resolved "https://registry.yarnpkg.com/koa-unless/-/koa-unless-1.0.7.tgz#b9df375e2b4da3043918d48622520c2c0b79f032"
Expand Down Expand Up @@ -11310,6 +11327,13 @@ stream-shift@^1.0.0:
resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.1.tgz#d7088281559ab2778424279b0877da3c392d5a3d"
integrity sha512-AiisoFqQ0vbGcZgQPY1cdP2I76glaVA/RauYR4G4thNFgkTqr90yXTo4LYX60Jl+sIlPNHHdGSwo01AvbKUSVQ==

stream@0.0.2:
version "0.0.2"
resolved "https://registry.npmjs.org/stream/-/stream-0.0.2.tgz#7f5363f057f6592c5595f00bc80a27f5cec1f0ef"
integrity sha1-f1Nj8Ff2WSxVlfALyAon9c7B8O8=
dependencies:
emitter-component "^1.1.1"

streaming-json-stringify@3:
version "3.1.0"
resolved "https://registry.yarnpkg.com/streaming-json-stringify/-/streaming-json-stringify-3.1.0.tgz#80200437a993cc39c4fe00263b7b3b903ac87af5"
Expand Down

0 comments on commit 3b11e32

Please sign in to comment.