-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathevent-dispatcher.ts
46 lines (38 loc) · 1.05 KB
/
event-dispatcher.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import {injectable} from 'inversify'
import {Kafka, logLevel, Message} from 'kafkajs'
import {Config} from '@libs/config'
export interface IEventDispatcher {
dispatch: (topic: string, messages: Message[]) => Promise<void>
disconnect: () => Promise<void>
isConnected: () => boolean
}
@injectable()
export class EventDispatcher implements IEventDispatcher {
private kafka = new Kafka({
brokers: [this.config.get('KAFKA_SEED_BROKER')],
logLevel: logLevel.WARN,
})
private producer = this.kafka.producer()
private connected = false
constructor(private readonly config: Config) {
this.connect()
}
async dispatch(topic: string, messages: Message[]) {
await this.connect()
await this.producer.send({topic, messages})
console.log('[dispatch]', topic, messages)
}
async disconnect() {
await this.producer.disconnect()
this.connected = false
}
isConnected() {
return this.connected
}
private async connect() {
if (!this.connected) {
await this.producer.connect()
this.connected = true
}
}
}