Skip to content

Commit

Permalink
Merge pull request #2502 from get10101/chore/rework-trade-and-recover…
Browse files Browse the repository at this point in the history
…-dialog

feat: Rework task status dialog
  • Loading branch information
holzeis authored May 7, 2024
2 parents 0d77b18 + 67f7592 commit f0f2710
Show file tree
Hide file tree
Showing 59 changed files with 1,634 additions and 1,683 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ALTER TYPE "Message_Type_Type"
ADD
VALUE IF NOT EXISTS 'RolloverAccept';
ALTER TYPE "Message_Type_Type"
ADD
VALUE IF NOT EXISTS 'RolloverConfirm';
ALTER TYPE "Message_Type_Type"
ADD
VALUE IF NOT EXISTS 'RolloverFinalize';
ALTER TYPE "Message_Type_Type"
ADD
VALUE IF NOT EXISTS 'RolloverRevoke';
20 changes: 10 additions & 10 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,15 @@ async fn main() -> Result<()> {

let running = node.start(dlc_event_receiver)?;

let (tx_user_feed, _rx) = broadcast::channel::<NewUserMessage>(100);

let notification_service = NotificationService::new(opts.fcm_api_key.clone(), pool.clone());

let (_handle, auth_users_notifier) = spawn_delivering_messages_to_authenticated_users(
notification_service.get_sender(),
tx_user_feed.clone(),
);

// an internal channel to send updates about our position
let (tx_position_feed, _rx) = broadcast::channel::<InternalPositionUpdateMessage>(100);

Expand All @@ -162,6 +171,7 @@ async fn main() -> Result<()> {
pool.clone(),
settings.to_node_settings(),
tx_position_feed.clone(),
auth_users_notifier.clone(),
);

// TODO: Pass the tokio metrics into Prometheus
Expand Down Expand Up @@ -244,18 +254,8 @@ async fn main() -> Result<()> {
}
});

let (tx_user_feed, _rx) = broadcast::channel::<NewUserMessage>(100);

let (tx_orderbook_feed, _rx) = broadcast::channel(100);

let notification_service =
NotificationService::new(opts.fcm_api_key.clone(), node.pool.clone());

let (_handle, auth_users_notifier) = spawn_delivering_messages_to_authenticated_users(
notification_service.get_sender(),
tx_user_feed.clone(),
);

