diff --git a/src/commands/paranet/paranet-sync-command.js b/src/commands/paranet/paranet-sync-command.js index 4db57beb53..811adf490b 100644 --- a/src/commands/paranet/paranet-sync-command.js +++ b/src/commands/paranet/paranet-sync-command.js @@ -8,6 +8,7 @@ import { CONTENT_ASSET_HASH_FUNCTION_ID, SIMPLE_ASSET_SYNC_PARAMETERS, TRIPLE_STORE_REPOSITORIES, + PARANET_SYNC_KA_COUNT, } from '../../constants/constants.js'; class StartParanetSyncCommands extends Command { @@ -22,45 +23,83 @@ class StartParanetSyncCommands extends Command { } async execute(command) { - const { commandOperationId, paranetId, tokenId } = command.data; + const { commandOperationId, paranetId } = command.data; this.logger.info( - `Paranet sync: Starting paranet sync command for ${paranetId} with operation id: ${commandOperationId}, token id: ${tokenId}`, + `Paranet sync: Starting paranet sync for operation ID: ${commandOperationId}`, ); - const { blockchain, contract } = this.ualService.resolveUal(paranetId); - const assertionIds = await this.blockchainModuleManager.getLatestAssertionId( - blockchain, - contract, - tokenId, + const contractKaCount = await this.blockchainModuleManager.getKnowledgeAssetsCount( + paranetId, + ); + const [cachedKaCount] = await this.repositoryModuleManager.getOrCreateParanetById( + paranetId, ); - // Go through all except the last one - for (let stateIndex = assertionIds.length - 2; stateIndex > 0; stateIndex -= 1) { - await this.syncAsset( - blockchain, - contract, - tokenId, - assertionIds, - stateIndex, + if (cachedKaCount === contractKaCount) { + this.logger.info( + `Paranet sync: KA count from contract and in DB is the same, nothing to sync!`, + ); + return Command.empty(); + } + + this.logger.info(`Paranet sync: Syncing ${contractKaCount - cachedKaCount + 1} assets...`); + + const kaToUpdate = []; + for (let i = cachedKaCount; i <= contractKaCount; i += PARANET_SYNC_KA_COUNT) { + const nextKaArray = this.blockchainModuleManager.getKnowledgeAssetsWithPagination( paranetId, - TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY, - false, - stateIndex === assertionIds.length - 2, + i, + PARANET_SYNC_KA_COUNT, ); + if (!nextKaArray.length) break; + kaToUpdate.push(...nextKaArray); } - // Then sync the last one, but put it in the current repo - await this.syncAsset( - blockchain, - contract, - tokenId, - assertionIds, - assertionIds.length - 1, - paranetId, - null, - false, - ); + kaToUpdate + .map((ka) => ka.tokenId) + .forEach(async (tokenId) => { + this.logger.info( + `Paranet sync: Syncing token id: ${tokenId} for ${paranetId} with operation id: ${commandOperationId}`, + ); + + const { blockchain, contract } = this.ualService.resolveUal(paranetId); + const assertionIds = await this.blockchainModuleManager.getLatestAssertionId( + blockchain, + contract, + tokenId, + ); + + // Go through all except the last one + for (let stateIndex = assertionIds.length - 2; stateIndex > 0; stateIndex -= 1) { + await this.syncAsset( + blockchain, + contract, + tokenId, + assertionIds, + stateIndex, + paranetId, + TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY, + false, + stateIndex === assertionIds.length - 2, + ); + } + + // Then sync the last one, but put it in the current repo + await this.syncAsset( + blockchain, + contract, + tokenId, + assertionIds, + assertionIds.length - 1, + paranetId, + null, + false, + ); + }); + + // TODO: Save only successfull ones + await this.repositoryModuleManager.updateParanetKaCount(paranetId, contractKaCount); return Command.repeat(); } diff --git a/src/commands/paranet/start-paranet-sync-commands.js b/src/commands/paranet/start-paranet-sync-commands.js index b37f826c4f..2ac76558d1 100644 --- a/src/commands/paranet/start-paranet-sync-commands.js +++ b/src/commands/paranet/start-paranet-sync-commands.js @@ -1,9 +1,5 @@ import Command from '../command.js'; -import { - ERROR_TYPE, - PARANET_SYNC_FREQUENCY_MILLS, - PARANET_SYNC_KA_COUNT, -} from '../../constants/constants.js'; +import { ERROR_TYPE, PARANET_SYNC_FREQUENCY_MILLS } from '../../constants/constants.js'; class StartParanetSyncCommands extends Command { constructor(ctx) { @@ -27,54 +23,18 @@ class StartParanetSyncCommands extends Command { const promises = []; this.config.assetSync?.syncParanets.forEach(async (paranetId) => { - const contractKaCount = await this.blockchainModuleManager.getKnowledgeAssetsCount( + const commandData = { paranetId, + operationId, + }; + + promises.append( + this.commandExecutor.add({ + name: 'paranetSyncCommand', + data: commandData, + period: PARANET_SYNC_FREQUENCY_MILLS, + }), ); - const [cachedKaCount] = await this.repositoryModuleManager.getOrCreateParanetById( - paranetId, - ); - - if (cachedKaCount === contractKaCount) { - this.logger.info( - `Paranet sync: KA count from contract and in DB is the same, nothing to sync!`, - ); - return Command.empty(); - } - - this.logger.info( - `Paranet sync: Syncing ${contractKaCount - cachedKaCount + 1} assets...`, - ); - - const kaToUpdate = []; - for (let i = cachedKaCount; i <= contractKaCount; i += PARANET_SYNC_KA_COUNT) { - const nextKaArray = this.blockchainModuleManager.getKnowledgeAssetsWithPagination( - paranetId, - i, - PARANET_SYNC_KA_COUNT, - ); - if (!nextKaArray.length) break; - kaToUpdate.push(...nextKaArray); - } - - kaToUpdate - .map((ka) => ka.tokenId) - .forEach((tokenId) => { - const commandData = { - paranetId, - operationId, - tokenId, - }; - - promises.append( - this.commandExecutor.add({ - name: 'paranetSyncCommand', - data: commandData, - period: PARANET_SYNC_FREQUENCY_MILLS, - }), - ); - }); - - await this.repositoryModuleManager.updateParanetKaCount(paranetId, contractKaCount); }); await Promise.all(promises);