-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.js
70 lines (64 loc) · 1.55 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import {
getDb,
countMessagesInQueue,
countMessagesInStore1,
countMessagesInStore2,
clearStore,
} from "./db.js";
async function messageProcessor(workerNumber, db) {
const message = await db.query({
text: `
DELETE FROM queue
WHERE id = (
SELECT id FROM queue
ORDER BY id ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *
`,
});
if (message.rows.length === 0) {
console.log("No messages in the queue, ending worker " + workerNumber);
return;
} else {
let store = "";
if (message.rows[0].header.topic === "topic1") {
store = "store1";
} else {
store = "store2";
}
db.query({
text: "INSERT INTO " + store + " (body) VALUES ($1)",
values: [message.rows[0].body],
});
await messageProcessor(workerNumber, db);
}
}
await countMessagesInQueue();
console.log("Starting to process messages using 5 workers...");
const dbConn1 = await getDb();
const dbConn2 = await getDb();
const dbConn3 = await getDb();
const dbConn4 = await getDb();
const dbConn5 = await getDb();
Promise.all([
messageProcessor(1, dbConn1),
messageProcessor(2, dbConn2),
messageProcessor(3, dbConn3),
messageProcessor(4, dbConn4),
messageProcessor(5, dbConn5),
])
.then(async () => {
console.log("Finished processing messages.");
await countMessagesInQueue();
await countMessagesInStore1();
await countMessagesInStore2();
})
.finally(() => {
dbConn1.end();
dbConn2.end();
dbConn3.end();
dbConn4.end();
dbConn5.end();
});