Skip to content

Commit

Permalink
Merge #461: Restructure electrum/esplora sync logic
Browse files Browse the repository at this point in the history
9c57708 Make stop_gap a parameter to EsploraBlockchainConfig::new (LLFourn)
0f0a01a s/vin/vout/ (LLFourn)
1a64fd9 Delete src/blockchain/utils.rs (LLFourn)
d3779fa Fix comments (LLFourn)
d394011 Less intermediary data states in sync (LLFourn)
dfb63d3 s/observed_txs/finished_txs/g (LLFourn)
188d9a4 Make variable names consistent (LLFourn)
5eadf5c Add some logging to script_sync (LLFourn)
aaad560 Always get up to chunk_size heights to request headers for (LLFourn)
e7c1357 Don't request conftime during tx request (LLFourn)
808d7d8 Update changelog (LLFourn)
732166f Fix feerate calculation for esplora (LLFourn)
3f5cb69 Invert dependencies in electrum sync (LLFourn)

Pull request description:

  ## Description

  This PR does dependency inversion on the previous sync logic for electrum and esplora captured in the trait `ElectrumLikeSync`. This means that the sync logic does not reference the blockchain at all. Instead the blockchain asks the sync logic (in `script_sync.rs`) what it needs to continue the sync and tries to retrieve it.

  The initial purpose of doing this is to remove invocations of `maybe_await` in the abstract sync logic in preparation for completely removing `maybe_await` in the future. The other major benefit is it gives a lot more freedom for the esplora logic to use the rich data from the responses to complete the sync with less HTTP requests than it did previously.

  ## List of changes

  - sync logic moved to `script_sync.rs` and `ElectrumLikeSync` is gone.
  - esplora makes one http request per sync address. This means it makes half the number of http requests for a fully synced wallet and N*M less requests for a wallet which has N new transactions with M unique input transactions.
  - electrum and esplora save less raw transactions in the database. Electrum still requests input transactions for each of its transactions to calculate the fee but it does not save them to the database anymore.
  - The ureq and reqwest blockchain configuration is now unified into the same struct. This is the only API change. `read_timeout` and `write_timeout` have been removed in favor of a single `timeout` option which is set in both ureq and reqwest.
  - ureq now does concurrent (parallel) requests using threads.
  - An previously unnoticed bug has been fixed where by sending a lot of double spending transactions to the same address you could trick a bdk Esplora wallet into thinking it had a lot of unconfirmed coins. This is because esplora doesn't delete double spent transactions from its indexes immediately (not sure if this is a bug or a feature). A blockchain test is added for this.
  - BONUS: The second commit in this PR fixes the feerate calculation for esplora and adds a test (the previous algorithm didn't work at all). I could have made a separate PR but since I was touching this file a lot I decided to fix it here.

  ## Notes to the reviewers

  - The most important thing to review is the the logic in `script_sync.rs` is sound.
  - Look at the two commits separately.
  - I think CI is failing because of MSRV problems again!
  - It would be cool to measure how much sync time is improved for your existing wallets/projects. For `gun` the speed improvements for modest but it is at least hammering the esplora server much less.
  - I noticed the performance of reqwest in blocking is much worse in this patch than previously. This is because somehow reqwest is not re-using the connection for each request in this new code. I have no idea why. The plan is to get rid of the blocking reqwest implementation in a follow up PR.

  ### Checklists

  #### All Submissions:

  * [x] I've signed all my commits

ACKs for top commit:
  rajarshimaitra:
    Retested ACK a630685

Tree-SHA512: de74981e9d1f80758a9f20a3314ed7381c6b7c635f7ede80b177651fe2f9e9468064fae26bf80d4254098accfacfe50326ae0968e915186e13313f05bf77990b
  • Loading branch information
notmandatory committed Nov 25, 2021
2 parents afa1ab4 + a630685 commit b2ac4a0
Show file tree
Hide file tree
Showing 11 changed files with 1,125 additions and 817 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- BIP39 implementation dependency, in `keys::bip39` changed from tiny-bip39 to rust-bip39.
- Add new method on the `TxBuilder` to embed data in the transaction via `OP_RETURN`. To allow that a fix to check the dust only on spendable output has been introduced.
- Overhauled sync logic for electrum and esplora.
- Unify ureq and reqwest esplora backends to have the same configuration parameters. This means reqwest now has a timeout parameter and ureq has a concurrency parameter.
- Fixed esplora fee estimation.
- Update the `Database` trait to store the last sync timestamp and block height
- Rename `ConfirmationTime` to `BlockTime`

Expand Down Expand Up @@ -393,4 +396,4 @@ final transaction is created by calling `finish` on the builder.
[v0.10.0]: https://github.com/bitcoindevkit/bdk/compare/v0.9.0...v0.10.0
[v0.11.0]: https://github.com/bitcoindevkit/bdk/compare/v0.10.0...v0.11.0
[v0.12.0]: https://github.com/bitcoindevkit/bdk/compare/v0.11.0...v0.12.0
[v0.13.0]: https://github.com/bitcoindevkit/bdk/compare/v0.12.0...v0.13.0
[v0.13.0]: https://github.com/bitcoindevkit/bdk/compare/v0.12.0...v0.13.0
216 changes: 175 additions & 41 deletions src/blockchain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@
//! # Ok::<(), bdk::Error>(())
//! ```

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

#[allow(unused_imports)]
use log::{debug, error, info, trace};

use bitcoin::{BlockHeader, Script, Transaction, Txid};
use bitcoin::{Transaction, Txid};

use electrum_client::{Client, ConfigBuilder, ElectrumApi, Socks5Config};

use self::utils::{ElectrumLikeSync, ElsGetHistoryRes};
use super::script_sync::Request;
use super::*;
use crate::database::BatchDatabase;
use crate::database::{BatchDatabase, Database};
use crate::error::Error;
use crate::FeeRate;
use crate::{BlockTime, FeeRate};

/// Wrapper over an Electrum Client that implements the required blockchain traits
///
Expand Down Expand Up @@ -71,10 +71,139 @@ impl Blockchain for ElectrumBlockchain {
fn setup<D: BatchDatabase, P: Progress>(
&self,
database: &mut D,
progress_update: P,
_progress_update: P,
) -> Result<(), Error> {
self.client
.electrum_like_setup(self.stop_gap, database, progress_update)
let mut request = script_sync::start(database, self.stop_gap)?;
let mut block_times = HashMap::<u32, u32>::new();
let mut txid_to_height = HashMap::<Txid, u32>::new();
let mut tx_cache = TxCache::new(database, &self.client);
let chunk_size = self.stop_gap;
// The electrum server has been inconsistent somehow in its responses during sync. For
// example, we do a batch request of transactions and the response contains less
// tranascations than in the request. This should never happen but we don't want to panic.
let electrum_goof = || Error::Generic("electrum server misbehaving".to_string());

let batch_update = loop {
request = match request {
Request::Script(script_req) => {
let scripts = script_req.request().take(chunk_size);
let txids_per_script: Vec<Vec<_>> = self
.client
.batch_script_get_history(scripts)
.map_err(Error::Electrum)?
.into_iter()
.map(|txs| {
txs.into_iter()
.map(|tx| {
let tx_height = match tx.height {
none if none <= 0 => None,
height => {
txid_to_height.insert(tx.tx_hash, height as u32);
Some(height as u32)
}
};
(tx.tx_hash, tx_height)
})
.collect()
})
.collect();

script_req.satisfy(txids_per_script)?
}

Request::Conftime(conftime_req) => {
// collect up to chunk_size heights to fetch from electrum
let needs_block_height = {
let mut needs_block_height_iter = conftime_req
.request()
.filter_map(|txid| txid_to_height.get(txid).cloned())
.filter(|height| block_times.get(height).is_none());
let mut needs_block_height = HashSet::new();

while needs_block_height.len() < chunk_size {
match needs_block_height_iter.next() {
Some(height) => needs_block_height.insert(height),
None => break,
};
}
needs_block_height
};

let new_block_headers = self
.client
.batch_block_header(needs_block_height.iter().cloned())?;

for (height, header) in needs_block_height.into_iter().zip(new_block_headers) {
block_times.insert(height, header.time);
}

let conftimes = conftime_req
.request()
.take(chunk_size)
.map(|txid| {
let confirmation_time = txid_to_height
.get(txid)
.map(|height| {
let timestamp =
*block_times.get(height).ok_or_else(electrum_goof)?;
Result::<_, Error>::Ok(BlockTime {
height: *height,
timestamp: timestamp.into(),
})
})
.transpose()?;
Ok(confirmation_time)
})
.collect::<Result<_, Error>>()?;

conftime_req.satisfy(conftimes)?
}
Request::Tx(tx_req) => {
let needs_full = tx_req.request().take(chunk_size);
tx_cache.save_txs(needs_full.clone())?;
let full_transactions = needs_full
.map(|txid| tx_cache.get(*txid).ok_or_else(electrum_goof))
.collect::<Result<Vec<_>, _>>()?;
let input_txs = full_transactions.iter().flat_map(|tx| {
tx.input
.iter()
.filter(|input| !input.previous_output.is_null())
.map(|input| &input.previous_output.txid)
});
tx_cache.save_txs(input_txs)?;

let full_details = full_transactions
.into_iter()
.map(|tx| {
let prev_outputs = tx
.input
.iter()
.map(|input| {
if input.previous_output.is_null() {
return Ok(None);
}
let prev_tx = tx_cache
.get(input.previous_output.txid)
.ok_or_else(electrum_goof)?;
let txout = prev_tx
.output
.get(input.previous_output.vout as usize)
.ok_or_else(electrum_goof)?;
Ok(Some(txout.clone()))
})
.collect::<Result<Vec<_>, Error>>()?;
Ok((prev_outputs, tx))
})
.collect::<Result<Vec<_>, Error>>()?;

tx_req.satisfy(full_details)?
}
Request::Finish(batch_update) => break batch_update,
}
};

database.commit_batch(batch_update)?;
Ok(())
}

fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
Expand All @@ -101,43 +230,48 @@ impl Blockchain for ElectrumBlockchain {
}
}

impl ElectrumLikeSync for Client {
fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script> + Clone>(
&self,
scripts: I,
) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
self.batch_script_get_history(scripts)
.map(|v| {
v.into_iter()
.map(|v| {
v.into_iter()
.map(
|electrum_client::GetHistoryRes {
height, tx_hash, ..
}| ElsGetHistoryRes {
height,
tx_hash,
},
)
.collect()
})
.collect()
})
.map_err(Error::Electrum)
struct TxCache<'a, 'b, D> {
db: &'a D,
client: &'b Client,
cache: HashMap<Txid, Transaction>,
}

impl<'a, 'b, D: Database> TxCache<'a, 'b, D> {
fn new(db: &'a D, client: &'b Client) -> Self {
TxCache {
db,
client,
cache: HashMap::default(),
}
}
fn save_txs<'c>(&mut self, txids: impl Iterator<Item = &'c Txid>) -> Result<(), Error> {
let mut need_fetch = vec![];
for txid in txids {
if self.cache.get(txid).is_some() {
continue;
} else if let Some(transaction) = self.db.get_raw_tx(txid)? {
self.cache.insert(*txid, transaction);
} else {
need_fetch.push(txid);
}
}

fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid> + Clone>(
&self,
txids: I,
) -> Result<Vec<Transaction>, Error> {
self.batch_transaction_get(txids).map_err(Error::Electrum)
if !need_fetch.is_empty() {
let txs = self
.client
.batch_transaction_get(need_fetch.clone())
.map_err(Error::Electrum)?;
for (tx, _txid) in txs.into_iter().zip(need_fetch) {
debug_assert_eq!(*_txid, tx.txid());
self.cache.insert(tx.txid(), tx);
}
}

Ok(())
}

fn els_batch_block_header<I: IntoIterator<Item = u32> + Clone>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
self.batch_block_header(heights).map_err(Error::Electrum)
fn get(&self, txid: Txid) -> Option<Transaction> {
self.cache.get(&txid).map(Clone::clone)
}
}

Expand Down
117 changes: 117 additions & 0 deletions src/blockchain/esplora/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! structs from the esplora API
//!
//! see: <https://github.com/Blockstream/esplora/blob/master/API.md>
use crate::BlockTime;
use bitcoin::{OutPoint, Script, Transaction, TxIn, TxOut, Txid};

#[derive(serde::Deserialize, Clone, Debug)]
pub struct PrevOut {
pub value: u64,
pub scriptpubkey: Script,
}

#[derive(serde::Deserialize, Clone, Debug)]
pub struct Vin {
pub txid: Txid,
pub vout: u32,
// None if coinbase
pub prevout: Option<PrevOut>,
pub scriptsig: Script,
#[serde(deserialize_with = "deserialize_witness")]
pub witness: Vec<Vec<u8>>,
pub sequence: u32,
pub is_coinbase: bool,
}

#[derive(serde::Deserialize, Clone, Debug)]
pub struct Vout {
pub value: u64,
pub scriptpubkey: Script,
}

#[derive(serde::Deserialize, Clone, Debug)]
pub struct TxStatus {
pub confirmed: bool,
pub block_height: Option<u32>,
pub block_time: Option<u64>,
}

#[derive(serde::Deserialize, Clone, Debug)]
pub struct Tx {
pub txid: Txid,
pub version: i32,
pub locktime: u32,
pub vin: Vec<Vin>,
pub vout: Vec<Vout>,
pub status: TxStatus,
pub fee: u64,
}

impl Tx {
pub fn to_tx(&self) -> Transaction {
Transaction {
version: self.version,
lock_time: self.locktime,
input: self
.vin
.iter()
.cloned()
.map(|vin| TxIn {
previous_output: OutPoint {
txid: vin.txid,
vout: vin.vout,
},
script_sig: vin.scriptsig,
sequence: vin.sequence,
witness: vin.witness,
})
.collect(),
output: self
.vout
.iter()
.cloned()
.map(|vout| TxOut {
value: vout.value,
script_pubkey: vout.scriptpubkey,
})
.collect(),
}
}

pub fn confirmation_time(&self) -> Option<BlockTime> {
match self.status {
TxStatus {
confirmed: true,
block_height: Some(height),
block_time: Some(timestamp),
} => Some(BlockTime { timestamp, height }),
_ => None,
}
}

pub fn previous_outputs(&self) -> Vec<Option<TxOut>> {
self.vin
.iter()
.cloned()
.map(|vin| {
vin.prevout.map(|po| TxOut {
script_pubkey: po.scriptpubkey,
value: po.value,
})
})
.collect()
}
}

fn deserialize_witness<'de, D>(d: D) -> Result<Vec<Vec<u8>>, D::Error>
where
D: serde::de::Deserializer<'de>,
{
use crate::serde::Deserialize;
use bitcoin::hashes::hex::FromHex;
let list = Vec::<String>::deserialize(d)?;
list.into_iter()
.map(|hex_str| Vec::<u8>::from_hex(&hex_str))
.collect::<Result<Vec<Vec<u8>>, _>>()
.map_err(serde::de::Error::custom)
}
Loading

0 comments on commit b2ac4a0

Please sign in to comment.