diff --git a/Cargo.toml b/Cargo.toml index 82f59261c..c2a826356 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ lightning-background-processor = { version = "0.0.110" } lightning-rapid-gossip-sync = { version = "0.0.110" } #bdk = "0.20.0" -bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch="master", features = ["use-esplora-async", "key-value-db"]} +bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch = "master", default-features = false, features = ["async-interface","use-esplora-async", "key-value-db"]} bitcoin = "0.28.1" rand = "0.8.5" diff --git a/src/access.rs b/src/access.rs index e2bd74906..1585cb363 100644 --- a/src/access.rs +++ b/src/access.rs @@ -1,36 +1,47 @@ use crate::logger::{ log_error, log_given_level, log_internal, log_trace, FilesystemLogger, Logger, }; -use crate::Error; +use crate::{Config, Error}; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::chain::WatchedOutput; use lightning::chain::{Confirm, Filter}; -use bdk::blockchain::{Blockchain, EsploraBlockchain, GetBlockHash, GetHeight, GetTx}; +use bdk::blockchain::{Blockchain, EsploraBlockchain}; use bdk::database::BatchDatabase; +use bdk::esplora_client; use bdk::wallet::AddressIndex; use bdk::{SignOptions, SyncOptions}; -use bitcoin::{BlockHash, Script, Transaction, Txid}; +use bitcoin::{Script, Transaction, Txid}; use std::collections::HashSet; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; /// The minimum feerate we are allowed to send, as specify by LDK. const MIN_FEERATE: u32 = 253; +// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold +// number of blocks after which BDK stops looking for scripts belonging to the wallet. +const BDK_CLIENT_STOP_GAP: usize = 20; + +// The number of concurrent requests made against the API provider. +const BDK_CLIENT_CONCURRENCY: u8 = 8; + pub struct ChainAccess where D: BatchDatabase, { blockchain: EsploraBlockchain, + _client: Arc, wallet: Mutex>, queued_transactions: Mutex>, watched_transactions: Mutex>, queued_outputs: Mutex>, watched_outputs: Mutex>, last_sync_height: tokio::sync::Mutex>, + tokio_runtime: RwLock>>, + _config: Arc, logger: Arc, } @@ -39,7 +50,7 @@ where D: BatchDatabase, { pub(crate) fn new( - blockchain: EsploraBlockchain, wallet: bdk::Wallet, logger: Arc, + wallet: bdk::Wallet, config: Arc, logger: Arc, ) -> Self { let wallet = Mutex::new(wallet); let watched_transactions = Mutex::new(Vec::new()); @@ -47,22 +58,41 @@ where let watched_outputs = Mutex::new(Vec::new()); let queued_outputs = Mutex::new(Vec::new()); let last_sync_height = tokio::sync::Mutex::new(None); + let tokio_runtime = RwLock::new(None); + // TODO: Check that we can be sure that the Esplora client re-connects in case of failure + // and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime? + let blockchain = EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP) + .with_concurrency(BDK_CLIENT_CONCURRENCY); + let client_builder = + esplora_client::Builder::new(&format!("http://{}", &config.esplora_server_url)); + let client = Arc::new(client_builder.build_async().unwrap()); Self { blockchain, + _client: client, wallet, queued_transactions, watched_transactions, queued_outputs, watched_outputs, last_sync_height, + tokio_runtime, + _config: config, logger, } } + pub(crate) fn set_runtime(&self, tokio_runtime: Arc) { + *self.tokio_runtime.write().unwrap() = Some(tokio_runtime); + } + + pub(crate) fn drop_runtime(&self) { + *self.tokio_runtime.write().unwrap() = None; + } + pub(crate) async fn sync_wallet(&self) -> Result<(), Error> { let sync_options = SyncOptions { progress: None }; - self.wallet.lock().unwrap().sync(&self.blockchain, sync_options)?; + self.wallet.lock().unwrap().sync(&self.blockchain, sync_options).await?; Ok(()) } @@ -237,11 +267,11 @@ where Ok(()) } - pub(crate) fn create_funding_transaction( + pub(crate) async fn create_funding_transaction( &self, output_script: &Script, value_sats: u64, confirmation_target: ConfirmationTarget, ) -> Result { let num_blocks = num_blocks_from_conf_target(confirmation_target); - let fee_rate = self.blockchain.estimate_fee(num_blocks)?; + let fee_rate = self.blockchain.estimate_fee(num_blocks).await?; let locked_wallet = self.wallet.lock().unwrap(); let mut tx_builder = locked_wallet.build_tx(); @@ -280,9 +310,18 @@ where fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { let num_blocks = num_blocks_from_conf_target(confirmation_target); let fallback_fee = fallback_fee_from_conf_target(confirmation_target); - self.blockchain - .estimate_fee(num_blocks) - .map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32 + + let locked_runtime = self.tokio_runtime.read().unwrap(); + if locked_runtime.as_ref().is_none() { + return fallback_fee; + } + + locked_runtime.as_ref().unwrap().block_on(async { + self.blockchain + .estimate_fee(num_blocks) + .await + .map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32 + }) } } @@ -291,13 +330,20 @@ where D: BatchDatabase, { fn broadcast_transaction(&self, tx: &Transaction) { - match self.blockchain.broadcast(tx) { - Ok(_) => {} - Err(err) => { - log_error!(self.logger, "Failed to broadcast transaction: {}", err); - panic!("Failed to broadcast transaction: {}", err); - } + let locked_runtime = self.tokio_runtime.read().unwrap(); + if locked_runtime.as_ref().is_none() { + return; } + + locked_runtime.as_ref().unwrap().block_on(async { + match self.blockchain.broadcast(tx).await { + Ok(_) => {} + Err(err) => { + log_error!(self.logger, "Failed to broadcast transaction: {}", err); + panic!("Failed to broadcast transaction: {}", err); + } + } + }) } } @@ -315,33 +361,6 @@ where } } -impl GetHeight for ChainAccess -where - D: BatchDatabase, -{ - fn get_height(&self) -> Result { - self.blockchain.get_height() - } -} - -impl GetBlockHash for ChainAccess -where - D: BatchDatabase, -{ - fn get_block_hash(&self, height: u64) -> Result { - self.blockchain.get_block_hash(height) - } -} - -impl GetTx for ChainAccess -where - D: BatchDatabase, -{ - fn get_tx(&self, txid: &Txid) -> Result, bdk::Error> { - self.blockchain.get_tx(txid) - } -} - fn num_blocks_from_conf_target(confirmation_target: ConfirmationTarget) -> usize { match confirmation_target { ConfirmationTarget::Background => 12, diff --git a/src/event.rs b/src/event.rs index 0ba7fc690..95d6bc9b0 100644 --- a/src/event.rs +++ b/src/event.rs @@ -18,8 +18,7 @@ use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use bitcoin::secp256k1::Secp256k1; use rand::{thread_rng, Rng}; use std::collections::{hash_map, VecDeque}; -use std::sync::{Arc, Condvar, Mutex}; -use std::thread; +use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::time::Duration; /// The event queue will be persisted under this key. @@ -221,6 +220,7 @@ pub(crate) struct EventHandler { keys_manager: Arc, inbound_payments: Arc, outbound_payments: Arc, + tokio_runtime: RwLock>>, logger: Arc, _config: Arc, } @@ -233,6 +233,7 @@ impl EventHandler { inbound_payments: Arc, outbound_payments: Arc, logger: Arc, _config: Arc, ) -> Self { + let tokio_runtime = RwLock::new(None); Self { event_queue, chain_access, @@ -241,10 +242,19 @@ impl EventHandler { keys_manager, inbound_payments, outbound_payments, + tokio_runtime, logger, _config, } } + + pub(crate) fn set_runtime(&self, tokio_runtime: Arc) { + *self.tokio_runtime.write().unwrap() = Some(tokio_runtime); + } + + pub(crate) fn drop_runtime(&self) { + *self.tokio_runtime.write().unwrap() = None; + } } impl LdkEventHandler for EventHandler { @@ -262,29 +272,36 @@ impl LdkEventHandler for EventHandler { let confirmation_target = ConfirmationTarget::Normal; // Sign the final funding transaction and broadcast it. - match self.chain_access.create_funding_transaction( - &output_script, - *channel_value_satoshis, - confirmation_target, - ) { - Ok(final_tx) => { - // Give the funding transaction back to LDK for opening the channel. - if self - .channel_manager - .funding_transaction_generated( - &temporary_channel_id, - counterparty_node_id, - final_tx, - ) - .is_err() - { - log_error!(self.logger, "Channel went away before we could fund it. The peer disconnected or refused the channel"); + let locked_runtime = self.tokio_runtime.read().unwrap(); + if locked_runtime.as_ref().is_none() { + return; + } + + locked_runtime.as_ref().unwrap().block_on(async { + match self.chain_access.create_funding_transaction( + &output_script, + *channel_value_satoshis, + confirmation_target, + ).await { + Ok(final_tx) => { + // Give the funding transaction back to LDK for opening the channel. + if self + .channel_manager + .funding_transaction_generated( + &temporary_channel_id, + counterparty_node_id, + final_tx, + ) + .is_err() + { + log_error!(self.logger, "Channel went away before we could fund it. The peer disconnected or refused the channel"); + } + } + Err(err) => { + log_error!(self.logger, "Failed to create funding transaction: {}", err); } } - Err(err) => { - log_error!(self.logger, "Failed to create funding transaction: {}", err); - } - } + }); } LdkEvent::PaymentReceived { payment_hash, purpose, amount_msat } => { log_info!( @@ -387,11 +404,14 @@ impl LdkEventHandler for EventHandler { let forwarding_channel_manager = self.channel_manager.clone(); let min = time_forwardable.as_millis() as u64; - // TODO: any way we still can use tokio here? - // TODO: stop this thread on shutdown - thread::spawn(move || { + let locked_runtime = self.tokio_runtime.read().unwrap(); + if locked_runtime.as_ref().is_none() { + return; + } + + locked_runtime.as_ref().unwrap().spawn(async move { let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64; - thread::sleep(Duration::from_millis(millis_to_sleep)); + tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await; forwarding_channel_manager.process_pending_htlc_forwards(); }); } diff --git a/src/lib.rs b/src/lib.rs index b7a1e194b..2bfa83e40 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,7 +65,6 @@ use lightning_invoice::utils::DefaultRouter; use lightning_invoice::{payment, Currency, Invoice}; use bdk::bitcoin::secp256k1::Secp256k1; -use bdk::blockchain::esplora::EsploraBlockchain; use bdk::sled; use bdk::template::Bip84; @@ -84,13 +83,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime}; -// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold -// number of blocks after which BDK stops looking for scripts belonging to the wallet. -const BDK_CLIENT_STOP_GAP: usize = 20; - -// The number of concurrent requests made against the API provider. -const BDK_CLIENT_CONCURRENCY: u8 = 8; - // The timeout after which we abandon retrying failed payments. const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); @@ -221,12 +213,8 @@ impl Builder { ) .expect("Failed to setup on-chain wallet"); - // TODO: Check that we can be sure that the Esplora client re-connects in case of failure - // and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime? - let blockchain = EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP) - .with_concurrency(BDK_CLIENT_CONCURRENCY); - - let chain_access = Arc::new(ChainAccess::new(blockchain, bdk_wallet, Arc::clone(&logger))); + let chain_access = + Arc::new(ChainAccess::new(bdk_wallet, Arc::clone(&config), Arc::clone(&logger))); // Step 3: Initialize Persist let persister = Arc::new(FilesystemPersister::new(ldk_data_dir.clone())); @@ -354,7 +342,7 @@ impl Builder { Arc::new(EventQueue::new(Arc::clone(&persister))) }; - let event_handler = EventHandler::new( + let event_handler = Arc::new(EventHandler::new( Arc::clone(&chain_access), Arc::clone(&event_queue), Arc::clone(&channel_manager), @@ -364,7 +352,7 @@ impl Builder { Arc::clone(&outbound_payments), Arc::clone(&logger), Arc::clone(&config), - ); + )); //// Step 16: Create Router and InvoicePayer let router = DefaultRouter::new( @@ -378,7 +366,7 @@ impl Builder { router, Arc::clone(&scorer), Arc::clone(&logger), - event_handler, + Arc::clone(&event_handler), payment::Retry::Timeout(LDK_PAYMENT_RETRY_TIMEOUT), )); @@ -402,6 +390,7 @@ impl Builder { config, chain_access, event_queue, + event_handler, channel_manager, chain_monitor, peer_manager, @@ -421,7 +410,7 @@ impl Builder { /// Wraps all objects that need to be preserved during the run time of [`LdkLite`]. Will be dropped /// upon [`LdkLite::stop()`]. struct Runtime { - tokio_runtime: tokio::runtime::Runtime, + tokio_runtime: Arc, _background_processor: BackgroundProcessor, stop_networking: Arc, stop_wallet_sync: Arc, @@ -435,6 +424,7 @@ pub struct LdkLite { config: Arc, chain_access: Arc>, event_queue: Arc>, + event_handler: Arc, channel_manager: Arc, chain_monitor: Arc, peer_manager: Arc, @@ -443,7 +433,7 @@ pub struct LdkLite { persister: Arc, logger: Arc, scorer: Arc>, - invoice_payer: Arc>, + invoice_payer: Arc>>, inbound_payments: Arc, outbound_payments: Arc, peer_store: Arc>, @@ -482,6 +472,10 @@ impl LdkLite { runtime.stop_networking.store(true, Ordering::Release); self.peer_manager.disconnect_all_peers(); + // Drop the held runtimes. + self.chain_access.drop_runtime(); + self.event_handler.drop_runtime(); + // Drop the runtime, which stops the background processor and any possibly remaining tokio threads. *run_lock = None; Ok(()) @@ -489,7 +483,10 @@ impl LdkLite { fn setup_runtime(&self) -> Result { let tokio_runtime = - tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); + Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); + + self.chain_access.set_runtime(Arc::clone(&tokio_runtime)); + self.event_handler.set_runtime(Arc::clone(&tokio_runtime)); // Setup wallet sync let chain_access = Arc::clone(&self.chain_access); @@ -499,7 +496,7 @@ impl LdkLite { let stop_wallet_sync = Arc::new(AtomicBool::new(false)); let stop_sync = Arc::clone(&stop_wallet_sync); - tokio_runtime.spawn(async move { + tokio_runtime.block_on(async move { let mut rounds = 0; loop { if stop_sync.load(Ordering::Acquire) {