From 3bc4756688f323c8a02f0c5388bcd5e858770d94 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Thu, 25 Apr 2024 10:10:13 +0200 Subject: [PATCH 1/2] chore: Remove weird error log --- mobile/native/src/trade/order/mod.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/mobile/native/src/trade/order/mod.rs b/mobile/native/src/trade/order/mod.rs index 4b65fc466..3b6c19122 100644 --- a/mobile/native/src/trade/order/mod.rs +++ b/mobile/native/src/trade/order/mod.rs @@ -207,9 +207,6 @@ pub struct Order { impl Order { /// This returns the executed price once known - /// - /// Logs an error if this function is called on a state where the execution price is not know - /// yet. pub fn execution_price(&self) -> Option { match self.state { OrderState::Filling { @@ -222,12 +219,7 @@ impl Order { execution_price: Some(execution_price), .. } => Some(execution_price), - _ => { - // TODO: The caller should decide how to handle this. Always logging an error is - // weird. - tracing::error!("Executed price not known in state {:?}", self.state); - None - } + _ => None, } } From 1ff6dcb436cd96a05883500f36b18596628744ae Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Thu, 25 Apr 2024 15:47:59 +0200 Subject: [PATCH 2/2] refactor!: Wrap DLC messages with TenTenOneMessages This is primarily a refactoring, moving the DLC message handler from the `rust-dlc` dependency to 10101, allowing us in following steps to enrich and adapt protocol messages with 10101 metadata. BREAKING CHANGE: The format of the messages we send over the wire has changed. --- coordinator/src/dlc_handler.rs | 8 +- coordinator/src/emergency_kit.rs | 16 +- coordinator/src/node.rs | 528 +++++++++-------- crates/ln-dlc-node/src/dlc_message.rs | 194 +++---- crates/ln-dlc-node/src/lib.rs | 5 +- crates/ln-dlc-node/src/message_handler.rs | 627 +++++++++++++++++++++ crates/ln-dlc-node/src/node/dlc_channel.rs | 76 +-- crates/ln-dlc-node/src/node/dlc_manager.rs | 22 + crates/ln-dlc-node/src/node/event.rs | 24 +- crates/ln-dlc-node/src/node/mod.rs | 53 +- crates/ln-dlc-node/src/node/sub_channel.rs | 60 -- mobile/native/src/dlc/dlc_handler.rs | 8 +- mobile/native/src/emergency_kit.rs | 16 +- mobile/native/src/ln_dlc/node.rs | 386 ++++++------- 14 files changed, 1289 insertions(+), 734 deletions(-) create mode 100644 crates/ln-dlc-node/src/message_handler.rs delete mode 100644 crates/ln-dlc-node/src/node/sub_channel.rs diff --git a/coordinator/src/dlc_handler.rs b/coordinator/src/dlc_handler.rs index c9c97e85d..22326686f 100644 --- a/coordinator/src/dlc_handler.rs +++ b/coordinator/src/dlc_handler.rs @@ -8,12 +8,12 @@ use diesel::r2d2::Pool; use diesel::PgConnection; use dlc_manager::channel::signed_channel::SignedChannel; use dlc_manager::channel::signed_channel::SignedChannelState; -use dlc_messages::Message; use futures::future::RemoteHandle; use futures::FutureExt; use ln_dlc_node::bitcoin_conversion::to_secp_pk_29; use ln_dlc_node::dlc_message::DlcMessage; use ln_dlc_node::dlc_message::SerializedDlcMessage; +use ln_dlc_node::message_handler::TenTenOneMessage; use ln_dlc_node::node::dlc_channel::send_dlc_message; use ln_dlc_node::node::event::NodeEvent; use ln_dlc_node::node::Node; @@ -103,7 +103,7 @@ pub fn spawn_handling_outbound_dlc_messages( } impl DlcHandler { - pub fn send_dlc_message(&self, peer: PublicKey, msg: Message) -> Result<()> { + pub fn send_dlc_message(&self, peer: PublicKey, msg: TenTenOneMessage) -> Result<()> { self.store_dlc_message(peer, msg.clone())?; send_dlc_message( @@ -116,7 +116,7 @@ impl DlcHandler { Ok(()) } - pub fn store_dlc_message(&self, peer: PublicKey, msg: Message) -> Result<()> { + pub fn store_dlc_message(&self, peer: PublicKey, msg: TenTenOneMessage) -> Result<()> { let mut conn = self.pool.get()?; let serialized_outbound_message = SerializedDlcMessage::try_from(&msg)?; @@ -132,7 +132,7 @@ impl DlcHandler { let last_serialized_message = db::last_outbound_dlc_message::get(&mut conn, &peer)?; if let Some(last_serialized_message) = last_serialized_message { - let message = Message::try_from(&last_serialized_message)?; + let message = TenTenOneMessage::try_from(&last_serialized_message)?; send_dlc_message( &self.node.dlc_message_handler, &self.node.peer_manager, diff --git a/coordinator/src/emergency_kit.rs b/coordinator/src/emergency_kit.rs index b3f9cc730..c5e3345e6 100644 --- a/coordinator/src/emergency_kit.rs +++ b/coordinator/src/emergency_kit.rs @@ -4,9 +4,9 @@ use bitcoin::secp256k1::PublicKey; use bitcoin_old::secp256k1::SecretKey; use dlc_manager::Signer; use dlc_messages::channel::RenewRevoke; -use dlc_messages::ChannelMessage; -use dlc_messages::Message; use lightning::ln::chan_utils::build_commitment_secret; +use ln_dlc_node::message_handler::TenTenOneMessage; +use ln_dlc_node::message_handler::TenTenOneRenewRevoke; use ln_dlc_node::node::event::NodeEvent; impl Node { @@ -25,11 +25,13 @@ impl Node { signed_channel.update_idx + 1, ))?; - let msg = Message::Channel(ChannelMessage::RenewRevoke(RenewRevoke { - channel_id: signed_channel.channel_id, - per_update_secret: prev_per_update_secret, - reference_id: signed_channel.reference_id, - })); + let msg = TenTenOneMessage::RenewRevoke(TenTenOneRenewRevoke { + renew_revoke: RenewRevoke { + channel_id: signed_channel.channel_id, + per_update_secret: prev_per_update_secret, + reference_id: signed_channel.reference_id, + }, + }); self.inner.event_handler.publish(NodeEvent::SendDlcMessage { peer: trader, diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index 30044ef2b..c9d98d7f5 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -20,15 +20,20 @@ use dlc_messages::channel::Reject; use dlc_messages::channel::RenewFinalize; use dlc_messages::channel::SettleFinalize; use dlc_messages::channel::SignChannel; -use dlc_messages::ChannelMessage; -use dlc_messages::Message; use ln_dlc_node::bitcoin_conversion::to_secp_pk_29; use ln_dlc_node::bitcoin_conversion::to_secp_pk_30; use ln_dlc_node::dlc_message::DlcMessage; use ln_dlc_node::dlc_message::SerializedDlcMessage; +use ln_dlc_node::message_handler::TenTenOneAcceptChannel; +use ln_dlc_node::message_handler::TenTenOneCollaborativeCloseOffer; +use ln_dlc_node::message_handler::TenTenOneMessage; +use ln_dlc_node::message_handler::TenTenOneReject; +use ln_dlc_node::message_handler::TenTenOneRenewFinalize; +use ln_dlc_node::message_handler::TenTenOneSettleFinalize; +use ln_dlc_node::message_handler::TenTenOneSignChannel; use ln_dlc_node::node; -use ln_dlc_node::node::dlc_message_name; use ln_dlc_node::node::event::NodeEvent; +use ln_dlc_node::node::tentenone_message_name; use ln_dlc_node::node::RunningNode; use std::sync::Arc; use tokio::sync::broadcast::Sender; @@ -112,7 +117,7 @@ impl Node { .get_and_clear_received_messages(); for (node_id, msg) in messages { - let msg_name = dlc_message_name(&msg); + let msg_name = tentenone_message_name(&msg); if let Err(e) = self.process_dlc_message(to_secp_pk_30(node_id), &msg) { if let Err(e) = self.set_dlc_protocol_to_failed(&msg) { tracing::error!( @@ -130,13 +135,7 @@ impl Node { } } - fn set_dlc_protocol_to_failed(&self, msg: &Message) -> Result<()> { - let msg = match msg { - Message::OnChain(_) => return Ok(()), - Message::Channel(msg) => msg, - Message::SubChannel(_) => return Ok(()), - }; - + fn set_dlc_protocol_to_failed(&self, msg: &TenTenOneMessage) -> Result<()> { if let Some(protocol_id) = msg.get_reference_id() { let protocol_id = ProtocolId::try_from(protocol_id)?; dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()) @@ -146,13 +145,13 @@ impl Node { Ok(()) } - /// Process an incoming [`Message::Channel`] and update the 10101 position accordingly. + /// Process an incoming [`TenTenOneMessage`] and update the 10101 position accordingly. /// /// - Any other kind of message will be ignored. /// - Any message that has already been processed will be skipped. /// - /// Offers such as [`ChannelMessage::Offer`], [`ChannelMessage::SettleOffer`], - /// [`ChannelMessage::CollaborativeCloseOffer`] and [`ChannelMessage::RenewOffer`] are + /// Offers such as [`TenTenOneMessage::Offer`], [`TenTenOneMessage::SettleOffer`], + /// [`TenTenOneMessage::CollaborativeCloseOffer`] and [`TenTenOneMessage::RenewOffer`] are /// automatically accepted. Unless the maturity date of the offer is already outdated. /// /// FIXME(holzeis): This function manipulates different data objects from different data sources @@ -160,304 +159,299 @@ impl Node { /// inconsistent state. One way of fixing that could be to: (1) use a single data source for the /// 10101 data and the `rust-dlc` data; (2) wrap the function into a DB transaction which can be /// atomically rolled back on error or committed on success. - fn process_dlc_message(&self, node_id: PublicKey, msg: &Message) -> Result<()> { + fn process_dlc_message(&self, node_id: PublicKey, msg: &TenTenOneMessage) -> Result<()> { tracing::info!( from = %node_id, - kind = %dlc_message_name(msg), + kind = %tentenone_message_name(msg), "Processing message" ); - let resp = match msg { - Message::OnChain(_) | Message::SubChannel(_) => { - tracing::warn!(from = %node_id, kind = %dlc_message_name(msg),"Ignoring unexpected dlc message."); - None - } - Message::Channel(channel_msg) => { - let protocol_id = match channel_msg.get_reference_id() { - Some(reference_id) => Some(ProtocolId::try_from(reference_id)?), - None => None, - }; + let protocol_id = match msg.get_reference_id() { + Some(reference_id) => Some(ProtocolId::try_from(reference_id)?), + None => None, + }; - tracing::debug!( - from = %node_id, - ?protocol_id, - "Received channel message" - ); + tracing::debug!( + from = %node_id, + ?protocol_id, + "Received message" + ); - self.verify_collab_close_offer(&node_id, channel_msg)?; - - let inbound_msg = { - let mut conn = self.pool.get()?; - let serialized_inbound_message = SerializedDlcMessage::try_from(msg)?; - let inbound_msg = DlcMessage::new(node_id, serialized_inbound_message, true)?; - match db::dlc_messages::get(&mut conn, &inbound_msg.message_hash)? { - Some(_) => { - tracing::debug!(%node_id, kind=%dlc_message_name(msg), "Received message that has already been processed, skipping."); - return Ok(()); - } - None => inbound_msg, - } - }; + self.verify_collab_close_offer(&node_id, msg)?; - let resp = self - .inner - .dlc_manager - .on_dlc_message(msg, to_secp_pk_29(node_id)) - .with_context(|| { - format!( - "Failed to handle {} dlc message from {node_id}", - dlc_message_name(msg) - ) - })?; - - if let Some(resp) = resp.clone() { - // store dlc message immediately so we do not lose the response if something - // goes wrong afterwards. - self.inner - .event_handler - .publish(NodeEvent::StoreDlcMessage { - peer: node_id, - msg: resp, - }); + let inbound_msg = { + let mut conn = self.pool.get()?; + let serialized_inbound_message = SerializedDlcMessage::try_from(msg)?; + let inbound_msg = DlcMessage::new(node_id, serialized_inbound_message, true)?; + match db::dlc_messages::get(&mut conn, &inbound_msg.message_hash)? { + Some(_) => { + tracing::debug!(%node_id, kind=%tentenone_message_name(msg), "Received message that has already been processed, skipping."); + return Ok(()); } + None => inbound_msg, + } + }; - { - let mut conn = self.pool.get()?; - db::dlc_messages::insert(&mut conn, inbound_msg)?; - } + let resp = self + .inner + .process_tentenone_message(msg.clone(), node_id) + .with_context(|| { + format!( + "Failed to handle {} dlc message from {node_id}", + tentenone_message_name(msg) + ) + })?; + + if let Some(msg) = resp.clone() { + // store dlc message immediately so we do not lose the response if something + // goes wrong afterwards. + self.inner + .event_handler + .publish(NodeEvent::StoreDlcMessage { peer: node_id, msg }); + } - match channel_msg { - ChannelMessage::RenewFinalize(RenewFinalize { + { + let mut conn = self.pool.get()?; + db::dlc_messages::insert(&mut conn, inbound_msg)?; + } + + match msg { + TenTenOneMessage::RenewFinalize(TenTenOneRenewFinalize { + renew_finalize: + RenewFinalize { channel_id, reference_id, .. - }) => { - // TODO: Receiving this message used to be specific to rolling over, but we - // now use the renew protocol for all (non-closing) - // trades beyond the first one. - - let channel_id_hex_string = hex::encode(channel_id); - - let reference_id = match reference_id { - Some(reference_id) => *reference_id, - // If the app did not yet update to the latest version, it will not - // send us the reference id in the message. In that case we will - // have to look up the reference id ourselves from the channel. - // TODO(holzeis): Remove this fallback handling once not needed - // anymore. - None => self - .inner - .get_dlc_channel_by_id(channel_id)? - .get_reference_id() - .context("missing reference id")?, - }; - let protocol_id = ProtocolId::try_from(reference_id)?; + }, + }) => { + // TODO: Receiving this message used to be specific to rolling over, but we + // now use the renew protocol for all (non-closing) + // trades beyond the first one. + + let channel_id_hex_string = hex::encode(channel_id); + + let reference_id = match reference_id { + Some(reference_id) => *reference_id, + // If the app did not yet update to the latest version, it will not + // send us the reference id in the message. In that case we will + // have to look up the reference id ourselves from the channel. + // TODO(holzeis): Remove this fallback handling once not needed + // anymore. + None => self + .inner + .get_dlc_channel_by_id(channel_id)? + .get_reference_id() + .context("missing reference id")?, + }; + let protocol_id = ProtocolId::try_from(reference_id)?; - tracing::info!( - channel_id = channel_id_hex_string, - node_id = node_id.to_string(), - %protocol_id, - "DLC channel renew protocol was finalized" - ); + tracing::info!( + channel_id = channel_id_hex_string, + node_id = node_id.to_string(), + %protocol_id, + "DLC channel renew protocol was finalized" + ); - let channel = self.inner.get_dlc_channel_by_id(channel_id)?; + let channel = self.inner.get_dlc_channel_by_id(channel_id)?; - let protocol_executor = - dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); - protocol_executor.finish_dlc_protocol( - protocol_id, - &node_id, - channel.get_contract_id(), - channel_id, - self.tx_position_feed.clone(), - )?; - } - ChannelMessage::SettleFinalize(SettleFinalize { + let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); + protocol_executor.finish_dlc_protocol( + protocol_id, + &node_id, + channel.get_contract_id(), + channel_id, + self.tx_position_feed.clone(), + )?; + } + TenTenOneMessage::SettleFinalize(TenTenOneSettleFinalize { + settle_finalize: + SettleFinalize { channel_id, reference_id, .. - }) => { - let channel_id_hex_string = hex::encode(channel_id); - - let reference_id = match reference_id { - Some(reference_id) => *reference_id, - // If the app did not yet update to the latest version, it will not - // send us the reference id in the message. In that case we will - // have to look up the reference id ourselves from the channel. - // TODO(holzeis): Remove this fallback handling once not needed - // anymore. - None => self - .inner - .get_dlc_channel_by_id(channel_id)? - .get_reference_id() - .context("missing reference id")?, - }; - let protocol_id = ProtocolId::try_from(reference_id)?; + }, + }) => { + let channel_id_hex_string = hex::encode(channel_id); + + let reference_id = match reference_id { + Some(reference_id) => *reference_id, + // If the app did not yet update to the latest version, it will not + // send us the reference id in the message. In that case we will + // have to look up the reference id ourselves from the channel. + // TODO(holzeis): Remove this fallback handling once not needed + // anymore. + None => self + .inner + .get_dlc_channel_by_id(channel_id)? + .get_reference_id() + .context("missing reference id")?, + }; + let protocol_id = ProtocolId::try_from(reference_id)?; - tracing::info!( - channel_id = channel_id_hex_string, - node_id = node_id.to_string(), - %protocol_id, - "DLC channel settle protocol was finalized" - ); + tracing::info!( + channel_id = channel_id_hex_string, + node_id = node_id.to_string(), + %protocol_id, + "DLC channel settle protocol was finalized" + ); - let protocol_executor = - dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); - protocol_executor.finish_dlc_protocol( - protocol_id, - &node_id, - // the settled signed channel does not have a contract - None, - channel_id, - self.tx_position_feed.clone(), - )?; - } - ChannelMessage::CollaborativeCloseOffer(close_offer) => { - tracing::info!( - channel_id = hex::encode(close_offer.channel_id), - node_id = node_id.to_string(), - "Received an offer to collaboratively close a channel" - ); + let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); + protocol_executor.finish_dlc_protocol( + protocol_id, + &node_id, + // the settled signed channel does not have a contract + None, + channel_id, + self.tx_position_feed.clone(), + )?; + } + TenTenOneMessage::CollaborativeCloseOffer(TenTenOneCollaborativeCloseOffer { + collaborative_close_offer: close_offer, + }) => { + tracing::info!( + channel_id = hex::encode(close_offer.channel_id), + node_id = node_id.to_string(), + "Received an offer to collaboratively close a channel" + ); - self.inner - .accept_dlc_channel_collaborative_close(&close_offer.channel_id)?; - } - ChannelMessage::Accept(AcceptChannel { + self.inner + .accept_dlc_channel_collaborative_close(&close_offer.channel_id)?; + } + TenTenOneMessage::Accept(TenTenOneAcceptChannel { + accept_channel: + AcceptChannel { temporary_channel_id, reference_id, .. - }) => { - let channel_id = match resp { - Some(Message::Channel(ChannelMessage::Sign(SignChannel { - channel_id, - .. - }))) => channel_id, - _ => *temporary_channel_id, - }; - - let reference_id = match reference_id { - Some(reference_id) => *reference_id, - // If the app did not yet update to the latest version, it will not - // send us the reference id in the message. In that case we will - // have to look up the reference id ourselves from the channel. - // TODO(holzeis): Remove this fallback handling once not needed - // anymore. - None => self - .inner - .get_dlc_channel_by_id(&channel_id)? - .get_reference_id() - .context("missing reference id")?, - }; - let protocol_id = ProtocolId::try_from(reference_id)?; + }, + }) => { + let channel_id = match resp { + Some(TenTenOneMessage::Sign(TenTenOneSignChannel { + sign_channel: SignChannel { channel_id, .. }, + })) => channel_id, + _ => *temporary_channel_id, + }; + + let reference_id = match reference_id { + Some(reference_id) => *reference_id, + // If the app did not yet update to the latest version, it will not + // send us the reference id in the message. In that case we will + // have to look up the reference id ourselves from the channel. + // TODO(holzeis): Remove this fallback handling once not needed + // anymore. + None => self + .inner + .get_dlc_channel_by_id(&channel_id)? + .get_reference_id() + .context("missing reference id")?, + }; + let protocol_id = ProtocolId::try_from(reference_id)?; + + tracing::info!( + channel_id = hex::encode(channel_id), + node_id = node_id.to_string(), + %protocol_id, + "DLC channel open protocol was finalized" + ); + + let channel = self.inner.get_dlc_channel_by_id(&channel_id)?; + + let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); + protocol_executor.finish_dlc_protocol( + protocol_id, + &node_id, + channel.get_contract_id(), + &channel_id, + self.tx_position_feed.clone(), + )?; + } + TenTenOneMessage::Reject(TenTenOneReject { + reject: + Reject { + channel_id, + reference_id, + .. + }, + }) => { + let channel_id_hex_string = hex::encode(channel_id); + + let reference_id = match reference_id { + Some(reference_id) => *reference_id, + // If the app did not yet update to the latest version, it will not + // send us the reference id in the message. In that case we will + // have to look up the reference id ourselves from the channel. + // TODO(holzeis): Remove this fallback handling once not needed + // anymore. + None => self + .inner + .get_dlc_channel_by_id(channel_id)? + .get_reference_id() + .context("missing reference id")?, + }; + let protocol_id = ProtocolId::try_from(reference_id)?; + + let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); + protocol_executor.fail_dlc_protocol(protocol_id)?; + let channel = self.inner.get_dlc_channel_by_id(channel_id)?; + let mut connection = self.pool.get()?; + + match channel { + Channel::Cancelled(_) => { tracing::info!( - channel_id = hex::encode(channel_id), + channel_id = channel_id_hex_string, node_id = node_id.to_string(), - %protocol_id, - "DLC channel open protocol was finalized" + "DLC Channel offer has been rejected. Setting position to failed." ); - let channel = self.inner.get_dlc_channel_by_id(&channel_id)?; - - let protocol_executor = - dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); - protocol_executor.finish_dlc_protocol( - protocol_id, - &node_id, - channel.get_contract_id(), - &channel_id, - self.tx_position_feed.clone(), + db::positions::Position::update_position_state( + &mut connection, + node_id.to_string(), + vec![PositionState::Proposed], + PositionState::Failed, )?; } - ChannelMessage::Reject(Reject { - channel_id, - reference_id, + Channel::Signed(SignedChannel { + state: SignedChannelState::Established { .. }, .. }) => { - let channel_id_hex_string = hex::encode(channel_id); - - let reference_id = match reference_id { - Some(reference_id) => *reference_id, - // If the app did not yet update to the latest version, it will not - // send us the reference id in the message. In that case we will - // have to look up the reference id ourselves from the channel. - // TODO(holzeis): Remove this fallback handling once not needed - // anymore. - None => self - .inner - .get_dlc_channel_by_id(channel_id)? - .get_reference_id() - .context("missing reference id")?, - }; - let protocol_id = ProtocolId::try_from(reference_id)?; - - let protocol_executor = - dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); - protocol_executor.fail_dlc_protocol(protocol_id)?; - - let channel = self.inner.get_dlc_channel_by_id(channel_id)?; - let mut connection = self.pool.get()?; - - match channel { - Channel::Cancelled(_) => { - tracing::info!( - channel_id = channel_id_hex_string, - node_id = node_id.to_string(), - "DLC Channel offer has been rejected. Setting position to failed." - ); + // TODO(holzeis): Reverting the position state back from `Closing` + // to `Open` only works as long as we do not support resizing. This + // logic needs to be adapted when we implement resize. - db::positions::Position::update_position_state( - &mut connection, - node_id.to_string(), - vec![PositionState::Proposed], - PositionState::Failed, - )?; - } - Channel::Signed(SignedChannel { - state: SignedChannelState::Established { .. }, - .. - }) => { - // TODO(holzeis): Reverting the position state back from `Closing` - // to `Open` only works as long as we do not support resizing. This - // logic needs to be adapted when we implement resize. - - tracing::info!( + tracing::info!( channel_id = channel_id_hex_string, node_id = node_id.to_string(), "DLC Channel settle offer has been rejected. Setting position to back to open." ); - db::positions::Position::update_closing_position( - &mut connection, - node_id.to_string(), - PositionState::Open, - )?; - } - Channel::Signed(SignedChannel { - state: SignedChannelState::Settled { .. }, - .. - }) => { - tracing::info!( + db::positions::Position::update_closing_position( + &mut connection, + node_id.to_string(), + PositionState::Open, + )?; + } + Channel::Signed(SignedChannel { + state: SignedChannelState::Settled { .. }, + .. + }) => { + tracing::info!( channel_id = channel_id_hex_string, node_id = node_id.to_string(), "DLC Channel renew offer has been rejected. Setting position to failed." ); - db::positions::Position::update_position_state( - &mut connection, - node_id.to_string(), - vec![PositionState::Proposed], - PositionState::Failed, - )?; - } - _ => {} - } + db::positions::Position::update_position_state( + &mut connection, + node_id.to_string(), + vec![PositionState::Proposed], + PositionState::Failed, + )?; } _ => {} - }; - - resp + } } + _ => {} }; if let Some(msg) = resp { @@ -465,7 +459,7 @@ impl Node { // that has been stored before. tracing::info!( to = %node_id, - kind = %dlc_message_name(&msg), + kind = %tentenone_message_name(&msg), "Sending message" ); @@ -483,9 +477,11 @@ impl Node { /// /// If the expected own payout amount does not match the offered own payout amount, /// we will simply ignore the offer. - fn verify_collab_close_offer(&self, node_id: &PublicKey, msg: &ChannelMessage) -> Result<()> { + fn verify_collab_close_offer(&self, node_id: &PublicKey, msg: &TenTenOneMessage) -> Result<()> { let close_offer = match msg { - ChannelMessage::CollaborativeCloseOffer(close_offer) => close_offer, + TenTenOneMessage::CollaborativeCloseOffer(TenTenOneCollaborativeCloseOffer { + collaborative_close_offer: close_offer, + }) => close_offer, _ => return Ok(()), }; diff --git a/crates/ln-dlc-node/src/dlc_message.rs b/crates/ln-dlc-node/src/dlc_message.rs index 43065dc11..de1c3a316 100644 --- a/crates/ln-dlc-node/src/dlc_message.rs +++ b/crates/ln-dlc-node/src/dlc_message.rs @@ -1,7 +1,6 @@ +use crate::message_handler::TenTenOneMessage; use anyhow::Result; use bitcoin::secp256k1::PublicKey; -use dlc_messages::ChannelMessage; -use dlc_messages::Message; use sha2::digest::FixedOutput; use sha2::Digest; use sha2::Sha256; @@ -67,119 +66,114 @@ pub enum DlcMessageType { Reject, } -impl TryFrom<&SerializedDlcMessage> for Message { +impl TryFrom<&SerializedDlcMessage> for TenTenOneMessage { type Error = anyhow::Error; fn try_from(serialized_msg: &SerializedDlcMessage) -> Result { let message = match serialized_msg.clone().message_type { - DlcMessageType::Offer => Message::Channel(ChannelMessage::Offer(serde_json::from_str( - &serialized_msg.message, - )?)), - DlcMessageType::Accept => Message::Channel(ChannelMessage::Accept( - serde_json::from_str(&serialized_msg.message)?, - )), - DlcMessageType::Sign => Message::Channel(ChannelMessage::Sign(serde_json::from_str( - &serialized_msg.message, - )?)), - DlcMessageType::SettleOffer => Message::Channel(ChannelMessage::SettleOffer( - serde_json::from_str(&serialized_msg.message)?, - )), - DlcMessageType::SettleAccept => Message::Channel(ChannelMessage::SettleAccept( - serde_json::from_str(&serialized_msg.message)?, - )), - DlcMessageType::SettleConfirm => Message::Channel(ChannelMessage::SettleConfirm( - serde_json::from_str(&serialized_msg.message)?, - )), - DlcMessageType::SettleFinalize => Message::Channel(ChannelMessage::SettleFinalize( - serde_json::from_str(&serialized_msg.message)?, - )), - DlcMessageType::RenewOffer => Message::Channel(ChannelMessage::RenewOffer( - serde_json::from_str(&serialized_msg.message)?, - )), - DlcMessageType::RenewAccept => Message::Channel(ChannelMessage::RenewAccept( - serde_json::from_str(&serialized_msg.message)?, - )), - DlcMessageType::RenewConfirm => Message::Channel(ChannelMessage::RenewConfirm( - serde_json::from_str(&serialized_msg.message)?, - )), - DlcMessageType::RenewFinalize => Message::Channel(ChannelMessage::RenewFinalize( - serde_json::from_str(&serialized_msg.message)?, - )), - DlcMessageType::RenewRevoke => Message::Channel(ChannelMessage::RenewRevoke( - serde_json::from_str(&serialized_msg.message)?, - )), - DlcMessageType::CollaborativeCloseOffer => { - Message::Channel(ChannelMessage::CollaborativeCloseOffer( - serde_json::from_str(&serialized_msg.message)?, - )) + DlcMessageType::Reject => { + TenTenOneMessage::Reject(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::Offer => { + TenTenOneMessage::Offer(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::Accept => { + TenTenOneMessage::Accept(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::Sign => { + TenTenOneMessage::Sign(serde_json::from_str(&serialized_msg.message)?) } - DlcMessageType::Reject => Message::Channel(ChannelMessage::Reject( + DlcMessageType::SettleOffer => { + TenTenOneMessage::SettleOffer(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::SettleAccept => { + TenTenOneMessage::SettleAccept(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::SettleConfirm => { + TenTenOneMessage::SettleConfirm(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::SettleFinalize => { + TenTenOneMessage::SettleFinalize(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::RenewOffer => { + TenTenOneMessage::RenewOffer(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::RenewAccept => { + TenTenOneMessage::RenewAccept(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::RenewConfirm => { + TenTenOneMessage::RenewConfirm(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::RenewFinalize => { + TenTenOneMessage::RenewFinalize(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::RenewRevoke => { + TenTenOneMessage::RenewRevoke(serde_json::from_str(&serialized_msg.message)?) + } + DlcMessageType::CollaborativeCloseOffer => TenTenOneMessage::CollaborativeCloseOffer( serde_json::from_str(&serialized_msg.message)?, - )), + ), }; Ok(message) } } -impl TryFrom<&Message> for SerializedDlcMessage { +impl TryFrom<&TenTenOneMessage> for SerializedDlcMessage { type Error = anyhow::Error; - fn try_from(msg: &Message) -> Result { + fn try_from(msg: &TenTenOneMessage) -> Result { let (message, message_type) = match &msg { - Message::Channel(message) => match message { - ChannelMessage::Offer(offer) => { - (serde_json::to_string(&offer)?, DlcMessageType::Offer) - } - ChannelMessage::Accept(accept) => { - (serde_json::to_string(&accept)?, DlcMessageType::Accept) - } - ChannelMessage::Sign(sign) => (serde_json::to_string(&sign)?, DlcMessageType::Sign), - ChannelMessage::SettleOffer(settle_offer) => ( - serde_json::to_string(&settle_offer)?, - DlcMessageType::SettleOffer, - ), - ChannelMessage::SettleAccept(settle_accept) => ( - serde_json::to_string(&settle_accept)?, - DlcMessageType::SettleAccept, - ), - ChannelMessage::SettleConfirm(settle_confirm) => ( - serde_json::to_string(&settle_confirm)?, - DlcMessageType::SettleConfirm, - ), - ChannelMessage::SettleFinalize(settle_finalize) => ( - serde_json::to_string(&settle_finalize)?, - DlcMessageType::SettleFinalize, - ), - ChannelMessage::RenewOffer(renew_offer) => ( - serde_json::to_string(&renew_offer)?, - DlcMessageType::RenewOffer, - ), - ChannelMessage::RenewAccept(renew_accept) => ( - serde_json::to_string(&renew_accept)?, - DlcMessageType::RenewAccept, - ), - ChannelMessage::RenewConfirm(renew_confirm) => ( - serde_json::to_string(&renew_confirm)?, - DlcMessageType::RenewConfirm, - ), - ChannelMessage::RenewFinalize(renew_finalize) => ( - serde_json::to_string(&renew_finalize)?, - DlcMessageType::RenewFinalize, - ), - ChannelMessage::RenewRevoke(renew_revoke) => ( - serde_json::to_string(&renew_revoke)?, - DlcMessageType::RenewRevoke, - ), - ChannelMessage::CollaborativeCloseOffer(collaborative_close_offer) => ( - serde_json::to_string(&collaborative_close_offer)?, - DlcMessageType::CollaborativeCloseOffer, - ), - ChannelMessage::Reject(reject) => { - (serde_json::to_string(&reject)?, DlcMessageType::Reject) - } - }, - _ => unreachable!(), + TenTenOneMessage::Offer(offer) => { + (serde_json::to_string(&offer)?, DlcMessageType::Offer) + } + TenTenOneMessage::Accept(accept) => { + (serde_json::to_string(&accept)?, DlcMessageType::Accept) + } + TenTenOneMessage::Sign(sign) => (serde_json::to_string(&sign)?, DlcMessageType::Sign), + TenTenOneMessage::SettleOffer(settle_offer) => ( + serde_json::to_string(&settle_offer)?, + DlcMessageType::SettleOffer, + ), + TenTenOneMessage::SettleAccept(settle_accept) => ( + serde_json::to_string(&settle_accept)?, + DlcMessageType::SettleAccept, + ), + TenTenOneMessage::SettleConfirm(settle_confirm) => ( + serde_json::to_string(&settle_confirm)?, + DlcMessageType::SettleConfirm, + ), + TenTenOneMessage::SettleFinalize(settle_finalize) => ( + serde_json::to_string(&settle_finalize)?, + DlcMessageType::SettleFinalize, + ), + TenTenOneMessage::RenewOffer(renew_offer) => ( + serde_json::to_string(&renew_offer)?, + DlcMessageType::RenewOffer, + ), + TenTenOneMessage::RenewAccept(renew_accept) => ( + serde_json::to_string(&renew_accept)?, + DlcMessageType::RenewAccept, + ), + TenTenOneMessage::RenewConfirm(renew_confirm) => ( + serde_json::to_string(&renew_confirm)?, + DlcMessageType::RenewConfirm, + ), + TenTenOneMessage::RenewFinalize(renew_finalize) => ( + serde_json::to_string(&renew_finalize)?, + DlcMessageType::RenewFinalize, + ), + TenTenOneMessage::RenewRevoke(renew_revoke) => ( + serde_json::to_string(&renew_revoke)?, + DlcMessageType::RenewRevoke, + ), + TenTenOneMessage::CollaborativeCloseOffer(collaborative_close_offer) => ( + serde_json::to_string(&collaborative_close_offer)?, + DlcMessageType::CollaborativeCloseOffer, + ), + TenTenOneMessage::Reject(reject) => { + (serde_json::to_string(&reject)?, DlcMessageType::Reject) + } }; Ok(Self { diff --git a/crates/ln-dlc-node/src/lib.rs b/crates/ln-dlc-node/src/lib.rs index 84f1be8d0..0e88396df 100644 --- a/crates/ln-dlc-node/src/lib.rs +++ b/crates/ln-dlc-node/src/lib.rs @@ -4,7 +4,6 @@ use crate::node::SubChannelManager; use crate::node::TenTenOneOnionMessageHandler; use dlc_custom_signer::CustomKeysManager; use dlc_custom_signer::CustomSigner; -use dlc_messages::message_handler::MessageHandler as DlcMessageHandler; use fee_rate_estimator::FeeRateEstimator; use lightning::chain::chainmonitor; use lightning::chain::Filter; @@ -28,12 +27,14 @@ pub mod bitcoin_conversion; pub mod config; pub mod dlc_message; pub mod ln; +pub mod message_handler; pub mod networking; pub mod node; pub mod seed; pub mod storage; pub mod transaction; +use crate::message_handler::TenTenOneMessageHandler; use crate::networking::DynamicSocketDescriptor; pub use config::CONFIRMATION_TARGET; pub use lightning; @@ -72,7 +73,7 @@ pub type PeerManager = lightning::ln::peer_handler::PeerManager< >, Arc, Arc, - Arc, + Arc, Arc>, >; diff --git a/crates/ln-dlc-node/src/message_handler.rs b/crates/ln-dlc-node/src/message_handler.rs new file mode 100644 index 000000000..62cc5d459 --- /dev/null +++ b/crates/ln-dlc-node/src/message_handler.rs @@ -0,0 +1,627 @@ +use anyhow::bail; +use dlc_manager::ReferenceId; +use dlc_messages::channel::AcceptChannel; +use dlc_messages::channel::CollaborativeCloseOffer; +use dlc_messages::channel::OfferChannel; +use dlc_messages::channel::Reject; +use dlc_messages::channel::RenewAccept; +use dlc_messages::channel::RenewConfirm; +use dlc_messages::channel::RenewFinalize; +use dlc_messages::channel::RenewOffer; +use dlc_messages::channel::RenewRevoke; +use dlc_messages::channel::SettleAccept; +use dlc_messages::channel::SettleConfirm; +use dlc_messages::channel::SettleFinalize; +use dlc_messages::channel::SettleOffer; +use dlc_messages::channel::SignChannel; +use dlc_messages::field_read; +use dlc_messages::field_write; +use dlc_messages::impl_dlc_writeable; +use dlc_messages::segmentation; +use dlc_messages::segmentation::get_segments; +use dlc_messages::segmentation::segment_reader::SegmentReader; +use dlc_messages::segmentation::SegmentChunk; +use dlc_messages::segmentation::SegmentStart; +use dlc_messages::ChannelMessage; +use dlc_messages::Message; +use lightning::ln::features::InitFeatures; +use lightning::ln::features::NodeFeatures; +use lightning::ln::msgs::DecodeError; +use lightning::ln::msgs::LightningError; +use lightning::ln::peer_handler::CustomMessageHandler; +use lightning::ln::wire::CustomMessageReader; +use lightning::ln::wire::Type; +use lightning::util::ser::Readable; +use lightning::util::ser::Writeable; +use lightning::util::ser::Writer; +use lightning::util::ser::MAX_BUF_SIZE; +use secp256k1_zkp::PublicKey; +use serde::Deserialize; +use serde::Serialize; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::fmt::Display; +use std::io::Cursor; +use std::sync::Mutex; + +/// TenTenOneMessageHandler is used to send and receive messages through the custom +/// message handling mechanism of the LDK. It also handles message segmentation +/// by splitting large messages when sending and re-constructing them when +/// receiving. +pub struct TenTenOneMessageHandler { + msg_events: Mutex>, + msg_received: Mutex>, + segment_readers: Mutex>, +} + +impl Default for TenTenOneMessageHandler { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum WireMessage { + Message(TenTenOneMessage), + SegmentStart(SegmentStart), + SegmentChunk(SegmentChunk), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[allow(clippy::large_enum_variant)] +pub enum TenTenOneMessage { + Reject(TenTenOneReject), + Offer(TenTenOneOfferChannel), + Accept(TenTenOneAcceptChannel), + Sign(TenTenOneSignChannel), + SettleOffer(TenTenOneSettleOffer), + SettleAccept(TenTenOneSettleAccept), + SettleConfirm(TenTenOneSettleConfirm), + SettleFinalize(TenTenOneSettleFinalize), + RenewOffer(TenTenOneRenewOffer), + RenewAccept(TenTenOneRenewAccept), + RenewConfirm(TenTenOneRenewConfirm), + RenewFinalize(TenTenOneRenewFinalize), + RenewRevoke(TenTenOneRenewRevoke), + CollaborativeCloseOffer(TenTenOneCollaborativeCloseOffer), +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneReject { + pub reject: Reject, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct TenTenOneOfferChannel { + pub offer_channel: OfferChannel, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneAcceptChannel { + pub accept_channel: AcceptChannel, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneSignChannel { + pub sign_channel: SignChannel, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneSettleOffer { + pub settle_offer: SettleOffer, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneSettleAccept { + pub settle_accept: SettleAccept, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneSettleConfirm { + pub settle_confirm: SettleConfirm, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneSettleFinalize { + pub settle_finalize: SettleFinalize, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct TenTenOneRenewOffer { + pub renew_offer: RenewOffer, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneRenewAccept { + pub renew_accept: RenewAccept, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneRenewConfirm { + pub renew_confirm: RenewConfirm, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneRenewFinalize { + pub renew_finalize: RenewFinalize, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneRenewRevoke { + pub renew_revoke: RenewRevoke, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenTenOneCollaborativeCloseOffer { + pub collaborative_close_offer: CollaborativeCloseOffer, +} + +impl TenTenOneMessageHandler { + /// Creates a new instance of a [`TenTenOneMessageHandler`] + pub fn new() -> Self { + TenTenOneMessageHandler { + msg_events: Mutex::new(VecDeque::new()), + msg_received: Mutex::new(Vec::new()), + segment_readers: Mutex::new(HashMap::new()), + } + } + + /// Returns whether there are any new received messages to process. + pub fn has_pending_messages_to_process(&self) -> bool { + !self.msg_received.lock().expect("to get lock").is_empty() + } + + /// Returns the messages received by the message handler and empty the + /// receiving buffer. + pub fn get_and_clear_received_messages(&self) -> Vec<(PublicKey, TenTenOneMessage)> { + let mut ret = Vec::new(); + std::mem::swap( + &mut *self.msg_received.lock().expect("to get lock"), + &mut ret, + ); + ret + } + + /// Send a message to the peer with given node id. Not that the message is not + /// sent right away, but only when the LDK + /// [`lightning::ln::peer_handler::PeerManager::process_events`] is next called. + pub fn send_message(&self, node_id: PublicKey, msg: TenTenOneMessage) { + if msg.serialized_length() > MAX_BUF_SIZE { + let (seg_start, seg_chunks) = get_segments(msg.encode(), msg.type_id()); + let mut msg_events = self.msg_events.lock().expect("to get lock"); + msg_events.push_back((node_id, WireMessage::SegmentStart(seg_start))); + for chunk in seg_chunks { + msg_events.push_back((node_id, WireMessage::SegmentChunk(chunk))); + } + } else { + self.msg_events + .lock() + .expect("to get lock") + .push_back((node_id, WireMessage::Message(msg))); + } + } + + /// Returns whether the message handler has any message to be sent. + pub fn has_pending_messages(&self) -> bool { + !self.msg_events.lock().expect("to get lock").is_empty() + } +} + +impl CustomMessageReader for TenTenOneMessageHandler { + type CustomMessage = WireMessage; + fn read( + &self, + msg_type: u16, + mut buffer: &mut R, + ) -> Result, DecodeError> { + let decoded = match msg_type { + segmentation::SEGMENT_START_TYPE => { + WireMessage::SegmentStart(Readable::read(&mut buffer)?) + } + segmentation::SEGMENT_CHUNK_TYPE => { + WireMessage::SegmentChunk(Readable::read(&mut buffer)?) + } + _ => return read_tentenone_message(msg_type, buffer), + }; + + Ok(Some(decoded)) + } +} + +/// Implementation of the `CustomMessageHandler` trait is required to handle +/// custom messages in the LDK. +impl CustomMessageHandler for TenTenOneMessageHandler { + fn handle_custom_message( + &self, + msg: WireMessage, + org: &PublicKey, + ) -> Result<(), LightningError> { + let mut segment_readers = self.segment_readers.lock().expect("to get lock"); + let segment_reader = segment_readers.entry(*org).or_default(); + + if segment_reader.expecting_chunk() { + match msg { + WireMessage::SegmentChunk(s) => { + if let Some(msg) = segment_reader + .process_segment_chunk(s) + .map_err(|e| to_ln_error(e, "Error processing segment chunk"))? + { + let mut buf = Cursor::new(msg); + let message_type = ::read(&mut buf).map_err(|e| { + to_ln_error(e, "Could not reconstruct message from segments") + })?; + if let WireMessage::Message(m) = self + .read(message_type, &mut buf) + .map_err(|e| { + to_ln_error(e, "Could not reconstruct message from segments") + })? + .expect("to have a message") + { + self.msg_received + .lock() + .expect("to get lock") + .push((*org, m)); + } else { + return Err(to_ln_error( + "Unexpected message type", + &message_type.to_string(), + )); + } + } + return Ok(()); + } + _ => { + // We were expecting a segment chunk but received something + // else, we reset the state. + segment_reader.reset(); + } + } + } + + match msg { + WireMessage::Message(m) => self + .msg_received + .lock() + .expect("to get lock") + .push((*org, m)), + WireMessage::SegmentStart(s) => segment_reader + .process_segment_start(s) + .map_err(|e| to_ln_error(e, "Error processing segment start"))?, + WireMessage::SegmentChunk(_) => { + return Err(LightningError { + err: "Received a SegmentChunk while not expecting one.".to_string(), + action: lightning::ln::msgs::ErrorAction::DisconnectPeer { msg: None }, + }); + } + }; + Ok(()) + } + + fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { + self.msg_events + .lock() + .expect("to get lock") + .drain(..) + .collect() + } + + fn provided_node_features(&self) -> NodeFeatures { + NodeFeatures::empty() + } + + fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { + InitFeatures::empty() + } +} + +#[inline] +fn to_ln_error(e: T, msg: &str) -> LightningError { + LightningError { + err: format!("{} :{}", msg, e), + action: lightning::ln::msgs::ErrorAction::DisconnectPeer { msg: None }, + } +} + +pub fn tentenone_message_name(msg: &TenTenOneMessage) -> String { + let name = match msg { + TenTenOneMessage::Offer(_) => "Offer", + TenTenOneMessage::Accept(_) => "Accept", + TenTenOneMessage::Sign(_) => "Sign", + TenTenOneMessage::SettleOffer(_) => "SettleOffer", + TenTenOneMessage::SettleAccept(_) => "SettleAccept", + TenTenOneMessage::SettleConfirm(_) => "SettleConfirm", + TenTenOneMessage::SettleFinalize(_) => "SettleFinalize", + TenTenOneMessage::RenewOffer(_) => "RenewOffer", + TenTenOneMessage::RenewAccept(_) => "RenewAccept", + TenTenOneMessage::RenewConfirm(_) => "RenewConfirm", + TenTenOneMessage::RenewFinalize(_) => "RenewFinalize", + TenTenOneMessage::RenewRevoke(_) => "RenewRevoke", + TenTenOneMessage::CollaborativeCloseOffer(_) => "CollaborativeCloseOffer", + TenTenOneMessage::Reject(_) => "Reject", + }; + + name.to_string() +} + +impl TryFrom for TenTenOneMessage { + type Error = anyhow::Error; + + fn try_from(value: Message) -> Result { + let msg = match value { + Message::Channel(ChannelMessage::Offer(offer_channel)) => { + TenTenOneMessage::Offer(TenTenOneOfferChannel { offer_channel }) + } + Message::Channel(ChannelMessage::Accept(accept_channel)) => { + TenTenOneMessage::Accept(TenTenOneAcceptChannel { accept_channel }) + } + Message::Channel(ChannelMessage::Sign(sign_channel)) => { + TenTenOneMessage::Sign(TenTenOneSignChannel { sign_channel }) + } + Message::Channel(ChannelMessage::SettleOffer(settle_offer)) => { + TenTenOneMessage::SettleOffer(TenTenOneSettleOffer { settle_offer }) + } + Message::Channel(ChannelMessage::SettleAccept(settle_accept)) => { + TenTenOneMessage::SettleAccept(TenTenOneSettleAccept { settle_accept }) + } + Message::Channel(ChannelMessage::SettleConfirm(settle_confirm)) => { + TenTenOneMessage::SettleConfirm(TenTenOneSettleConfirm { settle_confirm }) + } + Message::Channel(ChannelMessage::SettleFinalize(settle_finalize)) => { + TenTenOneMessage::SettleFinalize(TenTenOneSettleFinalize { settle_finalize }) + } + Message::Channel(ChannelMessage::RenewOffer(renew_offer)) => { + TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { renew_offer }) + } + Message::Channel(ChannelMessage::RenewAccept(renew_accept)) => { + TenTenOneMessage::RenewAccept(TenTenOneRenewAccept { renew_accept }) + } + Message::Channel(ChannelMessage::RenewConfirm(renew_confirm)) => { + TenTenOneMessage::RenewConfirm(TenTenOneRenewConfirm { renew_confirm }) + } + Message::Channel(ChannelMessage::RenewFinalize(renew_finalize)) => { + TenTenOneMessage::RenewFinalize(TenTenOneRenewFinalize { renew_finalize }) + } + Message::Channel(ChannelMessage::RenewRevoke(renew_revoke)) => { + TenTenOneMessage::RenewRevoke(TenTenOneRenewRevoke { renew_revoke }) + } + Message::Channel(ChannelMessage::CollaborativeCloseOffer( + collaborative_close_offer, + )) => TenTenOneMessage::CollaborativeCloseOffer(TenTenOneCollaborativeCloseOffer { + collaborative_close_offer, + }), + Message::Channel(ChannelMessage::Reject(reject)) => { + TenTenOneMessage::Reject(TenTenOneReject { reject }) + } + Message::OnChain(_) | Message::SubChannel(_) => bail!("Unexpected dlc message"), + }; + + Ok(msg) + } +} + +impl TenTenOneMessage { + pub fn get_reference_id(&self) -> Option { + match self { + TenTenOneMessage::Offer(TenTenOneOfferChannel { + offer_channel: OfferChannel { reference_id, .. }, + }) + | TenTenOneMessage::Accept(TenTenOneAcceptChannel { + accept_channel: AcceptChannel { reference_id, .. }, + }) + | TenTenOneMessage::Sign(TenTenOneSignChannel { + sign_channel: SignChannel { reference_id, .. }, + }) + | TenTenOneMessage::SettleOffer(TenTenOneSettleOffer { + settle_offer: SettleOffer { reference_id, .. }, + }) + | TenTenOneMessage::SettleAccept(TenTenOneSettleAccept { + settle_accept: SettleAccept { reference_id, .. }, + }) + | TenTenOneMessage::SettleConfirm(TenTenOneSettleConfirm { + settle_confirm: SettleConfirm { reference_id, .. }, + }) + | TenTenOneMessage::SettleFinalize(TenTenOneSettleFinalize { + settle_finalize: SettleFinalize { reference_id, .. }, + }) + | TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { + renew_offer: RenewOffer { reference_id, .. }, + }) + | TenTenOneMessage::RenewAccept(TenTenOneRenewAccept { + renew_accept: RenewAccept { reference_id, .. }, + }) + | TenTenOneMessage::RenewConfirm(TenTenOneRenewConfirm { + renew_confirm: RenewConfirm { reference_id, .. }, + }) + | TenTenOneMessage::RenewFinalize(TenTenOneRenewFinalize { + renew_finalize: RenewFinalize { reference_id, .. }, + }) + | TenTenOneMessage::RenewRevoke(TenTenOneRenewRevoke { + renew_revoke: RenewRevoke { reference_id, .. }, + }) + | TenTenOneMessage::CollaborativeCloseOffer(TenTenOneCollaborativeCloseOffer { + collaborative_close_offer: CollaborativeCloseOffer { reference_id, .. }, + }) + | TenTenOneMessage::Reject(TenTenOneReject { + reject: Reject { reference_id, .. }, + }) => *reference_id, + } + } +} + +impl From for Message { + fn from(value: TenTenOneMessage) -> Self { + let msg = ChannelMessage::from(value); + Message::Channel(msg) + } +} + +impl From for ChannelMessage { + fn from(value: TenTenOneMessage) -> Self { + match value { + TenTenOneMessage::Offer(TenTenOneOfferChannel { offer_channel }) => { + ChannelMessage::Offer(offer_channel) + } + TenTenOneMessage::Accept(TenTenOneAcceptChannel { accept_channel }) => { + ChannelMessage::Accept(accept_channel) + } + TenTenOneMessage::Sign(TenTenOneSignChannel { sign_channel }) => { + ChannelMessage::Sign(sign_channel) + } + TenTenOneMessage::SettleOffer(TenTenOneSettleOffer { settle_offer }) => { + ChannelMessage::SettleOffer(settle_offer) + } + TenTenOneMessage::SettleAccept(TenTenOneSettleAccept { settle_accept }) => { + ChannelMessage::SettleAccept(settle_accept) + } + TenTenOneMessage::SettleConfirm(TenTenOneSettleConfirm { settle_confirm }) => { + ChannelMessage::SettleConfirm(settle_confirm) + } + TenTenOneMessage::SettleFinalize(TenTenOneSettleFinalize { settle_finalize }) => { + ChannelMessage::SettleFinalize(settle_finalize) + } + TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { renew_offer }) => { + ChannelMessage::RenewOffer(renew_offer) + } + TenTenOneMessage::RenewAccept(TenTenOneRenewAccept { renew_accept }) => { + ChannelMessage::RenewAccept(renew_accept) + } + TenTenOneMessage::RenewConfirm(TenTenOneRenewConfirm { renew_confirm }) => { + ChannelMessage::RenewConfirm(renew_confirm) + } + TenTenOneMessage::RenewFinalize(TenTenOneRenewFinalize { renew_finalize }) => { + ChannelMessage::RenewFinalize(renew_finalize) + } + TenTenOneMessage::RenewRevoke(TenTenOneRenewRevoke { renew_revoke }) => { + ChannelMessage::RenewRevoke(renew_revoke) + } + TenTenOneMessage::CollaborativeCloseOffer(TenTenOneCollaborativeCloseOffer { + collaborative_close_offer, + }) => ChannelMessage::CollaborativeCloseOffer(collaborative_close_offer), + TenTenOneMessage::Reject(TenTenOneReject { reject }) => ChannelMessage::Reject(reject), + } + } +} + +macro_rules! impl_type_writeable_for_enum { + ($type_name: ident, {$($variant_name: ident),*}) => { + impl Type for $type_name { + fn type_id(&self) -> u16 { + match self { + $($type_name::$variant_name(v) => v.type_id(),)* + } + } + } + + impl Writeable for $type_name { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + $($type_name::$variant_name(v) => v.write(writer),)* + } + } + } + }; +} + +macro_rules! impl_type { + ($const_name: ident, $type_name: ident, $type_val: expr) => { + /// The type prefix for an [`$type_name`] message. + pub const $const_name: u16 = $type_val; + + impl Type for $type_name { + fn type_id(&self) -> u16 { + $const_name + } + } + }; +} + +macro_rules! handle_read_tentenone_messages { + ($msg_type:ident, $buffer:ident, $(($type_id:ident, $variant:ident)),*) => {{ + let decoded = match $msg_type { + $( + $type_id => TenTenOneMessage::$variant(Readable::read(&mut $buffer)?), + )* + _ => return Ok(None), + }; + Ok(Some(WireMessage::Message(decoded))) + }}; +} + +impl_type_writeable_for_enum!(WireMessage, { Message, SegmentStart, SegmentChunk }); +impl_type_writeable_for_enum!(TenTenOneMessage, +{ + Reject, + Offer, + Accept, + Sign, + SettleOffer, + SettleAccept, + SettleConfirm, + SettleFinalize, + RenewOffer, + RenewAccept, + RenewConfirm, + RenewFinalize, + RenewRevoke, + CollaborativeCloseOffer +}); + +impl_dlc_writeable!(TenTenOneReject, { (reject, writeable) }); +impl_dlc_writeable!(TenTenOneOfferChannel, { (offer_channel, writeable) }); +impl_dlc_writeable!(TenTenOneAcceptChannel, { (accept_channel, writeable) }); +impl_dlc_writeable!(TenTenOneSignChannel, { (sign_channel, writeable) }); +impl_dlc_writeable!(TenTenOneSettleOffer, { (settle_offer, writeable) }); +impl_dlc_writeable!(TenTenOneSettleAccept, { (settle_accept, writeable) }); +impl_dlc_writeable!(TenTenOneSettleConfirm, { (settle_confirm, writeable) }); +impl_dlc_writeable!(TenTenOneSettleFinalize, { (settle_finalize, writeable) }); +impl_dlc_writeable!(TenTenOneRenewOffer, { (renew_offer, writeable) }); +impl_dlc_writeable!(TenTenOneRenewAccept, { (renew_accept, writeable) }); +impl_dlc_writeable!(TenTenOneRenewConfirm, { (renew_confirm, writeable) }); +impl_dlc_writeable!(TenTenOneRenewFinalize, { (renew_finalize, writeable) }); +impl_dlc_writeable!(TenTenOneRenewRevoke, { (renew_revoke, writeable) }); +impl_dlc_writeable!(TenTenOneCollaborativeCloseOffer, { + (collaborative_close_offer, writeable) +}); + +impl_type!(REJECT, TenTenOneReject, 43024); +impl_type!(OFFER_CHANNEL_TYPE, TenTenOneOfferChannel, 43000); +impl_type!(ACCEPT_CHANNEL_TYPE, TenTenOneAcceptChannel, 43002); +impl_type!(SIGN_CHANNEL_TYPE, TenTenOneSignChannel, 43004); +impl_type!(SETTLE_CHANNEL_OFFER_TYPE, TenTenOneSettleOffer, 43006); +impl_type!(SETTLE_CHANNEL_ACCEPT_TYPE, TenTenOneSettleAccept, 43008); +impl_type!(SETTLE_CHANNEL_CONFIRM_TYPE, TenTenOneSettleConfirm, 43010); +impl_type!(SETTLE_CHANNEL_FINALIZE_TYPE, TenTenOneSettleFinalize, 43012); +impl_type!(RENEW_CHANNEL_OFFER_TYPE, TenTenOneRenewOffer, 43014); +impl_type!(RENEW_CHANNEL_ACCEPT_TYPE, TenTenOneRenewAccept, 43016); +impl_type!(RENEW_CHANNEL_CONFIRM_TYPE, TenTenOneRenewConfirm, 43018); +impl_type!(RENEW_CHANNEL_FINALIZE_TYPE, TenTenOneRenewFinalize, 43020); +impl_type!(RENEW_CHANNEL_REVOKE_TYPE, TenTenOneRenewRevoke, 43026); +impl_type!( + COLLABORATIVE_CLOSE_OFFER_TYPE, + TenTenOneCollaborativeCloseOffer, + 43022 +); + +fn read_tentenone_message( + msg_type: u16, + mut buffer: &mut R, +) -> Result, DecodeError> { + handle_read_tentenone_messages!( + msg_type, + buffer, + (REJECT, Reject), + (OFFER_CHANNEL_TYPE, Offer), + (ACCEPT_CHANNEL_TYPE, Accept), + (SIGN_CHANNEL_TYPE, Sign), + (SETTLE_CHANNEL_OFFER_TYPE, SettleOffer), + (SETTLE_CHANNEL_ACCEPT_TYPE, SettleAccept), + (SETTLE_CHANNEL_CONFIRM_TYPE, SettleConfirm), + (SETTLE_CHANNEL_FINALIZE_TYPE, SettleFinalize), + (RENEW_CHANNEL_OFFER_TYPE, RenewOffer), + (RENEW_CHANNEL_ACCEPT_TYPE, RenewAccept), + (RENEW_CHANNEL_CONFIRM_TYPE, RenewConfirm), + (RENEW_CHANNEL_FINALIZE_TYPE, RenewFinalize), + (RENEW_CHANNEL_REVOKE_TYPE, RenewRevoke), + (COLLABORATIVE_CLOSE_OFFER_TYPE, CollaborativeCloseOffer) + ) +} diff --git a/crates/ln-dlc-node/src/node/dlc_channel.rs b/crates/ln-dlc-node/src/node/dlc_channel.rs index fd5aaef23..da9a8ea81 100644 --- a/crates/ln-dlc-node/src/node/dlc_channel.rs +++ b/crates/ln-dlc-node/src/node/dlc_channel.rs @@ -1,11 +1,17 @@ use crate::bitcoin_conversion::to_secp_pk_29; use crate::bitcoin_conversion::to_secp_pk_30; +use crate::message_handler::TenTenOneCollaborativeCloseOffer; +use crate::message_handler::TenTenOneMessage; +use crate::message_handler::TenTenOneMessageHandler; +use crate::message_handler::TenTenOneOfferChannel; +use crate::message_handler::TenTenOneRenewOffer; +use crate::message_handler::TenTenOneSettleAccept; +use crate::message_handler::TenTenOneSettleOffer; use crate::node::event::NodeEvent; use crate::node::Node; use crate::node::Storage as LnDlcStorage; use crate::on_chain_wallet::BdkStorage; use crate::storage::TenTenOneStorage; -use crate::DlcMessageHandler; use crate::PeerManager; use anyhow::anyhow; use anyhow::bail; @@ -25,8 +31,6 @@ use dlc_manager::DlcChannelId; use dlc_manager::Oracle; use dlc_manager::ReferenceId; use dlc_manager::Storage; -use dlc_messages::ChannelMessage; -use dlc_messages::Message; use time::OffsetDateTime; use tokio::task::spawn_blocking; @@ -89,7 +93,7 @@ impl Result<()> { + use crate::message_handler::TenTenOneAcceptChannel; + let channel_id_hex = hex::encode(channel_id); tracing::info!(channel_id = %channel_id_hex, "Accepting DLC channel offer"); - let (msg, _channel_id, _contract_id, counter_party) = + let (accept_channel, _channel_id, _contract_id, counter_party) = self.dlc_manager.accept_channel(channel_id)?; self.event_handler.publish(NodeEvent::SendDlcMessage { peer: to_secp_pk_30(counter_party), - msg: Message::Channel(ChannelMessage::Accept(msg)), + msg: TenTenOneMessage::Accept(TenTenOneAcceptChannel { accept_channel }), }); Ok(()) @@ -184,9 +190,11 @@ impl Result<()> { + use crate::message_handler::TenTenOneRenewAccept; + let channel_id_hex = hex::encode(channel_id); tracing::info!(channel_id = %channel_id_hex, "Accepting DLC channel update offer"); - let (msg, counter_party) = self.dlc_manager.accept_renew_offer(channel_id)?; + let (renew_accept, counter_party) = self.dlc_manager.accept_renew_offer(channel_id)?; send_dlc_message( &self.dlc_message_handler, &self.peer_manager, to_secp_pk_30(counter_party), - Message::Channel(ChannelMessage::RenewAccept(msg)), + TenTenOneMessage::RenewAccept(TenTenOneRenewAccept { renew_accept }), ); Ok(()) @@ -740,10 +750,9 @@ impl Result<()> { - use crate::node::dlc_message_name; + use crate::node::tentenone_message_name; let dlc_message_handler = &self.dlc_message_handler; - let dlc_manager = &self.dlc_manager; let peer_manager = &self.peer_manager; let messages = dlc_message_handler.get_and_clear_received_messages(); tracing::debug!("Received and cleared {} messages", messages.len()); @@ -751,27 +760,20 @@ impl { - let resp = dlc_manager.on_dlc_message(&msg, node_id)?; - - if let Some(msg) = resp { - tracing::debug!(to = %to_secp_pk_30(node_id), msg = dlc_message_name(&msg), "Sending DLC-manager message"); - send_dlc_message( - dlc_message_handler, - peer_manager, - to_secp_pk_30(node_id), - msg, - ); - } - } - Message::SubChannel(_) => { - tracing::error!("Not sending subchannel message"); - } + let resp = self.process_tentenone_message(msg, to_secp_pk_30(node_id))?; + + if let Some(msg) = resp { + tracing::debug!(to = %to_secp_pk_30(node_id), msg = tentenone_message_name(&msg), "Sending DLC-manager message"); + send_dlc_message( + dlc_message_handler, + peer_manager, + to_secp_pk_30(node_id), + msg, + ); } } @@ -804,10 +806,10 @@ impl( - dlc_message_handler: &DlcMessageHandler, + dlc_message_handler: &TenTenOneMessageHandler, peer_manager: &PeerManager, node_id: PublicKey, - msg: Message, + msg: TenTenOneMessage, ) { // Enqueue the message. dlc_message_handler.send_message(to_secp_pk_29(node_id), msg); diff --git a/crates/ln-dlc-node/src/node/dlc_manager.rs b/crates/ln-dlc-node/src/node/dlc_manager.rs index 88f2322b4..202da2e2f 100644 --- a/crates/ln-dlc-node/src/node/dlc_manager.rs +++ b/crates/ln-dlc-node/src/node/dlc_manager.rs @@ -1,6 +1,7 @@ use crate::bitcoin_conversion::to_secp_pk_29; use crate::dlc_wallet::DlcWallet; use crate::fee_rate_estimator::FeeRateEstimator; +use crate::message_handler::TenTenOneMessage; use crate::node::Node; use crate::node::Storage; use crate::on_chain_wallet::BdkStorage; @@ -63,6 +64,27 @@ pub fn build( .context("Failed to initialise DlcManager") } +impl + Node +{ + pub fn process_tentenone_message( + &self, + message: TenTenOneMessage, + node_id: PublicKey, + ) -> Result> { + let response = self + .dlc_manager + .on_dlc_message(&message.into(), to_secp_pk_29(node_id))?; + + let response = match response { + Some(response) => Some(TenTenOneMessage::try_from(response)?), + None => None, + }; + + Ok(response) + } +} + pub fn signed_channel_state_name(signed_channel: &SignedChannel) -> String { let name = match signed_channel.state { SignedChannelState::Established { .. } => "Established", diff --git a/crates/ln-dlc-node/src/node/event.rs b/crates/ln-dlc-node/src/node/event.rs index 8ddbc14b3..e92dd176b 100644 --- a/crates/ln-dlc-node/src/node/event.rs +++ b/crates/ln-dlc-node/src/node/event.rs @@ -1,5 +1,5 @@ +use crate::message_handler::TenTenOneMessage; use bitcoin::secp256k1::PublicKey; -use dlc_messages::Message; use ln_dlc_storage::DlcChannelEvent; use std::sync::mpsc; use std::sync::Arc; @@ -9,11 +9,23 @@ use tokio::task::spawn_blocking; #[derive(Clone, Debug)] pub enum NodeEvent { - Connected { peer: PublicKey }, - SendDlcMessage { peer: PublicKey, msg: Message }, - StoreDlcMessage { peer: PublicKey, msg: Message }, - SendLastDlcMessage { peer: PublicKey }, - DlcChannelEvent { dlc_channel_event: DlcChannelEvent }, + Connected { + peer: PublicKey, + }, + SendDlcMessage { + peer: PublicKey, + msg: TenTenOneMessage, + }, + StoreDlcMessage { + peer: PublicKey, + msg: TenTenOneMessage, + }, + SendLastDlcMessage { + peer: PublicKey, + }, + DlcChannelEvent { + dlc_channel_event: DlcChannelEvent, + }, } #[derive(Clone)] diff --git a/crates/ln-dlc-node/src/node/mod.rs b/crates/ln-dlc-node/src/node/mod.rs index 0cf3acdb3..bd557e8e1 100644 --- a/crates/ln-dlc-node/src/node/mod.rs +++ b/crates/ln-dlc-node/src/node/mod.rs @@ -6,9 +6,9 @@ use crate::dlc_wallet::DlcWallet; use crate::fee_rate_estimator::FeeRateEstimator; use crate::ln::manage_spendable_outputs; use crate::ln::TracingLogger; +use crate::message_handler::TenTenOneMessageHandler; use crate::node::event::connect_node_event_handler_to_dlc_channel_events; use crate::node::event::NodeEventHandler; -use crate::node::sub_channel::sub_channel_manager_periodic_check; use crate::on_chain_wallet::BdkStorage; use crate::on_chain_wallet::OnChainWallet; use crate::seed::Bip39Seed; @@ -29,7 +29,6 @@ use bitcoin::secp256k1::XOnlyPublicKey; use bitcoin::Address; use bitcoin::Network; use bitcoin::Txid; -use dlc_messages::message_handler::MessageHandler as DlcMessageHandler; use futures::future::RemoteHandle; use futures::FutureExt; use lightning::chain::chaininterface::ConfirmationTarget; @@ -76,12 +75,11 @@ mod storage; mod sub_channel_manager; mod wallet; -pub(crate) mod sub_channel; - pub mod dlc_channel; pub mod event; pub mod peer_manager; +pub use crate::message_handler::tentenone_message_name; pub use ::dlc_manager as rust_dlc_manager; pub use channel_manager::ChannelManager; pub use connection::TenTenOneOnionMessageHandler; @@ -90,7 +88,6 @@ pub use dlc_manager::DlcManager; pub use oracle::OracleInfo; pub use storage::InMemoryStore; pub use storage::Storage; -pub use sub_channel::dlc_message_name; pub use sub_channel_manager::SubChannelManager; /// The interval at which spendable outputs generated by LDK are considered for spending. @@ -130,7 +127,7 @@ pub struct Node { /// All oracles clients the node is aware of. pub oracles: Vec>, - pub dlc_message_handler: Arc, + pub dlc_message_handler: Arc, pub ldk_config: Arc>, /// The oracle pubkey used for proposing dlc channels @@ -337,7 +334,7 @@ impl( self.electrs_server_url.clone(), self.node_storage.clone(), @@ -470,10 +464,6 @@ impl Result<()> { - sub_channel_manager_periodic_check(self.sub_channel_manager.clone()).await - } - pub fn sync_lightning_wallet(&self) -> Result<()> { lightning_wallet_sync( &self.channel_manager, @@ -735,41 +725,6 @@ async fn manage_spendable_outputs_task( - sub_channel_manager: Arc>, -) -> RemoteHandle<()> { - let (fut, remote_handle) = { - async move { - loop { - tracing::trace!("Started periodic check"); - let now = Instant::now(); - if let Err(e) = - sub_channel_manager_periodic_check(sub_channel_manager.clone()).await - { - tracing::error!("Failed to process pending DLC actions: {e:#}"); - }; - - tracing::trace!( - duration = now.elapsed().as_millis(), - "Finished periodic check" - ); - - tokio::time::sleep(Duration::from_secs(30)).await; - } - } - } - .remote_handle(); - - tokio::spawn(fut); - - remote_handle -} - impl Display for NodeInfo { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let scheme = if self.is_ws { "ws" } else { "tcp" }; diff --git a/crates/ln-dlc-node/src/node/sub_channel.rs b/crates/ln-dlc-node/src/node/sub_channel.rs deleted file mode 100644 index 1b0d708c4..000000000 --- a/crates/ln-dlc-node/src/node/sub_channel.rs +++ /dev/null @@ -1,60 +0,0 @@ -use crate::node::Storage; -use crate::node::SubChannelManager; -use crate::on_chain_wallet::BdkStorage; -use crate::storage::TenTenOneStorage; -use anyhow::Result; -use dlc_messages::ChannelMessage; -use dlc_messages::Message; -use dlc_messages::OnChainMessage; -use std::sync::Arc; -use tokio::task::spawn_blocking; - -pub(crate) async fn sub_channel_manager_periodic_check< - D: BdkStorage, - S: TenTenOneStorage + 'static, - N: Storage + Sync + Send + 'static, ->( - sub_channel_manager: Arc>, -) -> Result<()> { - let messages = spawn_blocking(move || sub_channel_manager.periodic_check()).await?; - - for (msg, node_id) in messages { - let msg = Message::SubChannel(msg); - let msg_name = dlc_message_name(&msg); - - tracing::debug!( - to = %node_id, - kind = %msg_name, - "Not sending DLC channel message tied to pending action" - ); - } - - Ok(()) -} - -pub fn dlc_message_name(msg: &Message) -> String { - let name = match msg { - Message::OnChain(OnChainMessage::Offer(_)) => "OnChainOffer", - Message::OnChain(OnChainMessage::Accept(_)) => "OnChainAccept", - Message::OnChain(OnChainMessage::Sign(_)) => "OnChainSign", - Message::Channel(ChannelMessage::Offer(_)) => "ChannelOffer", - Message::Channel(ChannelMessage::Accept(_)) => "ChannelAccept", - Message::Channel(ChannelMessage::Sign(_)) => "ChannelSign", - Message::Channel(ChannelMessage::SettleOffer(_)) => "ChannelSettleOffer", - Message::Channel(ChannelMessage::SettleAccept(_)) => "ChannelSettleAccept", - Message::Channel(ChannelMessage::SettleConfirm(_)) => "ChannelSettleConfirm", - Message::Channel(ChannelMessage::SettleFinalize(_)) => "ChannelSettleFinalize", - Message::Channel(ChannelMessage::RenewOffer(_)) => "ChannelRenewOffer", - Message::Channel(ChannelMessage::RenewAccept(_)) => "ChannelRenewAccept", - Message::Channel(ChannelMessage::RenewConfirm(_)) => "ChannelRenewConfirm", - Message::Channel(ChannelMessage::RenewFinalize(_)) => "ChannelRenewFinalize", - Message::Channel(ChannelMessage::RenewRevoke(_)) => "ChannelRenewRevoke", - Message::Channel(ChannelMessage::CollaborativeCloseOffer(_)) => { - "ChannelCollaborativeCloseOffer" - } - Message::Channel(ChannelMessage::Reject(_)) => "ChannelReject", - Message::SubChannel(_) => "SubChannelMessage", - }; - - name.to_string() -} diff --git a/mobile/native/src/dlc/dlc_handler.rs b/mobile/native/src/dlc/dlc_handler.rs index 87f7719b2..0b27b73e6 100644 --- a/mobile/native/src/dlc/dlc_handler.rs +++ b/mobile/native/src/dlc/dlc_handler.rs @@ -7,9 +7,9 @@ use crate::ln_dlc::node::Node; use anyhow::Context; use anyhow::Result; use bitcoin::secp256k1::PublicKey; -use dlc_messages::Message; use ln_dlc_node::dlc_message::DlcMessage; use ln_dlc_node::dlc_message::SerializedDlcMessage; +use ln_dlc_node::message_handler::TenTenOneMessage; use ln_dlc_node::node::dlc_channel::send_dlc_message; use ln_dlc_node::node::event::NodeEvent; use ln_dlc_node::node::rust_dlc_manager::channel::signed_channel::SignedChannel; @@ -78,7 +78,7 @@ pub async fn handle_outbound_dlc_messages( } impl DlcHandler { - pub fn send_dlc_message(&self, peer: PublicKey, msg: Message) -> Result<()> { + pub fn send_dlc_message(&self, peer: PublicKey, msg: TenTenOneMessage) -> Result<()> { self.store_dlc_message(peer, msg.clone())?; send_dlc_message( @@ -91,7 +91,7 @@ impl DlcHandler { Ok(()) } - pub fn store_dlc_message(&self, peer: PublicKey, msg: Message) -> Result<()> { + pub fn store_dlc_message(&self, peer: PublicKey, msg: TenTenOneMessage) -> Result<()> { let mut conn = db::connection()?; let serialized_outbound_message = SerializedDlcMessage::try_from(&msg)?; @@ -111,7 +111,7 @@ impl DlcHandler { db::last_outbound_dlc_messages::LastOutboundDlcMessage::get(&mut conn, &peer)?; if let Some(last_serialized_message) = last_serialized_message { - let message = Message::try_from(&last_serialized_message)?; + let message = TenTenOneMessage::try_from(&last_serialized_message)?; send_dlc_message( &self.node.inner.dlc_message_handler, &self.node.inner.peer_manager, diff --git a/mobile/native/src/emergency_kit.rs b/mobile/native/src/emergency_kit.rs index 716013d8e..058d52c40 100644 --- a/mobile/native/src/emergency_kit.rs +++ b/mobile/native/src/emergency_kit.rs @@ -20,11 +20,11 @@ use dlc_manager::contract::Contract; use dlc_manager::DlcChannelId; use dlc_manager::Signer; use dlc_messages::channel::SettleFinalize; -use dlc_messages::ChannelMessage; -use dlc_messages::Message; use hex::FromHex; use lightning::ln::chan_utils::build_commitment_secret; use ln_dlc_node::bitcoin_conversion::to_secp_sk_29; +use ln_dlc_node::message_handler::TenTenOneMessage; +use ln_dlc_node::message_handler::TenTenOneSettleFinalize; use ln_dlc_node::node::event::NodeEvent; use time::OffsetDateTime; use trade::ContractSymbol; @@ -171,11 +171,13 @@ pub fn resend_settle_finalize_message() -> Result<()> { signed_channel.update_idx + 1, ))?; - let msg = Message::Channel(ChannelMessage::SettleFinalize(SettleFinalize { - channel_id: signed_channel.channel_id, - prev_per_update_secret: to_secp_sk_29(prev_per_update_secret), - reference_id: signed_channel.reference_id, - })); + let msg = TenTenOneMessage::SettleFinalize(TenTenOneSettleFinalize { + settle_finalize: SettleFinalize { + channel_id: signed_channel.channel_id, + prev_per_update_secret: to_secp_sk_29(prev_per_update_secret), + reference_id: signed_channel.reference_id, + }, + }); node.inner.event_handler.publish(NodeEvent::SendDlcMessage { peer: coordinator_pubkey, diff --git a/mobile/native/src/ln_dlc/node.rs b/mobile/native/src/ln_dlc/node.rs index 579336309..02eb54bdc 100644 --- a/mobile/native/src/ln_dlc/node.rs +++ b/mobile/native/src/ln_dlc/node.rs @@ -16,24 +16,30 @@ use anyhow::Context; use anyhow::Result; use bitcoin::secp256k1::PublicKey; use dlc_manager::ReferenceId; +use dlc_messages::channel::CollaborativeCloseOffer; use dlc_messages::channel::OfferChannel; use dlc_messages::channel::Reject; use dlc_messages::channel::RenewOffer; use dlc_messages::channel::SettleOffer; -use dlc_messages::ChannelMessage; -use dlc_messages::Message; use lightning::chain::transaction::OutPoint; use lightning::sign::DelayedPaymentOutputDescriptor; use lightning::sign::SpendableOutputDescriptor; use lightning::sign::StaticPaymentOutputDescriptor; -use ln_dlc_node::bitcoin_conversion::to_secp_pk_29; use ln_dlc_node::bitcoin_conversion::to_secp_pk_30; use ln_dlc_node::dlc_message::DlcMessage; use ln_dlc_node::dlc_message::SerializedDlcMessage; +use ln_dlc_node::message_handler::TenTenOneAcceptChannel; +use ln_dlc_node::message_handler::TenTenOneCollaborativeCloseOffer; +use ln_dlc_node::message_handler::TenTenOneMessage; +use ln_dlc_node::message_handler::TenTenOneOfferChannel; +use ln_dlc_node::message_handler::TenTenOneReject; +use ln_dlc_node::message_handler::TenTenOneRenewAccept; +use ln_dlc_node::message_handler::TenTenOneRenewOffer; +use ln_dlc_node::message_handler::TenTenOneSettleOffer; use ln_dlc_node::node; -use ln_dlc_node::node::dlc_message_name; use ln_dlc_node::node::event::NodeEvent; use ln_dlc_node::node::rust_dlc_manager::DlcChannelId; +use ln_dlc_node::node::tentenone_message_name; use ln_dlc_node::node::NodeInfo; use ln_dlc_node::node::RunningNode; use ln_dlc_node::transaction::Transaction; @@ -140,7 +146,7 @@ impl Node { .get_and_clear_received_messages(); for (node_id, msg) in messages { - let msg_name = dlc_message_name(&msg); + let msg_name = tentenone_message_name(&msg); if let Err(e) = self.process_dlc_message(to_secp_pk_30(node_id), msg) { tracing::error!( from = %node_id, @@ -151,13 +157,13 @@ impl Node { } } - /// [`process_dlc_message`] processes incoming dlc channel messages and updates the 10101 + /// [`process_dlc_message`] processes incoming dlc messages and updates the 10101 /// position accordingly. /// - Any other message will be ignored. - /// - Any dlc channel message that has already been processed will be skipped. + /// - Any dlc message that has already been processed will be skipped. /// - /// If an offer is received [`ChannelMessage::Offer`], [`ChannelMessage::SettleOffer`], - /// [`ChannelMessage::CollaborativeCloseOffer`], [`ChannelMessage::RenewOffer`] will get + /// If an offer is received [`TenTenOneMessage::Offer`], [`TenTenOneMessage::SettleOffer`], + /// [`TenTenOneMessage::CollaborativeCloseOffer`], [`TenTenOneMessage::RenewOffer`] will get /// automatically accepted. Unless the maturity date of the offer is already outdated. /// /// FIXME(holzeis): This function manipulates different data objects in different data sources @@ -166,171 +172,135 @@ impl Node { /// (1) use a single data source for the 10101 data and the rust-dlc data. /// (2) wrap the function into a db transaction which can be atomically rolled back on error or /// committed on success. - fn process_dlc_message(&self, node_id: PublicKey, msg: Message) -> Result<()> { + fn process_dlc_message(&self, node_id: PublicKey, msg: TenTenOneMessage) -> Result<()> { tracing::info!( from = %node_id, - kind = %dlc_message_name(&msg), + kind = %tentenone_message_name(&msg), "Processing message" ); - let resp = match &msg { - Message::OnChain(_) | Message::SubChannel(_) => { - tracing::warn!("Ignoring unexpected dlc message."); - None - } - Message::Channel(channel_msg) => { - tracing::debug!( - from = %node_id, - "Received channel message" - ); + tracing::debug!( + from = %node_id, + "Received message" + ); - let inbound_msg = { - let mut conn = db::connection()?; - let serialized_inbound_message = SerializedDlcMessage::try_from(&msg)?; - let inbound_msg = DlcMessage::new(node_id, serialized_inbound_message, true)?; - match db::dlc_messages::DlcMessage::get(&mut conn, &inbound_msg.message_hash)? { - Some(_) => { - tracing::debug!(%node_id, kind=%dlc_message_name(&msg), "Received message that has already been processed, skipping."); - return Ok(()); - } - None => inbound_msg, - } - }; + let inbound_msg = { + let mut conn = db::connection()?; + let serialized_inbound_message = SerializedDlcMessage::try_from(&msg)?; + let inbound_msg = DlcMessage::new(node_id, serialized_inbound_message, true)?; + match db::dlc_messages::DlcMessage::get(&mut conn, &inbound_msg.message_hash)? { + Some(_) => { + tracing::debug!(%node_id, kind=%tentenone_message_name(&msg), "Received message that has already been processed, skipping."); + return Ok(()); + } + None => inbound_msg, + } + }; - let resp = match self - .inner - .dlc_manager - .on_dlc_message(&msg, to_secp_pk_29(node_id)) - .with_context(|| { - format!( - "Failed to handle {} message from {node_id}", - dlc_message_name(&msg) - ) - }) { - Ok(resp) => resp, - Err(e) => { - match &channel_msg { - ChannelMessage::Offer(OfferChannel { + let resp = match self + .inner + .process_tentenone_message(msg.clone(), node_id) + .with_context(|| { + format!( + "Failed to handle {} message from {node_id}", + tentenone_message_name(&msg) + ) + }) { + Ok(resp) => resp, + Err(e) => { + match &msg { + TenTenOneMessage::Offer(TenTenOneOfferChannel { + offer_channel: + OfferChannel { temporary_channel_id: channel_id, reference_id, .. - }) - | ChannelMessage::SettleOffer(SettleOffer { + }, + }) + | TenTenOneMessage::SettleOffer(TenTenOneSettleOffer { + settle_offer: + SettleOffer { channel_id, reference_id, .. - }) - | ChannelMessage::RenewOffer(RenewOffer { + }, + }) + | TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { + renew_offer: + RenewOffer { channel_id, reference_id, .. - }) => { - if let Err(e) = - self.force_reject_offer(node_id, *channel_id, *reference_id) - { - tracing::error!( - channel_id = hex::encode(channel_id), - "Failed to reject offer. Error: {e:#}" - ); - } - } - _ => {} + }, + }) => { + if let Err(e) = self.force_reject_offer(node_id, *channel_id, *reference_id) + { + tracing::error!( + channel_id = hex::encode(channel_id), + "Failed to reject offer. Error: {e:#}" + ); } - - return Err(e); } - }; - - if let Some(resp) = resp.clone() { - // store dlc message immediately so we do not lose the response if something - // goes wrong afterwards. - self.inner - .event_handler - .publish(NodeEvent::StoreDlcMessage { - peer: node_id, - msg: resp, - }); + _ => {} } - { - let mut conn = db::connection()?; - db::dlc_messages::DlcMessage::insert(&mut conn, inbound_msg)?; - } + return Err(e); + } + }; - match channel_msg { - ChannelMessage::Offer(offer) => { - tracing::info!( - channel_id = hex::encode(offer.temporary_channel_id), - "Automatically accepting dlc channel offer" - ); - self.process_dlc_channel_offer(&offer.temporary_channel_id)?; - } - ChannelMessage::SettleOffer(offer) => { - tracing::info!( - channel_id = hex::encode(offer.channel_id), - "Automatically accepting settle offer" - ); - self.process_settle_offer(&offer.channel_id)?; - } - ChannelMessage::RenewOffer(r) => { - tracing::info!( - channel_id = hex::encode(r.channel_id), - "Automatically accepting renew offer" - ); + if let Some(msg) = resp.clone() { + // store dlc message immediately so we do not lose the response if something + // goes wrong afterwards. + self.inner + .event_handler + .publish(NodeEvent::StoreDlcMessage { peer: node_id, msg }); + } - let expiry_timestamp = OffsetDateTime::from_unix_timestamp( - r.contract_info.get_closest_maturity_date() as i64, - )?; - self.process_renew_offer(&r.channel_id, expiry_timestamp)?; - } - ChannelMessage::RenewRevoke(r) => { - let channel_id_hex = hex::encode(r.channel_id); + { + let mut conn = db::connection()?; + db::dlc_messages::DlcMessage::insert(&mut conn, inbound_msg)?; + } - tracing::info!( - channel_id = %channel_id_hex, - "Finished renew protocol" - ); + match msg { + TenTenOneMessage::Offer(offer) => { + tracing::info!( + channel_id = hex::encode(offer.offer_channel.temporary_channel_id), + "Automatically accepting dlc channel offer" + ); + self.process_dlc_channel_offer(&offer.offer_channel.temporary_channel_id)?; + } + TenTenOneMessage::SettleOffer(offer) => { + tracing::info!( + channel_id = hex::encode(offer.settle_offer.channel_id), + "Automatically accepting settle offer" + ); + self.process_settle_offer(&offer.settle_offer.channel_id)?; + } + TenTenOneMessage::RenewOffer(offer) => { + tracing::info!( + channel_id = hex::encode(offer.renew_offer.channel_id), + "Automatically accepting renew offer" + ); - let expiry_timestamp = self - .inner - .get_expiry_for_confirmed_dlc_channel(&r.channel_id)?; - - match db::get_order_in_filling()? { - Some(_) => { - let filled_order = order::handler::order_filled() - .context("Cannot mark order as filled for confirmed DLC")?; - - update_position_after_dlc_channel_creation_or_update( - filled_order, - expiry_timestamp, - ) - .context("Failed to update position after DLC creation")?; - - // In case of a restart. - event::publish(&EventInternal::BackgroundNotification( - BackgroundTask::RecoverDlc(TaskStatus::Success), - )); - } - // If there is no order in `Filling` we must be rolling over. - None => { - tracing::info!( - channel_id = %channel_id_hex, - "Finished rolling over position" - ); - - position::handler::set_position_state(PositionState::Open)?; - - event::publish(&EventInternal::BackgroundNotification( - BackgroundTask::Rollover(TaskStatus::Success), - )); - } - }; - } - ChannelMessage::Sign(signed) => { - let expiry_timestamp = self - .inner - .get_expiry_for_confirmed_dlc_channel(&signed.channel_id)?; + let expiry_timestamp = OffsetDateTime::from_unix_timestamp( + offer.renew_offer.contract_info.get_closest_maturity_date() as i64, + )?; + self.process_renew_offer(&offer.renew_offer.channel_id, expiry_timestamp)?; + } + TenTenOneMessage::RenewRevoke(revoke) => { + let channel_id_hex = hex::encode(revoke.renew_revoke.channel_id); + + tracing::info!( + channel_id = %channel_id_hex, + "Finished renew protocol" + ); + + let expiry_timestamp = self + .inner + .get_expiry_for_confirmed_dlc_channel(&revoke.renew_revoke.channel_id)?; + match db::get_order_in_filling()? { + Some(_) => { let filled_order = order::handler::order_filled() .context("Cannot mark order as filled for confirmed DLC")?; @@ -340,59 +310,91 @@ impl Node { ) .context("Failed to update position after DLC creation")?; - // Sending always a recover dlc background notification success message here - // as we do not know if we might have reached this state after a restart. - // This event is only received by the UI at the moment indicating that the - // dialog can be closed. If the dialog is not open, this event would be - // simply ignored by the UI. - // - // FIXME(holzeis): We should not require that event and align the UI - // handling with waiting for an order execution in the happy case with - // waiting for an order execution after an in between restart. For now it - // was the easiest to go parallel to that implementation so that we don't - // have to touch it. + // In case of a restart. event::publish(&EventInternal::BackgroundNotification( BackgroundTask::RecoverDlc(TaskStatus::Success), )); } - ChannelMessage::SettleConfirm(_) => { - tracing::debug!("Position based on DLC channel is being closed"); - - let filled_order = order::handler::order_filled()?; + // If there is no order in `Filling` we must be rolling over. + None => { + tracing::info!( + channel_id = %channel_id_hex, + "Finished rolling over position" + ); - update_position_after_dlc_closure(Some(filled_order)) - .context("Failed to update position after DLC closure")?; + position::handler::set_position_state(PositionState::Open)?; - // In case of a restart. event::publish(&EventInternal::BackgroundNotification( - BackgroundTask::RecoverDlc(TaskStatus::Success), + BackgroundTask::Rollover(TaskStatus::Success), )); } - ChannelMessage::CollaborativeCloseOffer(close_offer) => { - let channel_id_hex_string = hex::encode(close_offer.channel_id); - tracing::info!( - channel_id = channel_id_hex_string, - node_id = node_id.to_string(), - "Received an offer to collaboratively close a channel" - ); + }; + } + TenTenOneMessage::Sign(signed) => { + let expiry_timestamp = self + .inner + .get_expiry_for_confirmed_dlc_channel(&signed.sign_channel.channel_id)?; - // TODO(bonomat): we should verify that the proposed amount is acceptable - self.inner - .accept_dlc_channel_collaborative_close(&close_offer.channel_id)?; - } - _ => (), - } + let filled_order = order::handler::order_filled() + .context("Cannot mark order as filled for confirmed DLC")?; - resp + update_position_after_dlc_channel_creation_or_update( + filled_order, + expiry_timestamp, + ) + .context("Failed to update position after DLC creation")?; + + // Sending always a recover dlc background notification success message here + // as we do not know if we might have reached this state after a restart. + // This event is only received by the UI at the moment indicating that the + // dialog can be closed. If the dialog is not open, this event would be + // simply ignored by the UI. + // + // FIXME(holzeis): We should not require that event and align the UI + // handling with waiting for an order execution in the happy case with + // waiting for an order execution after an in between restart. For now it + // was the easiest to go parallel to that implementation so that we don't + // have to touch it. + event::publish(&EventInternal::BackgroundNotification( + BackgroundTask::RecoverDlc(TaskStatus::Success), + )); } - }; + TenTenOneMessage::SettleConfirm(_) => { + tracing::debug!("Position based on DLC channel is being closed"); + + let filled_order = order::handler::order_filled()?; + + update_position_after_dlc_closure(Some(filled_order)) + .context("Failed to update position after DLC closure")?; + + // In case of a restart. + event::publish(&EventInternal::BackgroundNotification( + BackgroundTask::RecoverDlc(TaskStatus::Success), + )); + } + TenTenOneMessage::CollaborativeCloseOffer(TenTenOneCollaborativeCloseOffer { + collaborative_close_offer: CollaborativeCloseOffer { channel_id, .. }, + }) => { + let channel_id_hex_string = hex::encode(channel_id); + tracing::info!( + channel_id = channel_id_hex_string, + node_id = node_id.to_string(), + "Received an offer to collaboratively close a channel" + ); + + // TODO(bonomat): we should verify that the proposed amount is acceptable + self.inner + .accept_dlc_channel_collaborative_close(&channel_id)?; + } + _ => (), + } if let Some(msg) = resp { // Everything has been processed successfully, we can safely send the last dlc message, // that has been stored before. tracing::info!( to = %node_id, - kind = %dlc_message_name(&msg), + kind = %tentenone_message_name(&msg), "Sending message" ); @@ -441,7 +443,7 @@ impl Node { self.send_dlc_message( counterparty, - Message::Channel(ChannelMessage::Reject(reject)), + TenTenOneMessage::Reject(TenTenOneReject { reject }), ) } @@ -469,7 +471,7 @@ impl Node { self.send_dlc_message( to_secp_pk_30(counterparty), - Message::Channel(ChannelMessage::Reject(reject)), + TenTenOneMessage::Reject(TenTenOneReject { reject }), ) } @@ -486,7 +488,7 @@ impl Node { Ok((accept_channel, _, _, node_id)) => { self.send_dlc_message( to_secp_pk_30(node_id), - Message::Channel(ChannelMessage::Accept(accept_channel)), + TenTenOneMessage::Accept(TenTenOneAcceptChannel { accept_channel }), )?; } Err(e) => { @@ -511,7 +513,7 @@ impl Node { self.send_dlc_message( to_secp_pk_30(counterparty), - Message::Channel(ChannelMessage::Reject(reject)), + TenTenOneMessage::Reject(TenTenOneReject { reject }), ) } @@ -544,7 +546,7 @@ impl Node { self.send_dlc_message( to_secp_pk_30(counterparty), - Message::Channel(ChannelMessage::Reject(reject)), + TenTenOneMessage::Reject(TenTenOneReject { reject }), ) } @@ -562,7 +564,7 @@ impl Node { self.send_dlc_message( to_secp_pk_30(node_id), - Message::Channel(ChannelMessage::RenewAccept(renew_accept)), + TenTenOneMessage::RenewAccept(TenTenOneRenewAccept { renew_accept }), )?; } Err(e) => { @@ -575,10 +577,10 @@ impl Node { Ok(()) } - pub fn send_dlc_message(&self, node_id: PublicKey, msg: Message) -> Result<()> { + pub fn send_dlc_message(&self, node_id: PublicKey, msg: TenTenOneMessage) -> Result<()> { tracing::info!( to = %node_id, - kind = %dlc_message_name(&msg), + kind = %tentenone_message_name(&msg), "Sending message" );