Skip to content

Commit

Permalink
Merge pull request #2975 from OriginTrail/feature/sharding-table-heal…
Browse files Browse the repository at this point in the history
…th-check

Implement sharding table periodical check
  • Loading branch information
NZT48 authored Feb 13, 2024
2 parents f5529b5 + 770773e commit f802f43
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 5 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "origintrail_node",
"version": "6.2.0+hotfix.11",
"version": "6.2.0",
"description": "OTNode V6",
"main": "index.js",
"type": "module",
Expand Down
2 changes: 1 addition & 1 deletion src/commands/common/send-telemetry-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class SendTelemetryCommand extends Command {
}

/**
* Builds default otnodeUpdateCommand
* Builds default sendTelemetryCommand
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
Expand Down
143 changes: 143 additions & 0 deletions src/commands/common/sharding-table-check-command.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import Command from '../command.js';
import { SHARDING_TABLE_CHECK_COMMAND_FREQUENCY_MINUTES } from '../../constants/constants.js';

class ShardingTableCheckCommand extends Command {
constructor(ctx) {
super(ctx);
this.logger = ctx.logger;
this.config = ctx.config;
this.blockchainModuleManager = ctx.blockchainModuleManager;
this.repositoryModuleManager = ctx.repositoryModuleManager;
this.hashingService = ctx.hashingService;
}

/**
* Checks sharding table size on blockchain and compares to local
* If not equal, removes local and pulls new from blockchain
* @param command
*/
async execute() {
try {
const promises = this.blockchainModuleManager
.getImplementationNames()
.map(async (blockchainId) => {
this.logger.debug(
`Performing sharding table check for blockchain ${blockchainId}.`,
);
const shardingTableLength =
await this.blockchainModuleManager.getShardingTableLength(blockchainId);
const totalNodesNumber = await this.repositoryModuleManager.getPeersCount(
blockchainId,
);

if (shardingTableLength !== totalNodesNumber) {
this.logger.debug(
`Sharding table check for blockchain ${blockchainId} - difference between local sharding table
(${totalNodesNumber} nodes) and blockchain sharding table (${shardingTableLength} nodes).`,
);
this.logger.debug(
`Removing nodes from local sharding table for blockchain ${blockchainId}.`,
);
await this.repositoryModuleManager.removeShardingTablePeerRecords(
blockchainId,
);

let startingIdentityId =
await this.blockchainModuleManager.getShardingTableHead(blockchainId);
const pageSize = 10;
const shardingTable = [];

this.logger.debug(
`Started pulling ${shardingTableLength} nodes from blockchain sharding table.`,
);

let sliceIndex = 0;

while (shardingTable.length < shardingTableLength) {
// eslint-disable-next-line no-await-in-loop
const nodes = await this.blockchainModuleManager.getShardingTablePage(
blockchainId,
startingIdentityId,
pageSize,
);
shardingTable.push(
...nodes.slice(sliceIndex).filter((node) => node.nodeId !== '0x'),
);
sliceIndex = 1;
startingIdentityId = nodes[nodes.length - 1].identityId;
}

this.logger.debug(
`Finished pulling ${shardingTable.length} nodes from blockchain sharding table.`,
);

await this.repositoryModuleManager.createManyPeerRecords(
await Promise.all(
shardingTable.map(async (peer) => {
const nodeId = this.blockchainModuleManager.convertHexToAscii(
blockchainId,
peer.nodeId,
);

const sha256 = await this.hashingService.callHashFunction(
1,
nodeId,
);

return {
peerId: nodeId,
blockchainId,
ask: this.blockchainModuleManager.convertFromWei(
blockchainId,
peer.ask,
'ether',
),
stake: this.blockchainModuleManager.convertFromWei(
blockchainId,
peer.stake,
'ether',
),
sha256,
};
}),
),
);
}
});

await Promise.all(promises);
} catch (error) {
await this.handleError(error.message);
}
return Command.repeat();
}

async recover(command) {
await this.handleError(command.message);

return Command.repeat();
}

async handleError(errorMessage) {
this.logger.error(`Error in sharding table check command: ${errorMessage}`);
}

/**
* Builds default shardingTableCheckCommand
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
default(map) {
const command = {
name: 'shardingTableCheckCommand',
delay: 0,
data: {},
period: SHARDING_TABLE_CHECK_COMMAND_FREQUENCY_MINUTES * 60 * 1000,
transactional: false,
};
Object.assign(command, map);
return command;
}
}

export default ShardingTableCheckCommand;
3 changes: 3 additions & 0 deletions src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ export const NETWORK_API_BLACK_LIST_TIME_WINDOW_MINUTES = 60;

export const HIGH_TRAFFIC_OPERATIONS_NUMBER_PER_HOUR = 16000;

export const SHARDING_TABLE_CHECK_COMMAND_FREQUENCY_MINUTES = 30;

export const SEND_TELEMETRY_COMMAND_FREQUENCY_MINUTES = 15;

export const PEER_RECORD_UPDATE_DELAY = 30 * 60 * 1000; // 30 minutes
Expand All @@ -165,6 +167,7 @@ export const MIN_DIAL_FREQUENCY_MILLIS = 60 * 60 * 1000;
export const PERMANENT_COMMANDS = [
'otnodeUpdateCommand',
'sendTelemetryCommand',
'shardingTableCheckCommand',
'operationIdCleanerCommand',
'commandsCleanerCommand',
'dialPeersCommand',
Expand Down
1 change: 0 additions & 1 deletion src/migration/pull-sharding-table-migration.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class PullBlockchainShardingTableMigration extends BaseMigration {
...nodes.slice(sliceIndex).filter((node) => node.nodeId !== '0x'),
);
sliceIndex = 1;
// TODO: Should we fix it here also
startingIdentityId = nodes[nodes.length - 1].identityId;
}

Expand Down

0 comments on commit f802f43

Please sign in to comment.