Skip to content

Commit

Permalink
feat: include timestamp in WebSocket swap updates
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 committed Aug 13, 2024
1 parent 69566b5 commit 9a44944
Showing 1 changed file with 45 additions and 21 deletions.
66 changes: 45 additions & 21 deletions boltzr/src/ws/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::error::Error;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -48,6 +48,7 @@ enum WsRequest {
struct UpdateMessage {
channel: SubscriptionChannel,
args: Vec<SwapStatus>,
timestamp: String,
}

#[derive(Deserialize, Serialize, Debug, PartialEq)]
Expand Down Expand Up @@ -197,9 +198,18 @@ where
continue;
}

let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(res) => res,
Err(err) => {
error!("Could not get UNIX time: {}", err);
break;
}
};

let msg = match serde_json::to_string(&WsResponse::Update(UpdateMessage {
channel: SubscriptionChannel::SwapUpdate,
args: relevant_updates,
timestamp: timestamp.as_millis().to_string(),
})) {
Ok(res) => res,
Err(err) => {
Expand Down Expand Up @@ -274,16 +284,14 @@ where

#[cfg(test)]
mod status_test {
use crate::ws::status::{
ErrorResponse, Status, SubscriptionChannel, SwapInfos, UpdateMessage, WsResponse,
};
use crate::ws::status::{ErrorResponse, Status, SubscriptionChannel, SwapInfos, WsResponse};
use crate::ws::types::SwapStatus;
use crate::ws::Config;
use async_trait::async_trait;
use async_tungstenite::tungstenite::Message;
use futures::{SinkExt, StreamExt};
use serde_json::json;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast::Sender;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -425,18 +433,26 @@ mod status_test {
continue;
}

assert_eq!(
serde_json::from_str::<WsResponse>(msg.to_text().unwrap()).unwrap(),
WsResponse::Update {
0: UpdateMessage {
channel: SubscriptionChannel::SwapUpdate,
args: vec![
let res = serde_json::from_str::<WsResponse>(msg.to_text().unwrap()).unwrap();
match res {
WsResponse::Update(res) => {
assert_eq!(res.channel, SubscriptionChannel::SwapUpdate);
assert_eq!(
res.args,
vec![
SwapStatus::default("some".into(), "swap.created".into()),
SwapStatus::default("ids".into(), "swap.created".into()),
]
},
);
assert!(
res.timestamp.parse::<u128>().unwrap()
<= SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
);
}
);
};
break;
}

Expand Down Expand Up @@ -489,15 +505,23 @@ mod status_test {
continue;
}

assert_eq!(
serde_json::from_str::<WsResponse>(msg.to_text().unwrap()).unwrap(),
WsResponse::Update {
0: UpdateMessage {
channel: SubscriptionChannel::SwapUpdate,
args: vec![SwapStatus::default("ids".into(), "invoice.set".into()),]
},
let res = serde_json::from_str::<WsResponse>(msg.to_text().unwrap()).unwrap();
match res {
WsResponse::Update(res) => {
assert_eq!(res.channel, SubscriptionChannel::SwapUpdate);
assert_eq!(
res.args,
vec![SwapStatus::default("ids".into(), "invoice.set".into())]
);
assert!(
res.timestamp.parse::<u128>().unwrap()
<= SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
);
}
);
};
break;
}

Expand Down

0 comments on commit 9a44944

Please sign in to comment.