Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: save election_info from websocket confirmation message #9

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion db/schema.postgre.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ CREATE TABLE
link_account character varying(65) DEFAULT NULL::character varying,
signature character varying(128) DEFAULT NULL::character varying,
work character varying(16) DEFAULT NULL::character varying,
subtype integer DEFAULT NULL::integer
subtype integer DEFAULT NULL::integer,
election_duration integer DEFAULT NULL::integer,
election_time bigint DEFAULT NULL::bigint,
election_tally numeric(39) DEFAULT NULL::numeric,
election_request_count integer DEFAULT NULL::integer,
election_blocks integer DEFAULT NULL::integer,
election_voters integer DEFAULT NULL::integer
);

CREATE INDEX blocks_account ON blocks USING btree (account);
Expand Down
6 changes: 6 additions & 0 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ CREATE TABLE
`signature` char(128) NOT NULL,
`work` char(16) NOT NULL,
`subtype` tinyint (1) DEFAULT NULL,
`election_duration` int DEFAULT NULL,
`election_time` bigint DEFAULT NULL,
`election_tally` decimal(39, 0) DEFAULT NULL,
`election_request_count` int DEFAULT NULL,
`election_blocks` int DEFAULT NULL,
`election_voters` int DEFAULT NULL,
UNIQUE KEY `block` (`hash`),
UNIQUE KEY `height` (`account`, `height`),
INDEX `account` (`account`),
Expand Down
48 changes: 40 additions & 8 deletions scripts/import-websocket.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const account_check_queue = new PQueue({ concurrency: 1 })
const account_update_queue = new PQueue({ concurrency: 1 })
let frontiers_queue = {}
let blocks_queue = []
const election_info_queue = {}

const update_account = async ({ account, accountInfo, blockCount }) => {
let cursor = accountInfo.frontier
Expand Down Expand Up @@ -123,18 +124,39 @@ const save_blocks = async () => {
// clear blocks queue
blocks_queue = []

// get blocks via rpc
// get blocks from rpc and join with election info from websocket
const res = await getBlocksInfo({ hashes })
const blockInserts = []
for (const hash in res.blocks) {
const block = res.blocks[hash]
blockInserts.push({ hash, ...formatBlockInfo(block) })
const election_info = election_info_queue[hash] || {}
blockInserts.push({
hash,
...formatBlockInfo(block),
election_duration: election_info.duration
? Number(election_info.duration)
: null,
election_time: election_info.time ? Number(election_info.time) : null,
election_tally: election_info.tally ? Number(election_info.tally) : null,
election_request_count: election_info.request_count
? Number(election_info.request_count)
: null,
election_blocks: election_info.blocks
? Number(election_info.blocks)
: null,
election_voters: election_info.voters
? Number(election_info.voters)
: null
})

// remove election_info for block
delete election_info_queue[hash]
}

if (blockInserts.length) {
logger(`saving ${blockInserts.length} blocks`)
logger(`saving ${blockInserts.length} blocks with election info`)
await db.raw(
`INSERT INTO blocks (amount, balance, height, local_timestamp, confirmed, account, previous, representative, link, link_account, signature, work, type, subtype, hash) VALUES ${blockInserts
`INSERT INTO blocks (amount, balance, height, local_timestamp, confirmed, account, previous, representative, link, link_account, signature, work, type, subtype, hash, election_duration, election_time, election_tally, election_request_count, election_blocks, election_voters) VALUES ${blockInserts
.map(
(block) =>
`(${block.amount}, ${block.balance}, ${block.height}, ${
Expand All @@ -145,10 +167,14 @@ const save_blocks = async () => {
block.link_account
}', '${block.signature}', '${block.work}', '${block.type}', ${
block.subtype ? `'${block.subtype}'` : null
}, '${block.hash}')`
}, '${block.hash}', ${block.election_duration}, ${
block.election_time
}, ${block.election_tally}, ${block.election_request_count}, ${
block.election_blocks
}, ${block.election_voters})`
)
.join(', ')}
ON CONFLICT (hash) DO UPDATE SET local_timestamp = LEAST(blocks.local_timestamp, EXCLUDED.local_timestamp), confirmed = EXCLUDED.confirmed, height = EXCLUDED.height, amount = EXCLUDED.amount, balance = EXCLUDED.balance, previous = EXCLUDED.previous, representative = EXCLUDED.representative, link = EXCLUDED.link, link_account = EXCLUDED.link_account, signature = EXCLUDED.signature, work = EXCLUDED.work, type = EXCLUDED.type, subtype = EXCLUDED.subtype`
ON CONFLICT (hash) DO UPDATE SET local_timestamp = LEAST(blocks.local_timestamp, EXCLUDED.local_timestamp), confirmed = EXCLUDED.confirmed, height = EXCLUDED.height, amount = EXCLUDED.amount, balance = EXCLUDED.balance, previous = EXCLUDED.previous, representative = EXCLUDED.representative, link = EXCLUDED.link, link_account = EXCLUDED.link_account, signature = EXCLUDED.signature, work = EXCLUDED.work, type = EXCLUDED.type, subtype = EXCLUDED.subtype, election_duration = EXCLUDED.election_duration, election_time = EXCLUDED.election_time, election_tally = EXCLUDED.election_tally, election_request_count = EXCLUDED.election_request_count, election_blocks = EXCLUDED.election_blocks, election_voters = EXCLUDED.election_voters`
)
}

Expand Down Expand Up @@ -194,7 +220,10 @@ ws.onopen = () => {
logger('connected')
const subscription = {
action: 'subscribe',
topic: 'confirmation'
topic: 'confirmation',
options: {
include_election_info: true
}
}
ws.send(JSON.stringify(subscription))

Expand All @@ -212,14 +241,17 @@ ws.onerror = (err) => {

ws.onmessage = (msg) => {
const { topic, message } = JSON.parse(msg.data)
const { account, hash } = message
const { account, hash, election_info } = message

if (topic === 'confirmation') {
logger(`received block: ${hash}`)

// queue block for saving
blocks_queue.push(hash)

// save election info
election_info_queue[hash] = election_info

// queue frontier for saving
frontiers_queue[account] = true

Expand Down