From d04c1c149b106f1cc56bf4b65b889cb862caa989 Mon Sep 17 00:00:00 2001 From: mistakia <1823355+mistakia@users.noreply.github.com> Date: Thu, 7 Mar 2024 10:46:09 -0500 Subject: [PATCH] feat: save `election_info` from websocket confirmation message (closes #7) --- db/schema.postgre.sql | 8 +++++- db/schema.sql | 6 +++++ scripts/import-websocket.mjs | 48 ++++++++++++++++++++++++++++++------ 3 files changed, 53 insertions(+), 9 deletions(-) diff --git a/db/schema.postgre.sql b/db/schema.postgre.sql index a228c2e..1514f56 100644 --- a/db/schema.postgre.sql +++ b/db/schema.postgre.sql @@ -50,7 +50,13 @@ CREATE TABLE link_account character varying(65) DEFAULT NULL::character varying, signature character varying(128) DEFAULT NULL::character varying, work character varying(16) DEFAULT NULL::character varying, - subtype integer DEFAULT NULL::integer + subtype integer DEFAULT NULL::integer, + election_duration integer DEFAULT NULL::integer, + election_time bigint DEFAULT NULL::bigint, + election_tally numeric(39) DEFAULT NULL::numeric, + election_request_count integer DEFAULT NULL::integer, + election_blocks integer DEFAULT NULL::integer, + election_voters integer DEFAULT NULL::integer ); CREATE INDEX blocks_account ON blocks USING btree (account); diff --git a/db/schema.sql b/db/schema.sql index b39bfd5..806cf44 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -95,6 +95,12 @@ CREATE TABLE `signature` char(128) NOT NULL, `work` char(16) NOT NULL, `subtype` tinyint (1) DEFAULT NULL, + `election_duration` int DEFAULT NULL, + `election_time` bigint DEFAULT NULL, + `election_tally` decimal(39, 0) DEFAULT NULL, + `election_request_count` int DEFAULT NULL, + `election_blocks` int DEFAULT NULL, + `election_voters` int DEFAULT NULL, UNIQUE KEY `block` (`hash`), UNIQUE KEY `height` (`account`, `height`), INDEX `account` (`account`), diff --git a/scripts/import-websocket.mjs b/scripts/import-websocket.mjs index 6cee914..d811eec 100644 --- a/scripts/import-websocket.mjs +++ b/scripts/import-websocket.mjs @@ -30,6 +30,7 @@ const account_check_queue = new PQueue({ concurrency: 1 }) const account_update_queue = new PQueue({ concurrency: 1 }) let frontiers_queue = {} let blocks_queue = [] +const election_info_queue = {} const update_account = async ({ account, accountInfo, blockCount }) => { let cursor = accountInfo.frontier @@ -123,18 +124,39 @@ const save_blocks = async () => { // clear blocks queue blocks_queue = [] - // get blocks via rpc + // get blocks from rpc and join with election info from websocket const res = await getBlocksInfo({ hashes }) const blockInserts = [] for (const hash in res.blocks) { const block = res.blocks[hash] - blockInserts.push({ hash, ...formatBlockInfo(block) }) + const election_info = election_info_queue[hash] || {} + blockInserts.push({ + hash, + ...formatBlockInfo(block), + election_duration: election_info.duration + ? Number(election_info.duration) + : null, + election_time: election_info.time ? Number(election_info.time) : null, + election_tally: election_info.tally ? Number(election_info.tally) : null, + election_request_count: election_info.request_count + ? Number(election_info.request_count) + : null, + election_blocks: election_info.blocks + ? Number(election_info.blocks) + : null, + election_voters: election_info.voters + ? Number(election_info.voters) + : null + }) + + // remove election_info for block + delete election_info_queue[hash] } if (blockInserts.length) { - logger(`saving ${blockInserts.length} blocks`) + logger(`saving ${blockInserts.length} blocks with election info`) await db.raw( - `INSERT INTO blocks (amount, balance, height, local_timestamp, confirmed, account, previous, representative, link, link_account, signature, work, type, subtype, hash) VALUES ${blockInserts + `INSERT INTO blocks (amount, balance, height, local_timestamp, confirmed, account, previous, representative, link, link_account, signature, work, type, subtype, hash, election_duration, election_time, election_tally, election_request_count, election_blocks, election_voters) VALUES ${blockInserts .map( (block) => `(${block.amount}, ${block.balance}, ${block.height}, ${ @@ -145,10 +167,14 @@ const save_blocks = async () => { block.link_account }', '${block.signature}', '${block.work}', '${block.type}', ${ block.subtype ? `'${block.subtype}'` : null - }, '${block.hash}')` + }, '${block.hash}', ${block.election_duration}, ${ + block.election_time + }, ${block.election_tally}, ${block.election_request_count}, ${ + block.election_blocks + }, ${block.election_voters})` ) .join(', ')} - ON CONFLICT (hash) DO UPDATE SET local_timestamp = LEAST(blocks.local_timestamp, EXCLUDED.local_timestamp), confirmed = EXCLUDED.confirmed, height = EXCLUDED.height, amount = EXCLUDED.amount, balance = EXCLUDED.balance, previous = EXCLUDED.previous, representative = EXCLUDED.representative, link = EXCLUDED.link, link_account = EXCLUDED.link_account, signature = EXCLUDED.signature, work = EXCLUDED.work, type = EXCLUDED.type, subtype = EXCLUDED.subtype` + ON CONFLICT (hash) DO UPDATE SET local_timestamp = LEAST(blocks.local_timestamp, EXCLUDED.local_timestamp), confirmed = EXCLUDED.confirmed, height = EXCLUDED.height, amount = EXCLUDED.amount, balance = EXCLUDED.balance, previous = EXCLUDED.previous, representative = EXCLUDED.representative, link = EXCLUDED.link, link_account = EXCLUDED.link_account, signature = EXCLUDED.signature, work = EXCLUDED.work, type = EXCLUDED.type, subtype = EXCLUDED.subtype, election_duration = EXCLUDED.election_duration, election_time = EXCLUDED.election_time, election_tally = EXCLUDED.election_tally, election_request_count = EXCLUDED.election_request_count, election_blocks = EXCLUDED.election_blocks, election_voters = EXCLUDED.election_voters` ) } @@ -194,7 +220,10 @@ ws.onopen = () => { logger('connected') const subscription = { action: 'subscribe', - topic: 'confirmation' + topic: 'confirmation', + options: { + include_election_info: true + } } ws.send(JSON.stringify(subscription)) @@ -212,7 +241,7 @@ ws.onerror = (err) => { ws.onmessage = (msg) => { const { topic, message } = JSON.parse(msg.data) - const { account, hash } = message + const { account, hash, election_info } = message if (topic === 'confirmation') { logger(`received block: ${hash}`) @@ -220,6 +249,9 @@ ws.onmessage = (msg) => { // queue block for saving blocks_queue.push(hash) + // save election info + election_info_queue[hash] = election_info + // queue frontier for saving frontiers_queue[account] = true