Skip to content

Commit

Permalink
feat: restore ability to replay transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Aug 3, 2023
1 parent dcdfd16 commit 98e7e9b
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 239 deletions.
2 changes: 1 addition & 1 deletion components/hord-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
RepairCommand::Transfers(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let service = Service::new(config, ctx.clone());
service.replay_transfers(cmd.start_block, cmd.end_block, None)?;
service.replay_transfers(cmd.start_block, cmd.end_block, None).await?;
}
},
Command::Db(HordDbCommand::Check(cmd)) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::{
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType, OrdinalOperation,
TransactionIdentifier, OrdinalInscriptionTransferData,
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType,
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
},
utils::Context,
};
Expand Down Expand Up @@ -238,7 +238,7 @@ pub fn re_augment_block_with_ordinals_operations(
// Restore inscriptions data
let mut inscriptions =
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx);

let mut should_become_cursed = vec![];
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
for (op_index, operation) in tx.metadata.ordinal_operations.iter_mut().enumerate() {
Expand Down Expand Up @@ -296,7 +296,10 @@ pub fn re_augment_block_with_ordinals_operations(
let OrdinalOperation::InscriptionRevealed(inscription) = tx.metadata.ordinal_operations.remove(op_index) else {
continue;
};
tx.metadata.ordinal_operations.insert(op_index, OrdinalOperation::CursedInscriptionRevealed(inscription));
tx.metadata.ordinal_operations.insert(
op_index,
OrdinalOperation::CursedInscriptionRevealed(inscription),
);
}

// TODO: Handle transfers
Expand Down
1 change: 1 addition & 0 deletions components/hord-cli/src/core/pipeline/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod block_ingestion;
pub mod inscription_indexing;
pub mod transfers_recomputing;

pub use inscription_indexing::start_inscription_indexing_processor;
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use std::{
collections::BTreeMap,
thread::{sleep, JoinHandle},
time::Duration,
};

use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation},
utils::Context,
};
use crossbeam_channel::{Sender, TryRecvError};

use crate::{
core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
db::{
find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_locations,
parse_satpoint_to_watch, remove_entries_from_locations_at_block_height,
},
};

use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::open_readwrite_hord_db_conn,
};

pub fn start_transfers_recomputing_processor(
config: &Config,
ctx: &Context,
post_processor: Option<Sender<BitcoinBlockData>>,
) -> PostProcessorController {
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();

let config = config.clone();
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Inscription indexing runloop")
.spawn(move || {
let mut inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
let mut empty_cycles = 0;

if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start transfers recomputing runloop");
}

loop {
let mut blocks = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(_, blocks)) => {
empty_cycles = 0;
blocks
}
Ok(PostProcessorCommand::Terminate) => break,
Ok(PostProcessorCommand::Start) => continue,
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 10 {
empty_cycles = 0;
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
}
sleep(Duration::from_secs(1));
continue;
}
_ => {
break;
}
},
};

info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());

for block in blocks.iter_mut() {
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
};

info!(
ctx.expect_logger(),
"Cleaning transfers from block {}", block.block_identifier.index
);
let inscriptions = find_all_inscriptions_in_block(
&block.block_identifier.index,
&inscriptions_db_conn_rw,
&ctx,
);
info!(
ctx.expect_logger(),
"{} inscriptions retrieved at block {}",
inscriptions.len(),
block.block_identifier.index
);
let mut operations = BTreeMap::new();

let transaction = inscriptions_db_conn_rw.transaction().unwrap();

remove_entries_from_locations_at_block_height(
&block.block_identifier.index,
&transaction,
&ctx,
);

