Skip to content

Commit

Permalink
Add archive_persisted_channel to Persist trait
Browse files Browse the repository at this point in the history
  Archive a channel's data to a backup location.

  This function can be used to prune a stale channel's monitor.  It is reccommended
  to move the data first to an archive location, and only then remove from the primary
  storage.

  A stale channel is a channel that has been closed and settled on-chain, and no funds
  can be claimed with its data.

  Archiving the data is useful for hedging against data loss in case of an unexpected failure/bug.
  • Loading branch information
jbesraa committed Mar 29, 2024
1 parent 2b846f3 commit acf2c77
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 4 deletions.
7 changes: 5 additions & 2 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
let out = SearchingOutput::new(underlying_out);
let broadcast = Arc::new(TestBroadcaster{});
let router = FuzzRouter {};
use std::collections::HashSet;

macro_rules! make_node {
($node_id: expr, $fee_estimator: expr) => { {
Expand All @@ -467,7 +468,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
let keys_manager = Arc::new(KeyProvider { node_secret, rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(new_hash_map()) });
let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
Arc::new(TestPersister {
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed)
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
archived_channels: Mutex::new(HashSet::new()),
}), Arc::clone(&keys_manager)));

let mut config = UserConfig::default();
Expand All @@ -494,7 +496,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
Arc::new(TestPersister {
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed)
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
archived_channels: Mutex::new(HashSet::new()),
}), Arc::clone(& $keys_manager)));

let mut config = UserConfig::default();
Expand Down
5 changes: 4 additions & 1 deletion fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,10 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger>) {

let broadcast = Arc::new(TestBroadcaster{ txn_broadcasted: Mutex::new(Vec::new()) });
let monitor = Arc::new(chainmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone(),
Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) })));
Arc::new(TestPersister {
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) ,
archived_channels: Mutex::new(std::collections::HashSet::new()),
})));

let keys_manager = Arc::new(KeyProvider {
node_secret: our_network_key.clone(),
Expand Down
7 changes: 7 additions & 0 deletions fuzz/src/utils/test_persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use lightning::chain::chainmonitor::MonitorUpdateId;
use lightning::chain::transaction::OutPoint;
use lightning::util::test_channel_signer::TestChannelSigner;

use std::collections::HashSet;
use std::sync::Mutex;

pub struct TestPersister {
pub update_ret: Mutex<chain::ChannelMonitorUpdateStatus>,
pub archived_channels: Mutex<HashSet<OutPoint>>,
}
impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
Expand All @@ -17,4 +19,9 @@ impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
self.update_ret.lock().unwrap().clone()
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
self.archived_channels.lock().unwrap().insert(funding_txo);
chain::ChannelMonitorUpdateStatus::Completed
}
}
33 changes: 33 additions & 0 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
///
/// [`Writeable::write`]: crate::util::ser::Writeable::write
fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
/// Archive a channel's data to a backup location.
///
/// This function can be used to prune a stale channel's monitor. It is reccommended
/// to move the data first to an archive location, and only then remove from the primary
/// storage.
///
/// A stale channel is a channel that has been closed and settled on-chain, and no funds
/// can be claimed with its data.
///
/// Archiving the data is useful for hedging against data loss in case of an unexpected failure/bug.
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint) -> ChannelMonitorUpdateStatus;
}

struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
Expand Down Expand Up @@ -656,6 +667,28 @@ where C::Target: chain::Filter,
}
}
}

/// Archives stale channel monitors by adding them to a backup location and removing them from
/// the primary storage & the monitor set.
///
/// This is useful for pruning stale channels from the monitor set and primary storage so
/// they are reloaded on every new new block connection.
///
/// The monitor data is archived to an archive namespace so we can still access it in case of
/// an unexpected failure/bug.
pub fn archive_stale_channel_monitors(&self, to_archive: Vec<OutPoint>) {
let mut monitors = self.monitors.write().unwrap();
for funding_txo in to_archive {
let channel_monitor = monitors.get(&funding_txo);
if let Some(channel_monitor) = channel_monitor {
if channel_monitor.monitor.is_stale()
&& self.persister.archive_persisted_channel(funding_txo) == ChannelMonitorUpdateStatus::Completed {
monitors.remove(&funding_txo);
};
};
}
}

}

impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
Expand Down
5 changes: 5 additions & 0 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1898,6 +1898,11 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
self.inner.lock().unwrap().blanaces_empty_height
}

#[cfg(test)]
pub fn reset_balances_empty_height(&self) {
self.inner.lock().unwrap().blanaces_empty_height = None;
}

