diff --git a/src/bin/test_kucoin_order_private_latency.rs b/src/bin/test_kucoin_order_private_latency.rs index f79180c..6e0dd4c 100644 --- a/src/bin/test_kucoin_order_private_latency.rs +++ b/src/bin/test_kucoin_order_private_latency.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + /// Test latency between order and private channel order detection /// Places extreme order in REST, receive extreme order in private channel /// Please configure the buy price to either the current market price or lower for testing purpose @@ -8,10 +10,13 @@ use kucoin_arbitrage::broker::symbol::kucoin::{format_subscription_list, get_sym use kucoin_arbitrage::event::order::OrderEvent; use kucoin_arbitrage::event::orderchange::OrderChangeEvent; use kucoin_arbitrage::model::order::{LimitOrder, OrderSide, OrderType}; +use kucoin_arbitrage::monitor::counter::Counter; +use kucoin_arbitrage::monitor::task::{task_log_mps, task_monitor_channel_mps}; use kucoin_arbitrage::strings::generate_uid; use kucoin_arbitrage::{broker::symbol::filter::symbol_with_quotes, monitor}; -use tokio::sync::broadcast::channel; -use tokio::time::{sleep, Duration}; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::{broadcast, Mutex}; +use tokio::task::JoinSet; #[tokio::main] async fn main() -> Result<(), failure::Error> { @@ -38,35 +43,76 @@ async fn main() -> Result<(), failure::Error> { log::info!("Total orderbook WS sessions: {:?}", subs.len()); // Creates broadcast channels - // for placing order - let (tx_order, rx_order) = channel::(16); - // for getting private order changes - let (tx_orderchange, _rx_orderchange) = channel::(128); + let cx_order = Arc::new(Mutex::new(Counter::new("order"))); + let tx_order = broadcast::channel::(16).0; + let cx_orderchange = Arc::new(Mutex::new(Counter::new("orderchange"))); + let tx_orderchange = broadcast::channel::(128).0; log::info!("Broadcast channels setup"); - // TODO use the tx_order to send orders - tokio::spawn(task_place_order(rx_order, api.clone())); + // monitor tasks + let mut taskpool_monitor = JoinSet::new(); + taskpool_monitor.spawn(task_monitor_channel_mps( + tx_order.subscribe(), + cx_order.clone(), + )); + taskpool_monitor.spawn(task_monitor_channel_mps( + tx_orderchange.subscribe(), + cx_orderchange.clone(), + )); + taskpool_monitor.spawn(task_log_mps( + vec![cx_order.clone(), cx_orderchange.clone()], + 10, + )); - tokio::spawn(task_pub_orderchange_event(api.clone(), tx_orderchange)); + let mut taskpool_infrastructure: JoinSet> = JoinSet::new(); + taskpool_infrastructure.spawn(task_place_order(tx_order.subscribe(), api.clone())); + taskpool_infrastructure.spawn(task_place_order_periodically(tx_order.clone(), 10.0)); + taskpool_infrastructure.spawn(task_pub_orderchange_event( + api.clone(), + tx_orderchange.clone(), + )); log::info!("All application tasks setup"); monitor::timer::start("order_placement_network".to_string()).await; monitor::timer::start("order_placement_broadcast".to_string()).await; - // Sends a post order - let event = OrderEvent::PostOrder(LimitOrder { + tokio::select! { + _ = taskpool_infrastructure.join_next() => println!("taskpool_infrastructure stopped unexpectedly"), + _ = task_signal_handle() => println!("received external signal, terminating program"), + }; + Ok(()) +} + +/// wait for any external terminating signal +async fn task_signal_handle() -> Result<(), failure::Error> { + let mut sigterm = signal(SignalKind::terminate()).unwrap(); + let mut sigint = signal(SignalKind::interrupt()).unwrap(); + tokio::select! { + _ = sigterm.recv() => exit_program("SIGTERM").await?, + _ = sigint.recv() => exit_program("SIGINT").await?, + }; + Ok(()) +} + +/// handle external signal +async fn exit_program(signal_alias: &str) -> Result<(), failure::Error> { + log::info!("Received [{signal_alias}] signal"); + Ok(()) +} + +async fn task_place_order_periodically( + tx_order: broadcast::Sender, + interval_s: f64, +) -> Result<(), failure::Error> { + let event = OrderEvent::PlaceOrder(LimitOrder { id: generate_uid(40), order_type: OrderType::Limit, side: OrderSide::Buy, symbol: "BTC-USDT".to_string(), amount: 0.001.to_string(), - price: 29947.0.to_string(), + price: 35000.0.to_string(), }); - if let Err(e) = tx_order.send(event) { - log::error!("{e}"); - } - loop { - // Waits 60 seconds - sleep(Duration::from_secs(60)).await; + tx_order.send(event.clone())?; + tokio::time::sleep(tokio::time::Duration::from_secs_f64(interval_s)).await; } } diff --git a/src/broker/gatekeeper/kucoin.rs b/src/broker/gatekeeper/kucoin.rs index 10484bd..dbe6511 100644 --- a/src/broker/gatekeeper/kucoin.rs +++ b/src/broker/gatekeeper/kucoin.rs @@ -7,6 +7,7 @@ use std::time::SystemTime; use tokio::sync::broadcast::{Receiver, Sender}; // TODO implement when all_taker_btc_usdt is done +// TODO implement profit maximization /// Broker that accepts chances, then outputs actual orders based on other limiting factors /// Gate Keeper @@ -29,10 +30,6 @@ pub async fn task_gatekeep_chances( match event { ChanceEvent::AllTaker(chance) => { log::info!("All Taker Chance found!\n{chance:?}"); - // TODO conduct profit maximization here - // set up a sized queue here with a timer and a order monitor - // if timeout, close order with market price - // chance.profit // i is [0, 1, 2] for i in 0..3 { let order: LimitOrder = LimitOrder { @@ -47,7 +44,7 @@ pub async fn task_gatekeep_chances( let time_sent = SystemTime::now(); log::info!("time_sent: {time_sent:?}"); - sender.send(OrderEvent::PostOrder(order))?; + sender.send(OrderEvent::PlaceOrder(order))?; let mut amount_untraded = chance.actions[i].price.0; while amount_untraded > 0.0 { diff --git a/src/broker/order/kucoin.rs b/src/broker/order/kucoin.rs index e4e7c0c..220dfba 100644 --- a/src/broker/order/kucoin.rs +++ b/src/broker/order/kucoin.rs @@ -15,53 +15,36 @@ pub async fn task_place_order( match event { OrderEvent::GetAllOrders => { // unimplemented!("mossing source of order_id"); - let status = kucoin.get_recent_orders().await; - if let Err(e) = status { - log::error!("There was an error with kucoin API cancelling all order ({e})"); - } else { - let status_api_data = status.unwrap(); - let data = status_api_data.data.unwrap(); - for datum in data { - log::info!("get_recent_orders obtained {datum:#?}") - } - } + let status = kucoin.get_recent_orders().await?; + log::info!("{status:?}"); } OrderEvent::CancelOrder(order) => { - if let Err(e) = kucoin.cancel_order(order.id().to_string().as_str()).await { - log::error!("There was an error with kucoin API cancelling single order ({e})"); - } - - unimplemented!() + let status = kucoin.cancel_order(order.id().as_ref()).await?; + log::info!("{status:?}"); } OrderEvent::CancelAllOrders => { - unimplemented!("mossing source of symbol and trade_type"); - // if let Err(e) = kucoin.cancel_all_orders(symbol, trade_type).await { - // log::error!("There was an error with kucoin API cancelling all order ({e})"); - // } + unimplemented!(); } - OrderEvent::PostOrder(order) => { - // gge the broadcast duration - let time = monitor::timer::stop("order_placement_broadcast".to_string()).await; - if let Err(e) = time { - log::error!("{e:?}"); - } else { - log::info!("order_placement_broadcast: {:?}", time.unwrap()); - } - - log::info!("order placement\n{order:?}"); - if let Err(e) = kucoin + OrderEvent::PlaceOrder(order) => { + // get the broadcast duration + let time = monitor::timer::stop("order_placement_broadcast".to_string()).await?; + log::info!("order_placement_broadcast: {:?}", time); + let status = kucoin .post_limit_order( - order.id().to_string().as_str(), - order.symbol().as_str(), - order.side().to_string().as_str(), - order.price().as_str(), - order.amount().as_str(), + order.id().as_ref(), + order.symbol().as_ref(), + order.side().as_ref(), + order.price().as_ref(), + order.amount().as_ref(), None, ) - .await - { - log::error!("There was an error with kucoin API placing order ({e})"); - } + .await?; + log::info!("{status:?}"); + } + OrderEvent::PlaceBorrowOrder(_order) => { + // TODO learn more about the function below + // kucoin.post_borrow_order(currency, trade_type, size, max_rate, term) + unimplemented!(); } }; } diff --git a/src/broker/orderchange/kucoin.rs b/src/broker/orderchange/kucoin.rs index b4ecb6d..e95897b 100644 --- a/src/broker/orderchange/kucoin.rs +++ b/src/broker/orderchange/kucoin.rs @@ -15,15 +15,11 @@ pub async fn task_pub_orderchange_event( let mut ws = api.websocket(); let topics = vec![WSTopic::TradeOrders]; ws.subscribe(url_private.clone(), topics).await?; - loop { // Awaits subscription message - let msg = ws.try_next().await; - if let Err(e) = msg { - log::error!("task_pub_orderchange_event error: {e}"); - panic!() - } - let msg = msg?.unwrap(); + let msg = ws.try_next().await?; + let msg = msg.unwrap(); + log::info!("message: {msg:?}"); if let KucoinWebsocketMsg::TradeReceivedMsg(msg) = msg { // TradeReceived is only available to V2. diff --git a/src/event/order.rs b/src/event/order.rs index 540e918..719a476 100644 --- a/src/event/order.rs +++ b/src/event/order.rs @@ -5,5 +5,6 @@ pub enum OrderEvent { GetAllOrders, CancelOrder(LimitOrder), CancelAllOrders, - PostOrder(LimitOrder), + PlaceOrder(LimitOrder), + PlaceBorrowOrder(LimitOrder), }