Skip to content

Commit

Permalink
Merge pull request #3596 from OriginTrail/v8/data-migration-improvements
Browse files Browse the repository at this point in the history
V8 data migration improvements
  • Loading branch information
Mihajlo-Pavlovic authored Dec 24, 2024
2 parents 3e794b6 + cdde735 commit 0e53242
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 68 deletions.
6 changes: 4 additions & 2 deletions v8-data-migration/blockchain-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ function maskRpcUrl(url) {
export async function initializeRpc(rpcEndpoint) {
// Validation
if (!rpcEndpoint || typeof rpcEndpoint !== 'string') {
throw new Error(
logger.error(
`RPC endpoint is not defined or it is not a string. RPC endpoint: ${rpcEndpoint}`,
);
process.exit(1);
}
// initialize all possible providers
const Provider = ethers.providers.JsonRpcProvider;
Expand All @@ -39,7 +40,8 @@ export async function initializeRpc(rpcEndpoint) {
logger.info(`Connected to the blockchain RPC: ${maskRpcUrl(rpcEndpoint)}.`);
return provider;
} catch (e) {
throw new Error(`Unable to connect to the blockchain RPC: ${maskRpcUrl(rpcEndpoint)}.`);
logger.error(`Unable to connect to the blockchain RPC: ${maskRpcUrl(rpcEndpoint)}.`);
process.exit(1);
}
}

Expand Down
8 changes: 6 additions & 2 deletions v8-data-migration/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ export const VISIBILITY = {
PRIVATE: 'private',
};
export const BATCH_SIZE = 50;

export const DEFAULT_CONFIG_PATH = '/root/ot-node/current/config/config.json';
export const NODERC_CONFIG_PATH = '/root/ot-node/.origintrail_noderc';
export const DATA_MIGRATION_DIR = '/root/ot-node/data/data-migration';
export const LOG_DIR = '/root/ot-node/data/data-migration/logs';
export const ENV_PATH = '/root/ot-node/current/.env';
export const MIGRATION_PROGRESS_FILE = '/root/ot-node/data/migrations/v8DataMigration';
export const MIGRATION_DIR = '/root/ot-node/data/migrations/';
export const MIGRATION_PROGRESS_FILE = 'v8DataMigration';

export const DB_URLS = {
testnet: 'https://hosting.origin-trail.network/csv/testnet.db',
mainnet: '',
mainnet: 'https://hosting.origin-trail.network/csv/mainnet.db',
};

const require = createRequire(import.meta.url);
Expand Down
10 changes: 8 additions & 2 deletions v8-data-migration/logger.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import pino from 'pino';
import fs from 'fs';
import { LOG_DIR } from './constants.js';

// Ensure logs directory exists
const LOG_DIR = './logs';
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR);
fs.mkdirSync(LOG_DIR, { recursive: true });

if (!fs.existsSync(LOG_DIR)) {
throw new Error(
`Something went wrong. Directory: ${LOG_DIR} does not exist after creation.`,
);
}
}

const timers = new Map();
Expand Down
2 changes: 1 addition & 1 deletion v8-data-migration/run-data-migration.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
cd /root/ot-node/current/v8-data-migration/ &&
npm rebuild sqlite3 &&
nohup node v8-data-migration.js > /root/ot-node/current/v8-data-migration/nohup.out 2>&1 &
nohup node v8-data-migration.js > /root/ot-node/data/nohup.out 2>&1 &
9 changes: 6 additions & 3 deletions v8-data-migration/sqlite-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ export class SqliteDatabase {
});

if (!this.db) {
throw new Error('Failed to initialize SQLite database');
logger.error('Failed to initialize SQLite database');
process.exit(1);
}
}

