-
-
Notifications
You must be signed in to change notification settings - Fork 101
/
main.ts
225 lines (202 loc) · 6.53 KB
/
main.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import { Pool, PoolClient } from "pg";
import { inspect } from "util";
import {
TaskList,
Worker,
Job,
WorkerPool,
WorkerOptions,
WorkerPoolOptions,
} from "./interfaces";
import deferred from "./deferred";
import SIGNALS from "./signals";
import { makeNewWorker } from "./worker";
import {
makeWithPgClientFromPool,
makeWithPgClientFromClient,
} from "./helpers";
import { CONCURRENT_JOBS } from "./config";
import { defaultLogger, Logger } from "./logger";
const allWorkerPools: Array<WorkerPool> = [];
// Exported for testing only
export { allWorkerPools as _allWorkerPools };
let _registeredSignalHandlers = false;
let _shuttingDown = false;
function registerSignalHandlers(logger: Logger) {
if (_shuttingDown) {
throw new Error(
"System has already gone into shutdown, should not be spawning new workers now!"
);
}
if (_registeredSignalHandlers) {
return;
}
_registeredSignalHandlers = true;
SIGNALS.forEach(signal => {
logger.debug(`Registering signal handler for ${signal}`, {
registeringSignalHandler: signal,
});
const removeHandler = () => {
logger.debug(`Removing signal handler for ${signal}`, {
unregisteringSignalHandler: signal,
});
process.removeListener(signal, handler);
};
const handler = function() {
logger.error(`Received '${signal}'; attempting graceful shutdown...`);
setTimeout(removeHandler, 5000);
if (_shuttingDown) {
return;
}
_shuttingDown = true;
Promise.all(
allWorkerPools.map(pool =>
pool.gracefulShutdown(`Forced worker shutdown due to ${signal}`)
)
).finally(() => {
removeHandler();
logger.error(`Graceful shutdown attempted; killing self via ${signal}`);
process.kill(process.pid, signal);
});
};
process.on(signal, handler);
});
}
export function runTaskList(
tasks: TaskList,
pgPool: Pool,
options: WorkerPoolOptions = {}
): WorkerPool {
const { logger = defaultLogger } = options;
logger.debug(`Worker pool options are ${inspect(options)}`, { options });
const { concurrency = CONCURRENT_JOBS, ...workerOptions } = options;
// Clean up when certain signals occur
registerSignalHandlers(logger);
const promise = deferred();
const workers: Array<Worker> = [];
let listenForChangesClient: PoolClient | null = null;
const unlistenForChanges = async () => {
if (listenForChangesClient) {
const client = listenForChangesClient;
listenForChangesClient = null;
// Subscribe to jobs:insert message
try {
await client.query('UNLISTEN "jobs:insert"');
} catch (e) {
// Ignore
}
await client.release();
}
};
const listenForChanges = (
err: Error | undefined,
client: PoolClient,
release: () => void
) => {
if (err) {
logger.error(
`Error connecting with notify listener (trying again in 5 seconds): ${err.message}`,
{ error: err }
);
// Try again in 5 seconds
setTimeout(() => {
pgPool.connect(listenForChanges);
}, 5000);
return;
}
listenForChangesClient = client;
client.on("notification", () => {
if (listenForChangesClient === client) {
// Find a worker that's available
workers.some(worker => worker.nudge());
}
});
// Subscribe to jobs:insert message
client.query('LISTEN "jobs:insert"');
// On error, release this client and try again
client.on("error", (e: Error) => {
logger.error(`Error with database notify listener: ${e.message}`, {
error: e,
});
listenForChangesClient = null;
try {
release();
} catch (e) {
logger.error(`Error occurred releasing client: ${e.stack}`, {
error: e,
});
}
pgPool.connect(listenForChanges);
});
const supportedTaskNames = Object.keys(tasks);
logger.info(
`Worker connected and looking for jobs... (task names: '${supportedTaskNames.join(
"', '"
)}')`
);
};
// Create a client dedicated to listening for new jobs.
pgPool.connect(listenForChanges);
// This is a representation of us that can be interacted with externally
const workerPool = {
release: async () => {
unlistenForChanges();
promise.resolve();
await Promise.all(workers.map(worker => worker.release()));
const idx = allWorkerPools.indexOf(workerPool);
allWorkerPools.splice(idx, 1);
},
// Make sure we clean up after ourselves even if a signal is caught
async gracefulShutdown(message: string) {
try {
logger.debug(`Attempting graceful shutdown`);
// Release all our workers' jobs
const workerIds = workers.map(worker => worker.workerId);
const jobsInProgress: Array<Job> = workers
.map(worker => worker.getActiveJob())
.filter((job): job is Job => !!job);
// Remove all the workers - we're shutting them down manually
workers.splice(0, workers.length).map(worker => worker.release());
logger.debug(`Releasing the jobs '${workerIds.join(", ")}'`, {
workerIds,
});
const { rows: cancelledJobs } = await pgPool.query(
`
SELECT graphile_worker.fail_job(job_queues.locked_by, jobs.id, $2)
FROM graphile_worker.jobs
INNER JOIN graphile_worker.job_queues ON (job_queues.queue_name = jobs.queue_name)
WHERE job_queues.locked_by = ANY($1::text[]) AND jobs.id = ANY($3::int[]);
`,
[workerIds, message, jobsInProgress.map(job => job.id)]
);
logger.debug(`Cancelled ${cancelledJobs.length} jobs`, {
cancelledJobs,
});
logger.debug("Jobs released");
} catch (e) {
logger.error(`Error occurred during graceful shutdown: ${e.message}`, {
error: e,
});
}
// Remove ourself from the list of worker pools
this.release();
},
promise,
};
// Ensure that during a forced shutdown we get cleaned up too
allWorkerPools.push(workerPool);
// Spawn our workers; they can share clients from the pool.
const withPgClient = makeWithPgClientFromPool(pgPool);
for (let i = 0; i < concurrency; i++) {
workers.push(makeNewWorker(tasks, withPgClient, workerOptions));
}
// TODO: handle when a worker shuts down (spawn a new one)
return workerPool;
}
export const runTaskListOnce = (
tasks: TaskList,
client: PoolClient,
options: WorkerOptions = {}
) =>
makeNewWorker(tasks, makeWithPgClientFromClient(client), options, false)
.promise;