-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmock.ts
144 lines (127 loc) · 4.71 KB
/
mock.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import { join } from 'path';
import { Worker, MessagePort } from 'worker_threads';
import { CollectorModuleInterface } from '../collector.controller';
import {
DEFAULT_GETTER_RETRY_INTERVAL,
DEFAULT_GETTER_PROCESSING_INTERVAL,
DEFAULT_GETTER_MAX_BLOCKS,
} from 'src/getter/getter.service';
import { LoggerService, STATUS_LOG_INTERVAL } from 'src/logger/logger.service';
import { ConfigService } from 'src/config/config.service';
import { ChainConfig } from 'src/config/config.types';
import { LoggerOptions } from 'pino';
import { MonitorService } from 'src/monitor/monitor.service';
interface GlobalMockConfig {
retryInterval: number;
processingInterval: number;
maxBlocks: number | null;
privateKey: string;
}
export interface MockWorkerData {
chainId: string;
rpc: string;
resolver: string | null;
startingBlock?: number;
stoppingBlock?: number;
retryInterval: number;
processingInterval: number;
maxBlocks: number | null;
incentivesAddress: string;
privateKey: string;
monitorPort: MessagePort;
loggerOptions: LoggerOptions;
}
function loadGlobalMockConfig(configService: ConfigService): GlobalMockConfig {
const mockConfig = configService.ambsConfig.get('mock');
if (mockConfig == undefined) {
throw Error(`Failed to load Mock module: 'mock' configuration not found.`);
}
const getterConfig = configService.globalConfig.getter;
const retryInterval = getterConfig.retryInterval ?? DEFAULT_GETTER_RETRY_INTERVAL;
const processingInterval = getterConfig.processingInterval ?? DEFAULT_GETTER_PROCESSING_INTERVAL;
const maxBlocks = getterConfig.maxBlocks ?? DEFAULT_GETTER_MAX_BLOCKS;
const privateKey = mockConfig.globalProperties['privateKey'];
if (privateKey == undefined) {
throw Error(`Failed to load Mock module: 'privateKey' missing`);
}
return {
retryInterval,
processingInterval,
maxBlocks,
privateKey,
};
}
async function loadWorkerData(
configService: ConfigService,
monitorService: MonitorService,
loggerService: LoggerService,
chainConfig: ChainConfig,
globalConfig: GlobalMockConfig,
): Promise<MockWorkerData> {
const chainId = chainConfig.chainId;
const rpc = chainConfig.rpc;
//TODO implement if 'undefined' (see Wormhole/Polymer implementations)
const incentivesAddress = configService.getAMBConfig(
'mock',
'incentivesAddress',
chainId,
) as string;
return {
chainId,
rpc,
resolver: chainConfig.resolver,
startingBlock: chainConfig.startingBlock,
stoppingBlock: chainConfig.stoppingBlock,
retryInterval: chainConfig.getter.retryInterval ?? globalConfig.retryInterval,
processingInterval: chainConfig.getter.processingInterval ?? globalConfig.processingInterval,
maxBlocks: chainConfig.getter.maxBlocks ?? globalConfig.maxBlocks,
incentivesAddress,
privateKey: globalConfig.privateKey,
monitorPort: await monitorService.attachToMonitor(chainId),
loggerOptions: loggerService.loggerOptions,
};
}
export default async (moduleInterface: CollectorModuleInterface) => {
const { configService, monitorService, loggerService } = moduleInterface;
const globalMockConfig = loadGlobalMockConfig(configService);
const workers: Record<string, Worker | null> = {};
for (const [chainId, chainConfig] of configService.chainsConfig) {
const workerData = await loadWorkerData(
configService,
monitorService,
loggerService,
chainConfig,
globalMockConfig,
);
const worker = new Worker(join(__dirname, 'mock.worker.js'), {
workerData,
transferList: [workerData.monitorPort]
});
workers[workerData.chainId] = worker;
worker.on('error', (error) =>
loggerService.fatal(error, 'Error on mock collector service worker.'),
);
worker.on('exit', (exitCode) => {
workers[chainId] = null;
loggerService.info(
{ exitCode, chainId },
`Mock collector service worker exited.`,
);
});
};
// Initiate status log interval
const logStatus = () => {
const activeWorkers = [];
const inactiveWorkers = [];
for (const chainId of Object.keys(workers)) {
if (workers[chainId] != null) activeWorkers.push(chainId);
else inactiveWorkers.push(chainId);
}
const status = {
activeWorkers,
inactiveWorkers,
};
loggerService.info(status, 'Mock collector workers status.');
};
setInterval(logStatus, STATUS_LOG_INTERVAL);
};