diff --git a/ot-node.js b/ot-node.js index bb05cad1b..afd0c8def 100644 --- a/ot-node.js +++ b/ot-node.js @@ -112,6 +112,10 @@ class OTNode { async initializeModules() { const initializationPromises = []; for (const moduleName in this.config.modules) { + if (!this.config.modules[moduleName].enabled) { + continue; + } + const moduleManagerName = `${moduleName}ModuleManager`; const moduleManager = this.container.resolve(moduleManagerName); diff --git a/src/modules/base-module-manager.js b/src/modules/base-module-manager.js index 2bdc901d6..a028bec7c 100644 --- a/src/modules/base-module-manager.js +++ b/src/modules/base-module-manager.js @@ -15,6 +15,7 @@ class BaseModuleManager { this.moduleConfigValidation.validateModule(this.getName(), moduleConfig); this.handlers = {}; + this.initialized = true; for (const implementationName in moduleConfig.implementation) { if (!moduleConfig.implementation[implementationName].enabled) { // eslint-disable-next-line no-continue @@ -39,7 +40,23 @@ class BaseModuleManager { const ModuleClass = (await import(implementationConfig.package)).default; const module = new ModuleClass(); // eslint-disable-next-line no-await-in-loop - await module.initialize(implementationConfig.config, this.logger); + if (this.getName() === 'telemetry') { + try { + // eslint-disable-next-line no-await-in-loop + await module.initialize(implementationConfig.config, this.logger); + } catch (error) { + this.logger.error( + `Could not initialize the ${this.getName()} module. Error: ${ + error.message + }`, + ); + this.initialized = false; + break; + } + } else { + // eslint-disable-next-line no-await-in-loop + await module.initialize(implementationConfig.config, this.logger); + } module.getImplementationName = () => implementationName; this.logger.info( @@ -50,7 +67,6 @@ class BaseModuleManager { config: implementationConfig.config, }; } - this.initialized = true; return true; } catch (error) { diff --git a/src/modules/telemetry/implementation/quest-telemetry.js b/src/modules/telemetry/implementation/quest-telemetry.js index 13ae80192..3d08f2aed 100644 --- a/src/modules/telemetry/implementation/quest-telemetry.js +++ b/src/modules/telemetry/implementation/quest-telemetry.js @@ -1,12 +1,64 @@ import { Sender } from '@questdb/nodejs-client'; +import axios from 'axios'; class QuestTelemetry { async initialize(config, logger) { this.config = config; this.logger = logger; this.localSender = Sender.fromConfig(this.config.localEndpoint); + await this.handleEventTable(this.config.localEndpoint); + if (this.config.sendToSignalingService) { this.signalingServiceSender = Sender.fromConfig(this.config.signalingServiceEndpoint); + await this.handleEventTable(this.config.signalingServiceEndpoint); + } + } + + async handleEventTable(endpoint) { + try { + const tables = await this.getTables(endpoint); + + // Check if event table already exists + if (!tables.includes('event')) { + const createTableQuery = ` + CREATE TABLE event ( + operationId STRING, + blockchainId SYMBOL, + name STRING, + timestamp TIMESTAMP, + value1 STRING, + value2 STRING, + value3 STRING + ) TIMESTAMP(timestamp) PARTITION BY DAY; + `; + + const response = await axios.get(endpoint, { + params: { query: createTableQuery.trim() }, + }); + + if (response?.ddl === 'OK') { + this.logger.info('Event table successfully created in QuestDB'); + } else { + throw new Error( + `Could not create event table in QuestDB. Response: ${JSON.stringify( + response, + )}`, + ); + } + } + } catch (error) { + throw new Error( + `Failed to handle event table for endpoint: ${endpoint}. Error: ${error.message}`, + ); + } + } + + async getTables(endpoint) { + try { + const response = await axios.get(endpoint, { params: { query: 'SHOW TABLES;' } }); + return response?.dataset?.flat() || []; + } catch (error) { + throw new Error(`Failed to fetch all tables from QuestDB. Error: ${error.message}`); } } diff --git a/src/modules/telemetry/telemetry-module-manager.js b/src/modules/telemetry/telemetry-module-manager.js index 7c80fd415..e9af223bb 100644 --- a/src/modules/telemetry/telemetry-module-manager.js +++ b/src/modules/telemetry/telemetry-module-manager.js @@ -13,21 +13,23 @@ class TelemetryModuleManager extends BaseModuleManager { async initialize() { await super.initialize(); - this.listenOnEvents((eventData) => { - this.sendTelemetryData( - eventData.operationId, - eventData.timestamp, - eventData.blockchainId, - eventData.lastEvent, - eventData.value1, - eventData.value2, - eventData.value3, - ); - }); + if (this.initialized) { + this.listenOnEvents((eventData) => { + this.sendTelemetryData( + eventData.operationId, + eventData.timestamp, + eventData.blockchainId, + eventData.lastEvent, + eventData.value1, + eventData.value2, + eventData.value3, + ); + }); + } } listenOnEvents(onEventReceived) { - if (this.config.modules.telemetry.enabled && this.initialized) { + if (this.initialized) { return this.getImplementation().module.listenOnEvents( this.eventEmitter, onEventReceived, @@ -36,7 +38,7 @@ class TelemetryModuleManager extends BaseModuleManager { } async sendTelemetryData(operationId, timestamp, blockchainId, name, value1, value2, value3) { - if (this.config.modules.telemetry.enabled && this.initialized) { + if (this.initialized) { return this.getImplementation().module.sendTelemetryData( operationId, timestamp,