From fcdb64e596cb9c5553a5c7b4ea81a83f14727346 Mon Sep 17 00:00:00 2001 From: Taylor Schley Date: Tue, 2 Aug 2022 11:23:00 -0500 Subject: [PATCH] feat(sse): add log listeners for SSE --- app/controllers/api/v1/sse.js | 24 ++++++++++++++++++++++++ index.js | 31 ++++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/app/controllers/api/v1/sse.js b/app/controllers/api/v1/sse.js index 72eaa30..536190f 100644 --- a/app/controllers/api/v1/sse.js +++ b/app/controllers/api/v1/sse.js @@ -1,3 +1,7 @@ +const { conns } = require('../../../../helpers/bree-hooks'); + +let uuid = 0; + async function connect(ctx) { if (ctx.sse) { // likely not the best way to do this @@ -21,6 +25,8 @@ async function connect(ctx) { data: isActive(ctx) }); + conns.push({ id: uuid, sse: ctx.sse }); + // send bree events over sse for (const event of ['worker created', 'worker deleted']) { ctx.bree.on(event, (name) => { @@ -28,11 +34,29 @@ async function connect(ctx) { }); } + ctx.bree.on('worker message', (data) => { + ctx.sse.send({ event: 'worker message', data: JSON.stringify(data) }); + }); + + ctx.bree.on('worker error', (data) => { + ctx.sse.send({ event: 'worker error', data: JSON.stringify(data) }); + }); + ctx.sse.on('close', () => { ctx.logger.error('SSE closed'); + // remove from conns array) + const idx = conns.findIndex((conn) => conn.id === uuid); + + if (idx > 0) { + conns.splice(idx, 1); + } + clearInterval(interval); }); + + // bump uuid + uuid++; } } diff --git a/index.js b/index.js index bc9c604..ae51724 100644 --- a/index.js +++ b/index.js @@ -18,7 +18,36 @@ function plugin(opts, Bree) { const oldInit = Bree.prototype.init; Bree.prototype.init = async function () { - await oldInit.bind(this)(); + // hook error handler and message handler + const oldErrorHandler = this.config.errorHandler; + const oldWorkerMessageHandler = this.config.workerMessageHandler; + + this.config.errorHandler = function (error, data) { + if (oldErrorHandler) { + oldErrorHandler.call(this, error, data); + } + + this.emit('worker error', { + error, + name: data?.name, + data: data ? JSON.stringify(data) : undefined + }); + }; + + this.config.errorHandler = this.config.errorHandler.bind(this); + + this.config.workerMessageHandler = function (data) { + if (oldWorkerMessageHandler) { + oldWorkerMessageHandler.call(this, data); + } + + this.emit('worker message', data); + }; + + this.config.workerMessageHandler = + this.config.workerMessageHandler.bind(this); + + await oldInit.call(this); // assign bree to the context api.app.context.bree = this;