Skip to content

Commit

Permalink
Merge pull request #30 from kanekoshoyu/feature/cloneConfig
Browse files Browse the repository at this point in the history
Feature/clone config
  • Loading branch information
kanekoshoyu authored Oct 14, 2023
2 parents 1c538ca + 24c6f5f commit bf09c23
Show file tree
Hide file tree
Showing 28 changed files with 115 additions and 118 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kucoin_arbitrage"
version = "0.0.12"
version = "0.0.13"
edition = "2021"
authors = ["Sho Kaneko <kanekoshoyu@gmail.com>"]
description = "Event-Driven Kucoin Arbitrage Framework in Async Rust"
Expand All @@ -10,8 +10,8 @@ license = "MIT"

[dependencies]
# async
futures = "0.3"
tokio = { version = "1.27.0", features = ["full"] }
futures = "0.3.28"
tokio = { version = "1.33.0", features = ["full"] }
tokio-signal = "0.2.9"
# log
log = "0.4"
Expand All @@ -22,11 +22,11 @@ toml = "0.8.2"
serde = "1.0.188"
serde_derive = "1.0.188"
# debug
lazy_static = "1.4"
lazy_static = "1.4.0"
failure = "0.1.8"
# exchange
kucoin_api = "1.4.10"
chrono = "0.4"
chrono = "0.4.31"
# number
ordered-float = "3.6.0"
num-traits = "0.2.15"
Expand Down
6 changes: 4 additions & 2 deletions src/bin/event_orderbook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use kucoin_arbitrage::broker::orderchange::kucoin::task_pub_orderchange_event;
use kucoin_arbitrage::broker::symbol::filter::symbol_with_quotes;
use kucoin_arbitrage::broker::symbol::kucoin::{format_subscription_list, get_symbols};
use kucoin_arbitrage::event::{orderbook::OrderbookEvent, orderchange::OrderChangeEvent};
use kucoin_arbitrage::global::task::task_log_mps;
use kucoin_arbitrage::model::{counter::Counter, orderbook::FullOrderbook};
use kucoin_arbitrage::model::orderbook::FullOrderbook;
use kucoin_arbitrage::monitor::counter::Counter;
use kucoin_arbitrage::monitor::task::task_log_mps;
use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::broadcast::channel;
use tokio::sync::Mutex;
use tokio::task::JoinSet;

