Skip to content

Commit

Permalink
feat(email): enable graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
BastiDood committed Aug 3, 2024
1 parent 16f95be commit d31ba61
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 25 deletions.
31 changes: 23 additions & 8 deletions database/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ export class Database implements Loggable {
return this.#sql.begin('ISOLATION LEVEL SERIALIZABLE', sql => fn(new Database(sql, this.#logger)));
}

/** Closes the database connection. */
async end() {
await this.#sql.end();
}

@timed async generatePendingSession(hasExtendedScope: boolean) {
const sql = this.#sql;
const [first, ...rest] =
Expand Down Expand Up @@ -604,21 +609,31 @@ export class Database implements Loggable {
}
}

async *listen(channel: string): AsyncGenerator<string, void, boolean> {
async *listen(channel: string, signal: AbortSignal) {
let resolver = Promise.withResolvers<string>();

// eslint-disable-next-line func-style
const aborter = () => resolver.reject();
signal.addEventListener('abort', aborter);

const listener = await this.#sql.listen(
channel,
payload => resolver.resolve(payload),
() => resolver.resolve(''),
);

while (true) {
const payload = await resolver.promise;
// eslint-disable-next-line require-atomic-updates
resolver = Promise.withResolvers();
if (yield payload) break;
try {
while (true) {
const payload = await resolver.promise;
// eslint-disable-next-line require-atomic-updates
resolver = Promise.withResolvers();
yield payload;
}
} catch {
// NOTE: Intentionally empty. Abort signal invoked.
} finally {
signal.removeEventListener('abort', aborter);
await listener.unlisten();
}

await listener.unlisten();
}
}
4 changes: 4 additions & 0 deletions email/src/email.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ export class Emailer {
this.#clientSecret = secret;
}

get db() {
return this.#db;
}

/** Must be called within a transaction context for correctness. */
async #getLatestCredentials() {
const creds = await this.#db.getDesignatedSenderCredentials();
Expand Down
34 changes: 17 additions & 17 deletions email/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ const sql = postgres(POSTGRES_URL, { ssl: 'prefer', types: { bigint: postgres.Bi
const db = new Database(sql, logger);
const emailer = new Emailer(db, GOOGLE_OAUTH_CLIENT_ID, GOOGLE_OAUTH_CLIENT_SECRET);

async function listenForDraftNotifications(db: Database, emailer: Emailer) {
const stream = db.listen('notify:draft');
while (true) {
const result = await stream.next(true);
if (typeof result.done !== 'undefined' && result.done) break;
const controller = new AbortController();
process.on('SIGINT', () => controller.abort());

const email = await db.begin(async db => {
async function listenForDraftNotifications(emailer: Emailer, signal: AbortSignal) {
for await (const payload of emailer.db.listen('notify:draft', signal)) {
const email = await emailer.db.begin(async db => {
const notif = await db.getOneDraftNotification();
if (notif === null) return null;

Expand Down Expand Up @@ -55,19 +54,15 @@ async function listenForDraftNotifications(db: Database, emailer: Emailer) {
return email;
});

const logger = db.logger.child({ payload: result.value });
const logger = emailer.db.logger.child({ payload });
if (email === null) logger.warn({ email });
else logger.info({ email });
}
}

async function listenForUserNotifications(db: Database, emailer: Emailer) {
const stream = db.listen('notify:user');
while (true) {
const result = await stream.next(true);
if (typeof result.done !== 'undefined' && result.done) break;

const email = await db.begin(async db => {
async function listenForUserNotifications(emailer: Emailer, signal: AbortSignal) {
for await (const payload of emailer.db.listen('notify:user', signal)) {
const email = await emailer.db.begin(async db => {
const notif = await db.getOneUserNotification();
if (notif === null) return null;
const email = await emailer.send(
Expand All @@ -78,12 +73,17 @@ async function listenForUserNotifications(db: Database, emailer: Emailer) {
assert(await db.dropUserNotification(notif.notif_id), 'cannot drop non-existent notification');
return email;
});

const logger = db.logger.child({ payload: result.value });
const logger = emailer.db.logger.child({ payload });
if (email === null) logger.warn({ email });
else logger.info({ email });
}
}

// Main event loop of the email worker
await Promise.all([listenForDraftNotifications(db, emailer), listenForUserNotifications(db, emailer)]);
await Promise.all([
listenForDraftNotifications(emailer, controller.signal),
listenForUserNotifications(emailer, controller.signal),
]);
db.logger.warn('email workers shut down');
await db.end();
db.logger.warn('database connection shut down');

0 comments on commit d31ba61

Please sign in to comment.