Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event table handling in initialize #3473

Open
wants to merge 2 commits into
base: release/v8.0.0-sigma
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ot-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class OTNode {
async initializeModules() {
const initializationPromises = [];
for (const moduleName in this.config.modules) {
if (!this.config.modules[moduleName].enabled) {
continue;
}

const moduleManagerName = `${moduleName}ModuleManager`;

const moduleManager = this.container.resolve(moduleManagerName);
Expand Down
20 changes: 18 additions & 2 deletions src/modules/base-module-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class BaseModuleManager {
this.moduleConfigValidation.validateModule(this.getName(), moduleConfig);

this.handlers = {};
this.initialized = true;
for (const implementationName in moduleConfig.implementation) {
if (!moduleConfig.implementation[implementationName].enabled) {
// eslint-disable-next-line no-continue
Expand All @@ -39,7 +40,23 @@ class BaseModuleManager {
const ModuleClass = (await import(implementationConfig.package)).default;
const module = new ModuleClass();
// eslint-disable-next-line no-await-in-loop
await module.initialize(implementationConfig.config, this.logger);
if (this.getName() === 'telemetry') {
try {
// eslint-disable-next-line no-await-in-loop
await module.initialize(implementationConfig.config, this.logger);
} catch (error) {
this.logger.error(
`Could not initialize the ${this.getName()} module. Error: ${
error.message
}`,
);
this.initialized = false;
break;
}
} else {
// eslint-disable-next-line no-await-in-loop
await module.initialize(implementationConfig.config, this.logger);
}
module.getImplementationName = () => implementationName;

this.logger.info(
Expand All @@ -50,7 +67,6 @@ class BaseModuleManager {
config: implementationConfig.config,
};
}
this.initialized = true;

return true;
} catch (error) {
Expand Down
52 changes: 52 additions & 0 deletions src/modules/telemetry/implementation/quest-telemetry.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,64 @@
import { Sender } from '@questdb/nodejs-client';
import axios from 'axios';

class QuestTelemetry {
async initialize(config, logger) {
this.config = config;
this.logger = logger;
this.localSender = Sender.fromConfig(this.config.localEndpoint);
await this.handleEventTable(this.config.localEndpoint);

if (this.config.sendToSignalingService) {
this.signalingServiceSender = Sender.fromConfig(this.config.signalingServiceEndpoint);
await this.handleEventTable(this.config.signalingServiceEndpoint);
}
}

async handleEventTable(endpoint) {
try {
const tables = await this.getTables(endpoint);

// Check if event table already exists
if (!tables.includes('event')) {
const createTableQuery = `
CREATE TABLE event (
operationId STRING,
blockchainId SYMBOL,
name STRING,
timestamp TIMESTAMP,
value1 STRING,
value2 STRING,
value3 STRING
) TIMESTAMP(timestamp) PARTITION BY DAY;
`;

const response = await axios.get(endpoint, {
params: { query: createTableQuery.trim() },
});

if (response?.ddl === 'OK') {
this.logger.info('Event table successfully created in QuestDB');
} else {
throw new Error(
`Could not create event table in QuestDB. Response: ${JSON.stringify(
response,
)}`,
);
}
}
} catch (error) {
throw new Error(
`Failed to handle event table for endpoint: ${endpoint}. Error: ${error.message}`,
);
}
}

async getTables(endpoint) {
try {
const response = await axios.get(endpoint, { params: { query: 'SHOW TABLES;' } });
return response?.dataset?.flat() || [];
} catch (error) {
throw new Error(`Failed to fetch all tables from QuestDB. Error: ${error.message}`);
}
}

Expand Down
28 changes: 15 additions & 13 deletions src/modules/telemetry/telemetry-module-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,23 @@ class TelemetryModuleManager extends BaseModuleManager {
async initialize() {
await super.initialize();

this.listenOnEvents((eventData) => {
this.sendTelemetryData(
eventData.operationId,
eventData.timestamp,
eventData.blockchainId,
eventData.lastEvent,
eventData.value1,
eventData.value2,
eventData.value3,
);
});
if (this.initialized) {
this.listenOnEvents((eventData) => {
this.sendTelemetryData(
eventData.operationId,
eventData.timestamp,
eventData.blockchainId,
eventData.lastEvent,
eventData.value1,
eventData.value2,
eventData.value3,
);
});
}
}

listenOnEvents(onEventReceived) {
if (this.config.modules.telemetry.enabled && this.initialized) {
if (this.initialized) {
return this.getImplementation().module.listenOnEvents(
this.eventEmitter,
onEventReceived,
Expand All @@ -36,7 +38,7 @@ class TelemetryModuleManager extends BaseModuleManager {
}

async sendTelemetryData(operationId, timestamp, blockchainId, name, value1, value2, value3) {
if (this.config.modules.telemetry.enabled && this.initialized) {
if (this.initialized) {
return this.getImplementation().module.sendTelemetryData(
operationId,
timestamp,
Expand Down
Loading