-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
62 lines (50 loc) · 1.67 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import figlet from 'figlet';
import { promisify } from 'util';;
import messageList from './src/data/index.js';
import { Message, MessageHeader, MessageBody } from './src/message/index.js';
import KafkaDataPipe from './src/pipes/kafka.js';
const APP_NAME = process.env.APP_NAME || 'ice_cream_pipeline';
const APP_VERSION = process.env.APP_VERSION || '0.0.1';
const GROUP_ID = process.env.KAFKA_GROUP_ID || 'sherbert_group';
const KAFKA_BOOTSTRAP_SERVER = process.env.KAFKA_BOOTSTRAP_SERVER;
const CLIENT_ID = process.env.KAFKA_CLIENT_ID;
const figletize = promisify(figlet);
const banner = await figletize(`${APP_NAME} v${APP_VERSION}`);
console.log(banner);
/********* MAIN **********/
const kafkaDP = new KafkaDataPipe({
BOOTSTRAP_SERVER: KAFKA_BOOTSTRAP_SERVER,
CLIENT_ID,
GROUP_ID
});
function until(timeout) {
return new Promise((resolve) => setTimeout(resolve, timeout));
}
const messageIterator = messageList.items[Symbol.iterator]();
(async function() {
let sequenceNo = 0;
await kafkaDP.open();
for (let item of messageIterator) {
console.info('Info: Processing item...', {
id: item.id,
name: item.name
});
await until(5000);
const myMessage = new Message(
new MessageHeader({
id: item.id,
eventType: 'create',
eventName: 'create.ice_cream'
}),
new MessageBody({
...item,
sequenceNo
})
);
kafkaDP.put({
topic: 'ingress',
message:JSON.stringify(myMessage.value())
});
sequenceNo++;
}
}())