Skip to content

Commit

Permalink
indexer: handle vm status (starcoinorg#2526)
Browse files Browse the repository at this point in the history
* handle vm status

* [indexer] update mapping of txn_info

* indexer: repair es data

Co-authored-by: chengsuoyuan <suoyuan@gmail.com>
  • Loading branch information
2 people authored and naughtyvenom committed Jul 19, 2021
1 parent ba9af0a commit 2b22fac
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 7 deletions.
5 changes: 5 additions & 0 deletions cmd/indexer/mappings/txn_info.mapping.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"mappings": {
"dynamic": "strict",
"properties": {
"timestamp": {
"type": "date"
Expand Down Expand Up @@ -207,6 +208,10 @@
}
}
},
"status_content": {
"type": "object",
"dynamic": true
},
"transaction_hash": {
"type": "text",
"fields": {
Expand Down
4 changes: 2 additions & 2 deletions cmd/indexer/src/block_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl BlockClient {
.get_events_by_txn_hash(txn_info.transaction_hash)
.await?;
txns_data.push(TransactionData {
info: txn_info,
info: txn_info.into(),
block_metadata: txn.block_metadata,
user_transaction: txn.user_transaction,
events,
Expand All @@ -67,7 +67,7 @@ impl BlockClient {
txn_infos.into_iter().zip(events).zip(user_transactions)
{
txns_data.push(TransactionData {
info: txn_info,
info: txn_info.into(),
events,
user_transaction: Some(user_txn),
block_metadata: None,
Expand Down
27 changes: 27 additions & 0 deletions cmd/indexer/src/es_sinker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,33 @@ impl EsSinker {
Ok(())
}

pub async fn repair_block(&self, block: BlockData) -> Result<()> {
let BlockData { block, txns_data } = block;

let block_index = self.config.block_index.as_str();
let txn_info_index = self.config.txn_info_index.as_str();
let mut bulk_operations = BulkOperations::new();
bulk_operations.push(
BulkOperation::index(BlockWithMetadata {
block: block.clone(),
metadata: txns_data[0].block_metadata.clone(),
})
.id(block.header.block_hash.to_string())
.index(block_index),
)?;

for txn_data in txns_data {
bulk_operations.push(
BulkOperation::index(txn_data.clone())
.id(txn_data.info.transaction_hash.to_string())
.index(txn_info_index),
)?;
}

self.bulk(bulk_operations).await?;
Ok(())
}

/// write new block into es.
/// Caller need to make sure the block with right block number.
pub async fn write_next_block(&self, block: BlockData) -> Result<()> {
Expand Down
137 changes: 134 additions & 3 deletions cmd/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,110 @@ pub use block_client::BlockClient;
pub use es_sinker::{EsSinker, IndexConfig, LocalTipInfo};

use serde::{Deserialize, Serialize};
use starcoin_crypto::HashValue;
use starcoin_rpc_api::types::{
BlockMetadataView, BlockView, SignedUserTransactionView, TransactionEventView,
TransactionInfoView,
BlockMetadataView, BlockView, SignedUserTransactionView, StrView, TransactionEventView,
TransactionInfoView, TransactionVMStatus,
};
use starcoin_types::vm_error::AbortLocation;

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct TransactionData {
#[serde(flatten)]
pub info: TransactionInfoView,
pub info: TransactionInfoEsView,
pub block_metadata: Option<BlockMetadataView>,
pub user_transaction: Option<SignedUserTransactionView>,
pub events: Vec<TransactionEventView>,
pub timestamp: u64,
}

#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct TransactionInfoEsView {
pub block_hash: HashValue,
pub block_number: StrView<u64>,
/// The hash of this transaction.
pub transaction_hash: HashValue,
pub transaction_index: u32,
/// The root hash of Sparse Merkle Tree describing the world state at the end of this
/// transaction.
pub state_root_hash: HashValue,

/// The root hash of Merkle Accumulator storing all events emitted during this transaction.
pub event_root_hash: HashValue,

/// The amount of gas used.
pub gas_used: StrView<u64>,

/// The vm status. If it is not `Executed`, this will provide the general error class. Execution
/// failures and Move abort's receive more detailed information. But other errors are generally
/// categorized with no status code or other information
#[serde(flatten)]
pub status: TransactionVMStatusEsView,
}

impl From<TransactionInfoView> for TransactionInfoEsView {
fn from(info: TransactionInfoView) -> Self {
Self {
block_hash: info.block_hash,
block_number: info.block_number,
transaction_hash: info.transaction_hash,
transaction_index: info.transaction_index,
state_root_hash: info.state_root_hash,
event_root_hash: info.event_root_hash,
gas_used: info.gas_used,
status: info.status.into(),
}
}
}

#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)]
#[serde(tag = "status", content = "status_content")]
#[allow(clippy::upper_case_acronyms)]
pub enum TransactionVMStatusEsView {
Executed,
OutOfGas,
MoveAbort {
location: AbortLocation,
abort_code: StrView<u64>,
},
ExecutionFailure {
location: AbortLocation,
function: u16,
code_offset: u16,
},
MiscellaneousError,
Discard {
status_code: StrView<u64>,
},
}

impl From<TransactionVMStatus> for TransactionVMStatusEsView {
fn from(s: TransactionVMStatus) -> Self {
match s {
TransactionVMStatus::Executed => Self::Executed,
TransactionVMStatus::OutOfGas => Self::OutOfGas,
TransactionVMStatus::MoveAbort {
location,
abort_code,
} => Self::MoveAbort {
location,
abort_code,
},
TransactionVMStatus::ExecutionFailure {
location,
function,
code_offset,
} => Self::ExecutionFailure {
location,
function,
code_offset,
},
TransactionVMStatus::MiscellaneousError => Self::MiscellaneousError,
TransactionVMStatus::Discard { status_code } => Self::Discard { status_code },
}
}
}

#[derive(Clone, Debug)]
pub struct BlockData {
pub block: BlockView,
Expand All @@ -30,3 +120,44 @@ struct BlockWithMetadata {
block: BlockView,
metadata: Option<BlockMetadataView>,
}

#[cfg(test)]
mod tests {
use crate::{TransactionInfoEsView, TransactionVMStatusEsView};
use starcoin_rpc_api::types::StrView;

#[test]
fn test_info_view() {
let v = TransactionInfoEsView {
block_hash: Default::default(),
block_number: StrView(1),
transaction_hash: Default::default(),
transaction_index: 0,
state_root_hash: Default::default(),
event_root_hash: Default::default(),
gas_used: StrView(0),
status: TransactionVMStatusEsView::Executed,
};

let expected = r#"
{"block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","block_number":"1","transaction_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","transaction_index":0,"state_root_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","event_root_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","gas_used":"0","status":"Executed"}
"#;
assert_eq!(serde_json::to_string(&v).unwrap().as_str(), expected.trim());
let v = TransactionInfoEsView {
block_hash: Default::default(),
block_number: StrView(1),
transaction_hash: Default::default(),
transaction_index: 0,
state_root_hash: Default::default(),
event_root_hash: Default::default(),
gas_used: StrView(0),
status: TransactionVMStatusEsView::Discard {
status_code: StrView(1000),
},
};
let expected = r#"
{"block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","block_number":"1","transaction_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","transaction_index":0,"state_root_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","event_root_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","gas_used":"0","status":"Discard","status_content":{"status_code":"1000"}}
"#;
assert_eq!(serde_json::to_string(&v).unwrap().as_str(), expected.trim());
}
}
78 changes: 76 additions & 2 deletions cmd/indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,26 @@ pub struct Options {
default_value = "http://localhost:9850"
)]
node_url: String,

#[clap(subcommand)]
subcmd: Option<SubCommand>,
}

#[derive(Clap, Debug, Clone)]
enum SubCommand {
Repair(Repair),
}

/// repair sub command
#[derive(Clap, Debug, Clone)]
struct Repair {
// block to repair from. default to 0.
#[clap(long = "from-block")]
from_block: Option<u64>,

// block to repair to. default to current end block
#[clap(long = "to-block")]
to_block: Option<u64>,
}

async fn start_loop(block_client: BlockClient, sinker: EsSinker) -> Result<()> {
Expand Down Expand Up @@ -118,6 +138,54 @@ async fn start_loop(block_client: BlockClient, sinker: EsSinker) -> Result<()> {
}
}

async fn repair(block_client: BlockClient, sinker: EsSinker, repair_config: Repair) -> Result<()> {
let latest_block_number = block_client
.get_chain_head()
.await
.map_err(|e| anyhow!("{}", e))?
.number
.0;
let end_block = repair_config.to_block.unwrap_or(latest_block_number);
let from_block = repair_config.from_block.unwrap_or(0);
let mut current_block: u64 = from_block;

while current_block < end_block {
let block_data: BlockData = FutureRetry::new(
|| {
block_client.get_block_whole_by_height(current_block)
//.map_err(|e| e.compat())
},
|e| {
warn!("[Retry]: get chain block data, err: {}", &e);
RetryPolicy::<anyhow::Error>::WaitRetry(Duration::from_secs(1))
},
)
.await
.map(|(d, _)| d)
.map_err(|(e, _)| e)?;

// retry write
FutureRetry::new(
|| sinker.repair_block(block_data.clone()),
|e: anyhow::Error| {
warn!("[Retry]: repair block {}, err: {}", current_block, e);
RetryPolicy::<anyhow::Error>::WaitRetry(Duration::from_secs(1))
},
)
.await
.map(|(d, _)| d)
.map_err(|(e, _)| e)?;

info!(
"Repair block {}, height: {} done",
block_data.block.header.block_hash, block_data.block.header.number
);

current_block += 1;
}
Ok(())
}

fn main() -> anyhow::Result<()> {
let _log_handle = starcoin_logger::init();
let opts: Options = Options::parse();
Expand Down Expand Up @@ -145,7 +213,13 @@ fn main() -> anyhow::Result<()> {
let index_config = IndexConfig::new_with_prefix(opts.es_index_prefix.as_str());
let sinker = EsSinker::new(es, index_config);

rt.block_on(start_loop(block_client, sinker))?;

match &opts.subcmd {
Some(SubCommand::Repair(repair_config)) => {
rt.block_on(repair(block_client, sinker, repair_config.clone()))?;
}
None => {
rt.block_on(start_loop(block_client, sinker))?;
}
}
Ok(())
}

0 comments on commit 2b22fac

Please sign in to comment.