#[cfg(test)]
pub fn get_counterparty_payment_script(&self) -> ScriptBuf {
self.inner.lock().unwrap().counterparty_payment_script.clone()
Expand Down
13 changes: 12 additions & 1 deletion lightning/src/ln/monitor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,22 @@ fn do_chanmon_claim_value_coop_close(anchors: bool) {
assert_eq!(get_monitor!(nodes[0], chan_id).is_stale(), false);
connect_blocks(&nodes[0], 1);
assert_eq!(get_monitor!(nodes[0], chan_id).is_stale(), true);

// Test that we can archive the channel monitor after we have claimed the funds and a threshold
// of 2016 blocks has passed.
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1);
get_monitor!(nodes[0], chan_id).reset_balances_empty_height(); // reset the balances_empty_height to start fresh test
// first archive should set balances_empty_height to current block height
nodes[0].chain_monitor.chain_monitor.archive_stale_channel_monitors(vec![funding_outpoint]);
connect_blocks(&nodes[0], 2017);
// Second call after 2016+ blocks, should archive the monitor
nodes[0].chain_monitor.chain_monitor.archive_stale_channel_monitors(vec![funding_outpoint]);
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 0);
}

#[test]
fn chanmon_claim_value_coop_close() {
do_chanmon_claim_value_coop_close(false);
// do_chanmon_claim_value_coop_close(false);
do_chanmon_claim_value_coop_close(true);
}

Expand Down
67 changes: 67 additions & 0 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
/// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted.
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";

/// The primary namespace under which archived [`ChannelMonitor`]s will be persisted.
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
/// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted.
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

/// The primary namespace under which the [`NetworkGraph`] will be persisted.
pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
/// The secondary namespace under which the [`NetworkGraph`] will be persisted.
Expand Down Expand Up @@ -214,6 +219,40 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + ?Sized> Persist<Ch
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
}
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
let monitor_name = MonitorName::from(funding_txo);
let monitor = match self.read(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
) {
Ok(monitor) => monitor,
Err(_) => {
return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
}

};
match self.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
&monitor
) {
Ok(()) => {}
Err(_e) => return chain::ChannelMonitorUpdateStatus::UnrecoverableError // TODO: Should we return UnrecoverableError here?
};
let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
match self.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
false,
) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
}
}
}

/// Read previously persisted [`ChannelMonitor`]s from the store.
Expand Down Expand Up @@ -720,6 +759,34 @@ where
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
}
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
let monitor_name = MonitorName::from(funding_txo);
let monitor = match self.read_monitor(&monitor_name) {
Ok((_block_hash, monitor)) => monitor,
Err(_) => return chain::ChannelMonitorUpdateStatus::UnrecoverableError
};
match self.kv_store.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
&monitor.encode()
) {
Ok(()) => {},
Err(_e) => {
return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
} // TODO: Should we return UnrecoverableError here
};
match self.kv_store.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
false,
) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
}
}
}

impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
Expand Down
28 changes: 28 additions & 0 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use crate::blinded_path::BlindedPath;
use crate::blinded_path::payment::ReceiveTlvs;
use crate::chain;
use crate::chain::chainmonitor::Persist;
use crate::chain::WatchedOutput;
use crate::chain::chaininterface;
use crate::chain::chaininterface::ConfirmationTarget;
Expand Down Expand Up @@ -501,6 +502,12 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
}
res
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
let ret = <TestPersister as Persist<TestChannelSigner>>::archive_persisted_channel(&self.persister, funding_txo);

ret
}
}

pub struct TestPersister {
Expand All @@ -513,20 +520,27 @@ pub struct TestPersister {
/// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the
/// MonitorUpdateId here.
pub offchain_monitor_updates: Mutex<HashMap<OutPoint, HashSet<MonitorUpdateId>>>,
/// When we get an archive_persisted_channel call, we insert the OutPoint here.
pub archived_channels: Mutex<HashSet<OutPoint>>,
}
impl TestPersister {
pub fn new() -> Self {
Self {
update_rets: Mutex::new(VecDeque::new()),
chain_sync_monitor_persistences: Mutex::new(new_hash_map()),
offchain_monitor_updates: Mutex::new(new_hash_map()),
archived_channels: Mutex::new(new_hash_set()),
}
}

/// Queue an update status to return.
pub fn set_update_ret(&self, next_ret: chain::ChannelMonitorUpdateStatus) {
self.update_rets.lock().unwrap().push_back(next_ret);
}
// Check if the given OutPoint has been archived.
pub fn is_archived(&self, funding_txo: OutPoint) -> bool {
self.archived_channels.lock().unwrap().contains(&funding_txo)
}
}
impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Signer> for TestPersister {
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<Signer>, _id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
Expand All @@ -549,6 +563,20 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
}
ret
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
self.archived_channels.lock().unwrap().insert(funding_txo);
// remove the channel from the offchain_monitor_updates map
match self.offchain_monitor_updates.lock().unwrap().remove(&funding_txo) {
Some(_) => {},
None => {
// If the channel was not in the offchain_monitor_updates map, it should be in the
// chain_sync_monitor_persistences map.
assert!(self.chain_sync_monitor_persistences.lock().unwrap().remove(&funding_txo).is_some());
}
};
chain::ChannelMonitorUpdateStatus::Completed
}
}

pub struct TestStore {
Expand Down

0 comments on commit acf2c77

Please sign in to comment.