Skip to content

Commit

Permalink
Migrate to the upstreamed OutputSweeper
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Apr 25, 2024
1 parent a624501 commit 4564ec1
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 556 deletions.
82 changes: 41 additions & 41 deletions src/balance.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::sweep::value_satoshis_from_descriptor;

use lightning::chain::channelmonitor::Balance as LdkBalance;
use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage};
use lightning::util::sweep::{OutputSpendStatus, TrackedSpendableOutput};

use bitcoin::secp256k1::PublicKey;
use bitcoin::{BlockHash, Txid};

use crate::sweep::SpendableOutputInfo;

/// Details of the known available balances returned by [`Node::list_balances`].
///
/// [`Node::list_balances`]: crate::Node::list_balances
Expand Down Expand Up @@ -258,46 +259,45 @@ pub enum PendingSweepBalance {
}

impl PendingSweepBalance {
pub(crate) fn from_tracked_spendable_output(output_info: SpendableOutputInfo) -> Self {
if let Some(confirmation_hash) = output_info.confirmation_hash {
debug_assert!(output_info.confirmation_height.is_some());
debug_assert!(output_info.latest_spending_tx.is_some());
let channel_id = output_info.channel_id;
let confirmation_height = output_info
.confirmation_height
.expect("Height must be set if the output is confirmed");
let latest_spending_txid = output_info
.latest_spending_tx
.as_ref()
.expect("Spending tx must be set if the output is confirmed")
.txid();
let amount_satoshis = output_info.value_satoshis();
Self::AwaitingThresholdConfirmations {
channel_id,
latest_spending_txid,
confirmation_hash,
confirmation_height,
amount_satoshis,
}
} else if let Some(latest_broadcast_height) = output_info.latest_broadcast_height {
debug_assert!(output_info.latest_spending_tx.is_some());
let channel_id = output_info.channel_id;
let latest_spending_txid = output_info
.latest_spending_tx
.as_ref()
.expect("Spending tx must be set if the spend was broadcast")
.txid();
let amount_satoshis = output_info.value_satoshis();
Self::BroadcastAwaitingConfirmation {
channel_id,
pub(crate) fn from_tracked_spendable_output(output_info: TrackedSpendableOutput) -> Self {
match output_info.status {
OutputSpendStatus::PendingInitialBroadcast { .. } => {
let channel_id = output_info.channel_id;
let amount_satoshis = value_satoshis_from_descriptor(&output_info.descriptor);
Self::PendingBroadcast { channel_id, amount_satoshis }
},
OutputSpendStatus::PendingFirstConfirmation {
latest_broadcast_height,
latest_spending_txid,
amount_satoshis,
}
} else {
let channel_id = output_info.channel_id;
let amount_satoshis = output_info.value_satoshis();
Self::PendingBroadcast { channel_id, amount_satoshis }
latest_spending_tx,
..
} => {
let channel_id = output_info.channel_id;
let amount_satoshis = value_satoshis_from_descriptor(&output_info.descriptor);
let latest_spending_txid = latest_spending_tx.txid();
Self::BroadcastAwaitingConfirmation {
channel_id,
latest_broadcast_height,
latest_spending_txid,
amount_satoshis,
}
},
OutputSpendStatus::PendingThresholdConfirmations {
latest_spending_tx,
confirmation_height,
confirmation_hash,
..
} => {
let channel_id = output_info.channel_id;
let amount_satoshis = value_satoshis_from_descriptor(&output_info.descriptor);
let latest_spending_txid = latest_spending_tx.txid();
Self::AwaitingThresholdConfirmations {
channel_id,
latest_spending_txid,
confirmation_hash,
confirmation_height,
amount_satoshis,
}
},
}
}
}
64 changes: 43 additions & 21 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ use crate::gossip::GossipSource;
use crate::io;
use crate::io::sqlite_store::SqliteStore;
use crate::liquidity::LiquiditySource;
use crate::logger::{log_error, FilesystemLogger, Logger};
use crate::logger::{log_error, log_info, FilesystemLogger, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment_store::PaymentStore;
use crate::peer_store::PeerStore;
use crate::sweep::OutputSweeper;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, GossipSync, KeysManager, MessageRouter, NetworkGraph,
Expand All @@ -37,6 +36,7 @@ use lightning::util::persist::{
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::ReadableArgs;
use lightning::util::sweep::OutputSweeper;

use lightning_persister::fs_store::FilesystemStore;

Expand Down Expand Up @@ -895,6 +895,47 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(

liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager)));

let output_sweeper = match io::utils::read_output_sweeper(
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&tx_sync),
Arc::clone(&keys_manager),
Arc::clone(&kv_store),
Arc::clone(&logger),
) {
Ok(output_sweeper) => Arc::new(output_sweeper),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Arc::new(OutputSweeper::new(
channel_manager.current_best_block(),
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Some(Arc::clone(&tx_sync)),
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&kv_store),
Arc::clone(&logger),
))
} else {
return Err(BuildError::ReadFailed);
}
},
};

