diff --git a/src/balance.rs b/src/balance.rs index f5a52073d..bad2d1d5f 100644 --- a/src/balance.rs +++ b/src/balance.rs @@ -1,11 +1,12 @@ +use crate::sweep::value_satoshis_from_descriptor; + use lightning::chain::channelmonitor::Balance as LdkBalance; use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage}; +use lightning::util::sweep::{OutputSpendStatus, TrackedSpendableOutput}; use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Txid}; -use crate::sweep::SpendableOutputInfo; - /// Details of the known available balances returned by [`Node::list_balances`]. /// /// [`Node::list_balances`]: crate::Node::list_balances @@ -258,46 +259,45 @@ pub enum PendingSweepBalance { } impl PendingSweepBalance { - pub(crate) fn from_tracked_spendable_output(output_info: SpendableOutputInfo) -> Self { - if let Some(confirmation_hash) = output_info.confirmation_hash { - debug_assert!(output_info.confirmation_height.is_some()); - debug_assert!(output_info.latest_spending_tx.is_some()); - let channel_id = output_info.channel_id; - let confirmation_height = output_info - .confirmation_height - .expect("Height must be set if the output is confirmed"); - let latest_spending_txid = output_info - .latest_spending_tx - .as_ref() - .expect("Spending tx must be set if the output is confirmed") - .txid(); - let amount_satoshis = output_info.value_satoshis(); - Self::AwaitingThresholdConfirmations { - channel_id, - latest_spending_txid, - confirmation_hash, - confirmation_height, - amount_satoshis, - } - } else if let Some(latest_broadcast_height) = output_info.latest_broadcast_height { - debug_assert!(output_info.latest_spending_tx.is_some()); - let channel_id = output_info.channel_id; - let latest_spending_txid = output_info - .latest_spending_tx - .as_ref() - .expect("Spending tx must be set if the spend was broadcast") - .txid(); - let amount_satoshis = output_info.value_satoshis(); - Self::BroadcastAwaitingConfirmation { - channel_id, + pub(crate) fn from_tracked_spendable_output(output_info: TrackedSpendableOutput) -> Self { + match output_info.status { + OutputSpendStatus::PendingInitialBroadcast { .. } => { + let channel_id = output_info.channel_id; + let amount_satoshis = value_satoshis_from_descriptor(&output_info.descriptor); + Self::PendingBroadcast { channel_id, amount_satoshis } + }, + OutputSpendStatus::PendingFirstConfirmation { latest_broadcast_height, - latest_spending_txid, - amount_satoshis, - } - } else { - let channel_id = output_info.channel_id; - let amount_satoshis = output_info.value_satoshis(); - Self::PendingBroadcast { channel_id, amount_satoshis } + latest_spending_tx, + .. + } => { + let channel_id = output_info.channel_id; + let amount_satoshis = value_satoshis_from_descriptor(&output_info.descriptor); + let latest_spending_txid = latest_spending_tx.txid(); + Self::BroadcastAwaitingConfirmation { + channel_id, + latest_broadcast_height, + latest_spending_txid, + amount_satoshis, + } + }, + OutputSpendStatus::PendingThresholdConfirmations { + latest_spending_tx, + confirmation_height, + confirmation_hash, + .. + } => { + let channel_id = output_info.channel_id; + let amount_satoshis = value_satoshis_from_descriptor(&output_info.descriptor); + let latest_spending_txid = latest_spending_tx.txid(); + Self::AwaitingThresholdConfirmations { + channel_id, + latest_spending_txid, + confirmation_hash, + confirmation_height, + amount_satoshis, + } + }, } } } diff --git a/src/builder.rs b/src/builder.rs index d2f1c914c..daf3afd47 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -8,11 +8,10 @@ use crate::gossip::GossipSource; use crate::io; use crate::io::sqlite_store::SqliteStore; use crate::liquidity::LiquiditySource; -use crate::logger::{log_error, FilesystemLogger, Logger}; +use crate::logger::{log_error, log_info, FilesystemLogger, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment_store::PaymentStore; use crate::peer_store::PeerStore; -use crate::sweep::OutputSweeper; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, GossipSync, KeysManager, MessageRouter, NetworkGraph, @@ -37,6 +36,7 @@ use lightning::util::persist::{ CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::ReadableArgs; +use lightning::util::sweep::OutputSweeper; use lightning_persister::fs_store::FilesystemStore; @@ -895,6 +895,47 @@ fn build_with_store_internal( liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager))); + let output_sweeper = match io::utils::read_output_sweeper( + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&tx_sync), + Arc::clone(&keys_manager), + Arc::clone(&kv_store), + Arc::clone(&logger), + ) { + Ok(output_sweeper) => Arc::new(output_sweeper), + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + Arc::new(OutputSweeper::new( + channel_manager.current_best_block(), + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Some(Arc::clone(&tx_sync)), + Arc::clone(&keys_manager), + Arc::clone(&keys_manager), + Arc::clone(&kv_store), + Arc::clone(&logger), + )) + } else { + return Err(BuildError::ReadFailed); + } + }, + }; + + match io::utils::migrate_deprecated_spendable_outputs( + Arc::clone(&output_sweeper), + Arc::clone(&kv_store), + Arc::clone(&logger), + ) { + Ok(()) => { + log_info!(logger, "Successfully migrated OutputSweeper data."); + }, + Err(e) => { + log_error!(logger, "Failed to migrate OutputSweeper data: {}", e); + return Err(BuildError::ReadFailed); + }, + } + // Init payment info storage let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) { Ok(payments) => { @@ -928,25 +969,6 @@ fn build_with_store_internal( }, }; - let best_block = channel_manager.current_best_block(); - let output_sweeper = - match io::utils::read_spendable_outputs(Arc::clone(&kv_store), Arc::clone(&logger)) { - Ok(outputs) => Arc::new(OutputSweeper::new( - outputs, - Arc::clone(&wallet), - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&keys_manager), - Arc::clone(&kv_store), - best_block, - Some(Arc::clone(&tx_sync)), - Arc::clone(&logger), - )), - Err(_) => { - return Err(BuildError::ReadFailed); - }, - }; - let (stop_sender, _) = tokio::sync::watch::channel(()); let is_listening = Arc::new(AtomicBool::new(false)); diff --git a/src/event.rs b/src/event.rs index c6a8f9b6a..29ebbef43 100644 --- a/src/event.rs +++ b/src/event.rs @@ -708,7 +708,7 @@ where } }, LdkEvent::SpendableOutputs { outputs, channel_id } => { - self.output_sweeper.add_outputs(outputs, channel_id) + self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) }, LdkEvent::OpenChannelRequest { temporary_channel_id, diff --git a/src/io/mod.rs b/src/io/mod.rs index d9dab440c..d545f6b93 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -21,9 +21,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; -/// The spendable output information will be persisted under this prefix. -pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "spendable_outputs"; -pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The spendable output information used to persisted under this prefix until LDK Node v0.3.0. +pub(crate) const DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "spendable_outputs"; +pub(crate) const DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key. pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE: &str = ""; diff --git a/src/io/utils.rs b/src/io/utils.rs index f486dda8b..937cc706c 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -1,9 +1,10 @@ use super::*; use crate::config::WALLET_KEYS_SEED_LEN; -use crate::logger::log_error; +use crate::logger::{log_error, FilesystemLogger}; use crate::peer_store::PeerStore; -use crate::sweep::SpendableOutputInfo; +use crate::sweep::DeprecatedSpendableOutputInfo; +use crate::types::{Broadcaster, ChainSource, FeeEstimator, KeysManager, Sweeper}; use crate::{Error, EventQueue, PaymentDetails}; use lightning::routing::gossip::NetworkGraph; @@ -12,13 +13,16 @@ use lightning::util::logger::Logger; use lightning::util::persist::{ KVStore, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::{Readable, ReadableArgs, Writeable}; use lightning::util::string::PrintableString; use bip39::Mnemonic; +use lightning::util::sweep::{OutputSpendStatus, OutputSweeper}; use rand::{thread_rng, RngCore}; use std::fs; @@ -200,34 +204,118 @@ where Ok(res) } -/// Read previously persisted spendable output information from the store. -pub(crate) fn read_spendable_outputs( - kv_store: Arc, logger: L, -) -> Result, std::io::Error> +/// Read `OutputSweeper` state from the store. +pub(crate) fn read_output_sweeper( + broadcaster: Arc, fee_estimator: Arc, + chain_data_source: Arc, keys_manager: Arc, kv_store: Arc, + logger: Arc, +) -> Result, std::io::Error> { + let mut reader = Cursor::new(kv_store.read( + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + )?); + let args = ( + broadcaster, + fee_estimator, + Some(chain_data_source), + Arc::clone(&keys_manager), + keys_manager, + kv_store, + logger.clone(), + ); + OutputSweeper::read(&mut reader, args).map_err(|e| { + log_error!(logger, "Failed to deserialize OutputSweeper: {}", e); + std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize OutputSweeper") + }) +} + +/// Read previously persisted spendable output information from the store and migrate to the +/// upstreamed `OutputSweeper`. +/// +/// We first iterate all `DeprecatedSpendableOutputInfo`s and have them tracked by the new +/// `OutputSweeper`. In order to be certain the initial output spends will happen in a single +/// transaction (and safe on-chain fees), we batch them to happen at current height plus two +/// blocks. Lastly, we remove the previously persisted data once we checked they are tracked and +/// awaiting their initial spend at the correct height. +/// +/// Note that this migration will be run in the `Builder`, i.e., at the time when the migration is +/// happening no background sync is ongoing, so we shouldn't have a risk of interleaving block +/// connections during the migration. +pub(crate) fn migrate_deprecated_spendable_outputs( + sweeper: Arc>, kv_store: Arc, logger: L, +) -> Result<(), std::io::Error> where L::Target: Logger, { - let mut res = Vec::new(); + let best_block = sweeper.current_best_block(); for stored_key in kv_store.list( - SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, )? { let mut reader = Cursor::new(kv_store.read( - SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, &stored_key, )?); - let output = SpendableOutputInfo::read(&mut reader).map_err(|e| { + let output = DeprecatedSpendableOutputInfo::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize SpendableOutputInfo: {}", e); std::io::Error::new( std::io::ErrorKind::InvalidData, "Failed to deserialize SpendableOutputInfo", ) })?; - res.push(output); + let descriptors = vec![output.descriptor.clone()]; + let spend_delay = Some(best_block.height + 2); + sweeper.track_spendable_outputs(descriptors, output.channel_id, true, spend_delay); + if let Some(tracked_spendable_output) = + sweeper.tracked_spendable_outputs().iter().find(|o| o.descriptor == output.descriptor) + { + match tracked_spendable_output.status { + OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => { + if delayed_until_height == spend_delay { + kv_store.remove( + DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &stored_key, + false, + )?; + } else { + debug_assert!(false, "Unexpected status in OutputSweeper migration."); + log_error!(logger, "Unexpected status in OutputSweeper migration."); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to migrate OutputSweeper state.", + )); + } + }, + _ => { + debug_assert!(false, "Unexpected status in OutputSweeper migration."); + log_error!(logger, "Unexpected status in OutputSweeper migration."); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to migrate OutputSweeper state.", + )); + }, + } + } else { + debug_assert!( + false, + "OutputSweeper failed to track and persist outputs during migration." + ); + log_error!( + logger, + "OutputSweeper failed to track and persist outputs during migration." + ); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to migrate OutputSweeper state.", + )); + } } - Ok(res) + + Ok(()) } pub(crate) fn read_latest_rgs_sync_timestamp( diff --git a/src/sweep.rs b/src/sweep.rs index 59457944b..1c772d4e9 100644 --- a/src/sweep.rs +++ b/src/sweep.rs @@ -1,34 +1,15 @@ -use crate::hex_utils; -use crate::io::{ - SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, -}; -use crate::logger::{log_error, Logger}; -use crate::wallet::{Wallet, WalletKeysManager}; -use crate::Error; +//! The output sweeper used to live here before we upstreamed it to `rust-lightning` and migrated +//! to the upstreamed version with LDK Node v0.3.0 (May 2024). We should drop this module entirely +//! once sufficient time has passed for us to be confident any users completed the migration. -use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; -use lightning::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput}; use lightning::impl_writeable_tlv_based; use lightning::ln::ChannelId; -use lightning::sign::{EntropySource, SpendableOutputDescriptor}; -use lightning::util::persist::KVStore; -use lightning::util::ser::Writeable; +use lightning::sign::SpendableOutputDescriptor; -use bitcoin::blockdata::block::Header; -use bitcoin::blockdata::locktime::absolute::LockTime; -use bitcoin::secp256k1::Secp256k1; -use bitcoin::{BlockHash, Transaction, Txid}; - -use std::ops::Deref; -use std::sync::{Arc, Mutex}; - -const CONSIDERED_SPENT_THRESHOLD_CONF: u32 = 6; - -const REGENERATE_SPEND_THRESHOLD: u32 = 144; +use bitcoin::{BlockHash, Transaction}; #[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) struct SpendableOutputInfo { +pub(crate) struct DeprecatedSpendableOutputInfo { pub(crate) id: [u8; 32], pub(crate) descriptor: SpendableOutputDescriptor, pub(crate) channel_id: Option, @@ -39,55 +20,7 @@ pub(crate) struct SpendableOutputInfo { pub(crate) confirmation_hash: Option, } -impl SpendableOutputInfo { - fn to_watched_output(&self) -> WatchedOutput { - match &self.descriptor { - SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => { - WatchedOutput { - block_hash: self.first_broadcast_hash, - outpoint: *outpoint, - script_pubkey: output.script_pubkey.clone(), - } - }, - SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput { - block_hash: self.first_broadcast_hash, - outpoint: output.outpoint, - script_pubkey: output.output.script_pubkey.clone(), - }, - SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput { - block_hash: self.first_broadcast_hash, - outpoint: output.outpoint, - script_pubkey: output.output.script_pubkey.clone(), - }, - } - } - - fn is_spent_in(&self, tx: &Transaction) -> bool { - let prev_outpoint = match &self.descriptor { - SpendableOutputDescriptor::StaticOutput { outpoint, .. } => *outpoint, - SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.outpoint, - SpendableOutputDescriptor::StaticPaymentOutput(output) => output.outpoint, - }; - - for input in &tx.input { - if input.previous_output == prev_outpoint.into_bitcoin_outpoint() { - return true; - } - } - - false - } - - pub(crate) fn value_satoshis(&self) -> u64 { - match &self.descriptor { - SpendableOutputDescriptor::StaticOutput { output, .. } => output.value, - SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.output.value, - SpendableOutputDescriptor::StaticPaymentOutput(output) => output.output.value, - } - } -} - -impl_writeable_tlv_based!(SpendableOutputInfo, { +impl_writeable_tlv_based!(DeprecatedSpendableOutputInfo, { (0, id, required), (2, descriptor, required), (4, channel_id, option), @@ -98,386 +31,10 @@ impl_writeable_tlv_based!(SpendableOutputInfo, { (14, confirmation_hash, option), }); -pub(crate) struct OutputSweeper -where - B::Target: BroadcasterInterface, - E::Target: FeeEstimator, - F::Target: Filter, - K::Target: KVStore, - L::Target: Logger, -{ - outputs: Mutex>, - wallet: Arc>, - broadcaster: B, - fee_estimator: E, - keys_manager: Arc>, - kv_store: K, - best_block: Mutex, - chain_source: Option, - logger: L, -} - -impl OutputSweeper -where - B::Target: BroadcasterInterface, - E::Target: FeeEstimator, - F::Target: Filter, - K::Target: KVStore, - L::Target: Logger, -{ - pub(crate) fn new( - outputs: Vec, - wallet: Arc>, broadcaster: B, - fee_estimator: E, - keys_manager: Arc>, kv_store: K, - best_block: BestBlock, chain_source: Option, logger: L, - ) -> Self { - if let Some(filter) = chain_source.as_ref() { - for output_info in &outputs { - let watched_output = output_info.to_watched_output(); - filter.register_output(watched_output); - } - } - - let outputs = Mutex::new(outputs); - let best_block = Mutex::new(best_block); - Self { - outputs, - wallet, - broadcaster, - fee_estimator, - keys_manager, - kv_store, - best_block, - chain_source, - logger, - } - } - - pub(crate) fn add_outputs( - &self, mut output_descriptors: Vec, - channel_id: Option, - ) { - let non_static_outputs = output_descriptors - .drain(..) - .filter(|desc| !matches!(desc, SpendableOutputDescriptor::StaticOutput { .. })) - .collect::>(); - - if non_static_outputs.is_empty() { - return; - } - - { - let mut locked_outputs = self.outputs.lock().unwrap(); - for descriptor in non_static_outputs { - let id = self.keys_manager.get_secure_random_bytes(); - let output_info = SpendableOutputInfo { - id, - descriptor, - channel_id, - first_broadcast_hash: None, - latest_broadcast_height: None, - latest_spending_tx: None, - confirmation_height: None, - confirmation_hash: None, - }; - - locked_outputs.push(output_info.clone()); - self.persist_info(&output_info).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) - }); - } - } - - self.rebroadcast_if_necessary(); - } - - pub(crate) fn tracked_spendable_outputs(&self) -> Vec { - self.outputs.lock().unwrap().clone() - } - - fn rebroadcast_if_necessary(&self) { - let (cur_height, cur_hash) = { - let best_block = self.best_block.lock().unwrap(); - (best_block.height, best_block.block_hash) - }; - - let mut respend_descriptors = Vec::new(); - let mut respend_ids = Vec::new(); - - { - let mut locked_outputs = self.outputs.lock().unwrap(); - for output_info in locked_outputs.iter_mut() { - if output_info.confirmation_height.is_some() { - // Don't rebroadcast confirmed txs - debug_assert!(output_info.confirmation_hash.is_some()); - continue; - } - - if let Some(latest_broadcast_height) = output_info.latest_broadcast_height { - // Re-generate spending tx after REGENERATE_SPEND_THRESHOLD, rebroadcast - // after every block - if latest_broadcast_height + REGENERATE_SPEND_THRESHOLD >= cur_height { - respend_descriptors.push(output_info.descriptor.clone()); - respend_ids.push(output_info.id); - } else if latest_broadcast_height < cur_height { - if let Some(latest_spending_tx) = output_info.latest_spending_tx.as_ref() { - self.broadcaster.broadcast_transactions(&[&latest_spending_tx]); - output_info.latest_broadcast_height = Some(cur_height); - self.persist_info(&output_info).unwrap_or_else(|e| { - log_error!( - self.logger, - "Error persisting SpendableOutputInfo: {:?}", - e - ) - }); - } - } - } else { - // Our first broadcast. - respend_descriptors.push(output_info.descriptor.clone()); - respend_ids.push(output_info.id); - output_info.first_broadcast_hash = Some(cur_hash); - self.persist_info(&output_info).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) - }); - } - } - } - - if !respend_descriptors.is_empty() { - match self.get_spending_tx(&respend_descriptors, cur_height) { - Ok(spending_tx) => { - self.broadcaster.broadcast_transactions(&[&spending_tx]); - let mut locked_outputs = self.outputs.lock().unwrap(); - for output_info in locked_outputs.iter_mut() { - if respend_ids.contains(&output_info.id) { - if let Some(filter) = self.chain_source.as_ref() { - let watched_output = output_info.to_watched_output(); - filter.register_output(watched_output); - } - - output_info.latest_spending_tx = Some(spending_tx.clone()); - output_info.latest_broadcast_height = Some(cur_height); - self.persist_info(&output_info).unwrap_or_else(|e| { - log_error!( - self.logger, - "Error persisting SpendableOutputInfo: {:?}", - e - ) - }); - } - } - }, - Err(e) => { - log_error!(self.logger, "Error spending outputs: {:?}", e); - }, - }; - } - } - - fn prune_confirmed_outputs(&self) { - let cur_height = self.best_block.lock().unwrap().height; - let mut locked_outputs = self.outputs.lock().unwrap(); - - // Prune all outputs that have sufficient depth by now. - locked_outputs.retain(|o| { - if let Some(confirmation_height) = o.confirmation_height { - if cur_height >= confirmation_height + CONSIDERED_SPENT_THRESHOLD_CONF - 1 { - let key = hex_utils::to_string(&o.id); - match self.kv_store.remove( - SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - false, - ) { - Ok(_) => return false, - Err(e) => { - log_error!( - self.logger, - "Removal of key {}/{}/{} failed due to: {}", - SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - key, - e - ); - return true; - }, - } - } - } - true - }); - } - - fn get_spending_tx( - &self, output_descriptors: &Vec, cur_height: u32, - ) -> Result { - let tx_feerate = - self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee); - - let destination_address = self.wallet.get_new_address().map_err(|e| { - log_error!(self.logger, "Failed to get destination address from wallet: {}", e); - })?; - - let locktime = LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO); - - let output_descriptors = output_descriptors.iter().collect::>(); - self.keys_manager.spend_spendable_outputs( - &output_descriptors, - Vec::new(), - destination_address.script_pubkey(), - tx_feerate, - Some(locktime), - &Secp256k1::new(), - ) - } - - fn persist_info(&self, output: &SpendableOutputInfo) -> Result<(), Error> { - let key = hex_utils::to_string(&output.id); - let data = output.encode(); - self.kv_store - .write( - SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - &data, - ) - .map_err(|e| { - log_error!( - self.logger, - "Write for key {}/{}/{} failed due to: {}", - SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - key, - e - ); - Error::PersistenceFailed - }) - } -} - -impl Listen for OutputSweeper -where - B::Target: BroadcasterInterface, - E::Target: FeeEstimator, - F::Target: Filter, - K::Target: KVStore, - L::Target: Logger, -{ - fn filtered_block_connected( - &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, - ) { - { - let best_block = self.best_block.lock().unwrap(); - assert_eq!(best_block.block_hash, header.prev_blockhash, - "Blocks must be connected in chain-order - the connected header must build on the last connected header"); - assert_eq!(best_block.height, height - 1, - "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); - } - - self.transactions_confirmed(header, txdata, height); - self.best_block_updated(header, height); - } - - fn block_disconnected(&self, header: &Header, height: u32) { - let new_height = height - 1; - { - let mut best_block = self.best_block.lock().unwrap(); - assert_eq!(best_block.block_hash, header.block_hash(), - "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); - assert_eq!(best_block.height, height, - "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); - *best_block = BestBlock::new(header.prev_blockhash, new_height) - } - - let mut locked_outputs = self.outputs.lock().unwrap(); - for output_info in locked_outputs.iter_mut() { - if output_info.confirmation_hash == Some(header.block_hash()) { - debug_assert_eq!(output_info.confirmation_height, Some(height)); - output_info.confirmation_hash = None; - output_info.confirmation_height = None; - self.persist_info(&output_info).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) - }); - } - } - } -} - -impl Confirm for OutputSweeper -where - B::Target: BroadcasterInterface, - E::Target: FeeEstimator, - F::Target: Filter, - K::Target: KVStore, - L::Target: Logger, -{ - fn transactions_confirmed( - &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, - ) { - let mut locked_outputs = self.outputs.lock().unwrap(); - for (_, tx) in txdata { - for output_info in locked_outputs.iter_mut() { - if output_info.is_spent_in(*tx) { - debug_assert!(Some(height) > output_info.latest_broadcast_height); - output_info.confirmation_hash = Some(header.block_hash()); - output_info.confirmation_height = Some(height); - output_info.latest_spending_tx = Some((*tx).clone()); - self.persist_info(&output_info).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) - }); - } - } - } - } - - fn transaction_unconfirmed(&self, txid: &Txid) { - let mut locked_outputs = self.outputs.lock().unwrap(); - - // Get what height was unconfirmed. - let unconf_height = locked_outputs - .iter() - .find(|o| o.latest_spending_tx.as_ref().map(|tx| tx.txid()) == Some(*txid)) - .and_then(|o| o.confirmation_height); - - // Unconfirm all >= this height. - locked_outputs.iter_mut().filter(|o| o.confirmation_height >= unconf_height).for_each( - |o| { - o.confirmation_hash = None; - o.confirmation_height = None; - self.persist_info(&o).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) - }); - }, - ); - } - - fn best_block_updated(&self, header: &Header, height: u32) { - *self.best_block.lock().unwrap() = BestBlock::new(header.block_hash(), height); - self.prune_confirmed_outputs(); - self.rebroadcast_if_necessary(); - } - - fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { - let locked_outputs = self.outputs.lock().unwrap(); - locked_outputs - .iter() - .filter_map(|o| { - if let Some(confirmation_hash) = o.confirmation_hash { - if let Some(confirmation_height) = o.confirmation_height { - if let Some(latest_spending_tx) = o.latest_spending_tx.as_ref() { - return Some(( - latest_spending_tx.txid(), - confirmation_height, - Some(confirmation_hash), - )); - } - } - } - - None - }) - .collect::>() +pub(crate) fn value_satoshis_from_descriptor(descriptor: &SpendableOutputDescriptor) -> u64 { + match &descriptor { + SpendableOutputDescriptor::StaticOutput { output, .. } => output.value, + SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.output.value, + SpendableOutputDescriptor::StaticPaymentOutput(output) => output.output.value, } } diff --git a/src/types.rs b/src/types.rs index d10e2fbf7..afed1320a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,6 +1,5 @@ use crate::logger::FilesystemLogger; use crate::message_handler::NodeCustomMessageHandler; -use crate::sweep::OutputSweeper; use lightning::chain::chainmonitor; use lightning::ln::channelmanager::ChannelDetails as LdkChannelDetails; @@ -15,6 +14,7 @@ use lightning::sign::InMemorySigner; use lightning::util::config::ChannelConfig as LdkChannelConfig; use lightning::util::config::MaxDustHTLCExposure as LdkMaxDustHTLCExposure; use lightning::util::ser::{Readable, Writeable, Writer}; +use lightning::util::sweep::OutputSweeper; use lightning_net_tokio::SocketDescriptor; use lightning_transaction_sync::EsploraSyncClient; @@ -127,10 +127,12 @@ pub(crate) type MessageRouter = lightning::onion_message::messenger::DefaultMess pub(crate) type Sweeper = OutputSweeper< Arc, + Arc, Arc, Arc, Arc, Arc, + Arc, >; /// A local, potentially user-provided, identifier of a channel. diff --git a/src/wallet.rs b/src/wallet.rs index a79bb0078..2b01d1b49 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -7,8 +7,8 @@ use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, use lightning::ln::msgs::{DecodeError, UnsignedGossipMessage}; use lightning::ln::script::ShutdownScript; use lightning::sign::{ - EntropySource, InMemorySigner, KeyMaterial, KeysManager, NodeSigner, OutputSpender, Recipient, - SignerProvider, SpendableOutputDescriptor, + ChangeDestinationSource, EntropySource, InMemorySigner, KeyMaterial, KeysManager, NodeSigner, + OutputSpender, Recipient, SignerProvider, SpendableOutputDescriptor, }; use lightning::util::message_signing; @@ -278,22 +278,6 @@ where Self { inner, wallet, logger } } - /// See [`KeysManager::spend_spendable_outputs`] for documentation on this method. - pub fn spend_spendable_outputs( - &self, descriptors: &[&SpendableOutputDescriptor], outputs: Vec, - change_destination_script: ScriptBuf, feerate_sat_per_1000_weight: u32, - locktime: Option, secp_ctx: &Secp256k1, - ) -> Result { - self.inner.spend_spendable_outputs( - descriptors, - outputs, - change_destination_script, - feerate_sat_per_1000_weight, - locktime, - secp_ctx, - ) - } - pub fn sign_message(&self, msg: &[u8]) -> Result { message_signing::sign(msg, &self.inner.get_node_secret_key()) .or(Err(Error::MessageSigningFailed)) @@ -352,6 +336,30 @@ where } } +impl OutputSpender for WalletKeysManager +where + D: BatchDatabase, + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, + L::Target: Logger, +{ + /// See [`KeysManager::spend_spendable_outputs`] for documentation on this method. + fn spend_spendable_outputs( + &self, descriptors: &[&SpendableOutputDescriptor], outputs: Vec, + change_destination_script: ScriptBuf, feerate_sat_per_1000_weight: u32, + locktime: Option, secp_ctx: &Secp256k1, + ) -> Result { + self.inner.spend_spendable_outputs( + descriptors, + outputs, + change_destination_script, + feerate_sat_per_1000_weight, + locktime, + secp_ctx, + ) + } +} + impl EntropySource for WalletKeysManager where D: BatchDatabase, @@ -417,3 +425,18 @@ where } } } + +impl ChangeDestinationSource for WalletKeysManager +where + D: BatchDatabase, + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, + L::Target: Logger, +{ + fn get_change_destination_script(&self) -> Result { + let address = self.wallet.get_new_address().map_err(|e| { + log_error!(self.logger, "Failed to retrieve new address from wallet: {}", e); + })?; + Ok(address.script_pubkey()) + } +}