-
Notifications
You must be signed in to change notification settings - Fork 62
/
Copy pathmetricCollector.ts
137 lines (117 loc) · 4.13 KB
/
metricCollector.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import bull from 'bull';
import * as Logger from 'bunyan';
import { EventEmitter } from 'events';
import IoRedis from 'ioredis';
import { register as globalRegister, Registry } from 'prom-client';
import { logger as globalLogger } from './logger';
import { getJobCompleteStats, getStats, makeGuages, QueueGauges } from './queueGauges';
export interface MetricCollectorOptions extends Omit<bull.QueueOptions, 'redis'> {
metricPrefix: string;
redis: string;
autoDiscover: boolean;
logger: Logger;
}
export interface QueueData<T = unknown> {
queue: bull.Queue<T>;
name: string;
prefix: string;
}
export class MetricCollector {
private readonly logger: Logger;
private readonly defaultRedisClient: IoRedis.Redis;
private readonly redisUri: string;
private readonly bullOpts: Omit<bull.QueueOptions, 'redis'>;
private readonly queuesByName: Map<string, QueueData<unknown>> = new Map();
private get queues(): QueueData<unknown>[] {
return [...this.queuesByName.values()];
}
private readonly myListeners: Set<(id: string) => Promise<void>> = new Set();
private readonly guages: QueueGauges;
constructor(
queueNames: string[],
opts: MetricCollectorOptions,
registers: Registry[] = [globalRegister],
) {
const { logger, autoDiscover, redis, metricPrefix, ...bullOpts } = opts;
this.redisUri = redis;
this.defaultRedisClient = new IoRedis(this.redisUri);
this.defaultRedisClient.setMaxListeners(32);
this.bullOpts = bullOpts;
this.logger = logger || globalLogger;
this.addToQueueSet(queueNames);
this.guages = makeGuages(metricPrefix, registers);
}
private createClient(_type: 'client' | 'subscriber' | 'bclient', redisOpts?: IoRedis.RedisOptions): IoRedis.Redis {
if (_type === 'client') {
return this.defaultRedisClient!;
}
return new IoRedis(this.redisUri, redisOpts);
}
private addToQueueSet(names: string[]): void {
for (const name of names) {
if (this.queuesByName.has(name)) {
continue;
}
this.logger.info('added queue', name);
this.queuesByName.set(name, {
name,
queue: new bull(name, {
...this.bullOpts,
createClient: this.createClient.bind(this),
}),
prefix: this.bullOpts.prefix || 'bull',
});
}
}
public async discoverAll(): Promise<void> {
const keyPattern = new RegExp(`^${this.bullOpts.prefix}:([^:]+):(id|failed|active|waiting|stalled-check)$`);
this.logger.info({ pattern: keyPattern.source }, 'running queue discovery');
const keyStream = this.defaultRedisClient.scanStream({
match: `${this.bullOpts.prefix}:*:*`,
});
// tslint:disable-next-line:await-promise tslint does not like Readable's here
for await (const keyChunk of keyStream) {
for (const key of keyChunk) {
const match = keyPattern.exec(key);
if (match && match[1]) {
this.addToQueueSet([match[1]]);
}
}
}
}
private async onJobComplete(queue: QueueData, id: string): Promise<void> {
try {
const job = await queue.queue.getJob(id);
if (!job) {
this.logger.warn({ job: id }, 'unable to find job from id');
return;
}
await getJobCompleteStats(queue.prefix, queue.name, job, this.guages);
} catch (err) {
this.logger.error({ err, job: id }, 'unable to fetch completed job');
}
}
public collectJobCompletions(): void {
for (const q of this.queues) {
const cb = this.onJobComplete.bind(this, q);
this.myListeners.add(cb);
q.queue.on('global:completed', cb);
}
}
public async updateAll(): Promise<void> {
const updatePromises = this.queues.map(q => getStats(q.prefix, q.name, q.queue, this.guages));
await Promise.all(updatePromises);
}
public async ping(): Promise<void> {
await this.defaultRedisClient.ping();
}
public async close(): Promise<void> {
this.defaultRedisClient.disconnect();
for (const q of this.queues) {
for (const l of this.myListeners) {
(q.queue as any as EventEmitter).removeListener('global:completed', l);
}
}
await Promise.all(this.queues.map(q => q.queue.close()));
}
}