#[tokio::main]
async fn main() -> Result<(), failure::Error> {
// logging format
Expand Down
6 changes: 4 additions & 2 deletions src/bin/event_triangular.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ use kucoin_arbitrage::event::{
chance::ChanceEvent, order::OrderEvent, orderbook::OrderbookEvent,
orderchange::OrderChangeEvent,
};
use kucoin_arbitrage::global::task::task_log_mps;
use kucoin_arbitrage::model::{counter::Counter, orderbook::FullOrderbook};
use kucoin_arbitrage::model::orderbook::FullOrderbook;
use kucoin_arbitrage::monitor::counter::Counter;
use kucoin_arbitrage::monitor::task::task_log_mps;
use kucoin_arbitrage::strategy::all_taker_btc_usd::task_pub_chance_all_taker_btc_usd;
use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::broadcast::channel;
use tokio::sync::Mutex;
use tokio::task::JoinSet;

#[tokio::main]
async fn main() -> Result<(), failure::Error> {
// logging format
Expand Down
10 changes: 5 additions & 5 deletions src/bin/sample_orderbook_ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use kucoin_api::{
};
use kucoin_arbitrage::broker::symbol::filter::symbol_with_quotes;
use kucoin_arbitrage::broker::symbol::kucoin::get_symbols;
use kucoin_arbitrage::model::counter::Counter;
use kucoin_arbitrage::model::symbol::SymbolInfo;
use kucoin_arbitrage::monitor::counter;
use std::sync::Arc;
use tokio::sync::Mutex;

Expand All @@ -17,7 +17,7 @@ async fn main() -> Result<(), failure::Error> {
// provide logging format
kucoin_arbitrage::logger::log_init();
log::info!("Log setup");
let counter = Arc::new(Mutex::new(Counter::new("api_input")));
let counter = Arc::new(Mutex::new(counter::Counter::new("api_input")));

// config
let config = kucoin_arbitrage::config::from_file("config.toml")?;
Expand Down Expand Up @@ -46,7 +46,7 @@ async fn main() -> Result<(), failure::Error> {
tokio::spawn(sync_tickers(ws, counter.clone()));
log::info!("{i:?}-th session of WS subscription setup");
}
let _res = tokio::join!(kucoin_arbitrage::global::task::task_log_mps(
let _res = tokio::join!(kucoin_arbitrage::monitor::task::task_log_mps(
vec![counter.clone(),],
monitor_interval as u64
));
Expand All @@ -55,7 +55,7 @@ async fn main() -> Result<(), failure::Error> {

async fn sync_tickers(
mut ws: KucoinWebsocket,
counter: Arc<Mutex<Counter>>,
counter: Arc<Mutex<counter::Counter>>,
) -> Result<(), kucoin_api::failure::Error> {
while let Some(msg) = ws.try_next().await? {
// add matches for multi-subscribed sockets handling
Expand All @@ -68,7 +68,7 @@ async fn sync_tickers(
}
KucoinWebsocketMsg::OrderBookMsg(msg) => {
let _ = msg.data;
kucoin_arbitrage::global::counter_helper::increment(counter.clone()).await;
counter::increment(counter.clone()).await;
}
_ => {
panic!("unexpected msgs received: {msg:?}")
Expand Down
9 changes: 4 additions & 5 deletions src/bin/test_kucoin_order_private_latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use kucoin_arbitrage::broker::orderchange::kucoin::task_pub_orderchange_event;
use kucoin_arbitrage::broker::symbol::kucoin::{format_subscription_list, get_symbols};
use kucoin_arbitrage::event::order::OrderEvent;
use kucoin_arbitrage::event::orderchange::OrderChangeEvent;
use kucoin_arbitrage::model::counter::Counter;
use kucoin_arbitrage::model::order::{LimitOrder, OrderSide, OrderType};
use kucoin_arbitrage::monitor::counter::Counter;
use kucoin_arbitrage::strings::generate_uid;
use kucoin_arbitrage::{broker::symbol::filter::symbol_with_quotes, global};
use kucoin_arbitrage::{broker::symbol::filter::symbol_with_quotes, monitor};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::channel;
Expand Down Expand Up @@ -61,9 +61,8 @@ async fn main() -> Result<(), failure::Error> {
tokio::spawn(task_pub_orderchange_event(api.clone(), tx_orderchange));

log::info!("All application tasks setup");

global::timer::start("order_placement_network".to_string()).await;
global::timer::start("order_placement_broadcast".to_string()).await;
monitor::timer::start("order_placement_network".to_string()).await;
monitor::timer::start("order_placement_broadcast".to_string()).await;
// Sends a post order
let event = OrderEvent::PostOrder(LimitOrder {
id: generate_uid(40),
Expand Down
11 changes: 5 additions & 6 deletions src/bin/test_kucoin_orderbook_rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use kucoin_api::{
model::websocket::{KucoinWebsocketMsg, WSTopic, WSType},
websocket::KucoinWebsocket,
};
use kucoin_arbitrage::global::counter_helper;
use kucoin_arbitrage::model::counter::Counter;
use kucoin_arbitrage::monitor::counter;
use std::sync::Arc;
use tokio::sync::Mutex;

Expand All @@ -17,7 +16,7 @@ use tokio::sync::Mutex;
async fn main() -> Result<(), failure::Error> {
// provide logging format
kucoin_arbitrage::logger::log_init();
let counter = Arc::new(Mutex::new(Counter::new("api_input")));
let counter = Arc::new(Mutex::new(counter::Counter::new("api_input")));
log::info!("Testing Kucoin WS Message Rate");

// config
Expand All @@ -38,7 +37,7 @@ async fn main() -> Result<(), failure::Error> {

log::info!("Async polling");
tokio::spawn(sync_tickers(ws, counter.clone()));
let _res = tokio::join!(kucoin_arbitrage::global::task::task_log_mps(
let _res = tokio::join!(kucoin_arbitrage::monitor::task::task_log_mps(
vec![counter.clone()],
monitor_interval as u64
));
Expand All @@ -47,13 +46,13 @@ async fn main() -> Result<(), failure::Error> {

async fn sync_tickers(
mut ws: KucoinWebsocket,
counter: Arc<Mutex<Counter>>,
counter: Arc<Mutex<counter::Counter>>,
) -> Result<(), failure::Error> {
while let Some(msg) = ws.try_next().await? {
match msg {
KucoinWebsocketMsg::OrderBookMsg(_msg) => {
// TODO make counter more generic
counter_helper::reset(counter.clone()).await;
counter::reset(counter.clone()).await;
}
KucoinWebsocketMsg::PongMsg(_) => continue,
KucoinWebsocketMsg::WelcomeMsg(_) => continue,
Expand Down
7 changes: 3 additions & 4 deletions src/broker/gatekeeper/kucoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use std::sync::Arc;
use crate::event::chance::ChanceEvent;
use crate::event::order::OrderEvent;
use crate::event::orderchange::OrderChangeEvent;
use crate::global::counter_helper;
use crate::model::counter::Counter;
use crate::model::order::{LimitOrder, OrderType};
use crate::monitor::counter;
use crate::strings::generate_uid;
use std::time::SystemTime;
use tokio::sync::broadcast::{Receiver, Sender};
Expand All @@ -23,10 +22,10 @@ pub async fn task_gatekeep_chances(
mut receiver_chance: Receiver<ChanceEvent>,
mut receiver_order_change: Receiver<OrderChangeEvent>,
sender: Sender<OrderEvent>,
counter: Arc<Mutex<Counter>>,
counter: Arc<Mutex<counter::Counter>>,
) -> Result<(), kucoin_api::failure::Error> {
loop {
counter_helper::increment(counter.clone()).await;
counter::increment(counter.clone()).await;
let status = receiver_chance.recv().await;
if let Err(e) = status {
log::error!("gatekeep chance parsing error {e:?}");
Expand Down
11 changes: 6 additions & 5 deletions src/broker/order/kucoin.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::sync::Arc;

use crate::global;
use crate::model::counter::Counter;
use crate::event::order::OrderEvent;
use crate::model::order::Order;
use crate::{event::order::OrderEvent, global::counter_helper};
use crate::monitor;
use crate::monitor::counter;
use crate::monitor::counter::Counter;
use kucoin_api::client::Kucoin;
use tokio::sync::{broadcast, Mutex};

Expand All @@ -14,7 +15,7 @@ pub async fn task_place_order(
counter: Arc<Mutex<Counter>>,
) -> Result<(), kucoin_api::failure::Error> {
loop {
counter_helper::increment(counter.clone()).await;
counter::increment(counter.clone()).await;

let event = receiver.recv().await?;
// println!("Received event: {event:?}");
Expand Down Expand Up @@ -47,7 +48,7 @@ pub async fn task_place_order(
}
OrderEvent::PostOrder(order) => {
// gge the broadcast duration
let time = global::timer::stop("order_placement_broadcast".to_string())
let time = monitor::timer::stop("order_placement_broadcast".to_string())
.await
.unwrap();
log::info!("order_placement_broadcast: {time:?}");
Expand Down
2 changes: 1 addition & 1 deletion src/broker/order/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// Order placement task in KuCoin
// Order placement using KuCoin public REST API
pub mod kucoin;
10 changes: 4 additions & 6 deletions src/broker/orderbook/internal.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use crate::event::orderbook::OrderbookEvent;
use crate::global::counter_helper;
use crate::model::counter::Counter;
use crate::model::orderbook::FullOrderbook;
use crate::monitor::counter;
use crate::monitor::counter::Counter;
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::Mutex;

/// Task to sync local orderbook from API.
/// Subscribes OrderbookEvent.
/// Publishes OrderbookEvent after syncing the local orderbook
/// Subscribe OrderbookEvent, then publish OrderbookEvent after syncing local orderbook
pub async fn task_sync_orderbook(
mut receiver: Receiver<OrderbookEvent>,
sender: Sender<OrderbookEvent>,
local_full_orderbook: Arc<Mutex<FullOrderbook>>,
counter: Arc<Mutex<Counter>>,
) -> Result<(), failure::Error> {
loop {
counter_helper::increment(counter.clone()).await;
counter::increment(counter.clone()).await;
let event = receiver.recv().await?;
let mut full_orderbook = local_full_orderbook.lock().await;
match event {
Expand Down
2 changes: 1 addition & 1 deletion src/broker/orderbook/kucoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio::time::Duration;

/// Subscribe Websocket API, then publish OrderbookEvent directly after conversion to internal model.
/// Subscribe Websocket API, then publish internal OrderbookEvent
pub async fn task_pub_orderbook_event(
api: Kucoin,
topics: Vec<WSTopic>,
Expand Down
4 changes: 2 additions & 2 deletions src/broker/orderbook/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/// Syncing with local copies of orderbook.
/// Syncing local copies of orderbook
pub mod internal;
/// Obtaining orderbook state and changes using Kucoin API
/// Obtaining orderbook state and changes using Kucoin public API
pub mod kucoin;
4 changes: 2 additions & 2 deletions src/broker/orderchange/kucoin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::event::orderchange::OrderChangeEvent;
use crate::global;
use crate::monitor;
use kucoin_api::client::Kucoin;
use kucoin_api::futures::TryStreamExt;
use kucoin_api::model::websocket::{KucoinWebsocketMsg, WSTopic, WSType};
Expand Down Expand Up @@ -31,7 +31,7 @@ pub async fn task_pub_orderchange_event(
// Currently using a more stable TradeOpenMsg, although TradeReceived is always ahead of TradeOpen
log::info!("TradeReceivedMsg: {:?}\n{:#?}", msg.topic, msg.data);
} else if let KucoinWebsocketMsg::TradeOpenMsg(msg) = msg {
let time = global::timer::stop("order_placement_network".to_string())
let time = monitor::timer::stop("order_placement_network".to_string())
.await
.unwrap();
log::info!("order_placement_network: {time:?}");
Expand Down
1 change: 1 addition & 0 deletions src/broker/orderchange/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/// Obtaining order changes using Kucoin private API
pub mod kucoin;
2 changes: 1 addition & 1 deletion src/broker/symbol/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/// Filter functions using internal models
pub mod filter;
/// Uses KuCoin API
/// KuCoin API
pub mod kucoin;
6 changes: 3 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::error::Error;
use kucoin_api::client::Credentials;
use serde_derive::Deserialize;

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Config {
pub kucoin: KuCoin,
pub behaviour: Behaviour,
Expand All @@ -18,14 +18,14 @@ impl Config {
}
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct KuCoin {
pub api_key: String,
pub secret_key: String,
pub passphrase: String,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Behaviour {
pub monitor_interval_sec: u32,
pub usd_cyclic_arbitrage: u32,
Expand Down
6 changes: 0 additions & 6 deletions src/global/mod.rs

This file was deleted.

Loading

0 comments on commit bf09c23

Please sign in to comment.