Skip to content

Commit

Permalink
Make startParanetSync schedule paranetSync commands for each paranet
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksamagicka committed Apr 30, 2024
1 parent 5f4597b commit 32cdef1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 80 deletions.
97 changes: 68 additions & 29 deletions src/commands/paranet/paranet-sync-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
CONTENT_ASSET_HASH_FUNCTION_ID,
SIMPLE_ASSET_SYNC_PARAMETERS,
TRIPLE_STORE_REPOSITORIES,
PARANET_SYNC_KA_COUNT,
} from '../../constants/constants.js';

class StartParanetSyncCommands extends Command {
Expand All @@ -22,45 +23,83 @@ class StartParanetSyncCommands extends Command {
}

async execute(command) {
const { commandOperationId, paranetId, tokenId } = command.data;
const { commandOperationId, paranetId } = command.data;

this.logger.info(
`Paranet sync: Starting paranet sync command for ${paranetId} with operation id: ${commandOperationId}, token id: ${tokenId}`,
`Paranet sync: Starting paranet sync for operation ID: ${commandOperationId}`,
);

const { blockchain, contract } = this.ualService.resolveUal(paranetId);
const assertionIds = await this.blockchainModuleManager.getLatestAssertionId(
blockchain,
contract,
tokenId,
const contractKaCount = await this.blockchainModuleManager.getKnowledgeAssetsCount(
paranetId,
);
const [cachedKaCount] = await this.repositoryModuleManager.getOrCreateParanetById(
paranetId,
);

// Go through all except the last one
for (let stateIndex = assertionIds.length - 2; stateIndex > 0; stateIndex -= 1) {
await this.syncAsset(
blockchain,
contract,
tokenId,
assertionIds,
stateIndex,
if (cachedKaCount === contractKaCount) {
this.logger.info(
`Paranet sync: KA count from contract and in DB is the same, nothing to sync!`,
);
return Command.empty();
}

this.logger.info(`Paranet sync: Syncing ${contractKaCount - cachedKaCount + 1} assets...`);

const kaToUpdate = [];
for (let i = cachedKaCount; i <= contractKaCount; i += PARANET_SYNC_KA_COUNT) {
const nextKaArray = this.blockchainModuleManager.getKnowledgeAssetsWithPagination(
paranetId,
TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY,
false,
stateIndex === assertionIds.length - 2,
i,
PARANET_SYNC_KA_COUNT,
);
if (!nextKaArray.length) break;
kaToUpdate.push(...nextKaArray);
}

// Then sync the last one, but put it in the current repo
await this.syncAsset(
blockchain,
contract,
tokenId,
assertionIds,
assertionIds.length - 1,
paranetId,
null,
false,
);
kaToUpdate
.map((ka) => ka.tokenId)
.forEach(async (tokenId) => {
this.logger.info(
`Paranet sync: Syncing token id: ${tokenId} for ${paranetId} with operation id: ${commandOperationId}`,
);

const { blockchain, contract } = this.ualService.resolveUal(paranetId);
const assertionIds = await this.blockchainModuleManager.getLatestAssertionId(
blockchain,
contract,
tokenId,
);

// Go through all except the last one
for (let stateIndex = assertionIds.length - 2; stateIndex > 0; stateIndex -= 1) {
await this.syncAsset(
blockchain,
contract,
tokenId,
assertionIds,
stateIndex,
paranetId,
TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY,
false,
stateIndex === assertionIds.length - 2,
);
}

// Then sync the last one, but put it in the current repo
await this.syncAsset(
blockchain,
contract,
tokenId,
assertionIds,
assertionIds.length - 1,
paranetId,
null,
false,
);
});

// TODO: Save only successfull ones
await this.repositoryModuleManager.updateParanetKaCount(paranetId, contractKaCount);

return Command.repeat();
}
Expand Down
62 changes: 11 additions & 51 deletions src/commands/paranet/start-paranet-sync-commands.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import Command from '../command.js';
import {
ERROR_TYPE,
PARANET_SYNC_FREQUENCY_MILLS,
PARANET_SYNC_KA_COUNT,
} from '../../constants/constants.js';
import { ERROR_TYPE, PARANET_SYNC_FREQUENCY_MILLS } from '../../constants/constants.js';

class StartParanetSyncCommands extends Command {
constructor(ctx) {
Expand All @@ -27,54 +23,18 @@ class StartParanetSyncCommands extends Command {

const promises = [];
this.config.assetSync?.syncParanets.forEach(async (paranetId) => {
const contractKaCount = await this.blockchainModuleManager.getKnowledgeAssetsCount(
const commandData = {
paranetId,
operationId,
};

promises.append(
this.commandExecutor.add({
name: 'paranetSyncCommand',
data: commandData,
period: PARANET_SYNC_FREQUENCY_MILLS,
}),
);
const [cachedKaCount] = await this.repositoryModuleManager.getOrCreateParanetById(
paranetId,
);

if (cachedKaCount === contractKaCount) {
this.logger.info(
`Paranet sync: KA count from contract and in DB is the same, nothing to sync!`,
);
return Command.empty();
}

this.logger.info(
`Paranet sync: Syncing ${contractKaCount - cachedKaCount + 1} assets...`,
);

const kaToUpdate = [];
for (let i = cachedKaCount; i <= contractKaCount; i += PARANET_SYNC_KA_COUNT) {
const nextKaArray = this.blockchainModuleManager.getKnowledgeAssetsWithPagination(
paranetId,
i,
PARANET_SYNC_KA_COUNT,
);
if (!nextKaArray.length) break;
kaToUpdate.push(...nextKaArray);
}

kaToUpdate
.map((ka) => ka.tokenId)
.forEach((tokenId) => {
const commandData = {
paranetId,
operationId,
tokenId,
};

promises.append(
this.commandExecutor.add({
name: 'paranetSyncCommand',
data: commandData,
period: PARANET_SYNC_FREQUENCY_MILLS,
}),
);
});

await this.repositoryModuleManager.updateParanetKaCount(paranetId, contractKaCount);
});

await Promise.all(promises);
Expand Down

0 comments on commit 32cdef1

Please sign in to comment.