-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathkafka-pubsub.ts
158 lines (145 loc) · 3.98 KB
/
kafka-pubsub.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import { PubSubEngine } from "graphql-subscriptions";
import {
Consumer,
Kafka,
Producer,
ProducerConfig,
IHeaders,
KafkaMessage,
ConsumerConfig,
} from "kafkajs";
import { PubSubAsyncIterator } from "./pubsub-async-iterator";
interface KafkaPubSubInput {
kafka: Kafka;
topic: string;
groupIdPrefix: string;
producerConfig?: ProducerConfig;
consumerConfig?: Omit<ConsumerConfig, "groupId">;
}
export type MessageHandler = (msg: KafkaMessage) => any;
interface SubscriptionsMap {
[subId: number]: [string, MessageHandler];
}
export class KafkaPubSub implements PubSubEngine {
private client: Kafka;
private subscriptionMap: SubscriptionsMap;
private channelSubscriptions: { [channel: string]: number[] };
private producer: Producer;
private consumer: Consumer;
private topic: string;
public static async create({
kafka,
topic,
groupIdPrefix,
producerConfig = {},
consumerConfig = {},
}: KafkaPubSubInput): Promise<KafkaPubSub> {
const pubsub = new KafkaPubSub({
kafka,
topic,
groupIdPrefix,
producerConfig,
consumerConfig,
});
await pubsub.connectProducer();
await pubsub.runConsumer(pubsub.topic);
return pubsub;
}
private constructor({
kafka,
topic,
groupIdPrefix,
producerConfig,
consumerConfig,
}: KafkaPubSubInput) {
this.client = kafka;
this.subscriptionMap = {};
this.channelSubscriptions = {};
this.topic = topic;
this.producer = this.client.producer(producerConfig);
this.consumer = this.client.consumer({
...consumerConfig,
// we need all consumers listening to all messages
groupId: `${groupIdPrefix}-${Math.ceil(Math.random() * 9999)}`,
});
}
/**
*
* @param channel to use for internal routing, besides topic
* @param payload event to send
* @param key the key of the event
* @param headers optional kafkajs headers
* @param sendOptions optional kafkajs producer.send options
*/
public async publish(
channel: string,
payload: string | Buffer,
headers?: IHeaders,
sendOptions?: object,
key?: string | Buffer,
): Promise<void> {
await this.producer.send({
messages: [
{
value: payload,
key,
headers: {
...headers,
channel,
},
},
],
topic: this.topic,
...sendOptions,
});
}
public async subscribe(
channel: string,
onMessage: MessageHandler,
_?: any
): Promise<number> {
const index = Object.keys(this.subscriptionMap).length;
this.subscriptionMap[index] = [channel, onMessage];
this.channelSubscriptions[channel] = (
this.channelSubscriptions[channel] || []
).concat(index);
return index;
}
public unsubscribe(index: number) {
const [channel] = this.subscriptionMap[index];
this.channelSubscriptions[channel] = this.channelSubscriptions[
channel
].filter((subId) => subId !== index);
}
public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> {
return new PubSubAsyncIterator<T>(this, triggers);
}
private onMessage(channel: string, message: KafkaMessage) {
const subscriptions = this.channelSubscriptions[channel];
if (!subscriptions) {
return;
} // no subscribers, don't publish msg
for (const subId of subscriptions) {
const [cnl, listener] = this.subscriptionMap[subId];
listener(message);
}
}
private async connectProducer() {
await this.producer.connect();
}
private async runConsumer(topic: string) {
await this.consumer.connect();
await this.consumer.subscribe({ topic });
await this.consumer.run({
eachMessage: async ({ message }) => {
// Using channel abstraction
if (message.headers?.channel) {
this.onMessage(message.headers.channel as string, message);
} else {
// No channel abstraction, publish over the whole topic
this.onMessage(topic, message);
}
},
});
}
}