Expand Down Expand Up @@ -124,13 +125,15 @@ export class SqliteDatabase {

_validateConnection() {
if (!this.db) {
throw new Error('Database not initialized. Call initialize() first.');
logger.error('Database not initialized. Call initialize() first.');
process.exit(1);
}
}

_validateBlockchainName(blockchainName) {
if (!blockchainName) {
throw new Error('Blockchain name is required');
logger.error('Blockchain name is required');
process.exit(1);
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions v8-data-migration/triple-store-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ export async function _executeQuery(
validateQuery(query);

if (!mediaType) {
throw new Error(`[VALIDATION ERROR] Media type is not defined. Media type: ${mediaType}`);
logger.error(`[VALIDATION ERROR] Media type is not defined. Media type: ${mediaType}`);
process.exit(1);
}

const response = await axios.post(
Expand Down Expand Up @@ -396,9 +397,10 @@ export async function getAssertionFromV6TripleStore(
!ualAssertionIdData.assertionId ||
!ualAssertionIdData.ual
) {
throw new Error(
logger.error(
`[VALIDATION ERROR] Ual assertion ID data is not properly defined or it is not an object. Ual assertion ID data: ${ualAssertionIdData}`,
);
process.exit(1);
}

const { assertionId, ual } = ualAssertionIdData;
Expand All @@ -423,7 +425,7 @@ export async function getAssertionFromV6TripleStore(
// Extract the private assertionId from the publicAssertion if it exists
const privateAssertionId = extractPrivateAssertionId(publicAssertion);
if (!privateAssertionId) {
logger.error(
logger.warn(
`There was a problem while extracting the private assertionId from public assertion: ${publicAssertion}. Extracted privateAssertionId: ${privateAssertionId}`,
);
success = false;
Expand Down
32 changes: 22 additions & 10 deletions v8-data-migration/v8-data-migration-utils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import fs from 'fs';
import path from 'path';
import { NODERC_CONFIG_PATH, MIGRATION_PROGRESS_FILE, DEFAULT_CONFIG_PATH } from './constants.js';
import {
NODERC_CONFIG_PATH,
MIGRATION_PROGRESS_FILE,
DEFAULT_CONFIG_PATH,
MIGRATION_DIR,
} from './constants.js';
import { validateConfig } from './validation.js';
import logger from './logger.js';

Expand All @@ -24,26 +29,30 @@ export function ensureDirectoryExists(dirPath) {
logger.info(`Created directory: ${dirPath}`);

if (!fs.existsSync(dirPath)) {
throw new Error(
logger.error(
`Something went wrong. Directory: ${dirPath} does not exist after creation.`,
);
process.exit(1);
}
}
}

export function ensureMigrationProgressFileExists() {
if (!fs.existsSync(MIGRATION_PROGRESS_FILE)) {
fs.writeFileSync(MIGRATION_PROGRESS_FILE, '');
logger.info(`Created migration progress file: ${MIGRATION_PROGRESS_FILE}`);
if (!fs.existsSync(MIGRATION_PROGRESS_FILE)) {
ensureDirectoryExists(MIGRATION_DIR);
const migrationProgressFilePath = path.join(MIGRATION_DIR, MIGRATION_PROGRESS_FILE);

if (!fs.existsSync(migrationProgressFilePath)) {
fs.writeFileSync(migrationProgressFilePath, '');
logger.info(`Created migration progress file: ${migrationProgressFilePath}`);
if (!fs.existsSync(migrationProgressFilePath)) {
throw new Error(
`Something went wrong. Progress file: ${MIGRATION_PROGRESS_FILE} does not exist after creation.`,
`Something went wrong. Progress file: ${migrationProgressFilePath} does not exist after creation.`,
);
}
} else {
logger.info(`Migration progress file already exists: ${MIGRATION_PROGRESS_FILE}.`);
logger.info(`Migration progress file already exists: ${migrationProgressFilePath}.`);
logger.info('Checking if migration is already successful...');
const fileContent = fs.readFileSync(MIGRATION_PROGRESS_FILE, 'utf8');
const fileContent = fs.readFileSync(migrationProgressFilePath, 'utf8');
if (fileContent === 'MIGRATED') {
logger.info('Migration is already successful. Exiting...');
process.exit(0);
Expand All @@ -52,8 +61,11 @@ export function ensureMigrationProgressFileExists() {
}

export function markMigrationAsSuccessfull() {
// Construct the full path to the migration progress file
const migrationProgressFilePath = path.join(MIGRATION_DIR, MIGRATION_PROGRESS_FILE);

// open file
const file = fs.openSync(MIGRATION_PROGRESS_FILE, 'w');
const file = fs.openSync(migrationProgressFilePath, 'w');

// write MIGRATED
fs.writeSync(file, 'MIGRATED');
Expand Down
73 changes: 45 additions & 28 deletions v8-data-migration/v8-data-migration.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,8 @@ async function getAssertionsInBatch(
) {
// Validation
if (!batchKeys || !Array.isArray(batchKeys)) {
throw new Error(
`Batch keys is not defined or it is not an array. Batch keys: ${batchKeys}`,
);
logger.error(`Batch keys is not defined or it is not an array. Batch keys: ${batchKeys}`);
process.exit(1);
}
validateBatchData(batchData);
validateTripleStoreRepositories(tripleStoreRepositories);
Expand Down Expand Up @@ -279,15 +278,17 @@ async function main() {
// Initialize blockchain config
const blockchainConfig = config.modules.blockchain;
if (!blockchainConfig || !blockchainConfig.implementation) {
throw new Error('Invalid configuration for blockchain.');
logger.error('Invalid configuration for blockchain.');
process.exit(1);
}

logger.info('TRIPLE STORE INITIALIZATION START');

// Initialize triple store config
const tripleStoreConfig = config.modules.tripleStore;
if (!tripleStoreConfig || !tripleStoreConfig.implementation) {
throw new Error('Invalid configuration for triple store.');
logger.error('Invalid configuration for triple store.');
process.exit(1);
}

const tripleStoreData = getTripleStoreData(tripleStoreConfig);
Expand All @@ -297,11 +298,12 @@ async function main() {
let tripleStoreRepositories = tripleStoreData.tripleStoreRepositories;

if (Object.keys(tripleStoreRepositories).length !== 3) {
throw new Error(
logger.error(
`Triple store repositories are not initialized correctly. Expected 3 repositories, got: ${
Object.keys(tripleStoreRepositories).length
}`,
);
process.exit(1);
}

// Initialize repositories
Expand Down Expand Up @@ -339,24 +341,32 @@ async function main() {
// Pipe the response stream to the file
response.data.pipe(writer);
// Wait for the file to finish downloading
await new Promise((resolve, reject) => {
let downloadComplete = false;
try {
await new Promise((resolve, reject) => {
let downloadComplete = false;

response.data.on('end', () => {
downloadComplete = true;
});
response.data.on('end', () => {
downloadComplete = true;
});

writer.on('finish', resolve);
writer.on('error', (err) => reject(new Error(`Write stream error: ${err.message}`)));
response.data.on('error', (err) =>
reject(new Error(`Download stream error: ${err.message}`)),
);
response.data.on('close', () => {
if (!downloadComplete) {
reject(new Error('Download stream closed before completing'));
}
writer.on('finish', resolve);
writer.on('error', (err) =>
reject(new Error(`Write stream error: ${err.message}`)),
);
response.data.on('error', (err) =>
reject(new Error(`Download stream error: ${err.message}`)),
);
response.data.on('close', () => {
if (!downloadComplete) {
reject(new Error('Download stream closed before completing'));
}
});
});
});
} catch (error) {
logger.error(`Critical error during download: ${error.message}`);
logger.error('Terminating process to prevent data corruption');
process.exit(1);
}
logger.timeEnd(`Database file downloading time`);

if (!fs.existsSync(dbFilePath)) {
Expand All @@ -369,8 +379,10 @@ async function main() {
await sqliteDb.initialize();

try {
// make sure blockchains are always migrated in this order - base, gnosis, neuroweb
const sortedBlockchains = Object.keys(blockchainConfig.implementation).sort();
// Iterate through all chains
for (const blockchain in blockchainConfig.implementation) {
for (const blockchain of sortedBlockchains) {
logger.time(`PROCESSING TIME FOR ${blockchain}`);
let processed = 0;
const blockchainImplementation = blockchainConfig.implementation[blockchain];
Expand All @@ -383,7 +395,8 @@ async function main() {
: defaultConfig[process.env.NODE_ENV].modules.blockchain.implementation[blockchain]
.config.rpcEndpoints;
if (!Array.isArray(rpcEndpoints) || rpcEndpoints.length === 0) {
throw new Error(`RPC endpoints are not defined for blockchain ${blockchain}.`);
logger.error(`RPC endpoints are not defined for blockchain ${blockchain}.`);
process.exit(1);
}

let blockchainName;
Expand All @@ -397,24 +410,25 @@ async function main() {
}

if (!blockchainName) {
throw new Error(
logger.error(
`Blockchain ${blockchain} not found. Make sure you have the correct blockchain ID and correct NODE_ENV in .env file.`,
);
process.exit(1);
}

const tableExists = await sqliteDb.getTableExists(blockchainName);

if (!tableExists) {
throw new Error(
`Required table "${blockchainName}" does not exist in the database`,
);
logger.error(`Required table "${blockchainName}" does not exist in the database`);
process.exit(1);
}

const highestTokenId = await sqliteDb.getHighestTokenId(blockchainName);
if (!highestTokenId) {
throw new Error(
logger.error(
`Something went wrong. Could not fetch highest tokenId for ${blockchainName}.`,
);
process.exit(1);
}
logger.info(`Total amount of tokenIds: ${highestTokenId}`);

Expand Down Expand Up @@ -482,6 +496,9 @@ async function main() {
100
).toFixed(2)}%. Total processed: ${processed}/${tokenIdsToProcessCount}`,
);

// Pause for 500ms to deload the triple store
await setTimeout(500);
} catch (error) {
logger.error(`Error processing batch: ${error}. Pausing for 5 seconds...`);
await setTimeout(5000);
Expand Down
Loading

0 comments on commit 0e53242

Please sign in to comment.