Skip to content

Commit

Permalink
feat: save election_info from websocket confirmation message (closes
Browse files Browse the repository at this point in the history
  • Loading branch information
mistakia authored Mar 8, 2024
1 parent 52b200c commit 542bbf2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 9 deletions.
8 changes: 7 additions & 1 deletion db/schema.postgre.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ CREATE TABLE
link_account character varying(65) DEFAULT NULL::character varying,
signature character varying(128) DEFAULT NULL::character varying,
work character varying(16) DEFAULT NULL::character varying,
subtype integer DEFAULT NULL::integer
subtype integer DEFAULT NULL::integer,
election_duration integer DEFAULT NULL::integer,
election_time bigint DEFAULT NULL::bigint,
election_tally numeric(39) DEFAULT NULL::numeric,
election_request_count integer DEFAULT NULL::integer,
election_blocks integer DEFAULT NULL::integer,
election_voters integer DEFAULT NULL::integer
);

CREATE INDEX blocks_account ON blocks USING btree (account);
Expand Down
6 changes: 6 additions & 0 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ CREATE TABLE
`signature` char(128) NOT NULL,
`work` char(16) NOT NULL,
`subtype` tinyint (1) DEFAULT NULL,
`election_duration` int DEFAULT NULL,
`election_time` bigint DEFAULT NULL,
`election_tally` decimal(39, 0) DEFAULT NULL,
`election_request_count` int DEFAULT NULL,
`election_blocks` int DEFAULT NULL,
`election_voters` int DEFAULT NULL,
UNIQUE KEY `block` (`hash`),
UNIQUE KEY `height` (`account`, `height`),
INDEX `account` (`account`),
Expand Down
48 changes: 40 additions & 8 deletions scripts/import-websocket.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const account_check_queue = new PQueue({ concurrency: 1 })
const account_update_queue = new PQueue({ concurrency: 1 })
let frontiers_queue = {}
let blocks_queue = []
const election_info_queue = {}

const update_account = async ({ account, accountInfo, blockCount }) => {
let cursor = accountInfo.frontier
Expand Down Expand Up @@ -123,18 +124,39 @@ const save_blocks = async () => {
// clear blocks queue
blocks_queue = []

// get blocks via rpc
// get blocks from rpc and join with election info from websocket
const res = await getBlocksInfo({ hashes })
const blockInserts = []
for (const hash in res.blocks) {
const block = res.blocks[hash]
blockInserts.push({ hash, ...formatBlockInfo(block) })
const election_info = election_info_queue[hash] || {}
blockInserts.push({
hash,
...formatBlockInfo(block),
election_duration: election_info.duration
? Number(election_info.duration)
: null,
election_time: election_info.time ? Number(election_info.time) : null,
election_tally: election_info.tally ? Number(election_info.tally) : null,
election_request_count: election_info.request_count
? Number(election_info.request_count)
: null,
election_blocks: election_info.blocks
? Number(election_info.blocks)
: null,
election_voters: election_info.voters
? Number(election_info.voters)
: null
})

// remove election_info for block
delete election_info_queue[hash]
}

if (blockInserts.length) {
logger(`saving ${blockInserts.length} blocks`)
logger(`saving ${blockInserts.length} blocks with election info`)
await db.raw(
`INSERT INTO blocks (amount, balance, height, local_timestamp, confirmed, account, previous, representative, link, link_account, signature, work, type, subtype, hash) VALUES ${blockInserts
`INSERT INTO blocks (amount, balance, height, local_timestamp, confirmed, account, previous, representative, link, link_account, signature, work, type, subtype, hash, election_duration, election_time, election_tally, election_request_count, election_blocks, election_voters) VALUES ${blockInserts
.map(
(block) =>
`(${block.amount}, ${block.balance}, ${block.height}, ${
Expand All @@ -145,10 +167,14 @@ const save_blocks = async () => {
block.link_account
}', '${block.signature}', '${block.work}', '${block.type}', ${
block.subtype ? `'${block.subtype}'` : null
}, '${block.hash}')`
}, '${block.hash}', ${block.election_duration}, ${
block.election_time
}, ${block.election_tally}, ${block.election_request_count}, ${
block.election_blocks
}, ${block.election_voters})`
)
.join(', ')}
ON CONFLICT (hash) DO UPDATE SET local_timestamp = LEAST(blocks.local_timestamp, EXCLUDED.local_timestamp), confirmed = EXCLUDED.confirmed, height = EXCLUDED.height, amount = EXCLUDED.amount, balance = EXCLUDED.balance, previous = EXCLUDED.previous, representative = EXCLUDED.representative, link = EXCLUDED.link, link_account = EXCLUDED.link_account, signature = EXCLUDED.signature, work = EXCLUDED.work, type = EXCLUDED.type, subtype = EXCLUDED.subtype`
ON CONFLICT (hash) DO UPDATE SET local_timestamp = LEAST(blocks.local_timestamp, EXCLUDED.local_timestamp), confirmed = EXCLUDED.confirmed, height = EXCLUDED.height, amount = EXCLUDED.amount, balance = EXCLUDED.balance, previous = EXCLUDED.previous, representative = EXCLUDED.representative, link = EXCLUDED.link, link_account = EXCLUDED.link_account, signature = EXCLUDED.signature, work = EXCLUDED.work, type = EXCLUDED.type, subtype = EXCLUDED.subtype, election_duration = EXCLUDED.election_duration, election_time = EXCLUDED.election_time, election_tally = EXCLUDED.election_tally, election_request_count = EXCLUDED.election_request_count, election_blocks = EXCLUDED.election_blocks, election_voters = EXCLUDED.election_voters`
)
}

Expand Down Expand Up @@ -194,7 +220,10 @@ ws.onopen = () => {
logger('connected')
const subscription = {
action: 'subscribe',
topic: 'confirmation'
topic: 'confirmation',
options: {
include_election_info: true
}
}
ws.send(JSON.stringify(subscription))

Expand All @@ -212,14 +241,17 @@ ws.onerror = (err) => {

ws.onmessage = (msg) => {
const { topic, message } = JSON.parse(msg.data)
const { account, hash } = message
const { account, hash, election_info } = message

if (topic === 'confirmation') {
logger(`received block: ${hash}`)

// queue block for saving
blocks_queue.push(hash)

// save election info
election_info_queue[hash] = election_info

// queue frontier for saving
frontiers_queue[account] = true

Expand Down

0 comments on commit 542bbf2

Please sign in to comment.