match io::utils::migrate_deprecated_spendable_outputs(
Arc::clone(&output_sweeper),
Arc::clone(&kv_store),
Arc::clone(&logger),
) {
Ok(()) => {
log_info!(logger, "Successfully migrated OutputSweeper data.");
},
Err(e) => {
log_error!(logger, "Failed to migrate OutputSweeper data: {}", e);
return Err(BuildError::ReadFailed);
},
}

// Init payment info storage
let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(payments) => {
Expand Down Expand Up @@ -928,25 +969,6 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
},
};

let best_block = channel_manager.current_best_block();
let output_sweeper =
match io::utils::read_spendable_outputs(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(outputs) => Arc::new(OutputSweeper::new(
outputs,
Arc::clone(&wallet),
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&keys_manager),
Arc::clone(&kv_store),
best_block,
Some(Arc::clone(&tx_sync)),
Arc::clone(&logger),
)),
Err(_) => {
return Err(BuildError::ReadFailed);
},
};

let (stop_sender, _) = tokio::sync::watch::channel(());

let is_listening = Arc::new(AtomicBool::new(false));
Expand Down
2 changes: 1 addition & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ where
}
},
LdkEvent::SpendableOutputs { outputs, channel_id } => {
self.output_sweeper.add_outputs(outputs, channel_id)
self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None)
},
LdkEvent::OpenChannelRequest {
temporary_channel_id,
Expand Down
7 changes: 4 additions & 3 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers";
pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments";
pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

/// The spendable output information will be persisted under this prefix.
pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "spendable_outputs";
pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
/// The spendable output information used to persisted under this prefix until LDK Node v0.3.0.
pub(crate) const DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str =
"spendable_outputs";
pub(crate) const DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key.
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE: &str = "";
Expand Down
120 changes: 104 additions & 16 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::*;
use crate::config::WALLET_KEYS_SEED_LEN;

use crate::logger::log_error;
use crate::logger::{log_error, FilesystemLogger};
use crate::peer_store::PeerStore;
use crate::sweep::SpendableOutputInfo;
use crate::sweep::DeprecatedSpendableOutputInfo;
use crate::types::{Broadcaster, ChainSource, FeeEstimator, KeysManager, Sweeper};
use crate::{Error, EventQueue, PaymentDetails};

use lightning::routing::gossip::NetworkGraph;
Expand All @@ -12,13 +13,16 @@ use lightning::util::logger::Logger;
use lightning::util::persist::{
KVStore, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN,
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY,
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::{Readable, ReadableArgs, Writeable};
use lightning::util::string::PrintableString;

use bip39::Mnemonic;
use lightning::util::sweep::{OutputSpendStatus, OutputSweeper};
use rand::{thread_rng, RngCore};

use std::fs;
Expand Down Expand Up @@ -200,34 +204,118 @@ where
Ok(res)
}

/// Read previously persisted spendable output information from the store.
pub(crate) fn read_spendable_outputs<K: KVStore + Sync + Send, L: Deref>(
kv_store: Arc<K>, logger: L,
) -> Result<Vec<SpendableOutputInfo>, std::io::Error>
/// Read `OutputSweeper` state from the store.
pub(crate) fn read_output_sweeper<K: KVStore + Send + Sync>(
broadcaster: Arc<Broadcaster>, fee_estimator: Arc<FeeEstimator>,
chain_data_source: Arc<ChainSource>, keys_manager: Arc<KeysManager>, kv_store: Arc<K>,
logger: Arc<FilesystemLogger>,
) -> Result<Sweeper<K>, std::io::Error> {
let mut reader = Cursor::new(kv_store.read(
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_KEY,
)?);
let args = (
broadcaster,
fee_estimator,
Some(chain_data_source),
Arc::clone(&keys_manager),
keys_manager,
kv_store,
logger.clone(),
);
OutputSweeper::read(&mut reader, args).map_err(|e| {
log_error!(logger, "Failed to deserialize OutputSweeper: {}", e);
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize OutputSweeper")
})
}

/// Read previously persisted spendable output information from the store and migrate to the
/// upstreamed `OutputSweeper`.
///
/// We first iterate all `DeprecatedSpendableOutputInfo`s and have them tracked by the new
/// `OutputSweeper`. In order to be certain the initial output spends will happen in a single
/// transaction (and safe on-chain fees), we batch them to happen at current height plus two
/// blocks. Lastly, we remove the previously persisted data once we checked they are tracked and
/// awaiting their initial spend at the correct height.
///
/// Note that this migration will be run in the `Builder`, i.e., at the time when the migration is
/// happening no background sync is ongoing, so we shouldn't have a risk of interleaving block
/// connections during the migration.
pub(crate) fn migrate_deprecated_spendable_outputs<K: KVStore + Sync + Send, L: Deref>(
sweeper: Arc<Sweeper<K>>, kv_store: Arc<K>, logger: L,
) -> Result<(), std::io::Error>
where
L::Target: Logger,
{
let mut res = Vec::new();
let best_block = sweeper.current_best_block();

for stored_key in kv_store.list(
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
)? {
let mut reader = Cursor::new(kv_store.read(
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&stored_key,
)?);
let output = SpendableOutputInfo::read(&mut reader).map_err(|e| {
let output = DeprecatedSpendableOutputInfo::read(&mut reader).map_err(|e| {
log_error!(logger, "Failed to deserialize SpendableOutputInfo: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize SpendableOutputInfo",
)
})?;
res.push(output);
let descriptors = vec![output.descriptor.clone()];
let spend_delay = Some(best_block.height + 2);
sweeper.track_spendable_outputs(descriptors, output.channel_id, true, spend_delay);
if let Some(tracked_spendable_output) =
sweeper.tracked_spendable_outputs().iter().find(|o| o.descriptor == output.descriptor)
{
match tracked_spendable_output.status {
OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
if delayed_until_height == spend_delay {
kv_store.remove(
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&stored_key,
false,
)?;
} else {
debug_assert!(false, "Unexpected status in OutputSweeper migration.");
log_error!(logger, "Unexpected status in OutputSweeper migration.");
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to migrate OutputSweeper state.",
));
}
},
_ => {
debug_assert!(false, "Unexpected status in OutputSweeper migration.");
log_error!(logger, "Unexpected status in OutputSweeper migration.");
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to migrate OutputSweeper state.",
));
},
}
} else {
debug_assert!(
false,
"OutputSweeper failed to track and persist outputs during migration."
);
log_error!(
logger,
"OutputSweeper failed to track and persist outputs during migration."
);
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to migrate OutputSweeper state.",
));
}
}
Ok(res)

Ok(())
}

pub(crate) fn read_latest_rgs_sync_timestamp<K: KVStore + Sync + Send, L: Deref>(
Expand Down
Loading

0 comments on commit 4564ec1

Please sign in to comment.