Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature #27

Merged
merged 2 commits into from
Dec 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 127 additions & 5 deletions src/worker/market_helpers/market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time;

pub fn market_factory(mut spine: MarketSpine) -> Arc<Mutex<dyn Market + Send>> {
pub fn market_factory(
mut spine: MarketSpine,
exchange_pairs: Vec<ExchangePair>,
) -> Arc<Mutex<dyn Market + Send>> {
let mask_pairs = match spine.name.as_ref() {
"binance" => vec![("IOT", "IOTA"), ("USD", "USDT")],
"bitfinex" => vec![("DASH", "dsh"), ("QTUM", "QTM")],
Expand All @@ -32,6 +35,10 @@ pub fn market_factory(mut spine: MarketSpine) -> Arc<Mutex<dyn Market + Send>> {
.get_spine_mut()
.set_arc(Arc::clone(&market));

for exchange_pair in exchange_pairs {
market.lock().unwrap().add_exchange_pair(exchange_pair);
}

market
}

Expand Down Expand Up @@ -64,10 +71,7 @@ fn subscribe_channel(
}

fn update(market: Arc<Mutex<dyn Market + Send>>) {
market.lock().unwrap().get_spine_mut().socket_enabled = true;

let channels = MarketChannels::get_all();

let exchange_pairs: Vec<String> = market
.lock()
.unwrap()
Expand Down Expand Up @@ -174,7 +178,7 @@ pub trait Market {

let market = Arc::clone(self.get_spine().arc.as_ref().unwrap());

let thread_name = format!("fn: update, market: {}", self.get_spine().name,);
let thread_name = format!("fn: update, market: {}", self.get_spine().name);
let thread = thread::Builder::new()
.name(thread_name)
.spawn(move || update(market))
Expand All @@ -185,3 +189,121 @@ pub trait Market {
fn parse_last_trade_info(&mut self, pair: String, info: String);
fn parse_depth_info(&mut self, pair: String, info: String);
}

#[cfg(test)]
mod test {
use crate::worker::defaults::{COINS, FIATS};
use crate::worker::market_helpers::market::{market_factory, update, Market};
use crate::worker::market_helpers::market_channels::MarketChannels;
use crate::worker::market_helpers::market_spine::MarketSpine;
use crate::worker::worker::test::{check_threads, get_worker};
use crate::worker::worker::Worker;
use chrono::{Duration, Utc};
use ntest::timeout;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;

fn get_market(
market_name: Option<&str>,
) -> (Arc<Mutex<dyn Market + Send>>, Receiver<JoinHandle<()>>) {
let market_name = market_name.unwrap_or("binance").to_string();
let fiats = Vec::from(FIATS);
let coins = Vec::from(COINS);
let exchange_pairs = Worker::make_exchange_pairs(coins, fiats);

let (worker, tx, rx) = get_worker();
let market_spine = MarketSpine::new(worker, tx, market_name);
let market = market_factory(market_spine, exchange_pairs);

(market, rx)
}

#[test]
#[should_panic]
fn test_market_factory() {
let (_, _) = get_market(Some("not_existing_market"));
}

#[test]
fn test_market_factory_2() {
let (market, _) = get_market(None);

assert!(market.lock().unwrap().get_spine().arc.is_some());

let fiats = Vec::from(FIATS);
let coins = Vec::from(COINS);
let exchange_pairs = Worker::make_exchange_pairs(coins, fiats);
let exchange_pair_keys: Vec<String> = market
.lock()
.unwrap()
.get_spine()
.get_exchange_pairs()
.keys()
.cloned()
.collect();
assert_eq!(exchange_pair_keys.len(), exchange_pairs.len());

for pair in &exchange_pairs {
let pair_string = market.lock().unwrap().make_pair(pair.get_pair_ref());
assert!(exchange_pair_keys.contains(&pair_string));
}
}

#[test]
#[timeout(3000)]
fn test_perform() {
let (market, rx) = get_market(None);

let thread_names = vec![format!(
"fn: update, market: {}",
market.lock().unwrap().get_spine().name
)];

market.lock().unwrap().perform();

// TODO: Refactor (not always working)
check_threads(thread_names, rx);

let now = Utc::now();
let last_capitalization_refresh = market
.lock()
.unwrap()
.get_spine()
.get_last_capitalization_refresh();
assert!(now - last_capitalization_refresh <= Duration::milliseconds(5000));
}

#[test]
#[timeout(120000)]
/// TODO: Refactor (not always working)
fn test_update() {
let (market, rx) = get_market(None);

let channels = MarketChannels::get_all();
let exchange_pairs: Vec<String> = market
.lock()
.unwrap()
.get_spine()
.get_exchange_pairs()
.keys()
.cloned()
.collect();

let mut thread_names = Vec::new();
for pair in exchange_pairs {
for channel in channels {
let thread_name = format!(
"fn: subscribe_channel, market: {}, pair: {}, channel: {}",
market.lock().unwrap().get_spine().name,
pair,
channel
);
thread_names.push(thread_name);
}
}

update(market);
check_threads(thread_names, rx);
}
}
6 changes: 4 additions & 2 deletions src/worker/market_helpers/market_spine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub struct MarketSpine {
exchange_pairs: HashMap<String, ExchangePairInfo>,
conversions: HashMap<String, ConversionType>,
pairs: HashMap<String, (String, String)>,
pub socket_enabled: bool,
capitalization: HashMap<String, f64>,
last_capitalization_refresh: DateTime<Utc>,
}
Expand All @@ -41,7 +40,6 @@ impl MarketSpine {
exchange_pairs: HashMap::new(),
conversions: HashMap::new(),
pairs: HashMap::new(),
socket_enabled: false,
capitalization: HashMap::new(),
last_capitalization_refresh: MIN_DATETIME,
}
Expand Down Expand Up @@ -281,4 +279,8 @@ impl MarketSpine {
self.update_market_pair(pair, "totalValues", false);
}
}

pub fn get_last_capitalization_refresh(&self) -> DateTime<Utc> {
self.last_capitalization_refresh
}
}
82 changes: 32 additions & 50 deletions src/worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Worker {
.insert(pair, new_avg);
}

fn make_exchange_pairs(coins: Vec<&str>, fiats: Vec<&str>) -> Vec<ExchangePair> {
pub fn make_exchange_pairs(coins: Vec<&str>, fiats: Vec<&str>) -> Vec<ExchangePair> {
let mut exchange_pairs = Vec::new();

for coin in coins {
Expand All @@ -88,14 +88,7 @@ impl Worker {
for market_name in market_names {
let worker_2 = Arc::clone(self.arc.as_ref().unwrap());
let market_spine = MarketSpine::new(worker_2, self.tx.clone(), market_name.to_string());
let market = market_factory(market_spine);

for exchange_pair in &exchange_pairs {
market
.lock()
.unwrap()
.add_exchange_pair(exchange_pair.clone());
}
let market = market_factory(market_spine, exchange_pairs.clone());

self.markets.push(market);
}
Expand All @@ -121,65 +114,65 @@ impl Worker {
}

#[cfg(test)]
mod test {
pub mod test {
use crate::repository::pair_average_trade_price::PairAverageTradePrice;
use crate::worker::defaults::{COINS, FIATS, MARKETS};
use crate::worker::defaults::MARKETS;
use crate::worker::worker::Worker;
use ntest::timeout;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{mpsc, Arc, Mutex};
use std::thread::JoinHandle;

fn get_worker() -> (Arc<Mutex<Worker>>, Receiver<JoinHandle<()>>) {
pub fn get_worker() -> (
Arc<Mutex<Worker>>,
Sender<JoinHandle<()>>,
Receiver<JoinHandle<()>>,
) {
let (tx, rx) = mpsc::channel();
let pair_average_trade_price_repository = PairAverageTradePrice::new();
let worker = Worker::new(
tx,
tx.clone(),
Arc::new(Mutex::new(pair_average_trade_price_repository)),
);

(worker, rx)
(worker, tx, rx)
}

pub fn check_threads(mut thread_names: Vec<String>, rx: Receiver<JoinHandle<()>>) {
let mut passed_thread_names = Vec::new();
for received_thread in rx {
let thread_name = received_thread.thread().name().unwrap().to_string();
assert!(!passed_thread_names.contains(&thread_name));

if let Some(index) = thread_names.iter().position(|r| r == &thread_name) {
passed_thread_names.push(thread_names.swap_remove(index));
}
if thread_names.is_empty() {
break;
}
}
}

#[test]
fn test_new() {
let (worker, _) = get_worker();
let (worker, _, _) = get_worker();

assert!(worker.lock().unwrap().arc.is_some());
}

fn test_configure(markets: Option<Vec<&str>>, coins: Option<Vec<&str>>) {
let (worker, _) = get_worker();
let (worker, _, _) = get_worker();
worker
.lock()
.unwrap()
.configure(markets.clone(), coins.clone());

let markets = markets.unwrap_or(Vec::from(MARKETS));
let fiats = Vec::from(FIATS);
let coins = coins.unwrap_or(Vec::from(COINS));
let exchange_pairs = Worker::make_exchange_pairs(coins, fiats);

assert_eq!(markets.len(), worker.lock().unwrap().markets.len());

for (i, market) in worker.lock().unwrap().markets.iter().enumerate() {
let market_name = market.lock().unwrap().get_spine().name.clone();
assert_eq!(market_name, markets[i]);

let exchange_pair_keys: Vec<String> = market
.lock()
.unwrap()
.get_spine()
.get_exchange_pairs()
.keys()
.cloned()
.collect();
assert_eq!(exchange_pair_keys.len(), exchange_pairs.len());

for pair in &exchange_pairs {
let pair_string = market.lock().unwrap().make_pair(pair.get_pair_ref());
assert!(exchange_pair_keys.contains(&pair_string));
}
}
}

Expand All @@ -199,8 +192,7 @@ mod test {
#[test]
#[timeout(1000)]
fn test_start() {
let (worker, rx) = get_worker();
worker.lock().unwrap().start(None, None);
let (worker, _, rx) = get_worker();

let markets = Vec::from(MARKETS);
let mut thread_names = Vec::new();
Expand All @@ -209,17 +201,7 @@ mod test {
thread_names.push(thread_name);
}

let mut passed_thread_names = Vec::new();
for received_thread in rx {
let thread_name = received_thread.thread().name().unwrap().to_string();
assert!(!passed_thread_names.contains(&thread_name));

if let Some(index) = thread_names.iter().position(|r| r == &thread_name) {
passed_thread_names.push(thread_names.swap_remove(index));
}
if thread_names.is_empty() {
break;
}
}
worker.lock().unwrap().start(None, None);
check_threads(thread_names, rx);
}
}