Skip to content

Commit

Permalink
clean up order events
Browse files Browse the repository at this point in the history
  • Loading branch information
kanekoshoyu committed Oct 25, 2023
1 parent 4c10244 commit f6a9918
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 70 deletions.
82 changes: 64 additions & 18 deletions src/bin/test_kucoin_order_private_latency.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

/// Test latency between order and private channel order detection
/// Places extreme order in REST, receive extreme order in private channel
/// Please configure the buy price to either the current market price or lower for testing purpose
Expand All @@ -8,10 +10,13 @@ use kucoin_arbitrage::broker::symbol::kucoin::{format_subscription_list, get_sym
use kucoin_arbitrage::event::order::OrderEvent;
use kucoin_arbitrage::event::orderchange::OrderChangeEvent;
use kucoin_arbitrage::model::order::{LimitOrder, OrderSide, OrderType};
use kucoin_arbitrage::monitor::counter::Counter;
use kucoin_arbitrage::monitor::task::{task_log_mps, task_monitor_channel_mps};
use kucoin_arbitrage::strings::generate_uid;
use kucoin_arbitrage::{broker::symbol::filter::symbol_with_quotes, monitor};
use tokio::sync::broadcast::channel;
use tokio::time::{sleep, Duration};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{broadcast, Mutex};
use tokio::task::JoinSet;

#[tokio::main]
async fn main() -> Result<(), failure::Error> {
Expand All @@ -38,35 +43,76 @@ async fn main() -> Result<(), failure::Error> {
log::info!("Total orderbook WS sessions: {:?}", subs.len());

// Creates broadcast channels
// for placing order
let (tx_order, rx_order) = channel::<OrderEvent>(16);
// for getting private order changes
let (tx_orderchange, _rx_orderchange) = channel::<OrderChangeEvent>(128);
let cx_order = Arc::new(Mutex::new(Counter::new("order")));
let tx_order = broadcast::channel::<OrderEvent>(16).0;
let cx_orderchange = Arc::new(Mutex::new(Counter::new("orderchange")));
let tx_orderchange = broadcast::channel::<OrderChangeEvent>(128).0;
log::info!("Broadcast channels setup");

// TODO use the tx_order to send orders
tokio::spawn(task_place_order(rx_order, api.clone()));
// monitor tasks
let mut taskpool_monitor = JoinSet::new();
taskpool_monitor.spawn(task_monitor_channel_mps(
tx_order.subscribe(),
cx_order.clone(),
));
taskpool_monitor.spawn(task_monitor_channel_mps(
tx_orderchange.subscribe(),
cx_orderchange.clone(),
));
taskpool_monitor.spawn(task_log_mps(
vec![cx_order.clone(), cx_orderchange.clone()],
10,
));

tokio::spawn(task_pub_orderchange_event(api.clone(), tx_orderchange));
let mut taskpool_infrastructure: JoinSet<Result<(), failure::Error>> = JoinSet::new();
taskpool_infrastructure.spawn(task_place_order(tx_order.subscribe(), api.clone()));
taskpool_infrastructure.spawn(task_place_order_periodically(tx_order.clone(), 10.0));
taskpool_infrastructure.spawn(task_pub_orderchange_event(
api.clone(),
tx_orderchange.clone(),
));

log::info!("All application tasks setup");
monitor::timer::start("order_placement_network".to_string()).await;
monitor::timer::start("order_placement_broadcast".to_string()).await;
// Sends a post order
let event = OrderEvent::PostOrder(LimitOrder {
tokio::select! {
_ = taskpool_infrastructure.join_next() => println!("taskpool_infrastructure stopped unexpectedly"),
_ = task_signal_handle() => println!("received external signal, terminating program"),
};
Ok(())
}

/// wait for any external terminating signal
async fn task_signal_handle() -> Result<(), failure::Error> {
let mut sigterm = signal(SignalKind::terminate()).unwrap();
let mut sigint = signal(SignalKind::interrupt()).unwrap();
tokio::select! {
_ = sigterm.recv() => exit_program("SIGTERM").await?,
_ = sigint.recv() => exit_program("SIGINT").await?,
};
Ok(())
}

/// handle external signal
async fn exit_program(signal_alias: &str) -> Result<(), failure::Error> {
log::info!("Received [{signal_alias}] signal");
Ok(())
}

async fn task_place_order_periodically(
tx_order: broadcast::Sender<OrderEvent>,
interval_s: f64,
) -> Result<(), failure::Error> {
let event = OrderEvent::PlaceOrder(LimitOrder {
id: generate_uid(40),
order_type: OrderType::Limit,
side: OrderSide::Buy,
symbol: "BTC-USDT".to_string(),
amount: 0.001.to_string(),
price: 29947.0.to_string(),
price: 35000.0.to_string(),
});
if let Err(e) = tx_order.send(event) {
log::error!("{e}");
}

loop {
// Waits 60 seconds
sleep(Duration::from_secs(60)).await;
tx_order.send(event.clone())?;
tokio::time::sleep(tokio::time::Duration::from_secs_f64(interval_s)).await;
}
}
7 changes: 2 additions & 5 deletions src/broker/gatekeeper/kucoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::SystemTime;
use tokio::sync::broadcast::{Receiver, Sender};

// TODO implement when all_taker_btc_usdt is done
// TODO implement profit maximization

/// Broker that accepts chances, then outputs actual orders based on other limiting factors
/// Gate Keeper
Expand All @@ -29,10 +30,6 @@ pub async fn task_gatekeep_chances(
match event {
ChanceEvent::AllTaker(chance) => {
log::info!("All Taker Chance found!\n{chance:?}");
// TODO conduct profit maximization here
// set up a sized queue here with a timer and a order monitor
// if timeout, close order with market price
// chance.profit
// i is [0, 1, 2]
for i in 0..3 {
let order: LimitOrder = LimitOrder {
Expand All @@ -47,7 +44,7 @@ pub async fn task_gatekeep_chances(
let time_sent = SystemTime::now();
log::info!("time_sent: {time_sent:?}");

sender.send(OrderEvent::PostOrder(order))?;
sender.send(OrderEvent::PlaceOrder(order))?;

let mut amount_untraded = chance.actions[i].price.0;
while amount_untraded > 0.0 {
Expand Down
61 changes: 22 additions & 39 deletions src/broker/order/kucoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,36 @@ pub async fn task_place_order(
match event {
OrderEvent::GetAllOrders => {
// unimplemented!("mossing source of order_id");
let status = kucoin.get_recent_orders().await;
if let Err(e) = status {
log::error!("There was an error with kucoin API cancelling all order ({e})");
} else {
let status_api_data = status.unwrap();
let data = status_api_data.data.unwrap();
for datum in data {
log::info!("get_recent_orders obtained {datum:#?}")
}
}
let status = kucoin.get_recent_orders().await?;
log::info!("{status:?}");
}
OrderEvent::CancelOrder(order) => {
if let Err(e) = kucoin.cancel_order(order.id().to_string().as_str()).await {
log::error!("There was an error with kucoin API cancelling single order ({e})");
}

unimplemented!()
let status = kucoin.cancel_order(order.id().as_ref()).await?;
log::info!("{status:?}");
}
OrderEvent::CancelAllOrders => {
unimplemented!("mossing source of symbol and trade_type");
// if let Err(e) = kucoin.cancel_all_orders(symbol, trade_type).await {
// log::error!("There was an error with kucoin API cancelling all order ({e})");
// }
unimplemented!();
}
OrderEvent::PostOrder(order) => {
// gge the broadcast duration
let time = monitor::timer::stop("order_placement_broadcast".to_string()).await;
if let Err(e) = time {
log::error!("{e:?}");
} else {
log::info!("order_placement_broadcast: {:?}", time.unwrap());
}

log::info!("order placement\n{order:?}");
if let Err(e) = kucoin
OrderEvent::PlaceOrder(order) => {
// get the broadcast duration
let time = monitor::timer::stop("order_placement_broadcast".to_string()).await?;
log::info!("order_placement_broadcast: {:?}", time);
let status = kucoin
.post_limit_order(
order.id().to_string().as_str(),
order.symbol().as_str(),
order.side().to_string().as_str(),
order.price().as_str(),
order.amount().as_str(),
order.id().as_ref(),
order.symbol().as_ref(),
order.side().as_ref(),
order.price().as_ref(),
order.amount().as_ref(),
None,
)
.await
{
log::error!("There was an error with kucoin API placing order ({e})");
}
.await?;
log::info!("{status:?}");
}
OrderEvent::PlaceBorrowOrder(_order) => {
// TODO learn more about the function below
// kucoin.post_borrow_order(currency, trade_type, size, max_rate, term)
unimplemented!();
}
};
}
Expand Down
10 changes: 3 additions & 7 deletions src/broker/orderchange/kucoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@ pub async fn task_pub_orderchange_event(
let mut ws = api.websocket();
let topics = vec![WSTopic::TradeOrders];
ws.subscribe(url_private.clone(), topics).await?;

loop {
// Awaits subscription message
let msg = ws.try_next().await;
if let Err(e) = msg {
log::error!("task_pub_orderchange_event error: {e}");
panic!()
}
let msg = msg?.unwrap();
let msg = ws.try_next().await?;
let msg = msg.unwrap();
log::info!("message: {msg:?}");

if let KucoinWebsocketMsg::TradeReceivedMsg(msg) = msg {
// TradeReceived is only available to V2.
Expand Down
3 changes: 2 additions & 1 deletion src/event/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ pub enum OrderEvent {
GetAllOrders,
CancelOrder(LimitOrder),
CancelAllOrders,
PostOrder(LimitOrder),
PlaceOrder(LimitOrder),
PlaceBorrowOrder(LimitOrder),
}

0 comments on commit f6a9918

Please sign in to comment.