Skip to content
This repository has been archived by the owner on Mar 24, 2023. It is now read-only.

Fix eip155 v #567

Merged
merged 11 commits into from
Nov 15, 2022
2 changes: 1 addition & 1 deletion .github/workflows/godwoken-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:

jobs:
godwoken-tests:
uses: nervosnetwork/godwoken-tests/.github/workflows/reusable-integration-test-v1.yml@develop
uses: godwokenrises/godwoken-tests/.github/workflows/reusable-integration-test-v1.yml@develop
with:
extra_github_env: |
MANUAL_BUILD_WEB3=true
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ jobs:
# Godwoken-Kicker
- uses: actions/checkout@v3
with:
repository: RetricSu/godwoken-kicker
ref: 'master'
repository: godwokenrises/godwoken-kicker
ref: 'develop'
- name: Kicker init
run: ./kicker init
- name: Kicker start
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ pg_url=<database url, e.g. "postgres://username:password@localhost:5432/dbname">
./target/release/gw-web3-indexer
```

### Update blocks

Update blocks / transactions / logs info in database by update command, include start block and end block.

```bash
./target/release/gw-web3-indexer update <optional start block, default to 0> <optional end block, default to local tip>
```

### Start API server

```bash
Expand Down
2 changes: 1 addition & 1 deletion crates/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "gw-web3-indexer"
version = "0.1.0"
authors = ["Nervos Network"]
edition = "2018"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
43 changes: 35 additions & 8 deletions crates/indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::{

use crate::{
helper::{hex, parse_log, GwLog, PolyjuiceArgs, GW_LOG_POLYJUICE_SYSTEM},
insert_l2_block::{insert_web3_block, insert_web3_txs_and_logs},
insert_l2_block::{
insert_web3_block, insert_web3_txs_and_logs, update_web3_block, update_web3_txs_and_logs,
},
pool::POOL,
types::{
Block as Web3Block, Log as Web3Log, Transaction as Web3Transaction,
Expand Down Expand Up @@ -66,13 +68,27 @@ impl Web3Indexer {
}
}

pub async fn update_l2_block(&self, l2_block: L2Block) -> Result<(usize, usize)> {
let number: u64 = l2_block.raw().number().unpack();
// update block
let (txs_len, logs_len) = self.insert_or_update_l2block(l2_block, true).await?;
log::debug!(
"web3 indexer: update block #{}, {} txs, {} logs",
number,
txs_len,
logs_len
);
Ok((txs_len, logs_len))
}

pub async fn store_l2_block(&self, l2_block: L2Block) -> Result<(usize, usize)> {
let number: u64 = l2_block.raw().number().unpack();
let local_tip_number = self.tip_number().await?.unwrap_or(0);
let mut txs_len = 0;
let mut logs_len = 0;
if number > local_tip_number || self.query_number(number).await?.is_none() {
(txs_len, logs_len) = self.insert_l2block(l2_block).await?;
// insert l2 block
(txs_len, logs_len) = self.insert_or_update_l2block(l2_block, false).await?;
log::debug!(
"web3 indexer: sync new block #{}, {} txs, {} logs",
number,
Expand Down Expand Up @@ -442,7 +458,11 @@ impl Web3Indexer {
Ok(hashmap)
}

async fn insert_l2block(&self, l2_block: L2Block) -> Result<(usize, usize)> {
async fn insert_or_update_l2block(
&self,
l2_block: L2Block,
is_update: bool,
) -> Result<(usize, usize)> {
let block_number = l2_block.raw().number().unpack();
let block_hash: gw_common::H256 = blake2b_256(l2_block.raw().as_slice()).into();
// let mut cumulative_gas_used: u128 = 0;
Expand Down Expand Up @@ -508,19 +528,26 @@ impl Web3Indexer {

tx_index_cursor += txs_vec.len() as u32;

// insert to db
let (txs_part_len, logs_part_len) =
insert_web3_txs_and_logs(txs_vec, &mut pg_tx).await?;
// insert to db or update
let (txs_part_len, logs_part_len) = if is_update {
update_web3_txs_and_logs(txs_vec, &mut pg_tx).await?
} else {
insert_web3_txs_and_logs(txs_vec, &mut pg_tx).await?
};

web3_txs_len += txs_part_len;
logs_len += logs_part_len;
}

// insert block
// insert or update block
let web3_block = self
.build_web3_block(&l2_block, total_gas_limit, cumulative_gas_used)
.await?;
insert_web3_block(web3_block, &mut pg_tx).await?;
if is_update {
update_web3_block(web3_block, &mut pg_tx).await?;
} else {
insert_web3_block(web3_block, &mut pg_tx).await?;
}

// commit
pg_tx.commit().await?;
Expand Down
143 changes: 139 additions & 4 deletions crates/indexer/src/insert_l2_block.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{convert::TryFrom, str::FromStr};

use anyhow::Result;
use anyhow::{anyhow, Result};
use gw_types::U256;
use rust_decimal::Decimal;
use sqlx::{
Expand All @@ -13,7 +13,10 @@ use sqlx::{
};
use sqlx::{Postgres, QueryBuilder};

use crate::types::{Block, Log, Transaction, TransactionWithLogs};
use crate::{
pool::POOL_FOR_UPDATE,
types::{Block, Log, Transaction, TransactionWithLogs},
};

use itertools::Itertools;
use rayon::prelude::*;
Expand Down Expand Up @@ -70,6 +73,7 @@ pub struct DbTransaction {
gas_used: BigDecimal,
contract_address: Option<Vec<u8>>,
exit_code: Decimal,
chain_id: Option<Decimal>,
}

impl TryFrom<Transaction> for DbTransaction {
Expand Down Expand Up @@ -98,6 +102,7 @@ impl TryFrom<Transaction> for DbTransaction {
gas_used: u128_to_big_decimal(&tx.gas_used)?,
contract_address: web3_contract_address,
exit_code: tx.exit_code.into(),
chain_id: tx.chain_id.map(|id| id.into()),
};
Ok(db_transaction)
}
Expand Down Expand Up @@ -199,7 +204,7 @@ pub async fn insert_web3_txs_and_logs(

let mut txs_query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
"INSERT INTO transactions
(hash, eth_tx_hash, block_number, block_hash, transaction_index, from_address, to_address, value, nonce, gas_limit, gas_price, input, v, r, s, cumulative_gas_used, gas_used, contract_address, exit_code) "
(hash, eth_tx_hash, block_number, block_hash, transaction_index, from_address, to_address, value, nonce, gas_limit, gas_price, input, v, r, s, cumulative_gas_used, gas_used, contract_address, exit_code, chain_id) "
);

txs_query_builder
Expand All @@ -222,7 +227,8 @@ pub async fn insert_web3_txs_and_logs(
.push_bind(tx.cumulative_gas_used)
.push_bind(tx.gas_used)
.push_bind(tx.contract_address)
.push_bind(tx.exit_code);
.push_bind(tx.exit_code)
.push_bind(tx.chain_id);
})
.push(" RETURNING id");

Expand Down Expand Up @@ -272,6 +278,135 @@ pub async fn insert_web3_txs_and_logs(
Ok((txs_len, logs_len))
}

pub async fn update_web3_block(
web3_block: Block,
pg_tx: &mut sqlx::Transaction<'_, Postgres>,
) -> Result<()> {
let block = DbBlock::try_from(&web3_block)?;

sqlx::query(
"UPDATE blocks SET hash = $1, parent_hash = $2, gas_limit = $3, gas_used = $4, timestamp = $5, miner = $6, size = $7 where number = $8"
)
.bind(block.hash)
.bind(block.parent_hash)
.bind(block.gas_limit)
.bind(block.gas_used)
.bind(block.timestamp)
.bind(block.miner)
.bind(block.size)
.bind(block.number)
.execute(pg_tx)
.await?;

Ok(())
}

pub async fn update_web3_txs_and_logs(
web3_tx_with_logs_vec: Vec<TransactionWithLogs>,
_pg_tx: &mut sqlx::Transaction<'_, Postgres>,
) -> Result<(usize, usize)> {
if web3_tx_with_logs_vec.is_empty() {
return Ok((0, 0));
}

let (txs, logs) = web3_tx_with_logs_vec
.into_par_iter()
.enumerate()
.map(|(i, web3_tx_with_logs)| {
let db_logs: Result<Vec<DbLog>> = web3_tx_with_logs
.logs
.into_par_iter()
.map(|l| DbLog::try_from_log(l, i as i64))
.collect();
(DbTransaction::try_from(web3_tx_with_logs.tx), db_logs)
})
.collect::<(Vec<_>, Vec<_>)>();
let txs = txs.into_iter().collect::<Result<Vec<_>>>()?;
let logs = logs.into_iter().collect::<Result<Vec<_>>>()?;
let logs = logs.into_iter().flatten().collect::<Vec<_>>();

let logs_len = logs.len();
let txs_len = txs.len();

let size = logs_len / 4;
let final_size = if size > INSERT_LOGS_BATCH_SIZE || size == 0 {
INSERT_LOGS_BATCH_SIZE
} else {
size
};

let logs_slice = logs
.into_iter()
.chunks(final_size)
.into_iter()
.map(|chunk| chunk.collect())
.collect::<Vec<Vec<_>>>();

futures::future::join_all(
txs.into_iter().map(|tx| {
sqlx::query(
"UPDATE transactions SET hash = $1, eth_tx_hash = $2, from_address = $3, to_address = $4, value = $5, nonce = $6, gas_limit = $7, gas_price = $8, input = $9, v = $10, r = $11, s = $12, cumulative_gas_used = $13, gas_used = $14, contract_address = $15, exit_code = $16, chain_id = $17 where block_number = $18 and transaction_index = $19"
)
.bind(tx.hash)
.bind(tx.eth_tx_hash)
.bind(tx.from_address)
.bind(tx.to_address)
.bind(tx.value)
.bind(tx.nonce)
.bind(tx.gas_limit)
.bind(tx.gas_price)
.bind(tx.input)
.bind(tx.v)
.bind(tx.r)
.bind(tx.s)
.bind(tx.cumulative_gas_used)
.bind(tx.gas_used)
.bind(tx.contract_address)
.bind(tx.exit_code)
.bind(tx.chain_id)
.bind(tx.block_number)
.bind(tx.transaction_index)
.execute(&*POOL_FOR_UPDATE)
})
)
.await
.into_iter()
.collect::<Result<Vec<_>, sqlx::Error>>()?;

if logs_len != 0 {
let mut logs_querys = logs_slice
.into_par_iter()
.map(|db_logs| {
let mut logs_query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
"UPDATE logs SET transaction_hash = data_table.transaction_hash, address = data_table.address, data = data_table.data, topics = data_table.topics FROM ( "
);

logs_query_builder.push_values(db_logs, |mut b, log| {
b.push_bind(log.transaction_hash)
.push_bind(log.address)
.push_bind(log.data)
.push_bind(log.topics)
.push_bind(log.block_number)
.push_bind(log.log_index);
})
.push(" ) AS data_table(transaction_hash, address, data, topics, block_number, log_index) WHERE logs.block_number = data_table.block_number AND logs.log_index = data_table.log_index");
logs_query_builder
}).collect::<Vec<_>>();

logs_querys
.par_iter_mut()
.map(|query_builder| {
let query = query_builder.build();
smol::block_on(query.execute(&*POOL_FOR_UPDATE)).map_err(|err| anyhow!(err))
})
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()?;
}

Ok((txs_len, logs_len))
}

fn u128_to_big_decimal(value: &u128) -> Result<BigDecimal> {
let result = BigDecimal::from_str(&value.to_string())?;
Ok(result)
Expand Down
22 changes: 21 additions & 1 deletion crates/indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,27 @@ fn main() -> Result<()> {
};

let mut runner = Runner::new(indexer_config)?;
smol::block_on(runner.run())?;

let command_name = std::env::args().nth(1);

// `cargo run` -> run sync mode
// `cargo run update <optional start number> <optional end number>` -> run update mode
if let Some(name) = command_name {
if name == "update" {
let start_block_number = std::env::args()
.nth(2)
.map(|num| num.parse::<u64>().unwrap());
let end_block_number = std::env::args()
.nth(3)
.map(|num| num.parse::<u64>().unwrap());
smol::block_on(runner.run_update(start_block_number, end_block_number))?;
} else {
smol::block_on(runner.run())?;
}
} else {
smol::block_on(runner.run())?;
}

Ok(())
}

Expand Down
12 changes: 12 additions & 0 deletions crates/indexer/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,16 @@ lazy_static::lazy_static! {
.max_connections(5)
.connect_lazy_with(opts)
};

// Adapt slow query duration to 30s, and enlarge max connections
pub static ref POOL_FOR_UPDATE: PgPool = {
let indexer_config = load_indexer_config("./indexer-config.toml").unwrap();

let mut opts: PgConnectOptions = indexer_config.pg_url.parse().expect("pg url parse error");
opts.log_statements(log::LevelFilter::Debug)
.log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(30));
PgPoolOptions::new()
.max_connections(20)
.connect_lazy_with(opts)
};
}
Loading