-
Notifications
You must be signed in to change notification settings - Fork 2
/
sqs-loop.js
101 lines (87 loc) · 2.64 KB
/
sqs-loop.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
'use strict';
const aws = require('aws-sdk');
const bb = require('bluebird');
const STOP_STRING = 'PLXSTAHPKTHXBYE';
const KEEP_STRING = 'CHANGEFRIGHTENME';
function stop_loop() {
return bb.reject(STOP_STRING);
}
function keep() {
return bb.reject(KEEP_STRING);
}
function readMessage(params) {
const sqs = new aws.SQS();
return sqs.receiveMessage(params).promise();
}
function removeMessage(url, msg) {
const sqs = new aws.SQS();
var params = {
QueueUrl: url,
ReceiptHandle: msg.ReceiptHandle
};
return sqs.deleteMessage(params).promise();
}
function readMessageLoop(params, cb) {
function promiseLoop() {
let stop = false;
return readMessage(params)
.then(res => {
if (!res.Messages) {
return [];
}
const sent = res.Messages.map(msg => {
try {
return cb(msg).then(() => bb.resolve(msg)).reflect();
} catch(e) {
return bb.reject(e);
}
});
return bb.all(sent);
})
.then(responses => {
if (responses.length === 0) {
return;
}
const fulfiled = [];
responses.forEach(res => {
if (res.isFulfilled()) {
fulfiled.push(res.value());
return;
}
if (res.reason() === STOP_STRING) {
stop = true;
return;
}
if (res.reason() === KEEP_STRING) {
return;
}
});
return bb.all(fulfiled.map(res => removeMessage(params.QueueUrl, res)));
})
.then(() => {
if (stop) {
return;
}
return promiseLoop();
});
}
return promiseLoop();
}
function loop(params, cb) {
const sqs = new aws.SQS();
let promise = bb.resolve(params.QueueUrl);
if (!params.QueueUrl) {
promise = promise
.then(() => sqs.createQueue({ QueueName: params.QueueName }).promise())
.then(res => res.QueueUrl);
}
const _params = JSON.parse(JSON.stringify(params));
delete _params.QueueName;
return promise
.then(url => {
_params.QueueUrl = url;
return _params;
})
.then(params => readMessageLoop(params, cb));
}
module.exports = { aws, loop, stop: stop_loop, keep };