for (_, entry) in inscriptions.iter() {
let inscription_id = entry.get_inscription_id();
info!(
ctx.expect_logger(),
"Processing inscription {}", inscription_id
);
insert_entry_in_locations(
&inscription_id,
block.block_identifier.index,
&entry.transfer_data,
&transaction,
&ctx,
);

operations.insert(
entry.transaction_identifier_inscription.clone(),
OrdinalInscriptionTransferData {
inscription_id: entry.get_inscription_id(),
updated_address: None,
satpoint_pre_transfer: format_satpoint_to_watch(
&entry.transaction_identifier_inscription,
entry.inscription_input_index,
0,
),
satpoint_post_transfer: format_satpoint_to_watch(
&entry.transfer_data.transaction_identifier_location,
entry.transfer_data.output_index,
entry.transfer_data.inscription_offset_intra_output,
),
post_transfer_output_value: None,
tx_index: 0,
},
);
}

info!(
ctx.expect_logger(),
"Rewriting transfers for block {}", block.block_identifier.index
);

for tx in block.transactions.iter_mut() {
tx.metadata.ordinal_operations.clear();
if let Some(mut entry) = operations.remove(&tx.transaction_identifier) {
let (_, output_index, _) =
parse_satpoint_to_watch(&entry.satpoint_post_transfer);

let script_pub_key_hex =
tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match Script::from_hex(&script_pub_key_hex) {
Ok(script) => {
match Address::from_script(&script, network.clone()) {
Ok(address) => Some(address.to_string()),
Err(_e) => None,
}
}
Err(_e) => None,
};

entry.updated_address = updated_address;
entry.post_transfer_output_value =
Some(tx.metadata.outputs[output_index].value);

tx.metadata
.ordinal_operations
.push(OrdinalOperation::InscriptionTransferred(entry));
}
}

update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
block,
&transaction,
&ctx,
)
.unwrap();

info!(
ctx.expect_logger(),
"Saving supdates for block {}", block.block_identifier.index
);
transaction.commit().unwrap();

info!(
ctx.expect_logger(),
"Transfers in block {} repaired", block.block_identifier.index
);

if let Some(ref post_processor) = post_processor {
let _ = post_processor.send(block.clone());
}
}
}
})
.expect("unable to spawn thread");

PostProcessorController {
commands_tx,
events_rx,
thread_handle: handle,
}
}
36 changes: 18 additions & 18 deletions components/hord-cli/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,6 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
)
});
} else {
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS locations (
inscription_id TEXT NOT NULL,
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
outpoint_to_watch TEXT NOT NULL,
offset INTEGER NOT NULL,
UNIQUE(outpoint_to_watch,offset)
)",
[],
) {
ctx.try_log(|logger| {
warn!(logger, "Unable to create table locations:{}", e.to_string())
});
}

if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_ordinal_number ON inscriptions(ordinal_number);",
[],
Expand All @@ -110,7 +94,22 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
}
}
}
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS locations (
inscription_id TEXT NOT NULL,
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
outpoint_to_watch TEXT NOT NULL,
offset INTEGER NOT NULL
)",
[],
) {
ctx.try_log(|logger| {
warn!(logger, "Unable to create table locations: {}", e.to_string())
});
} else {
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_locations_on_block_height ON locations(block_height);",
[],
Expand All @@ -128,8 +127,9 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
}
}
}

conn
}

Expand Down
10 changes: 2 additions & 8 deletions components/hord-cli/src/service/http_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,7 @@ fn handle_create_predicate(
let predicate_uuid = predicate.get_uuid().to_string();

if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) {
let key: String = format!(
"{}",
ChainhookSpecification::bitcoin_key(&predicate_uuid)
);
let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid));
match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) {
Ok(Some(_)) => {
return Json(json!({
Expand Down Expand Up @@ -172,10 +169,7 @@ fn handle_get_predicate(

match open_readwrite_predicates_db_conn(api_config) {
Ok(mut predicates_db_conn) => {
let key: String = format!(
"{}",
ChainhookSpecification::bitcoin_key(&predicate_uuid)
);
let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid));
let entry = match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) {
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({
"chain": "stacks",
Expand Down
Loading

0 comments on commit 98e7e9b

Please sign in to comment.