Skip to content

Commit

Permalink
log not stopped services (#7480)
Browse files Browse the repository at this point in the history
Co-authored-by: Joan Gallego Girona <daneryl@gmail.com>
  • Loading branch information
Joao-vi and daneryl authored Nov 27, 2024
1 parent 83648ef commit a12c96e
Showing 1 changed file with 45 additions and 28 deletions.
73 changes: 45 additions & 28 deletions app/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,60 +48,77 @@ DB.connect(config.DBHOST, dbAuth)

systemLogger.info('[Worker] - ==> 📡 starting external services...');

const services: any[] = [
ocrManager,
new ATServiceListener(),
new InformationExtraction(),
new ConvertToPdfWorker(),
new DistributedLoop('preserve_integration', async () => preserveSync.syncAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}),
new DistributedLoop('toc_service', async () => tocService.processAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}),
new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), {
const services: Record<string, any> = {
ocr_manager: ocrManager,
at_service: new ATServiceListener(),
information_extractor: new InformationExtraction(),
convert_pdf: new ConvertToPdfWorker(),
preserve_integration: new DistributedLoop(
'preserve_integration',
async () => preserveSync.syncAllTenants(),
{
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}
),
toc_service: new DistributedLoop(
'toc_service',
async () => tocService.processAllTenants(),
{
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}
),
sync_job: new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 1000,
}),
];

const segmentationConnector = new PDFSegmentation();
pdf_segmentation: new PDFSegmentation(),
twitter_integration: new TwitterIntegration(),
};

const segmentationRepeater = new DistributedLoop(
services.segmentation_distributed_loop = new DistributedLoop(
'segmentation_repeat',
segmentationConnector.segmentPdfs,
services.pdf_segmentation.segmentPdfs,
{ port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 5000 }
);
services.push(segmentationConnector, segmentationRepeater);

const twitterIntegration = new TwitterIntegration();
const twitterRepeater = new DistributedLoop(
services.twitter_distributed_loop = new DistributedLoop(
'twitter_repeat',
twitterIntegration.addTweetsRequestsToQueue,
services.twitter_integration.addTweetsRequestsToQueue,
{ port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 120000 }
);
services.push(twitterIntegration, twitterRepeater);

services.forEach(service => service.start());
Object.values(services).forEach(service => service.start());

process.on('SIGINT', async () => {
systemLogger.info(
'[Worker Graceful shutdown] - Received SIGINT, waiting for graceful stop...'
);

const stopPromises = Promise.all(services.map(async service => service.stop()));
const stoppedServices: string[] = [];

const stopPromises = Promise.all(
Object.entries(services).map(async ([name, service]) => {
await service.stop();
stoppedServices.push(name);
})
);
const firstToFinish = await Promise.race([stopPromises, sleep(10_000)]);

if (Array.isArray(firstToFinish)) {
systemLogger.info('[Worker Graceful shutdown] - Services stopped successfully!');
} else {
const notStoppedServices = Object.keys(services)
.filter(service => !stoppedServices.includes(service))
.join(', ');

systemLogger.info(
'[Worker Graceful shutdown] - Some services did not stop in time, initiating forceful shutdown...'
`[Worker Graceful shutdown] - These services [${notStoppedServices}] did not stop in time, initiating forceful shutdown...`
);
}

Expand Down

0 comments on commit a12c96e

Please sign in to comment.