-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathevent-listener.ts
69 lines (60 loc) · 2.06 KB
/
event-listener.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import {injectable} from 'inversify'
import {Consumer, Kafka, logLevel} from 'kafkajs'
import {Observable, Observer} from 'rxjs'
import {Config} from '@libs/config'
export interface IEventListener {
consume: (
groupId: string,
topic: string | RegExp,
fromBeginning: boolean,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
) => Promise<Observable<any>>
disconnectAll: () => Promise<void>
isConnected: () => boolean
}
@injectable()
export class EventListener implements IEventListener {
private kafka = new Kafka({
brokers: [this.config.get('KAFKA_SEED_BROKER')],
logLevel: logLevel.WARN,
})
private consumers: Consumer[] = []
private consumerConnections: boolean[] = []
constructor(private readonly config: Config) {}
async consume(groupId: string, topic: string | RegExp, fromBeginning = false) {
const consumer = this.kafka.consumer({groupId})
this.consumers.push(consumer)
this.consumerConnections.push(false)
const index = this.consumerConnections.length - 1
consumer.on('consumer.disconnect', () => {
this.consumerConnections[index] = false
})
await consumer.connect()
console.log('connected to consumer for topic', topic)
this.consumerConnections[index] = true
await consumer.subscribe({topic, fromBeginning})
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return new Observable((observer: Observer<any>) => {
consumer.run({
eachMessage: async message => {
if (!message.message.value) return
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const event = JSON.parse(message.message.value.toString()) as any
console.log('[consume]:', topic, event)
observer.next(event)
},
})
})
}
async disconnectAll() {
await Promise.all(this.consumers.map(c => c.disconnect()))
this.consumers = []
this.consumerConnections = []
}
isConnected() {
for (const connected of this.consumerConnections) {
if (!connected) return false
}
return true
}
}