Skip to content

Commit

Permalink
fix: lazy block approach
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed May 10, 2023
1 parent aa5e418 commit b567322
Showing 1 changed file with 154 additions and 31 deletions.
185 changes: 154 additions & 31 deletions components/chainhook-event-observer/src/hord/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,23 +330,36 @@ impl CompactedBlock {
Ok(())
}

fn serialize_to_lazy_format<W: Write>(&self, fd: &mut W) -> std::io::Result<()> {
pub fn serialize_to_lazy_format<W: Write>(&self, fd: &mut W) -> std::io::Result<()> {
// Number of transactions in the block (not including coinbase)
let tx_len = self.0 .1.len() as u16;
fd.write(&tx_len.to_be_bytes())?;
// For each transaction:
for (_, inputs, outputs) in self.0 .1.iter() {
let inputs_len = inputs.len() as u8;
let outputs_len = outputs.len() as u8;
fd.write(&[inputs_len])?;
fd.write(&[outputs_len])?;
let inputs_len = inputs.len() as u16;
let outputs_len = outputs.len() as u16;
// Number of inputs
fd.write(&inputs_len.to_be_bytes())?;
// Number of outputs
fd.write(&outputs_len.to_be_bytes())?;
}
// Coinbase transaction
fd.write_all(&self.0 .0 .0)?;
//
fd.write(&self.0 .0 .1.to_be_bytes())?;
// For each transaction
for (id, inputs, outputs) in self.0 .1.iter() {
// Transaction id
fd.write_all(id)?;
// For each input
for (txid, block, vout, value) in inputs.iter() {
// Txin id
fd.write_all(txid)?;
// Block height
fd.write(&block.to_be_bytes())?;
// Vout
fd.write(&vout.to_be_bytes())?;
// Value
fd.write(&value.to_be_bytes())?;
}
for value in outputs.iter() {
Expand Down Expand Up @@ -418,7 +431,10 @@ fn get_default_hord_db_file_path_rocks_db(base_dir: &PathBuf) -> PathBuf {
fn rocks_db_default_options() -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_disable_auto_compactions(true);
// opts.prepare_for_bulk_load();
// opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
// opts.set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
// opts.increase_parallelism(parallelism)
// Per rocksdb's documentation:
// If cache_index_and_filter_blocks is false (which is default),
// the number of index/filter blocks is controlled by option max_open_files.
Expand Down Expand Up @@ -501,6 +517,21 @@ pub fn insert_entry_in_blocks(
.expect("unable to insert metadata");
}

pub fn insert_entry_in_blocks_lazy_block(
block_height: u32,
lazy_block: &LazyBlock,
blocks_db_rw: &DB,
_ctx: &Context,
) {
let block_height_bytes = block_height.to_be_bytes();
blocks_db_rw
.put(&block_height_bytes, &lazy_block.bytes)
.expect("unable to insert blocks");
blocks_db_rw
.put(b"metadata::last_insert", block_height_bytes)
.expect("unable to insert metadata");
}

pub fn find_last_block_inserted(blocks_db: &DB) -> u32 {
match blocks_db.get(b"metadata::last_insert") {
Ok(Some(bytes)) => u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
Expand Down Expand Up @@ -534,7 +565,11 @@ pub fn find_block_at_block_height(
}
}

pub fn find_lazy_block_at_block_height(block_height: u32, blocks_db: &DB) -> Option<LazyBlock> {
pub fn find_lazy_block_at_block_height(
block_height: u32,
retry: u8,
blocks_db: &DB,
) -> Option<LazyBlock> {
match blocks_db.get(block_height.to_be_bytes()) {
Ok(Some(res)) => Some(LazyBlock::new(res)),
_ => None,
Expand Down Expand Up @@ -1360,6 +1395,9 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
inscription_number: u64,
traversals_cache: Arc<
DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>,
>,
ctx: &Context,
) -> Result<TraversalResult, String> {
ctx.try_log(|logger| {
Expand All @@ -1381,22 +1419,86 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
};
let mut tx_cursor = (txid, 0);
let mut hops: u32 = 0;
let mut local_block_cache = HashMap::new();
loop {
local_block_cache.clear();

hops += 1;
let lazy_block = match local_block_cache.get(&ordinal_block_number) {
Some(block) => block,
None => match find_lazy_block_at_block_height(ordinal_block_number, &blocks_db) {
Some(block) => {
local_block_cache.insert(ordinal_block_number, block);
local_block_cache.get(&ordinal_block_number).unwrap()
if hops as u64 > block_identifier.index {
return Err(format!(
"Unable to process transaction {}, manual investigation required",
transaction_identifier.hash
));
}

if let Some(cached_tx) = traversals_cache.get(&(ordinal_block_number, tx_cursor.0)) {
let tx = cached_tx.value();
let mut next_found_in_cache = false;
let mut sats_out = 0;
for (index, output_value) in tx.outputs.iter().enumerate() {
if index == tx_cursor.1 {
break;
}
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
// ctx.try_log(|logger| {
// slog::info!(logger, "Adding {} from output #{}", output_value, index)
// });
sats_out += output_value;
}
sats_out += ordinal_offset;
// ctx.try_log(|logger| {
// slog::info!(
// logger,
// "Adding offset {ordinal_offset} to sats_out {sats_out}"
// )
// });

let mut sats_in = 0;
for input in tx.inputs.iter() {
sats_in += input.txin_value;
// ctx.try_log(|logger| {
// slog::info!(
// logger,
// "Adding txin_value {txin_value} to sats_in {sats_in} (txin: {})",
// hex::encode(&txin)
// )
// });

if sats_out < sats_in {
ordinal_offset = sats_out - (sats_in - input.txin_value);
ordinal_block_number = input.block_height;

// ctx.try_log(|logger| slog::info!(logger, "Block {ordinal_block_number} / Tx {} / [in:{sats_in}, out:{sats_out}]: {block_height} -> {ordinal_block_number}:{ordinal_offset} -> {}:{vout}",
// hex::encode(&txid_n),
// hex::encode(&txin)));
tx_cursor = (input.txin.clone(), input.vout as usize);
next_found_in_cache = true;
break;
}
},
}

if next_found_in_cache {
continue;
}

if sats_in == 0 {
ctx.try_log(|logger| {
slog::error!(
logger,
"Transaction {} is originating from a non spending transaction",
transaction_identifier.hash
)
});
return Ok(TraversalResult {
inscription_number: 0,
ordinal_number: 0,
transfers: 0,
});
}
}

let lazy_block = match find_lazy_block_at_block_height(ordinal_block_number, 3, &blocks_db)
{
Some(block) => block,
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
}
};

let coinbase_txid = lazy_block.get_coinbase_txid();
Expand Down Expand Up @@ -1472,7 +1574,7 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
sats_out += ordinal_offset;

let mut sats_in = 0;
for input in lazy_tx.inputs.into_iter() {
for input in lazy_tx.inputs.iter() {
sats_in += input.txin_value;
// ctx.try_log(|logger| {
// slog::info!(
Expand All @@ -1493,6 +1595,23 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
break;
}
}

traversals_cache.insert((ordinal_block_number, tx_cursor.0), lazy_tx);

if sats_in == 0 {
ctx.try_log(|logger| {
slog::error!(
logger,
"Transaction {} is originating from a non spending transaction",
transaction_identifier.hash
)
});
return Ok(TraversalResult {
inscription_number: 0,
ordinal_number: 0,
transfers: 0,
});
}
}
}

Expand Down Expand Up @@ -1523,13 +1642,13 @@ pub struct LazyBlockTransaction {
pub struct LazyBlockTransactionInput {
pub txin: [u8; 8],
pub block_height: u32,
pub vout: u8,
pub vout: u16,
pub txin_value: u64,
}

const TXID_LEN: usize = 8;
const SATS_LEN: usize = 8;
const INPUT_SIZE: usize = 8 + 4 + 1 + 8;
const INPUT_SIZE: usize = TXID_LEN + 4 + 2 + SATS_LEN;
const OUTPUT_SIZE: usize = 8;

impl LazyBlock {
Expand All @@ -1539,7 +1658,7 @@ impl LazyBlock {
}

pub fn get_coinbase_data_pos(&self) -> usize {
(2 + self.tx_len * 2) as usize
(2 + self.tx_len * 2 * 2) as usize
}

pub fn get_u64_at_pos(&self, pos: usize) -> u64 {
Expand Down Expand Up @@ -1569,10 +1688,14 @@ impl LazyBlock {
self.get_coinbase_data_pos() + TXID_LEN + SATS_LEN
}

pub fn get_transaction_format(&self, index: u16) -> (u8, u8, usize) {
let inputs_len_pos = (2 + index * 2) as usize;
let inputs = self.bytes[inputs_len_pos];
let outputs = self.bytes[inputs_len_pos + 1];
pub fn get_transaction_format(&self, index: u16) -> (u16, u16, usize) {
let inputs_len_pos = (2 + index * 2 * 2) as usize;
let inputs =
u16::from_be_bytes([self.bytes[inputs_len_pos], self.bytes[inputs_len_pos + 1]]);
let outputs = u16::from_be_bytes([
self.bytes[inputs_len_pos + 2],
self.bytes[inputs_len_pos + 3],
]);
let size = TXID_LEN + (inputs as usize * INPUT_SIZE) + (outputs as usize * OUTPUT_SIZE);
(inputs, outputs, size)
}
Expand All @@ -1581,8 +1704,8 @@ impl LazyBlock {
&self,
cursor: &mut Cursor<&Vec<u8>>,
txid: [u8; 8],
inputs_len: u8,
outputs_len: u8,
inputs_len: u16,
outputs_len: u16,
) -> LazyBlockTransaction {
let mut inputs = Vec::with_capacity(inputs_len as usize);
for _ in 0..inputs_len {
Expand All @@ -1592,14 +1715,14 @@ impl LazyBlock {
cursor
.read_exact(&mut block_height)
.expect("data corrupted");
let mut vout = [0u8; 1];
let mut vout = [0u8; 2];
cursor.read_exact(&mut vout).expect("data corrupted");
let mut txin_value = [0u8; 8];
cursor.read_exact(&mut txin_value).expect("data corrupted");
inputs.push(LazyBlockTransactionInput {
txin: txin,
block_height: u32::from_be_bytes(block_height),
vout: vout[0],
vout: u16::from_be_bytes(vout),
txin_value: u64::from_be_bytes(txin_value),
});
}
Expand Down

0 comments on commit b567322

Please sign in to comment.