Skip to content

Commit

Permalink
No timeout for realtime streams (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
wboayue committed Aug 29, 2023
1 parent a9c5fb4 commit 9952086
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 22 deletions.
16 changes: 9 additions & 7 deletions examples/tick_by_tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,29 @@ fn main() {
}

fn stream_last(client: &mut Client, symbol: &str) -> anyhow::Result<()> {
let contract = contract_es();
let contract = contract_gc();
let ticks = client.tick_by_tick_last(&contract, 0, false)?;

for (i, tick) in ticks.enumerate().take(60) {
println!("tick: {i:?} {tick:?}");
for (i, tick) in ticks.enumerate() {
println!("{}: {i:?} {tick:?}", contract.symbol);
}

Ok(())
}

fn contract_es() -> Contract {
let mut contract = Contract::futures("ES");
contract.exchange = "CME".to_owned();
contract.contract_id = 495512569;
contract.local_symbol = "ESU3".to_string();
contract.trading_class = "ES".into();
contract.exchange = "CME".into();
contract
}

fn contract_gc() -> Contract {
let mut contract = Contract::futures("GC");
contract.exchange = "COMEX".to_owned();
contract.contract_id = 605552438;
contract.local_symbol = "GCZ3".to_string();
contract.trading_class = "GC".into();
contract
}

Expand All @@ -89,7 +91,7 @@ fn stream_bid_ask(client: &mut Client, symbol: &str) -> anyhow::Result<()> {
let contract = contract_es();
let ticks = client.tick_by_tick_bid_ask(&contract, 0, false)?;

for (i, tick) in ticks.enumerate().take(60) {
for (i, tick) in ticks.enumerate() {
println!("tick: {i:?} {tick:?}");
}

Expand Down
5 changes: 5 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,11 @@ impl Client {
self.message_bus.borrow_mut().send_generic_message(request_id, &message)
}

pub(crate) fn send_durable_request(&self, request_id: i32, message: RequestMessage) -> Result<ResponseIterator, Error> {
debug!("send_durable_request({:?}, {:?})", request_id, message);
self.message_bus.borrow_mut().send_durable_message(request_id, &message)
}

pub(crate) fn send_order(&self, order_id: i32, message: RequestMessage) -> Result<ResponseIterator, Error> {
debug!("send_order({:?}, {:?})", order_id, message);
self.message_bus.borrow_mut().send_order_message(order_id, &message)
Expand Down
38 changes: 29 additions & 9 deletions src/client/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub(crate) trait MessageBus {
fn write_message(&mut self, packet: &RequestMessage) -> Result<(), Error>;

fn send_generic_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<ResponseIterator, Error>;
fn send_durable_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<ResponseIterator, Error>;
fn send_order_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<ResponseIterator, Error>;
fn request_next_order_id(&mut self, message: &RequestMessage) -> Result<GlobalResponseIterator, Error>;
fn request_open_orders(&mut self, message: &RequestMessage) -> Result<GlobalResponseIterator, Error>;
Expand Down Expand Up @@ -148,10 +149,19 @@ impl MessageBus for TcpMessageBus {
self.signals_send.clone(),
Some(request_id),
None,
Duration::from_secs(10),
Some(Duration::from_secs(10)),
))
}

fn send_durable_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<ResponseIterator, Error> {
let (sender, receiver) = channel::unbounded();

self.add_request(request_id, sender)?;
self.write_message(packet)?;

Ok(ResponseIterator::new(receiver, self.signals_send.clone(), Some(request_id), None, None))
}

fn send_order_message(&mut self, order_id: i32, message: &RequestMessage) -> Result<ResponseIterator, Error> {
let (sender, receiver) = channel::unbounded();

Expand All @@ -163,7 +173,7 @@ impl MessageBus for TcpMessageBus {
self.signals_send.clone(),
None,
Some(order_id),
Duration::from_secs(10),
Some(Duration::from_secs(10)),
))
}

Expand Down Expand Up @@ -525,7 +535,7 @@ pub(crate) struct ResponseIterator {
signals: Sender<Signal>, // for client to signal termination
request_id: Option<i32>, // initiating request_id
order_id: Option<i32>, // initiating order_id
timeout: Duration, // How long to wait for next message
timeout: Option<Duration>, // How long to wait for next message
}

impl ResponseIterator {
Expand All @@ -534,7 +544,7 @@ impl ResponseIterator {
signals: Sender<Signal>,
request_id: Option<i32>,
order_id: Option<i32>,
timeout: Duration,
timeout: Option<Duration>,
) -> Self {
ResponseIterator {
messages,
Expand All @@ -561,12 +571,22 @@ impl Drop for ResponseIterator {
impl Iterator for ResponseIterator {
type Item = ResponseMessage;
fn next(&mut self) -> Option<Self::Item> {
match self.messages.recv_timeout(self.timeout) {
Err(err) => {
info!("timeout receiving packet: {err}");
None
if let Some(timeout) = self.timeout {
match self.messages.recv_timeout(timeout) {
Ok(message) => Some(message),
Err(err) => {
info!("timeout receiving message: {err}");
None
}
}
} else {
match self.messages.recv() {
Ok(message) => Some(message),
Err(err) => {
error!("error receiving message: {err}");
None
}
}
Ok(message) => Some(message),
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/market_data/realtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub(crate) fn realtime_bars<'a>(
let request_id = client.next_request_id();
let packet = encoders::encode_request_realtime_bars(client.server_version(), request_id, contract, bar_size, what_to_show, use_rth, options)?;

let responses = client.send_request(request_id, packet)?;
let responses = client.send_durable_request(request_id, packet)?;

Ok(RealTimeBarIterator::new(client, request_id, responses))
}
Expand All @@ -164,7 +164,7 @@ pub(crate) fn tick_by_tick_all_last<'a>(
let request_id = client.next_request_id();

let message = encoders::tick_by_tick(server_version, request_id, contract, "AllLast", number_of_ticks, ignore_size)?;
let responses = client.send_request(request_id, message)?;
let responses = client.send_durable_request(request_id, message)?;

Ok(TradeIterator {
client,
Expand Down Expand Up @@ -200,7 +200,7 @@ pub(crate) fn tick_by_tick_last<'a>(
let request_id = client.next_request_id();

let message = encoders::tick_by_tick(server_version, request_id, contract, "Last", number_of_ticks, ignore_size)?;
let responses = client.send_request(request_id, message)?;
let responses = client.send_durable_request(request_id, message)?;

Ok(TradeIterator {
client,
Expand All @@ -222,7 +222,7 @@ pub(crate) fn tick_by_tick_bid_ask<'a>(
let request_id = client.next_request_id();

let message = encoders::tick_by_tick(server_version, request_id, contract, "BidAsk", number_of_ticks, ignore_size)?;
let responses = client.send_request(request_id, message)?;
let responses = client.send_durable_request(request_id, message)?;

Ok(BidAskIterator {
client,
Expand All @@ -244,7 +244,7 @@ pub(crate) fn tick_by_tick_midpoint<'a>(
let request_id = client.next_request_id();

let message = encoders::tick_by_tick(server_version, request_id, contract, "MidPoint", number_of_ticks, ignore_size)?;
let responses = client.send_request(request_id, message)?;
let responses = client.send_durable_request(request_id, message)?;

Ok(MidPointIterator {
client,
Expand Down
6 changes: 5 additions & 1 deletion src/stubs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ impl MessageBus for MessageBusStub {
mock_request(self, request_id, message)
}

fn send_durable_message(&mut self, request_id: i32, message: &RequestMessage) -> Result<ResponseIterator, Error> {
mock_request(self, request_id, message)
}

fn send_order_message(&mut self, request_id: i32, message: &RequestMessage) -> Result<ResponseIterator, Error> {
mock_request(self, request_id, message)
}
Expand Down Expand Up @@ -77,7 +81,7 @@ fn mock_request(stub: &mut MessageBusStub, _request_id: i32, message: &RequestMe
sender.send(ResponseMessage::from(&message.replace('|', "\0"))).unwrap();
}

Ok(ResponseIterator::new(receiver, s1, None, None, Duration::from_secs(5)))
Ok(ResponseIterator::new(receiver, s1, None, None, Some(Duration::from_secs(5))))
}

fn mock_global_request(stub: &mut MessageBusStub, message: &RequestMessage) -> Result<GlobalResponseIterator, Error> {
Expand Down

0 comments on commit 9952086

Please sign in to comment.