let (_handle, trading_sender) = trading::start(
node.clone(),
tx_orderbook_feed.clone(),
Expand Down
12 changes: 10 additions & 2 deletions coordinator/src/db/custom_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,15 @@ impl ToSql<MessageTypeType, Pg> for MessageType {
MessageType::SettleConfirm => out.write_all(b"SettleConfirm")?,
MessageType::SettleFinalize => out.write_all(b"SettleFinalize")?,
MessageType::RenewOffer => out.write_all(b"RenewOffer")?,
MessageType::RolloverOffer => out.write_all(b"RolloverOffer")?,
MessageType::RenewAccept => out.write_all(b"RenewAccept")?,
MessageType::RenewConfirm => out.write_all(b"RenewConfirm")?,
MessageType::RenewFinalize => out.write_all(b"RenewFinalize")?,
MessageType::RenewRevoke => out.write_all(b"RenewRevoke")?,
MessageType::RolloverOffer => out.write_all(b"RolloverOffer")?,
MessageType::RolloverAccept => out.write_all(b"RolloverAccept")?,
MessageType::RolloverConfirm => out.write_all(b"RolloverConfirm")?,
MessageType::RolloverFinalize => out.write_all(b"RolloverFinalize")?,
MessageType::RolloverRevoke => out.write_all(b"RolloverRevoke")?,
MessageType::CollaborativeCloseOffer => out.write_all(b"CollaborativeCloseOffer")?,
MessageType::Reject => out.write_all(b"Reject")?,
}
Expand All @@ -128,11 +132,15 @@ impl FromSql<MessageTypeType, Pg> for MessageType {
b"SettleConfirm" => Ok(MessageType::SettleConfirm),
b"SettleFinalize" => Ok(MessageType::SettleFinalize),
b"RenewOffer" => Ok(MessageType::RenewOffer),
b"RolloverOffer" => Ok(MessageType::RolloverOffer),
b"RenewAccept" => Ok(MessageType::RenewAccept),
b"RenewConfirm" => Ok(MessageType::RenewConfirm),
b"RenewFinalize" => Ok(MessageType::RenewFinalize),
b"RenewRevoke" => Ok(MessageType::RenewRevoke),
b"RolloverOffer" => Ok(MessageType::RolloverOffer),
b"RolloverAccept" => Ok(MessageType::RolloverAccept),
b"RolloverConfirm" => Ok(MessageType::RolloverConfirm),
b"RolloverFinalize" => Ok(MessageType::RolloverFinalize),
b"RolloverRevoke" => Ok(MessageType::RolloverRevoke),
b"CollaborativeCloseOffer" => Ok(MessageType::CollaborativeCloseOffer),
b"Reject" => Ok(MessageType::Reject),
_ => Err("Unrecognized enum variant".into()),
Expand Down
20 changes: 17 additions & 3 deletions coordinator/src/db/dlc_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ pub(crate) enum MessageType {
SettleConfirm,
SettleFinalize,
RenewOffer,
RolloverOffer,
RenewAccept,
RenewConfirm,
RenewFinalize,
RenewRevoke,
RolloverOffer,
RolloverAccept,
RolloverConfirm,
RolloverFinalize,
RolloverRevoke,
CollaborativeCloseOffer,
Reject,
}
Expand Down Expand Up @@ -103,11 +107,15 @@ impl From<xxi_node::dlc_message::DlcMessageType> for MessageType {
xxi_node::dlc_message::DlcMessageType::SettleConfirm => Self::SettleConfirm,
xxi_node::dlc_message::DlcMessageType::SettleFinalize => Self::SettleFinalize,
xxi_node::dlc_message::DlcMessageType::RenewOffer => Self::RenewOffer,
xxi_node::dlc_message::DlcMessageType::RolloverOffer => Self::RolloverOffer,
xxi_node::dlc_message::DlcMessageType::RenewAccept => Self::RenewAccept,
xxi_node::dlc_message::DlcMessageType::RenewConfirm => Self::RenewConfirm,
xxi_node::dlc_message::DlcMessageType::RenewFinalize => Self::RenewFinalize,
xxi_node::dlc_message::DlcMessageType::RenewRevoke => Self::RenewRevoke,
xxi_node::dlc_message::DlcMessageType::RolloverOffer => Self::RolloverOffer,
xxi_node::dlc_message::DlcMessageType::RolloverAccept => Self::RolloverAccept,
xxi_node::dlc_message::DlcMessageType::RolloverConfirm => Self::RolloverConfirm,
xxi_node::dlc_message::DlcMessageType::RolloverFinalize => Self::RolloverFinalize,
xxi_node::dlc_message::DlcMessageType::RolloverRevoke => Self::RolloverRevoke,
xxi_node::dlc_message::DlcMessageType::CollaborativeCloseOffer => {
Self::CollaborativeCloseOffer
}
Expand Down Expand Up @@ -139,11 +147,17 @@ impl From<MessageType> for xxi_node::dlc_message::DlcMessageType {
MessageType::SettleConfirm => xxi_node::dlc_message::DlcMessageType::SettleConfirm,
MessageType::SettleFinalize => xxi_node::dlc_message::DlcMessageType::SettleFinalize,
MessageType::RenewOffer => xxi_node::dlc_message::DlcMessageType::RenewOffer,
MessageType::RolloverOffer => xxi_node::dlc_message::DlcMessageType::RolloverOffer,
MessageType::RenewAccept => xxi_node::dlc_message::DlcMessageType::RenewAccept,
MessageType::RenewConfirm => xxi_node::dlc_message::DlcMessageType::RenewConfirm,
MessageType::RenewFinalize => xxi_node::dlc_message::DlcMessageType::RenewFinalize,
MessageType::RenewRevoke => xxi_node::dlc_message::DlcMessageType::RenewRevoke,
MessageType::RolloverOffer => xxi_node::dlc_message::DlcMessageType::RolloverOffer,
MessageType::RolloverAccept => xxi_node::dlc_message::DlcMessageType::RolloverAccept,
MessageType::RolloverConfirm => xxi_node::dlc_message::DlcMessageType::RolloverConfirm,
MessageType::RolloverFinalize => {
xxi_node::dlc_message::DlcMessageType::RolloverFinalize
}
MessageType::RolloverRevoke => xxi_node::dlc_message::DlcMessageType::RolloverRevoke,
MessageType::CollaborativeCloseOffer => {
xxi_node::dlc_message::DlcMessageType::CollaborativeCloseOffer
}
Expand Down
11 changes: 11 additions & 0 deletions coordinator/src/emergency_kit.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::node::Node;
use crate::orderbook::db;
use anyhow::Context;
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use bitcoin_old::secp256k1::SecretKey;
use dlc_manager::Signer;
use dlc_messages::channel::RenewRevoke;
use lightning::ln::chan_utils::build_commitment_secret;
use xxi_node::commons::OrderState;
use xxi_node::message_handler::TenTenOneMessage;
use xxi_node::message_handler::TenTenOneRenewRevoke;
use xxi_node::node::event::NodeEvent;
Expand All @@ -25,7 +28,15 @@ impl Node {
signed_channel.update_idx + 1,
))?;

let mut conn = self.pool.clone().get()?;
// We assume the last taken order to be the relevant order.
let order = db::orders::get_by_trader_id_and_state(&mut conn, trader, OrderState::Taken)?
.with_context(|| {
format!("Couldn't find last order in state taken. trader_id={trader}")
})?;

let msg = TenTenOneMessage::RenewRevoke(TenTenOneRenewRevoke {
order_id: order.id,
renew_revoke: RenewRevoke {
channel_id: signed_channel.channel_id,
per_update_secret: prev_per_update_secret,
Expand Down
67 changes: 61 additions & 6 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::db;
use crate::dlc_protocol;
use crate::dlc_protocol::ProtocolId;
use crate::message::OrderbookMessage;
use crate::node::storage::NodeStorage;
use crate::position::models::PositionState;
use crate::storage::CoordinatorTenTenOneStorage;
Expand All @@ -22,16 +23,22 @@ use dlc_messages::channel::SettleFinalize;
use dlc_messages::channel::SignChannel;
use std::sync::Arc;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use xxi_node::bitcoin_conversion::to_secp_pk_29;
use xxi_node::bitcoin_conversion::to_secp_pk_30;
use xxi_node::commons::Message::RolloverError;
use xxi_node::commons::Message::TradeError;
use xxi_node::commons::TradingError;
use xxi_node::dlc_message::DlcMessage;
use xxi_node::dlc_message::SerializedDlcMessage;
use xxi_node::message_handler::TenTenOneAcceptChannel;
use xxi_node::message_handler::TenTenOneCollaborativeCloseOffer;
use xxi_node::message_handler::TenTenOneMessage;
use xxi_node::message_handler::TenTenOneMessageType;
use xxi_node::message_handler::TenTenOneReject;
use xxi_node::message_handler::TenTenOneRenewFinalize;
use xxi_node::message_handler::TenTenOneRolloverFinalize;
use xxi_node::message_handler::TenTenOneSettleFinalize;
use xxi_node::message_handler::TenTenOneSignChannel;
use xxi_node::node;
Expand Down Expand Up @@ -68,6 +75,7 @@ pub struct Node {
pub pool: Pool<ConnectionManager<PgConnection>>,
pub settings: Arc<RwLock<NodeSettings>>,
tx_position_feed: Sender<InternalPositionUpdateMessage>,
trade_notifier: mpsc::Sender<OrderbookMessage>,
}

impl Node {
Expand All @@ -83,13 +91,15 @@ impl Node {
pool: Pool<ConnectionManager<PgConnection>>,
settings: NodeSettings,
tx_position_feed: Sender<InternalPositionUpdateMessage>,
trade_notifier: mpsc::Sender<OrderbookMessage>,
) -> Self {
Self {
inner,
pool,
settings: Arc::new(RwLock::new(settings)),
_running: Arc::new(running),
tx_position_feed,
trade_notifier,
}
}

Expand Down Expand Up @@ -126,6 +136,42 @@ impl Node {
);
}

tokio::spawn({
let trade_notifier = self.trade_notifier.clone();
let error = TradingError::Other(format!("{e:#}"));
async move {
let message = match msg.get_tentenone_message_type() {
TenTenOneMessageType::Trade
| TenTenOneMessageType::Expire
| TenTenOneMessageType::Liquidate => {
if let Some(order_id) = msg.get_order_id() {
OrderbookMessage::TraderMessage {
trader_id: to_secp_pk_30(node_id),
message: TradeError { order_id, error },
notification: None,
}
} else {
tracing::warn!("Could not send trade error to user due to missing order id");
return;
}
}
TenTenOneMessageType::Rollover => OrderbookMessage::TraderMessage {
trader_id: to_secp_pk_30(node_id),
message: RolloverError { error },
notification: None,
},
TenTenOneMessageType::Other => {
tracing::debug!("Not sending errors to the app unrelated to a trade or rollover.");
return;
}
};

if let Err(e) = trade_notifier.send(message).await {
tracing::error!("Failed to send trade error to user. Error: {e:#}");
}
}
});

tracing::error!(
from = %node_id,
kind = %msg_name,
Expand All @@ -151,8 +197,9 @@ impl Node {
/// - Any message that has already been processed will be skipped.
///
/// Offers such as [`TenTenOneMessage::Offer`], [`TenTenOneMessage::SettleOffer`],
/// [`TenTenOneMessage::CollaborativeCloseOffer`] and [`TenTenOneMessage::RenewOffer`] are
/// automatically accepted. Unless the maturity date of the offer is already outdated.
/// [`TenTenOneMessage::RolloverOffer`], [`TenTenOneMessage::CollaborativeCloseOffer`] and
/// [`TenTenOneMessage::RenewOffer`] are automatically accepted. Unless the maturity date of
/// the offer is already outdated.
///
/// FIXME(holzeis): This function manipulates different data objects from different data sources
/// and should use a transaction to make all changes atomic. Not doing so risks ending up in an
Expand Down Expand Up @@ -223,11 +270,16 @@ impl Node {
reference_id,
..
},
..
})
| TenTenOneMessage::RolloverFinalize(TenTenOneRolloverFinalize {
renew_finalize:
RenewFinalize {
channel_id,
reference_id,
..
},
}) => {
// TODO: Receiving this message used to be specific to rolling over, but we
// now use the renew protocol for all (non-closing)
// trades beyond the first one.

let channel_id_hex_string = hex::encode(channel_id);

let reference_id = match reference_id {
Expand Down Expand Up @@ -270,6 +322,7 @@ impl Node {
reference_id,
..
},
..
}) => {
let channel_id_hex_string = hex::encode(channel_id);

Expand Down Expand Up @@ -324,10 +377,12 @@ impl Node {
reference_id,
..
},
..
}) => {
let channel_id = match resp {
Some(TenTenOneMessage::Sign(TenTenOneSignChannel {
sign_channel: SignChannel { channel_id, .. },
..
})) => channel_id,
_ => *temporary_channel_id,
};
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/orderbook/db/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ pub fn get_by_trader_id_and_state(
orders::table
.filter(orders::trader_id.eq(trader_id.to_string()))
.filter(orders::order_state.eq(OrderState::from(order_state)))
.order_by(orders::timestamp.desc())
.first::<Order>(conn)
.map(OrderbookOrder::from)
.optional()
Expand Down
Loading

0 comments on commit f0f2710

Please sign in to comment.