Skip to content

Commit

Permalink
Added sync commands for paranets
Browse files Browse the repository at this point in the history
  • Loading branch information
djordjekovac committed Apr 16, 2024
1 parent 44a6720 commit f91a685
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 1 deletion.
15 changes: 15 additions & 0 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@
"commandExecutorVerboseLoggingEnabled": false,
"appDataPath": "data",
"logLevel": "info",
"assetSync": {
"syncParanets": []
},
"auth": {
"ipBasedAuthEnabled": true,
"tokenBasedAuthEnabled": false,
Expand Down Expand Up @@ -309,6 +312,9 @@
"commandExecutorVerboseLoggingEnabled": false,
"appDataPath": "data",
"logLevel": "trace",
"assetSync": {
"syncParanets": []
},
"auth": {
"ipBasedAuthEnabled": true,
"tokenBasedAuthEnabled": false,
Expand Down Expand Up @@ -475,6 +481,9 @@
"commandExecutorVerboseLoggingEnabled": false,
"appDataPath": "data",
"logLevel": "trace",
"assetSync": {
"syncParanets": []
},
"auth": {
"ipBasedAuthEnabled": true,
"tokenBasedAuthEnabled": false,
Expand Down Expand Up @@ -641,6 +650,9 @@
"commandExecutorVerboseLoggingEnabled": false,
"appDataPath": "data",
"logLevel": "trace",
"assetSync": {
"syncParanets": []
},
"auth": {
"ipBasedAuthEnabled": true,
"tokenBasedAuthEnabled": false,
Expand Down Expand Up @@ -807,6 +819,9 @@
"commandExecutorVerboseLoggingEnabled": false,
"appDataPath": "data",
"logLevel": "trace",
"assetSync": {
"syncParanets": []
},
"auth": {
"ipBasedAuthEnabled": true,
"tokenBasedAuthEnabled": false,
Expand Down
28 changes: 28 additions & 0 deletions ot-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,34 @@ class OTNode {
await autoUpdaterCommand.execute();
}

initializeParanets() {
const paranetIdService = this.container.resolve('paranetIdService');
const blockchainModuleManager = this.container.resolve('blockchainModuleManager');
const tripleStoreService = this.container.resolve('tripleStoreService');
const tripleStoreModuleManager = this.container.resolve('tripleStoreModuleManager');
const invalidParanets = [];
const validParanets = [];
this.config.assetSync?.syncParanets.forEach((paranetId) => {
if (!paranetIdService.isUAL(paranetId)) {
invalidParanets.push(paranetId);
this.logger.warn('');
} else {
const { blockchainId } = paranetIdService.resolveUAL(paranetId);
if (!blockchainModuleManager.getImplementationNames().includes(blockchainId)) {
invalidParanets.push(paranetId);
this.logger.warn('');
} else {
validParanets.push(paranetId);
const repository = paranetIdService.getParanetRepositoryName(paranetId);
tripleStoreModuleManager.initializeRepository(repository);
}
}
});
this.config.assetSync.syncParanets = validParanets;

tripleStoreService.initializeRepositories();
}

stop(code = 0) {
this.logger.info('Stopping node...');
process.exit(code);
Expand Down
206 changes: 206 additions & 0 deletions src/commands/paranet/paranet-sync-command.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import Command from '../command.js';
import { ERROR_TYPE, PARANET_SYNC_FREQUENCY_MILLS } from '../../constants/constants.js';

class StartParanetSyncCommands extends Command {
constructor(ctx) {
super(ctx);
this.commandExecutor = ctx.commandExecutor;
this.blockchainModuleManager = ctx.blockchainModuleManager;
this.tripleStoreService = ctx.tripleStoreService;

this.errorType = ERROR_TYPE.PARANET.PARANET_SYNC_ERROR;
}

async execute(command) {
const { operationId, paranetId } = command.data;

this.logger.info(
`Paranet sync: Starting paranet sync command for ${paranetId} with operation id: ${operationId}`,
);

// get missed token ids for paranet

// schedule get commands for each asset

// store in paranet repository

return Command.repeat();
}

// async syncAsset(tokenId, blockchain, contract) {
// const assertionIds = await this.blockchainModuleManager.getLatestAssertionId(
// blockchain,
// contract,
// tokenId,
// );
//
// // eslint-disable-next-line for-direction
// for (let stateIndex = assertionIds.length - 1; stateIndex < 0; stateIndex -= 1) {
// try {
// // if (
// // await this.repositoryModuleManager.isStateSynced(
// // blockchain,
// // contract,
// // tokenId,
// // stateIndex,
// // )
// // ) {
// // this.logger.trace(
// // `ASSET_SYNC: StateIndex: ${stateIndex} for tokenId: ${tokenId} already synced blockchain: ${blockchain}`,
// // );
// // await this.repositoryModuleManager.updateAssetSyncRecord(
// // blockchain,
// // contract,
// // tokenId,
// // stateIndex,
// // ASSET_SYNC_PARAMETERS.STATUS.COMPLETED,
// // true,
// // );
// // continue;
// // }
//
// const statePresentInParanetRepository = await this.tripleStoreService.assetExists(
// paranet,
// tokenId,
// stateIndex,
// assertionIds,
// );
//
// if (await this.isStatePresentInRepository(tokenId, stateIndex, assertionIds)) {
// this.logger.trace(
// `ASSET_SYNC: StateIndex: ${stateIndex} for tokenId: ${tokenId} found in triple store blockchain: ${blockchain}`,
// );
// await this.repositoryModuleManager.createAssetSyncRecord(
// blockchain,
// contract,
// tokenId,
// stateIndex,
// ASSET_SYNC_PARAMETERS.STATUS.COMPLETED,
// true,
// );
// continue;
// }
//
// const ual = this.ualService.deriveUAL(blockchain, contract, tokenId);
// this.logger.debug(
// `ASSET_SYNC: Fetching state index: ${stateIndex + 1} of ${
// assertionIds.length
// } for asset with ual: ${ual}. blockchain: ${blockchain}`,
// );
// const assertionId = assertionIds[stateIndex];
//
// const operationId = await this.operationIdService.generateOperationId(
// OPERATION_ID_STATUS.GET.GET_START,
// );
//
// await Promise.all([
// this.operationIdService.updateOperationIdStatus(
// operationId,
// blockchain,
// OPERATION_ID_STATUS.GET.GET_INIT_START,
// ),
//
// this.repositoryModuleManager.createAssetSyncRecord(
// blockchain,
// contract,
// tokenId,
// stateIndex,
// ASSET_SYNC_PARAMETERS.STATUS.IN_PROGRESS,
// ),
//
// this.repositoryModuleManager.createOperationRecord(
// this.getService.getOperationName(),
// operationId,
// OPERATION_STATUS.IN_PROGRESS,
// ),
// ]);
//
// const hashFunctionId = CONTENT_ASSET_HASH_FUNCTION_ID;
//
// this.logger.debug(
// `ASSET_SYNC: Get for ${ual} with operation id ${operationId} initiated. blockchain: ${blockchain}`,
// );
//
// await this.commandExecutor.add({
// name: 'networkGetCommand',
// sequence: [],
// delay: 0,
// data: {
// operationId,
// id: ual,
// blockchain,
// contract,
// tokenId,
// state: assertionId,
// hashFunctionId,
// assertionId,
// assetSync: true,
// stateIndex,
// assetSyncInsertedByCommand: true,
// },
// transactional: false,
// });
//
// await this.operationIdService.updateOperationIdStatus(
// operationId,
// blockchain,
// OPERATION_ID_STATUS.GET.GET_INIT_END,
// );
//
// let attempt = 0;
// let getResult;
// do {
// await setTimeout(ASSET_SYNC_PARAMETERS.GET_RESULT_POLLING_INTERVAL_MILLIS);
//
// getResult = await this.operationIdService.getOperationIdRecord(operationId);
// attempt += 1;
// } while (
// attempt < ASSET_SYNC_PARAMETERS.GET_RESULT_POLLING_MAX_ATTEMPTS &&
// getResult?.status !== OPERATION_ID_STATUS.FAILED &&
// getResult?.status !== OPERATION_ID_STATUS.COMPLETED
// );
// } catch (error) {
// this.logger.warn(
// `ASSET_SYNC: Unable to sync tokenId: ${tokenId}, for contract: ${contract} state index: ${stateIndex} blockchain: ${blockchain}, error: ${error}`,
// );
// await this.repositoryModuleManager.updateAssetSyncRecord(
// blockchain,
// contract,
// tokenId,
// stateIndex,
// ASSET_SYNC_PARAMETERS.STATUS.FAILED,
// true,
// );
// }
// }
// }

/**
* Recover system from failure
* @param command
* @param error
*/
async recover(command) {
this.logger.warn(`Failed to execute ${command.name}. Error: ${command.message}`);

return Command.repeat();
}

/**
* Builds default paranetSyncCommands
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
default(map) {
const command = {
name: 'paranetSyncCommands',
data: {},
transactional: false,
period: PARANET_SYNC_FREQUENCY_MILLS,
};
Object.assign(command, map);
return command;
}
}

export default StartParanetSyncCommands;
69 changes: 69 additions & 0 deletions src/commands/paranet/start-paranet-sync-commands.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import Command from '../command.js';
import { ERROR_TYPE, PARANET_SYNC_FREQUENCY_MILLS } from '../../constants/constants.js';

class StartParanetSyncCommands extends Command {
constructor(ctx) {
super(ctx);
this.commandExecutor = ctx.commandExecutor;
this.blockchainModuleManager = ctx.blockchainModuleManager;

this.errorType = ERROR_TYPE.PARANET.START_PARANET_SYNC_ERROR;
}

async execute() {
const operationId = this.operationIdService.generateId();

this.logger.info(
`Paranet sync: Starting Paranet sync command for operation id: ${operationId}`,
);

await this.commandExecutor.delete('paranetSyncCommand');

await Promise.all(
this.config.assetSync?.syncParanets.map(async (paranetId) => {
// validate paranet id before scheduling paranet sync command

const commandData = {
paranetId,
operationId,
};

return this.commandExecutor.add({
name: 'paranetSyncCommand',
data: commandData,
period: PARANET_SYNC_FREQUENCY_MILLS,
});
}),
);

return Command.empty();
}

/**
* Recover system from failure
* @param command
* @param error
*/
async recover(command) {
this.logger.warn(`Failed to execute ${command.name}. Error: ${command.message}`);

return Command.repeat();
}

/**
* Builds default startParanetSyncCommands
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
default(map) {
const command = {
name: 'startParanetSyncCommands',
data: {},
transactional: false,
};
Object.assign(command, map);
return command;
}
}

export default StartParanetSyncCommands;
6 changes: 6 additions & 0 deletions src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ export const HIGH_TRAFFIC_OPERATIONS_NUMBER_PER_HOUR = 16000;

export const SHARDING_TABLE_CHECK_COMMAND_FREQUENCY_MINUTES = 30;

export const PARANET_SYNC_FREQUENCY_MILLS = 5 * 1000;

export const SEND_TELEMETRY_COMMAND_FREQUENCY_MINUTES = 15;

export const PEER_RECORD_UPDATE_DELAY = 30 * 60 * 1000; // 30 minutes
Expand Down Expand Up @@ -346,6 +348,10 @@ export const ERROR_TYPE = {
SUBMIT_UPDATE_COMMIT_ERROR: 'SubmitUpdateCommitError',
SUBMIT_UPDATE_COMMIT_SEND_TX_ERROR: 'SubmitUpdateCommitSendTxError',
},
PARANET: {
START_PARANET_SYNC_ERROR: 'StartParanetSyncError',
PARANET_SYNC_ERROR: 'ParanetSyncError',
},
};
export const OPERATION_ID_STATUS = {
PENDING: 'PENDING',
Expand Down
6 changes: 5 additions & 1 deletion src/modules/triple-store/implementation/ot-triple-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ class OtTripleStore {

initializeRepositories() {
for (const repository of Object.keys(this.repositories)) {
this.initializeSparqlEndpoints(repository);
this.initializeRepository(repository);
}
}

initializeRepository(repository) {
this.initializeSparqlEndpoints(repository);
}

initializeSparqlEndpoints() {
throw Error('initializeSparqlEndpoints not implemented');
}
Expand Down
Loading

0 comments on commit f91a685

Please sign in to comment.