From 6996950ee56bd2e72b6f43b44323a2c855bfdc8c Mon Sep 17 00:00:00 2001 From: classicalliu Date: Wed, 2 Nov 2022 16:08:32 +0800 Subject: [PATCH 01/16] feat: Add chain_id to transactions table --- crates/indexer/src/insert_l2_block.rs | 7 +++++-- .../20221102080006_add_chain_id_to_transactions.ts | 13 +++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) create mode 100644 packages/api-server/migrations/20221102080006_add_chain_id_to_transactions.ts diff --git a/crates/indexer/src/insert_l2_block.rs b/crates/indexer/src/insert_l2_block.rs index 77329bb9..60c8dd82 100644 --- a/crates/indexer/src/insert_l2_block.rs +++ b/crates/indexer/src/insert_l2_block.rs @@ -70,6 +70,7 @@ pub struct DbTransaction { gas_used: BigDecimal, contract_address: Option>, exit_code: Decimal, + chain_id: Option, } impl TryFrom for DbTransaction { @@ -98,6 +99,7 @@ impl TryFrom for DbTransaction { gas_used: u128_to_big_decimal(&tx.gas_used)?, contract_address: web3_contract_address, exit_code: tx.exit_code.into(), + chain_id: tx.chain_id.map(|id| id.into()), }; Ok(db_transaction) } @@ -199,7 +201,7 @@ pub async fn insert_web3_txs_and_logs( let mut txs_query_builder: QueryBuilder = QueryBuilder::new( "INSERT INTO transactions - (hash, eth_tx_hash, block_number, block_hash, transaction_index, from_address, to_address, value, nonce, gas_limit, gas_price, input, v, r, s, cumulative_gas_used, gas_used, contract_address, exit_code) " + (hash, eth_tx_hash, block_number, block_hash, transaction_index, from_address, to_address, value, nonce, gas_limit, gas_price, input, v, r, s, cumulative_gas_used, gas_used, contract_address, exit_code, chain_id) " ); txs_query_builder @@ -222,7 +224,8 @@ pub async fn insert_web3_txs_and_logs( .push_bind(tx.cumulative_gas_used) .push_bind(tx.gas_used) .push_bind(tx.contract_address) - .push_bind(tx.exit_code); + .push_bind(tx.exit_code) + .push_bind(tx.chain_id); }) .push(" RETURNING id"); diff --git a/packages/api-server/migrations/20221102080006_add_chain_id_to_transactions.ts b/packages/api-server/migrations/20221102080006_add_chain_id_to_transactions.ts new file mode 100644 index 00000000..b15325d2 --- /dev/null +++ b/packages/api-server/migrations/20221102080006_add_chain_id_to_transactions.ts @@ -0,0 +1,13 @@ +import { Knex } from "knex"; + +export async function up(knex: Knex): Promise { + await knex.schema.alterTable("transactions", (table) => { + table.decimal("chain_id", null, 0).nullable(); + }); +} + +export async function down(knex: Knex): Promise { + await knex.schema.alterTable("transactions", (table) => { + table.dropColumn("chain_id"); + }); +} From 029b307876c18e2fafed702ae25acac9845012f6 Mon Sep 17 00:00:00 2001 From: classicalliu Date: Wed, 2 Nov 2022 17:14:44 +0800 Subject: [PATCH 02/16] fix: Update v in transaction for missing chain_id info --- packages/api-server/src/convert-tx.ts | 2 +- packages/api-server/src/db/helpers.ts | 9 +++++++++ packages/api-server/src/db/types.ts | 5 ++++- packages/api-server/src/filter-web3-tx.ts | 7 ++++++- 4 files changed, 20 insertions(+), 3 deletions(-) diff --git a/packages/api-server/src/convert-tx.ts b/packages/api-server/src/convert-tx.ts index 17627d9f..8b5aceb3 100644 --- a/packages/api-server/src/convert-tx.ts +++ b/packages/api-server/src/convert-tx.ts @@ -209,7 +209,7 @@ export function polyjuiceRawTransactionToApiTransaction( input: tx.data, nonce: tx.nonce === "0x" ? "0x0" : tx.nonce, value: tx.value === "0x" ? "0x0" : tx.value, - v: +tx.v % 2 === 0 ? "0x1" : "0x0", + v: "0x" + BigInt(tx.v).toString(16), r: "0x" + BigInt(tx.r).toString(16), s: "0x" + BigInt(tx.s).toString(16), }; diff --git a/packages/api-server/src/db/helpers.ts b/packages/api-server/src/db/helpers.ts index 31ae7d4d..5973e993 100644 --- a/packages/api-server/src/db/helpers.ts +++ b/packages/api-server/src/db/helpers.ts @@ -85,6 +85,7 @@ export function formatTransaction(tx: DBTransaction): Transaction { s: bufferToHex(tx.s), contract_address: bufferToHexOpt(tx.contract_address), logs_bloom: "0x", + chain_id: toBigIntOpt(tx.chain_id), }; } @@ -313,3 +314,11 @@ export function buildQueryLogId( queryBuilder.where("id", ">", lastPollId.toString()); } } + +// v = v(0/1) * 2 + 35 OR v = v(0/1) + 27 +export function getRealV(v: bigint, chainId?: bigint): bigint { + if (![0n, 1n].includes(v)) { + throw new Error("chain id must be 0 / 1"); + } + return v + (chainId == null || chainId === 0n ? 27n : chainId * 2n + 35n); +} diff --git a/packages/api-server/src/db/types.ts b/packages/api-server/src/db/types.ts index cad924ad..ec21de27 100644 --- a/packages/api-server/src/db/types.ts +++ b/packages/api-server/src/db/types.ts @@ -18,6 +18,7 @@ import { POLY_BLOCK_DIFFICULTY, POLY_MAX_BLOCK_GAS_LIMIT, } from "../methods/constant"; +import { getRealV } from "./helpers"; export interface DBBlock { number: string; @@ -63,6 +64,7 @@ export interface DBTransaction { gas_used?: string; contract_address?: Buffer; exit_code: number; + chain_id?: string; } export interface Transaction { @@ -87,6 +89,7 @@ export interface Transaction { logs_bloom: HexString; contract_address?: HexString; exit_code: number; + chain_id?: bigint; } export interface DBLog { @@ -164,7 +167,7 @@ export function toApiTransaction(t: Transaction): EthTransaction { input: t.input || "0x", // TODO: check default value nonce: new Uint64(t.nonce || 0n).toHex(), // TODO: check default value value: new Uint256(t.value).toHex(), - v: new Uint64(t.v).toHex(), + v: new Uint128(getRealV(t.v, t.chain_id)).toHex(), r: "0x" + BigInt(t.r).toString(16), s: "0x" + BigInt(t.s).toString(16), }; diff --git a/packages/api-server/src/filter-web3-tx.ts b/packages/api-server/src/filter-web3-tx.ts index 9e0c9ddc..c8767587 100644 --- a/packages/api-server/src/filter-web3-tx.ts +++ b/packages/api-server/src/filter-web3-tx.ts @@ -25,6 +25,7 @@ import { gwConfig } from "./base/index"; import { logger } from "./base/logger"; import { EthRegistryAddress } from "./base/address"; import { decodePolyjuiceArgs } from "./parse-tx"; +import { getRealV } from "./db"; export const PENDING_TRANSACTION_INDEX = "0x0"; @@ -85,7 +86,11 @@ export async function filterWeb3Transaction( // Remove s left zeros s = "0x" + BigInt(s).toString(16); // signature[65] byte - const v = Uint32.fromHex("0x" + signature.slice(130, 132)); + const signatureV = Uint32.fromHex("0x" + signature.slice(130, 132)); + const chainId = l2Tx.raw.chain_id; + const v = new Uint128( + getRealV(BigInt(signatureV.getValue()), BigInt(chainId)) + ); const nonce: HexU32 = l2Tx.raw.nonce; From fb96ec742fcdf80e2f9a87062cbb258933916552 Mon Sep 17 00:00:00 2001 From: classicalliu Date: Wed, 9 Nov 2022 18:52:07 +0800 Subject: [PATCH 03/16] feat: Add update command --- crates/indexer/Cargo.toml | 2 +- crates/indexer/src/indexer.rs | 43 +++++++++-- crates/indexer/src/insert_l2_block.rs | 104 ++++++++++++++++++++++++++ crates/indexer/src/main.rs | 22 +++++- crates/indexer/src/runner.rs | 94 ++++++++++++++++++++++- crates/rpc-client/Cargo.toml | 2 +- 6 files changed, 254 insertions(+), 13 deletions(-) diff --git a/crates/indexer/Cargo.toml b/crates/indexer/Cargo.toml index e932e642..5c8fcbce 100644 --- a/crates/indexer/Cargo.toml +++ b/crates/indexer/Cargo.toml @@ -2,7 +2,7 @@ name = "gw-web3-indexer" version = "0.1.0" authors = ["Nervos Network"] -edition = "2018" +edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index dd7e9be8..d4939897 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -5,7 +5,9 @@ use std::{ use crate::{ helper::{hex, parse_log, GwLog, PolyjuiceArgs, GW_LOG_POLYJUICE_SYSTEM}, - insert_l2_block::{insert_web3_block, insert_web3_txs_and_logs}, + insert_l2_block::{ + insert_web3_block, insert_web3_txs_and_logs, update_web3_block, update_web3_txs_and_logs, + }, pool::POOL, types::{ Block as Web3Block, Log as Web3Log, Transaction as Web3Transaction, @@ -66,13 +68,27 @@ impl Web3Indexer { } } + pub async fn update_l2_block(&self, l2_block: L2Block) -> Result<(usize, usize)> { + let number: u64 = l2_block.raw().number().unpack(); + // update block + let (txs_len, logs_len) = self.insert_or_update_l2block(l2_block, true).await?; + log::debug!( + "web3 indexer: update block #{}, {} txs, {} logs", + number, + txs_len, + logs_len + ); + Ok((txs_len, logs_len)) + } + pub async fn store_l2_block(&self, l2_block: L2Block) -> Result<(usize, usize)> { let number: u64 = l2_block.raw().number().unpack(); let local_tip_number = self.tip_number().await?.unwrap_or(0); let mut txs_len = 0; let mut logs_len = 0; if number > local_tip_number || self.query_number(number).await?.is_none() { - (txs_len, logs_len) = self.insert_l2block(l2_block).await?; + // insert l2 block + (txs_len, logs_len) = self.insert_or_update_l2block(l2_block, false).await?; log::debug!( "web3 indexer: sync new block #{}, {} txs, {} logs", number, @@ -442,7 +458,11 @@ impl Web3Indexer { Ok(hashmap) } - async fn insert_l2block(&self, l2_block: L2Block) -> Result<(usize, usize)> { + async fn insert_or_update_l2block( + &self, + l2_block: L2Block, + is_update: bool, + ) -> Result<(usize, usize)> { let block_number = l2_block.raw().number().unpack(); let block_hash: gw_common::H256 = blake2b_256(l2_block.raw().as_slice()).into(); // let mut cumulative_gas_used: u128 = 0; @@ -508,19 +528,26 @@ impl Web3Indexer { tx_index_cursor += txs_vec.len() as u32; - // insert to db - let (txs_part_len, logs_part_len) = - insert_web3_txs_and_logs(txs_vec, &mut pg_tx).await?; + // insert to db or update + let (txs_part_len, logs_part_len) = if is_update { + update_web3_txs_and_logs(txs_vec, &mut pg_tx).await? + } else { + insert_web3_txs_and_logs(txs_vec, &mut pg_tx).await? + }; web3_txs_len += txs_part_len; logs_len += logs_part_len; } - // insert block + // insert or update block let web3_block = self .build_web3_block(&l2_block, total_gas_limit, cumulative_gas_used) .await?; - insert_web3_block(web3_block, &mut pg_tx).await?; + if is_update { + update_web3_block(web3_block, &mut pg_tx).await?; + } else { + insert_web3_block(web3_block, &mut pg_tx).await?; + } // commit pg_tx.commit().await?; diff --git a/crates/indexer/src/insert_l2_block.rs b/crates/indexer/src/insert_l2_block.rs index 60c8dd82..0c8e393a 100644 --- a/crates/indexer/src/insert_l2_block.rs +++ b/crates/indexer/src/insert_l2_block.rs @@ -275,6 +275,110 @@ pub async fn insert_web3_txs_and_logs( Ok((txs_len, logs_len)) } +pub async fn update_web3_block( + web3_block: Block, + pg_tx: &mut sqlx::Transaction<'_, Postgres>, +) -> Result<()> { + let block = DbBlock::try_from(&web3_block)?; + + sqlx::query( + "UPDATE blocks SET number=$1, parent_hash=$3, gas_limit=$4, gas_used=$5,timestamp=$6, miner=$7, size=$8 where hash=$2" + ) + .bind(block.number) + .bind(block.hash) + .bind(block.parent_hash) + .bind(block.gas_limit) + .bind(block.gas_used) + .bind(block.timestamp) + .bind(block.miner) + .bind(block.size) + .execute(pg_tx) + .await?; + + Ok(()) +} + +pub async fn update_web3_txs_and_logs( + web3_tx_with_logs_vec: Vec, + pg_tx: &mut sqlx::Transaction<'_, Postgres>, +) -> Result<(usize, usize)> { + if web3_tx_with_logs_vec.is_empty() { + return Ok((0, 0)); + } + + let (txs, logs) = web3_tx_with_logs_vec + .into_par_iter() + .enumerate() + .map(|(i, web3_tx_with_logs)| { + let db_logs: Result> = web3_tx_with_logs + .logs + .into_par_iter() + .map(|l| DbLog::try_from_log(l, i as i64)) + .collect(); + (DbTransaction::try_from(web3_tx_with_logs.tx), db_logs) + }) + .collect::<(Vec<_>, Vec<_>)>(); + let txs = txs.into_iter().collect::>>()?; + let logs = logs.into_iter().collect::>>()?; + let logs = logs.into_iter().flatten().collect::>(); + + let logs_len = logs.len(); + let txs_len = txs.len(); + + let logs_slice = logs + .into_iter() + .chunks(INSERT_LOGS_BATCH_SIZE) + .into_iter() + .map(|chunk| chunk.collect()) + .collect::>>(); + + for tx in txs { + sqlx::query( + "UPDATE transactions SET hash = $1, eth_tx_hash = $2, from_address = $3, to_address = $4, value = $5, nonce = $6, gas_limit = $7, gas_price = $8, input = $9, v = $10, r = $11, s = $12, cumulative_gas_used = $13, gas_used = $14, contract_address = $15, exit_code = $16, chain_id = $17 where block_hash = $18 and transaction_index = $19" + ) + .bind(tx.hash) + .bind(tx.eth_tx_hash) + .bind(tx.from_address) + .bind(tx.to_address) + .bind(tx.value) + .bind(tx.nonce) + .bind(tx.gas_limit) + .bind(tx.gas_price) + .bind(tx.input) + .bind(tx.v) + .bind(tx.r) + .bind(tx.s) + .bind(tx.cumulative_gas_used) + .bind(tx.gas_used) + .bind(tx.contract_address) + .bind(tx.exit_code) + .bind(tx.chain_id) + .bind(tx.block_hash) + .bind(tx.transaction_index) + .execute(&mut *pg_tx) + .await?; + } + + for db_logs in logs_slice { + for log in db_logs { + sqlx::query( + "UPDATE logs SET transaction_hash = $1, address = $2, data = $3, topics = $4 WHERE block_hash = $5 AND transaction_index = $6 AND log_index = $7 " + ) + .bind(log.transaction_hash) + .bind(log.address) + .bind(log.data) + .bind(log.topics) + .bind(log.block_hash) + .bind(log.transaction_index) + .bind(log.log_index) + .execute(&mut *pg_tx) + .await?; + } + } + + Ok((txs_len, logs_len)) +} + fn u128_to_big_decimal(value: &u128) -> Result { let result = BigDecimal::from_str(&value.to_string())?; Ok(result) diff --git a/crates/indexer/src/main.rs b/crates/indexer/src/main.rs index 08a980b7..aa6a40c2 100644 --- a/crates/indexer/src/main.rs +++ b/crates/indexer/src/main.rs @@ -21,7 +21,27 @@ fn main() -> Result<()> { }; let mut runner = Runner::new(indexer_config)?; - smol::block_on(runner.run())?; + + let command_name = std::env::args().nth(1); + + // `cargo run` -> run sync mode + // `cargo run update ` -> run update mode + if let Some(name) = command_name { + if name == "update" { + let start_block_number = std::env::args() + .nth(2) + .map(|num| num.parse::().unwrap()); + let end_block_number = std::env::args() + .nth(3) + .map(|num| num.parse::().unwrap()); + smol::block_on(runner.update(start_block_number, end_block_number))?; + } else { + smol::block_on(runner.run())?; + } + } else { + smol::block_on(runner.run())?; + } + Ok(()) } diff --git a/crates/indexer/src/runner.rs b/crates/indexer/src/runner.rs index 2f08ef98..67c2e9dd 100644 --- a/crates/indexer/src/runner.rs +++ b/crates/indexer/src/runner.rs @@ -4,8 +4,8 @@ use gw_web3_rpc_client::{ }; use rust_decimal::{prelude::ToPrimitive, Decimal}; -use crate::{config::IndexerConfig, pool::POOL, Web3Indexer}; -use anyhow::Result; +use crate::{config::IndexerConfig, helper::hex, pool::POOL, Web3Indexer}; +use anyhow::{anyhow, Result}; pub struct Runner { indexer: Web3Indexer, @@ -113,6 +113,96 @@ impl Runner { Ok(()) } + pub async fn update( + &mut self, + start_block_number: Option, + end_block_number: Option, + ) -> Result { + let start_block_number = start_block_number.unwrap_or(0); + let local_tip = self.tip().await?; + + // end_block_number must be <= local tip + if let Some(end_num) = end_block_number { + if let Some(tip_num) = local_tip { + if end_num > tip_num { + return Err(anyhow!( + "end_block_number {} can't larger than tip number: {}", + end_num, + tip_num + )); + } + } + } + + let end_block_number = end_block_number.unwrap_or_else(|| local_tip.unwrap_or(0)); + + log::info!( + "Update from block {} to block {}", + start_block_number, + end_block_number + ); + + let mut current_block_number = start_block_number; + loop { + if current_block_number >= end_block_number { + log::info!("All blocks have been updated!"); + break; + } + + let start = std::time::Instant::now(); + + let current_block = self + .godwoken_rpc_client + .get_block_by_number(current_block_number)? + .ok_or_else(|| anyhow!("block {} not exist!", current_block_number))?; + + let l2_block = to_l2_block(current_block); + let l2_block_parent_hash = l2_block.raw().parent_block_hash(); + + if current_block_number > 0 { + let prev_block_number = current_block_number - 1; + let db_prev_block_hash = self.get_db_block_hash(prev_block_number).await?; + if let Some(prev_block_hash) = db_prev_block_hash { + // if match, insert a new block + // if not match, sleep and try again + if l2_block_parent_hash.as_slice() == prev_block_hash.as_bytes() { + let (txs_len, logs_len) = self.indexer.update_l2_block(l2_block).await?; + + let duration = start.elapsed(); + log::info!( + "Update block {}, {} txs, {} logs, duration: {:?}", + current_block_number, + txs_len, + logs_len, + duration, + ); + + current_block_number += 1; + } else { + // Sleep and try again, wait for indexer to deal with revert + log::info!("block {}'s parent_block_hash: {} not match prev block's hash {}, sleep and try again", current_block_number, hex(l2_block_parent_hash.as_slice())?, hex(prev_block_hash.as_bytes())?); + let sleep_time = std::time::Duration::from_secs(3); + smol::Timer::after(sleep_time).await; + } + } + } else { + let (txs_len, logs_len) = self.indexer.update_l2_block(l2_block).await?; + + let duration = start.elapsed(); + log::info!( + "Update block {}, {} txs, {} logs, duration: {:?}", + current_block_number, + txs_len, + logs_len, + duration, + ); + + current_block_number += 1; + } + } + Ok(true) + } + pub async fn insert(&mut self) -> Result { let start = std::time::Instant::now(); diff --git a/crates/rpc-client/Cargo.toml b/crates/rpc-client/Cargo.toml index c072ca31..6bb2f8da 100644 --- a/crates/rpc-client/Cargo.toml +++ b/crates/rpc-client/Cargo.toml @@ -2,7 +2,7 @@ name = "gw-web3-rpc-client" version = "0.1.0" authors = ["Nervos Network"] -edition = "2018" +edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From 89f7f8e17b5ea165b0ac54819e5757b231da4f86 Mon Sep 17 00:00:00 2001 From: classicalliu Date: Thu, 10 Nov 2022 19:01:03 +0800 Subject: [PATCH 04/16] perf(indexer): Improve update logs performance --- crates/indexer/src/insert_l2_block.rs | 122 ++++++++++++++++---------- crates/indexer/src/pool.rs | 12 +++ crates/indexer/src/runner.rs | 10 ++- 3 files changed, 95 insertions(+), 49 deletions(-) diff --git a/crates/indexer/src/insert_l2_block.rs b/crates/indexer/src/insert_l2_block.rs index 0c8e393a..7fd737ca 100644 --- a/crates/indexer/src/insert_l2_block.rs +++ b/crates/indexer/src/insert_l2_block.rs @@ -1,6 +1,6 @@ use std::{convert::TryFrom, str::FromStr}; -use anyhow::Result; +use anyhow::{anyhow, Result}; use gw_types::U256; use rust_decimal::Decimal; use sqlx::{ @@ -13,7 +13,10 @@ use sqlx::{ }; use sqlx::{Postgres, QueryBuilder}; -use crate::types::{Block, Log, Transaction, TransactionWithLogs}; +use crate::{ + pool::POOL_FOR_UPDATE, + types::{Block, Log, Transaction, TransactionWithLogs}, +}; use itertools::Itertools; use rayon::prelude::*; @@ -282,9 +285,8 @@ pub async fn update_web3_block( let block = DbBlock::try_from(&web3_block)?; sqlx::query( - "UPDATE blocks SET number=$1, parent_hash=$3, gas_limit=$4, gas_used=$5,timestamp=$6, miner=$7, size=$8 where hash=$2" + "UPDATE blocks SET hash = $1, parent_hash = $2, gas_limit = $3, gas_used = $4, timestamp = $5, miner = $6, size = $7 where number = $8" ) - .bind(block.number) .bind(block.hash) .bind(block.parent_hash) .bind(block.gas_limit) @@ -292,6 +294,7 @@ pub async fn update_web3_block( .bind(block.timestamp) .bind(block.miner) .bind(block.size) + .bind(block.number) .execute(pg_tx) .await?; @@ -300,7 +303,7 @@ pub async fn update_web3_block( pub async fn update_web3_txs_and_logs( web3_tx_with_logs_vec: Vec, - pg_tx: &mut sqlx::Transaction<'_, Postgres>, + _pg_tx: &mut sqlx::Transaction<'_, Postgres>, ) -> Result<(usize, usize)> { if web3_tx_with_logs_vec.is_empty() { return Ok((0, 0)); @@ -325,55 +328,80 @@ pub async fn update_web3_txs_and_logs( let logs_len = logs.len(); let txs_len = txs.len(); + let size = logs_len / 4; + let final_size = if size > INSERT_LOGS_BATCH_SIZE || size == 0 { + INSERT_LOGS_BATCH_SIZE + } else { + size + }; + let logs_slice = logs .into_iter() - .chunks(INSERT_LOGS_BATCH_SIZE) + .chunks(final_size) .into_iter() .map(|chunk| chunk.collect()) .collect::>>(); - for tx in txs { - sqlx::query( - "UPDATE transactions SET hash = $1, eth_tx_hash = $2, from_address = $3, to_address = $4, value = $5, nonce = $6, gas_limit = $7, gas_price = $8, input = $9, v = $10, r = $11, s = $12, cumulative_gas_used = $13, gas_used = $14, contract_address = $15, exit_code = $16, chain_id = $17 where block_hash = $18 and transaction_index = $19" - ) - .bind(tx.hash) - .bind(tx.eth_tx_hash) - .bind(tx.from_address) - .bind(tx.to_address) - .bind(tx.value) - .bind(tx.nonce) - .bind(tx.gas_limit) - .bind(tx.gas_price) - .bind(tx.input) - .bind(tx.v) - .bind(tx.r) - .bind(tx.s) - .bind(tx.cumulative_gas_used) - .bind(tx.gas_used) - .bind(tx.contract_address) - .bind(tx.exit_code) - .bind(tx.chain_id) - .bind(tx.block_hash) - .bind(tx.transaction_index) - .execute(&mut *pg_tx) - .await?; - } + futures::future::join_all( + txs.into_iter().map(|tx| { + sqlx::query( + "UPDATE transactions SET hash = $1, eth_tx_hash = $2, from_address = $3, to_address = $4, value = $5, nonce = $6, gas_limit = $7, gas_price = $8, input = $9, v = $10, r = $11, s = $12, cumulative_gas_used = $13, gas_used = $14, contract_address = $15, exit_code = $16, chain_id = $17 where block_number = $18 and transaction_index = $19" + ) + .bind(tx.hash) + .bind(tx.eth_tx_hash) + .bind(tx.from_address) + .bind(tx.to_address) + .bind(tx.value) + .bind(tx.nonce) + .bind(tx.gas_limit) + .bind(tx.gas_price) + .bind(tx.input) + .bind(tx.v) + .bind(tx.r) + .bind(tx.s) + .bind(tx.cumulative_gas_used) + .bind(tx.gas_used) + .bind(tx.contract_address) + .bind(tx.exit_code) + .bind(tx.chain_id) + .bind(tx.block_number) + .bind(tx.transaction_index) + .execute(&*POOL_FOR_UPDATE) + }) + ) + .await + .into_iter() + .collect::, sqlx::Error>>()?; - for db_logs in logs_slice { - for log in db_logs { - sqlx::query( - "UPDATE logs SET transaction_hash = $1, address = $2, data = $3, topics = $4 WHERE block_hash = $5 AND transaction_index = $6 AND log_index = $7 " - ) - .bind(log.transaction_hash) - .bind(log.address) - .bind(log.data) - .bind(log.topics) - .bind(log.block_hash) - .bind(log.transaction_index) - .bind(log.log_index) - .execute(&mut *pg_tx) - .await?; - } + if logs_len != 0 { + let mut logs_querys = logs_slice + .into_par_iter() + .map(|db_logs| { + let mut logs_query_builder: QueryBuilder = QueryBuilder::new( + "UPDATE logs SET transaction_hash = data_table.transaction_hash, address = data_table.address, data = data_table.data, topics = data_table.topics FROM ( " + ); + + logs_query_builder.push_values(db_logs, |mut b, log| { + b.push_bind(log.transaction_hash) + .push_bind(log.address) + .push_bind(log.data) + .push_bind(log.topics) + .push_bind(log.block_number) + .push_bind(log.log_index); + }) + .push(" ) AS data_table(transaction_hash, address, data, topics, block_number, log_index) WHERE logs.block_number = data_table.block_number AND logs.log_index = data_table.log_index"); + logs_query_builder + }).collect::>(); + + logs_querys + .par_iter_mut() + .map(|query_builder| { + let query = query_builder.build(); + smol::block_on(query.execute(&*POOL_FOR_UPDATE)).map_err(|err| anyhow!(err)) + }) + .collect::>() + .into_iter() + .collect::>>()?; } Ok((txs_len, logs_len)) diff --git a/crates/indexer/src/pool.rs b/crates/indexer/src/pool.rs index 526b2f14..3105c447 100644 --- a/crates/indexer/src/pool.rs +++ b/crates/indexer/src/pool.rs @@ -18,4 +18,16 @@ lazy_static::lazy_static! { .max_connections(5) .connect_lazy_with(opts) }; + + // Adapt slow query duration to 30s, and enlarge max connections + pub static ref POOL_FOR_UPDATE: PgPool = { + let indexer_config = load_indexer_config("./indexer-config.toml").unwrap(); + + let mut opts: PgConnectOptions = indexer_config.pg_url.parse().expect("pg url parse error"); + opts.log_statements(log::LevelFilter::Debug) + .log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(30)); + PgPoolOptions::new() + .max_connections(20) + .connect_lazy_with(opts) + }; } diff --git a/crates/indexer/src/runner.rs b/crates/indexer/src/runner.rs index 67c2e9dd..0df355c0 100644 --- a/crates/indexer/src/runner.rs +++ b/crates/indexer/src/runner.rs @@ -142,10 +142,16 @@ impl Runner { end_block_number ); + let loop_start = std::time::Instant::now(); + let mut current_block_number = start_block_number; loop { - if current_block_number >= end_block_number { - log::info!("All blocks have been updated!"); + if current_block_number > end_block_number { + let loop_duration = loop_start.elapsed(); + log::info!( + "All blocks have been updated! Total duration: {:?}", + loop_duration + ); break; } From e5bb9fde4a2b804754bee70212f2283a6caff4df Mon Sep 17 00:00:00 2001 From: classicalliu Date: Thu, 10 Nov 2022 19:17:59 +0800 Subject: [PATCH 05/16] chore: Handle update network error --- crates/indexer/src/main.rs | 2 +- crates/indexer/src/runner.rs | 203 ++++++++++++++++++++--------------- 2 files changed, 115 insertions(+), 90 deletions(-) diff --git a/crates/indexer/src/main.rs b/crates/indexer/src/main.rs index aa6a40c2..22b7e6a7 100644 --- a/crates/indexer/src/main.rs +++ b/crates/indexer/src/main.rs @@ -34,7 +34,7 @@ fn main() -> Result<()> { let end_block_number = std::env::args() .nth(3) .map(|num| num.parse::().unwrap()); - smol::block_on(runner.update(start_block_number, end_block_number))?; + smol::block_on(runner.run_update(start_block_number, end_block_number))?; } else { smol::block_on(runner.run())?; } diff --git a/crates/indexer/src/runner.rs b/crates/indexer/src/runner.rs index 0df355c0..eeb8472f 100644 --- a/crates/indexer/src/runner.rs +++ b/crates/indexer/src/runner.rs @@ -113,100 +113,58 @@ impl Runner { Ok(()) } - pub async fn update( - &mut self, - start_block_number: Option, - end_block_number: Option, - ) -> Result { - let start_block_number = start_block_number.unwrap_or(0); - let local_tip = self.tip().await?; + async fn update(&mut self, current_block_number: u64) -> Result { + let start = std::time::Instant::now(); - // end_block_number must be <= local tip - if let Some(end_num) = end_block_number { - if let Some(tip_num) = local_tip { - if end_num > tip_num { - return Err(anyhow!( - "end_block_number {} can't larger than tip number: {}", - end_num, - tip_num - )); + let current_block = self + .godwoken_rpc_client + .get_block_by_number(current_block_number)? + .ok_or_else(|| anyhow!("block {} not exist!", current_block_number))?; + + let l2_block = to_l2_block(current_block); + let l2_block_parent_hash = l2_block.raw().parent_block_hash(); + + if current_block_number > 0 { + let prev_block_number = current_block_number - 1; + let db_prev_block_hash = self.get_db_block_hash(prev_block_number).await?; + if let Some(prev_block_hash) = db_prev_block_hash { + // if match, insert a new block + // if not match, sleep and try again + if l2_block_parent_hash.as_slice() == prev_block_hash.as_bytes() { + let (txs_len, logs_len) = self.indexer.update_l2_block(l2_block).await?; + + let duration = start.elapsed(); + log::info!( + "Update block {}, {} txs, {} logs, duration: {:?}", + current_block_number, + txs_len, + logs_len, + duration, + ); + + return Ok(true); + } else { + // Sleep and try again, wait for indexer to deal with revert + log::info!("block {}'s parent_block_hash: {} not match prev block's hash {}, sleep and try again", current_block_number, hex(l2_block_parent_hash.as_slice())?, hex(prev_block_hash.as_bytes())?); + let sleep_time = std::time::Duration::from_secs(3); + smol::Timer::after(sleep_time).await; } } - } - - let end_block_number = end_block_number.unwrap_or_else(|| local_tip.unwrap_or(0)); - - log::info!( - "Update from block {} to block {}", - start_block_number, - end_block_number - ); - - let loop_start = std::time::Instant::now(); - - let mut current_block_number = start_block_number; - loop { - if current_block_number > end_block_number { - let loop_duration = loop_start.elapsed(); - log::info!( - "All blocks have been updated! Total duration: {:?}", - loop_duration - ); - break; - } - - let start = std::time::Instant::now(); - - let current_block = self - .godwoken_rpc_client - .get_block_by_number(current_block_number)? - .ok_or_else(|| anyhow!("block {} not exist!", current_block_number))?; - - let l2_block = to_l2_block(current_block); - let l2_block_parent_hash = l2_block.raw().parent_block_hash(); - - if current_block_number > 0 { - let prev_block_number = current_block_number - 1; - let db_prev_block_hash = self.get_db_block_hash(prev_block_number).await?; - if let Some(prev_block_hash) = db_prev_block_hash { - // if match, insert a new block - // if not match, sleep and try again - if l2_block_parent_hash.as_slice() == prev_block_hash.as_bytes() { - let (txs_len, logs_len) = self.indexer.update_l2_block(l2_block).await?; - - let duration = start.elapsed(); - log::info!( - "Update block {}, {} txs, {} logs, duration: {:?}", - current_block_number, - txs_len, - logs_len, - duration, - ); - - current_block_number += 1; - } else { - // Sleep and try again, wait for indexer to deal with revert - log::info!("block {}'s parent_block_hash: {} not match prev block's hash {}, sleep and try again", current_block_number, hex(l2_block_parent_hash.as_slice())?, hex(prev_block_hash.as_bytes())?); - let sleep_time = std::time::Duration::from_secs(3); - smol::Timer::after(sleep_time).await; - } - } - } else { - let (txs_len, logs_len) = self.indexer.update_l2_block(l2_block).await?; - - let duration = start.elapsed(); - log::info!( - "Update block {}, {} txs, {} logs, duration: {:?}", - current_block_number, - txs_len, - logs_len, - duration, - ); + } else { + let (txs_len, logs_len) = self.indexer.update_l2_block(l2_block).await?; + + let duration = start.elapsed(); + log::info!( + "Update block {}, {} txs, {} logs, duration: {:?}", + current_block_number, + txs_len, + logs_len, + duration, + ); - current_block_number += 1; - } + return Ok(true); } - Ok(true) + Ok(false) } pub async fn insert(&mut self) -> Result { @@ -293,4 +251,71 @@ impl Runner { }; } } + + pub async fn run_update( + &mut self, + start_block_number: Option, + end_block_number: Option, + ) -> Result<()> { + let start_block_number = start_block_number.unwrap_or(0); + let local_tip = self.tip().await?; + + // end_block_number must be <= local tip + if let Some(end_num) = end_block_number { + if let Some(tip_num) = local_tip { + if end_num > tip_num { + return Err(anyhow!( + "end_block_number {} can't larger than tip number: {}", + end_num, + tip_num + )); + } + } + } + + let end_block_number = end_block_number.unwrap_or_else(|| local_tip.unwrap_or(0)); + + log::info!( + "Update from block {} to block {}", + start_block_number, + end_block_number + ); + + let loop_start = std::time::Instant::now(); + + let mut current_block_number = start_block_number; + + loop { + if current_block_number > end_block_number { + let loop_duration = loop_start.elapsed(); + log::info!( + "All blocks have been updated! Total duration: {:?}", + loop_duration + ); + break; + } + + match self.update(current_block_number).await { + Ok(result) => { + if result { + current_block_number += 1; + } else { + continue; + } + } + Err(err) => { + let err_ref = err.downcast_ref::(); + if let Some(RpcClientError::ConnectionError(_, _)) = err_ref { + log::error!("{}", err); + // wait for 1s + let sleep_time = std::time::Duration::from_secs(1); + smol::Timer::after(sleep_time).await; + continue; + }; + return Err(err); + } + }; + } + Ok(()) + } } From 4c345fb26f854fd36d4bbb08c24df46dc31147fc Mon Sep 17 00:00:00 2001 From: classicalliu Date: Thu, 10 Nov 2022 23:39:48 +0800 Subject: [PATCH 06/16] docs: Update README for update blocks info --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index 68e422ff..66b3cc82 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,14 @@ pg_url= ./target/release/gw-web3-indexer ``` +### Update blocks + +Update blocks / transactions / logs info in database by update command, include start block and end block. + +```bash +./target/release/gw-web3-indexer update +``` + ### Start API server ```bash From 1e532277f1a87153ec9681c106bfed7654e19f3f Mon Sep 17 00:00:00 2001 From: classicalliu Date: Fri, 11 Nov 2022 16:36:03 +0800 Subject: [PATCH 07/16] chore: Update CI ref --- .github/workflows/godwoken-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/godwoken-tests.yml b/.github/workflows/godwoken-tests.yml index 8ae5ee54..1ed3cfea 100644 --- a/.github/workflows/godwoken-tests.yml +++ b/.github/workflows/godwoken-tests.yml @@ -10,7 +10,7 @@ on: jobs: godwoken-tests: - uses: nervosnetwork/godwoken-tests/.github/workflows/reusable-integration-test-v1.yml@develop + uses: godwokenrises/godwoken-tests/.github/workflows/reusable-integration-test-v1.yml@develop with: extra_github_env: | MANUAL_BUILD_WEB3=true From d0485404214e3024c44d2190fd8600d26dfdf8a6 Mon Sep 17 00:00:00 2001 From: classicalliu Date: Fri, 11 Nov 2022 16:39:07 +0800 Subject: [PATCH 08/16] chore: Update CI ref --- .github/workflows/unit-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 78ddcb1e..872aebbc 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -23,7 +23,7 @@ jobs: # Godwoken-Kicker - uses: actions/checkout@v3 with: - repository: RetricSu/godwoken-kicker + repository: godwokenrises/godwoken-kicker ref: 'master' - name: Kicker init run: ./kicker init From 2828ae08c47282aa2bd8b406b3085727d176ccb5 Mon Sep 17 00:00:00 2001 From: classicalliu Date: Fri, 11 Nov 2022 17:40:06 +0800 Subject: [PATCH 09/16] chore: Update CI ref --- .github/workflows/unit-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 872aebbc..b250e430 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -24,7 +24,7 @@ jobs: - uses: actions/checkout@v3 with: repository: godwokenrises/godwoken-kicker - ref: 'master' + ref: 'develop' - name: Kicker init run: ./kicker init - name: Kicker start From db452549a812456fc903583ed6e0e53b941a50c5 Mon Sep 17 00:00:00 2001 From: CL Date: Tue, 15 Nov 2022 14:30:50 +0800 Subject: [PATCH 10/16] Apply suggestions from code review Co-authored-by: RetricSu --- packages/api-server/src/db/helpers.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/api-server/src/db/helpers.ts b/packages/api-server/src/db/helpers.ts index 5973e993..8a7f4105 100644 --- a/packages/api-server/src/db/helpers.ts +++ b/packages/api-server/src/db/helpers.ts @@ -315,10 +315,10 @@ export function buildQueryLogId( } } -// v = v(0/1) * 2 + 35 OR v = v(0/1) + 27 +// v = v(0/1) + chainId * 2 + 35 OR v = v(0/1) + 27 export function getRealV(v: bigint, chainId?: bigint): bigint { if (![0n, 1n].includes(v)) { - throw new Error("chain id must be 0 / 1"); + throw new Error("V value must be 0 / 1"); } return v + (chainId == null || chainId === 0n ? 27n : chainId * 2n + 35n); } From 135b9818ecc74e8fbc610fd121a2d433ba5b7e47 Mon Sep 17 00:00:00 2001 From: classicalliu Date: Tue, 15 Nov 2022 14:36:54 +0800 Subject: [PATCH 11/16] chore: Add comment --- packages/api-server/src/db/helpers.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/api-server/src/db/helpers.ts b/packages/api-server/src/db/helpers.ts index 8a7f4105..80b8792a 100644 --- a/packages/api-server/src/db/helpers.ts +++ b/packages/api-server/src/db/helpers.ts @@ -315,6 +315,7 @@ export function buildQueryLogId( } } +// chainId = 0 means non-EIP155 tx // v = v(0/1) + chainId * 2 + 35 OR v = v(0/1) + 27 export function getRealV(v: bigint, chainId?: bigint): bigint { if (![0n, 1n].includes(v)) { From ba40179e894da78ca07b44014fcc1a74af612920 Mon Sep 17 00:00:00 2001 From: RetricSu Date: Thu, 3 Nov 2022 09:56:42 +0800 Subject: [PATCH 12/16] refactor: instant-finality feature via special RPC url --- README.md | 23 +- docs/addtional-feature.md | 22 ++ packages/api-server/src/methods/index.ts | 5 + .../api-server/src/methods/modules/eth.ts | 211 ++++++++++-------- packages/api-server/src/middlewares/jayson.ts | 11 +- packages/api-server/src/util.ts | 9 + packages/api-server/src/ws/methods.ts | 95 ++++---- 7 files changed, 235 insertions(+), 141 deletions(-) create mode 100644 docs/addtional-feature.md diff --git a/README.md b/README.md index 66b3cc82..c1f1db46 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ A Web3 RPC compatible layer build upon Godwoken/Polyjuice. +Checkout [additional feature](docs/addtional-feature.md). + ## Development ### Config database @@ -121,9 +123,26 @@ docker exec -it /bin/bash $ root@ec562fe2172b:/godwoken-web3# pm2 monit ``` -Http url: http://your-url/ +#### URLs + +```sh +# Http +http://example_web3_rpc_url + +# WebSocket +ws://example_web3_rpc_url/ws +``` -WebSocket url: ws://your-url/ws +With instant-finality feature turn on: + +```sh +# Http +http://example_web3_rpc_url?instant-finality-hack=true +http://example_web3_rpc_url/instant-finality-hack + +# WebSocket +ws://example_web3_rpc_url/ws?instant-finality-hack=true +``` ### Docker Prebuilds diff --git a/docs/addtional-feature.md b/docs/addtional-feature.md new file mode 100644 index 00000000..e274c553 --- /dev/null +++ b/docs/addtional-feature.md @@ -0,0 +1,22 @@ +# Additional Feature + +## Instant Finality + +Ethereum require a transaction to be on-chain(meaning the transaction is included in the latest block) before returning a final status(aka. transaction receipt) to users so they can know whether the transaction is success or not. + +Godwoken provide a quicker way to confirm transaction. Once the transaction is validated in mempool, users can get instant transaction receipt. This feature is called Instant Finality. + +If your want to build a low latency user experience for on-chain interaction in your dapp, you can turn on such feature by using the RPC with additional path or query parameter: + +```sh +# http +https://example_web3_rpc_url?instant-finality-hack=true +https://example_web3_rpc_url/instant-finality-hack + +# websocket +ws://example_web3_rpc_url/ws?instant-finality-hack=true +``` + +Environment like [Hardhat](https://github.com/NomicFoundation/hardhat) will swallow the http url's query parameter, so you might want to use the `/instant-finality-hack` path to overcome that. + +Also notice that under such mode, there might have some [compatibility issue](https://github.com/godwokenrises/godwoken-web3/issues/283) with Ethereum toolchain like `ether.js`. If you care more about the compatibility, please use the bare RPC url `https://example_web3_rpc_url`, which is consider to be most compatible with Ethereum. diff --git a/packages/api-server/src/methods/index.ts b/packages/api-server/src/methods/index.ts index 5424907e..484a458e 100644 --- a/packages/api-server/src/methods/index.ts +++ b/packages/api-server/src/methods/index.ts @@ -75,4 +75,9 @@ function getMethods(argsList: ModConstructorArgs = {}) { return methods; } +const instantFinalityHackMode = true; + export const methods = getMethods(); +export const instantFinalityHackMethods = getMethods({ + eth: [instantFinalityHackMode], +}); diff --git a/packages/api-server/src/methods/modules/eth.ts b/packages/api-server/src/methods/modules/eth.ts index 6e4cdd1e..903a8c77 100644 --- a/packages/api-server/src/methods/modules/eth.ts +++ b/packages/api-server/src/methods/modules/eth.ts @@ -94,16 +94,18 @@ type GodwokenBlockParameter = U64 | undefined; export class Eth { private query: Query; private rpc: GodwokenClient; + private instantFinalityHackMode: boolean; private filterManager: FilterManager; private cacheStore: Store; private ethNormalizer: EthNormalizer; - constructor() { + constructor(instantFinalityHackMode: boolean = false) { this.query = new Query(); this.rpc = new GodwokenClient( envConfig.godwokenJsonRpc, envConfig.godwokenReadonlyJsonRpc ); + this.instantFinalityHackMode = instantFinalityHackMode; this.filterManager = new FilterManager(true); this.cacheStore = new Store(true, CACHE_EXPIRED_TIME_MILSECS); this.ethNormalizer = new EthNormalizer(this.rpc); @@ -718,86 +720,94 @@ export class Eth { async getTransactionByHash(args: [string]): Promise { const ethTxHash: Hash = args[0]; - const cacheKey = autoCreateAccountCacheKey(ethTxHash); + const acaCacheKey = autoCreateAccountCacheKey(ethTxHash); // 1. Find in db const tx = await this.query.getTransactionByEthTxHash(ethTxHash); if (tx != null) { // no need await // delete auto create account tx if already in db - this.cacheStore.delete(cacheKey); + this.cacheStore.delete(acaCacheKey); const apiTx = toApiTransaction(tx); return apiTx; } - // 2. If null, find pending transactions - const ethTxHashKey = ethTxHashCacheKey(ethTxHash); - const gwTxHash: Hash | null = await this.cacheStore.get(ethTxHashKey); - if (gwTxHash != null) { - const godwokenTxWithStatus = await this.rpc.getTransaction(gwTxHash); - if (godwokenTxWithStatus == null) { - return null; - } - const godwokenTxReceipt = await this.rpc.getTransactionReceipt(gwTxHash); - const tipBlock = await this.query.getTipBlock(); - if (tipBlock == null) { - throw new Error("tip block not found!"); - } - let ethTxInfo = undefined; - try { - ethTxInfo = await filterWeb3Transaction( - ethTxHash, - this.rpc, - tipBlock.number, - tipBlock.hash, - godwokenTxWithStatus.transaction, - godwokenTxReceipt - ); - } catch (err) { - logger.error("filterWeb3Transaction:", err); - logger.info("godwoken tx:", godwokenTxWithStatus); - logger.info("godwoken receipt:", godwokenTxReceipt); - throw err; - } - if (ethTxInfo != null) { - const ethTx = ethTxInfo[0]; - return ethTx; - } - } - - // 3. Find by auto create account tx - // TODO: delete cache store if dropped by godwoken - // convert to tx hash mapping store if account id generated ? - const polyjuiceRawTx = await this.cacheStore.get(cacheKey); - if (polyjuiceRawTx != null) { - const tipBlock = await this.query.getTipBlock(); - if (tipBlock == null) { - throw new Error("tip block not found!"); - } - // Convert polyjuice tx to api transaction - const { tx, fromAddress }: AutoCreateAccountCacheValue = - JSON.parse(polyjuiceRawTx); - const isAcaTxExist: boolean = await this.isAcaTxExist( - ethTxHash, - tx, - fromAddress - ); - logger.info( - `aca tx: action: getTransactionByHash, eth_tx_hash: ${ethTxHash}, is_tx_exist: ${isAcaTxExist}` + // 2. If under instant-finality hack mode, find tx from gw mempool block + if (this.instantFinalityHackMode) { + logger.debug( + `[eth_getTransactionByHash] find with instant-finality hack` ); - if (isAcaTxExist) { - const apiTransaction: EthTransaction = - polyjuiceRawTransactionToApiTransaction( - tx, + // A. find pending transactions + const ethTxHashKey = ethTxHashCacheKey(ethTxHash); + const gwTxHash: Hash | null = await this.cacheStore.get(ethTxHashKey); + if (gwTxHash != null) { + const godwokenTxWithStatus = await this.rpc.getTransaction(gwTxHash); + if (godwokenTxWithStatus == null) { + return null; + } + const godwokenTxReceipt = await this.rpc.getTransactionReceipt( + gwTxHash + ); + const tipBlock = await this.query.getTipBlock(); + if (tipBlock == null) { + throw new Error("tip block not found!"); + } + let ethTxInfo = undefined; + try { + ethTxInfo = await filterWeb3Transaction( ethTxHash, - tipBlock.hash, + this.rpc, tipBlock.number, - fromAddress + tipBlock.hash, + godwokenTxWithStatus.transaction, + godwokenTxReceipt ); - return apiTransaction; - } else { - // If not found, means dropped by godwoken, should delete cache - this.cacheStore.delete(cacheKey); + } catch (err) { + logger.error("filterWeb3Transaction:", err); + logger.info("godwoken tx:", godwokenTxWithStatus); + logger.info("godwoken receipt:", godwokenTxReceipt); + throw err; + } + if (ethTxInfo != null) { + const ethTx = ethTxInfo[0]; + return ethTx; + } + } + + // B. Find by auto create account tx + // TODO: delete cache store if dropped by godwoken + // convert to tx hash mapping store if account id generated ? + const polyjuiceRawTx = await this.cacheStore.get(acaCacheKey); + if (polyjuiceRawTx != null) { + const tipBlock = await this.query.getTipBlock(); + if (tipBlock == null) { + throw new Error("tip block not found!"); + } + // Convert polyjuice tx to api transaction + const { tx, fromAddress }: AutoCreateAccountCacheValue = + JSON.parse(polyjuiceRawTx); + const isAcaTxExist: boolean = await this.isAcaTxExist( + ethTxHash, + tx, + fromAddress + ); + logger.info( + `aca tx: action: getTransactionByHash, eth_tx_hash: ${ethTxHash}, is_tx_exist: ${isAcaTxExist}` + ); + if (isAcaTxExist) { + const apiTransaction: EthTransaction = + polyjuiceRawTransactionToApiTransaction( + tx, + ethTxHash, + tipBlock.hash, + tipBlock.number, + fromAddress + ); + return apiTransaction; + } else { + // If not found, means dropped by godwoken, should delete cache + this.cacheStore.delete(acaCacheKey); + } } } @@ -861,6 +871,7 @@ export class Eth { return null; } + // 1. Find in db const data = await this.query.getTransactionAndLogsByHash(gwTxHash); if (data != null) { const [tx, logs] = data; @@ -869,37 +880,43 @@ export class Eth { return transactionReceipt; } - const godwokenTxWithStatus = await this.rpc.getTransaction(gwTxHash); - if (godwokenTxWithStatus == null) { - return null; - } - const godwokenTxReceipt = await this.rpc.getTransactionReceipt(gwTxHash); - if (godwokenTxReceipt == null) { - return null; - } - const tipBlock = await this.query.getTipBlock(); - if (tipBlock == null) { - throw new Error(`tip block not found`); - } - let ethTxInfo = undefined; - try { - ethTxInfo = await filterWeb3Transaction( - ethTxHash, - this.rpc, - tipBlock.number, - tipBlock.hash, - godwokenTxWithStatus.transaction, - godwokenTxReceipt + // 2. If under instant-finality hack mode, build receipt from gw mempool block + if (this.instantFinalityHackMode) { + logger.debug( + `[eth_getTransactionReceipt] find with instant-finality hack` ); - } catch (err) { - logger.error("filterWeb3Transaction:", err); - logger.info("godwoken tx:", godwokenTxWithStatus); - logger.info("godwoken receipt:", godwokenTxReceipt); - throw err; - } - if (ethTxInfo != null) { - const ethTxReceipt = ethTxInfo[1]!; - return ethTxReceipt; + const godwokenTxWithStatus = await this.rpc.getTransaction(gwTxHash); + if (godwokenTxWithStatus == null) { + return null; + } + const godwokenTxReceipt = await this.rpc.getTransactionReceipt(gwTxHash); + if (godwokenTxReceipt == null) { + return null; + } + const tipBlock = await this.query.getTipBlock(); + if (tipBlock == null) { + throw new Error(`tip block not found`); + } + let ethTxInfo = undefined; + try { + ethTxInfo = await filterWeb3Transaction( + ethTxHash, + this.rpc, + tipBlock.number, + tipBlock.hash, + godwokenTxWithStatus.transaction, + godwokenTxReceipt + ); + } catch (err) { + logger.error("filterWeb3Transaction:", err); + logger.info("godwoken tx:", godwokenTxWithStatus); + logger.info("godwoken receipt:", godwokenTxReceipt); + throw err; + } + if (ethTxInfo != null) { + const ethTxReceipt = ethTxInfo[1]!; + return ethTxReceipt; + } } return null; diff --git a/packages/api-server/src/middlewares/jayson.ts b/packages/api-server/src/middlewares/jayson.ts index ea0d28e9..0110679e 100644 --- a/packages/api-server/src/middlewares/jayson.ts +++ b/packages/api-server/src/middlewares/jayson.ts @@ -1,9 +1,11 @@ import jayson from "jayson"; -import { methods } from "../methods/index"; +import { instantFinalityHackMethods, methods } from "../methods/index"; import { Request, Response, NextFunction } from "express"; import createServer from "connect"; +import { isInstantFinalityHackMode } from "../util"; const server = new jayson.Server(methods); +const instantFinalityHackServer = new jayson.Server(instantFinalityHackMethods); export const jaysonMiddleware = ( req: Request, @@ -16,6 +18,13 @@ export const jaysonMiddleware = ( req.body.params = [] as any[]; } + // enable additional feature for special URL + if (isInstantFinalityHackMode(req)) { + const middleware = + instantFinalityHackServer.middleware() as createServer.NextHandleFunction; + return middleware(req, res, next); + } + const middleware = server.middleware() as createServer.NextHandleFunction; return middleware(req, res, next); }; diff --git a/packages/api-server/src/util.ts b/packages/api-server/src/util.ts index fe95f5a4..35cf1e71 100644 --- a/packages/api-server/src/util.ts +++ b/packages/api-server/src/util.ts @@ -1,4 +1,5 @@ import { HexString } from "@ckb-lumos/base"; +import { Request } from "express"; import { TX_DATA_NONE_ZERO_GAS, TX_DATA_ZERO_GAS, @@ -121,6 +122,14 @@ export function calcFee(serializedL2Tx: HexString, feeRate: bigint) { return byteLen * feeRate; } +// WEB3_RPC_URL/instant-finality-hack or WEB3_RPC_URL?instant-finality-hack=true +export function isInstantFinalityHackMode(req: Request): boolean { + return ( + req.url == "/instant-finality-hack" || + (req.query && req.query["instant-finality-hack"] == "true") + ); +} + export async function asyncSleep(ms = 0) { return new Promise((r) => setTimeout(() => r("ok"), ms)); } diff --git a/packages/api-server/src/ws/methods.ts b/packages/api-server/src/ws/methods.ts index 35092d32..b491f8a4 100644 --- a/packages/api-server/src/ws/methods.ts +++ b/packages/api-server/src/ws/methods.ts @@ -1,7 +1,10 @@ import { EthNewHead } from "../base/types/api"; import { BlockEmitter } from "../block-emitter"; import { INVALID_PARAMS, METHOD_NOT_FOUND } from "../methods/error-code"; -import { methods } from "../methods/index"; +import { + instantFinalityHackMethods, + methods as compatibleMethods, +} from "../methods/index"; import { middleware as wsrpc } from "./wss"; import crypto from "crypto"; import { HexNumber } from "@ckb-lumos/base"; @@ -11,6 +14,7 @@ import { Store } from "../cache/store"; import { CACHE_EXPIRED_TIME_MILSECS } from "../cache/constant"; import { wsApplyRateLimitByIp } from "../rate-limit"; import { gwTxHashToEthTxHash } from "../cache/tx-hash"; +import { isInstantFinalityHackMode } from "../util"; const query = new Query(); const cacheStore = new Store(true, CACHE_EXPIRED_TIME_MILSECS); @@ -25,6 +29,13 @@ export function wrapper(ws: any, req: any) { wsrpc(ws); + // check if use most compatible or enable additional feature + let methods = compatibleMethods; + if (isInstantFinalityHackMode(req)) { + methods = instantFinalityHackMethods; + } + + // 1. RPC request for (const [method, methodFunc] of Object.entries(methods)) { ws.on(method, async function (...args: any[]) { const execMethod = async () => { @@ -54,6 +65,48 @@ export function wrapper(ws: any, req: any) { }); } + // 2. RPC batch request + ws.on("@batchRequests", async function (...args: any[]) { + const objs = args.slice(0, args.length - 1); + const cb = args[args.length - 1]; + + const callback = (err: any, result: any) => { + return { err, result }; + }; + const info = await Promise.all( + objs.map(async (obj) => { + // check rate limit + const err = await wsApplyRateLimitByIp(req, obj.method); + if (err != null) { + return { + err, + }; + } + + if (obj.method === "eth_subscribe") { + const r = ethSubscribe(obj.params, callback); + return r; + } else if (obj.method === "eth_unsubscribe") { + const r = ethUnsubscribe(obj.params, callback); + return r; + } + const value = methods[obj.method]; + if (value == null) { + return { + err: { + code: METHOD_NOT_FOUND, + message: `method ${obj.method} not found!`, + }, + }; + } + const r = await (value as any)(obj.params, callback); + return r; + }) + ); + cb(info); + }); + + // 3. RPC Subscribe request const newHeadsIds: Set = new Set(); const syncingIds: Set = new Set(); const logsQueryMaps: Map = new Map(); @@ -203,44 +256,4 @@ export function wrapper(ws: any, req: any) { return {}; } - - ws.on("@batchRequests", async function (...args: any[]) { - const objs = args.slice(0, args.length - 1); - const cb = args[args.length - 1]; - - const callback = (err: any, result: any) => { - return { err, result }; - }; - const info = await Promise.all( - objs.map(async (obj) => { - // check rate limit - const err = await wsApplyRateLimitByIp(req, obj.method); - if (err != null) { - return { - err, - }; - } - - if (obj.method === "eth_subscribe") { - const r = ethSubscribe(obj.params, callback); - return r; - } else if (obj.method === "eth_unsubscribe") { - const r = ethUnsubscribe(obj.params, callback); - return r; - } - const value = methods[obj.method]; - if (value == null) { - return { - err: { - code: METHOD_NOT_FOUND, - message: `method ${obj.method} not found!`, - }, - }; - } - const r = await (value as any)(obj.params, callback); - return r; - }) - ); - cb(info); - }); } From 67707cfe7a0411f3b2fab78d9cbfc3281cc5354c Mon Sep 17 00:00:00 2001 From: RetricSu Date: Fri, 4 Nov 2022 10:47:43 +0800 Subject: [PATCH 13/16] fix: eth_getTransactionByHash support pending --- .../api-server/src/methods/modules/eth.ts | 136 +++++++++--------- 1 file changed, 64 insertions(+), 72 deletions(-) diff --git a/packages/api-server/src/methods/modules/eth.ts b/packages/api-server/src/methods/modules/eth.ts index 903a8c77..10983ff0 100644 --- a/packages/api-server/src/methods/modules/eth.ts +++ b/packages/api-server/src/methods/modules/eth.ts @@ -732,82 +732,74 @@ export class Eth { return apiTx; } - // 2. If under instant-finality hack mode, find tx from gw mempool block - if (this.instantFinalityHackMode) { - logger.debug( - `[eth_getTransactionByHash] find with instant-finality hack` - ); - // A. find pending transactions - const ethTxHashKey = ethTxHashCacheKey(ethTxHash); - const gwTxHash: Hash | null = await this.cacheStore.get(ethTxHashKey); - if (gwTxHash != null) { - const godwokenTxWithStatus = await this.rpc.getTransaction(gwTxHash); - if (godwokenTxWithStatus == null) { - return null; - } - const godwokenTxReceipt = await this.rpc.getTransactionReceipt( - gwTxHash + // 2. Find pending tx from gw mempool block + const ethTxHashKey = ethTxHashCacheKey(ethTxHash); + const gwTxHash: Hash | null = await this.cacheStore.get(ethTxHashKey); + if (gwTxHash != null) { + const godwokenTxWithStatus = await this.rpc.getTransaction(gwTxHash); + if (godwokenTxWithStatus == null) { + return null; + } + const godwokenTxReceipt = await this.rpc.getTransactionReceipt(gwTxHash); + const tipBlock = await this.query.getTipBlock(); + if (tipBlock == null) { + throw new Error("tip block not found!"); + } + let ethTxInfo = undefined; + try { + ethTxInfo = await filterWeb3Transaction( + ethTxHash, + this.rpc, + tipBlock.number, + tipBlock.hash, + godwokenTxWithStatus.transaction, + godwokenTxReceipt ); - const tipBlock = await this.query.getTipBlock(); - if (tipBlock == null) { - throw new Error("tip block not found!"); - } - let ethTxInfo = undefined; - try { - ethTxInfo = await filterWeb3Transaction( + } catch (err) { + logger.error("filterWeb3Transaction:", err); + logger.info("godwoken tx:", godwokenTxWithStatus); + logger.info("godwoken receipt:", godwokenTxReceipt); + throw err; + } + if (ethTxInfo != null) { + const ethTx = ethTxInfo[0]; + return ethTx; + } + } + + // 3. Find by auto create account tx + // TODO: delete cache store if dropped by godwoken + // convert to tx hash mapping store if account id generated ? + const polyjuiceRawTx = await this.cacheStore.get(acaCacheKey); + if (polyjuiceRawTx != null) { + const tipBlock = await this.query.getTipBlock(); + if (tipBlock == null) { + throw new Error("tip block not found!"); + } + // Convert polyjuice tx to api transaction + const { tx, fromAddress }: AutoCreateAccountCacheValue = + JSON.parse(polyjuiceRawTx); + const isAcaTxExist: boolean = await this.isAcaTxExist( + ethTxHash, + tx, + fromAddress + ); + logger.info( + `aca tx: action: getTransactionByHash, eth_tx_hash: ${ethTxHash}, is_tx_exist: ${isAcaTxExist}` + ); + if (isAcaTxExist) { + const apiTransaction: EthTransaction = + polyjuiceRawTransactionToApiTransaction( + tx, ethTxHash, - this.rpc, - tipBlock.number, tipBlock.hash, - godwokenTxWithStatus.transaction, - godwokenTxReceipt + tipBlock.number, + fromAddress ); - } catch (err) { - logger.error("filterWeb3Transaction:", err); - logger.info("godwoken tx:", godwokenTxWithStatus); - logger.info("godwoken receipt:", godwokenTxReceipt); - throw err; - } - if (ethTxInfo != null) { - const ethTx = ethTxInfo[0]; - return ethTx; - } - } - - // B. Find by auto create account tx - // TODO: delete cache store if dropped by godwoken - // convert to tx hash mapping store if account id generated ? - const polyjuiceRawTx = await this.cacheStore.get(acaCacheKey); - if (polyjuiceRawTx != null) { - const tipBlock = await this.query.getTipBlock(); - if (tipBlock == null) { - throw new Error("tip block not found!"); - } - // Convert polyjuice tx to api transaction - const { tx, fromAddress }: AutoCreateAccountCacheValue = - JSON.parse(polyjuiceRawTx); - const isAcaTxExist: boolean = await this.isAcaTxExist( - ethTxHash, - tx, - fromAddress - ); - logger.info( - `aca tx: action: getTransactionByHash, eth_tx_hash: ${ethTxHash}, is_tx_exist: ${isAcaTxExist}` - ); - if (isAcaTxExist) { - const apiTransaction: EthTransaction = - polyjuiceRawTransactionToApiTransaction( - tx, - ethTxHash, - tipBlock.hash, - tipBlock.number, - fromAddress - ); - return apiTransaction; - } else { - // If not found, means dropped by godwoken, should delete cache - this.cacheStore.delete(acaCacheKey); - } + return apiTransaction; + } else { + // If not found, means dropped by godwoken, should delete cache + this.cacheStore.delete(acaCacheKey); } } From d02a9acc6f4bbe8aa955928d4d3353704ee8166a Mon Sep 17 00:00:00 2001 From: RetricSu Date: Mon, 7 Nov 2022 10:47:21 +0800 Subject: [PATCH 14/16] fix: godwoken-test ref to make CI work --- .github/workflows/godwoken-tests.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/godwoken-tests.yml b/.github/workflows/godwoken-tests.yml index 1ed3cfea..6f282d5a 100644 --- a/.github/workflows/godwoken-tests.yml +++ b/.github/workflows/godwoken-tests.yml @@ -13,6 +13,8 @@ jobs: uses: godwokenrises/godwoken-tests/.github/workflows/reusable-integration-test-v1.yml@develop with: extra_github_env: | + GODWOKEN_TESTS_REPO=RetricSu/godwoken-tests + GODWOKEN_TESTS_REF=sisyhusGamble-block MANUAL_BUILD_WEB3=true MANUAL_BUILD_WEB3_INDEXER=true WEB3_GIT_URL=https://github.com/${{ github.repository }} From a7f54daf5ccaef44e563ff9b1de907b2aaf77c36 Mon Sep 17 00:00:00 2001 From: RetricSu Date: Wed, 9 Nov 2022 14:30:48 +0800 Subject: [PATCH 15/16] chore: treat pending and latest differently in non-instant-finality mode --- packages/api-server/src/methods/modules/eth.ts | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/api-server/src/methods/modules/eth.ts b/packages/api-server/src/methods/modules/eth.ts index 10983ff0..72b832e9 100644 --- a/packages/api-server/src/methods/modules/eth.ts +++ b/packages/api-server/src/methods/modules/eth.ts @@ -1108,7 +1108,11 @@ export class Eth { ): Promise { switch (blockParameter) { case "latest": - return undefined; + if (this.instantFinalityHackMode) { + // under instant-finality hack, we treat latest as pending + return undefined; + } + return await this.getTipNumber(); case "earliest": return 0n; // It's supposed to be filtered in the validator, so throw an error if matched @@ -1152,6 +1156,9 @@ export class Eth { return blockNumber; } + // Some RPCs does not support pending parameter + // eth_getBlockByNumber/eth_getBlockTransactionCountByNumber/eth_getTransactionByBlockNumberAndIndex + // TODO: maybe we should support for those as well? private async blockParameterToBlockNumber( blockParameter: BlockParameter ): Promise { @@ -1477,7 +1484,7 @@ function serializeEthCallParameters( gasPrice: ethCallObj.gasPrice || "0x", data: ethCallObj.data || "0x", value: ethCallObj.value || "0x", - blockNumber: blockNumber ? "0x" + blockNumber?.toString(16) : "0x", // undefined means latest block, the key contains tipBlockHash, so there is no need to diff latest height + blockNumber: blockNumber ? "0x" + blockNumber?.toString(16) : "0x", // undefined means pending block, the key contains tipBlockHash, so there is no need to diff pending height }; return JSON.stringify(toSerializeObj); } @@ -1507,7 +1514,7 @@ function serializeEstimateGasParameters( gasPrice: estimateGasObj.gasPrice || "0x", data: estimateGasObj.data || "0x", value: estimateGasObj.value || "0x", - blockNumber: blockNumber ? "0x" + blockNumber?.toString(16) : "0x", // undefined means latest block, the key contains tipBlockHash, so there is no need to diff latest height + blockNumber: blockNumber ? "0x" + blockNumber?.toString(16) : "0x", // undefined means pending block, the key contains tipBlockHash, so there is no need to diff pending height }; return JSON.stringify(toSerializeObj); } From 9d97119de0da31288a25517a727612c8e135d183 Mon Sep 17 00:00:00 2001 From: RetricSu Date: Tue, 15 Nov 2022 19:31:04 +0800 Subject: [PATCH 16/16] chore(ci): revert the fix to use godwokenrise integration-test-v1 --- .github/workflows/godwoken-tests.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/godwoken-tests.yml b/.github/workflows/godwoken-tests.yml index 6f282d5a..1ed3cfea 100644 --- a/.github/workflows/godwoken-tests.yml +++ b/.github/workflows/godwoken-tests.yml @@ -13,8 +13,6 @@ jobs: uses: godwokenrises/godwoken-tests/.github/workflows/reusable-integration-test-v1.yml@develop with: extra_github_env: | - GODWOKEN_TESTS_REPO=RetricSu/godwoken-tests - GODWOKEN_TESTS_REF=sisyhusGamble-block MANUAL_BUILD_WEB3=true MANUAL_BUILD_WEB3_INDEXER=true WEB3_GIT_URL=https://github.com/${{ github.repository }}