diff --git a/database/src/database.ts b/database/src/database.ts index e0dee5d..933ddc7 100644 --- a/database/src/database.ts +++ b/database/src/database.ts @@ -109,6 +109,11 @@ export class Database implements Loggable { return this.#sql.begin('ISOLATION LEVEL SERIALIZABLE', sql => fn(new Database(sql, this.#logger))); } + /** Closes the database connection. */ + async end() { + await this.#sql.end(); + } + @timed async generatePendingSession(hasExtendedScope: boolean) { const sql = this.#sql; const [first, ...rest] = @@ -604,21 +609,31 @@ export class Database implements Loggable { } } - async *listen(channel: string): AsyncGenerator { + async *listen(channel: string, signal: AbortSignal) { let resolver = Promise.withResolvers(); + + // eslint-disable-next-line func-style + const aborter = () => resolver.reject(); + signal.addEventListener('abort', aborter); + const listener = await this.#sql.listen( channel, payload => resolver.resolve(payload), () => resolver.resolve(''), ); - while (true) { - const payload = await resolver.promise; - // eslint-disable-next-line require-atomic-updates - resolver = Promise.withResolvers(); - if (yield payload) break; + try { + while (true) { + const payload = await resolver.promise; + // eslint-disable-next-line require-atomic-updates + resolver = Promise.withResolvers(); + yield payload; + } + } catch { + // NOTE: Intentionally empty. Abort signal invoked. + } finally { + signal.removeEventListener('abort', aborter); + await listener.unlisten(); } - - await listener.unlisten(); } } diff --git a/email/src/email.ts b/email/src/email.ts index 24a2fc2..7527f14 100644 --- a/email/src/email.ts +++ b/email/src/email.ts @@ -22,6 +22,10 @@ export class Emailer { this.#clientSecret = secret; } + get db() { + return this.#db; + } + /** Must be called within a transaction context for correctness. */ async #getLatestCredentials() { const creds = await this.#db.getDesignatedSenderCredentials(); diff --git a/email/src/main.ts b/email/src/main.ts index bd0a400..2e5f1e0 100644 --- a/email/src/main.ts +++ b/email/src/main.ts @@ -14,13 +14,12 @@ const sql = postgres(POSTGRES_URL, { ssl: 'prefer', types: { bigint: postgres.Bi const db = new Database(sql, logger); const emailer = new Emailer(db, GOOGLE_OAUTH_CLIENT_ID, GOOGLE_OAUTH_CLIENT_SECRET); -async function listenForDraftNotifications(db: Database, emailer: Emailer) { - const stream = db.listen('notify:draft'); - while (true) { - const result = await stream.next(true); - if (typeof result.done !== 'undefined' && result.done) break; +const controller = new AbortController(); +process.on('SIGINT', () => controller.abort()); - const email = await db.begin(async db => { +async function listenForDraftNotifications(emailer: Emailer, signal: AbortSignal) { + for await (const payload of emailer.db.listen('notify:draft', signal)) { + const email = await emailer.db.begin(async db => { const notif = await db.getOneDraftNotification(); if (notif === null) return null; @@ -55,19 +54,15 @@ async function listenForDraftNotifications(db: Database, emailer: Emailer) { return email; }); - const logger = db.logger.child({ payload: result.value }); + const logger = emailer.db.logger.child({ payload }); if (email === null) logger.warn({ email }); else logger.info({ email }); } } -async function listenForUserNotifications(db: Database, emailer: Emailer) { - const stream = db.listen('notify:user'); - while (true) { - const result = await stream.next(true); - if (typeof result.done !== 'undefined' && result.done) break; - - const email = await db.begin(async db => { +async function listenForUserNotifications(emailer: Emailer, signal: AbortSignal) { + for await (const payload of emailer.db.listen('notify:user', signal)) { + const email = await emailer.db.begin(async db => { const notif = await db.getOneUserNotification(); if (notif === null) return null; const email = await emailer.send( @@ -78,12 +73,17 @@ async function listenForUserNotifications(db: Database, emailer: Emailer) { assert(await db.dropUserNotification(notif.notif_id), 'cannot drop non-existent notification'); return email; }); - - const logger = db.logger.child({ payload: result.value }); + const logger = emailer.db.logger.child({ payload }); if (email === null) logger.warn({ email }); else logger.info({ email }); } } // Main event loop of the email worker -await Promise.all([listenForDraftNotifications(db, emailer), listenForUserNotifications(db, emailer)]); +await Promise.all([ + listenForDraftNotifications(emailer, controller.signal), + listenForUserNotifications(emailer, controller.signal), +]); +db.logger.warn('email workers shut down'); +await db.end(); +db.logger.warn('database connection shut down');