Skip to content

Commit

Permalink
Remove anyhow from PriceTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Mar 22, 2023
1 parent 2288ce4 commit 7d35456
Showing 1 changed file with 46 additions and 21 deletions.
67 changes: 46 additions & 21 deletions price/src/price_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::{anyhow, Error, Result};
use chrono::{DateTime, Duration, TimeZone, Utc};
use file_store::{FileInfo, FileStore, FileType};
use futures::stream::{StreamExt, TryStreamExt};
Expand All @@ -7,22 +6,42 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::{mpsc, watch};

#[derive(thiserror::Error, Debug)]
pub enum PriceTrackerError {
#[error("invalid timestamp in price: {0}")]
InvalidTimestamp(u64),
#[error("price is not currently available")]
PriceNotAvailable,
#[error("price too old, price timestamp: {0}")]
PriceTooOld(DateTime<Utc>),
#[error("tokio join error")]
JoinError(#[from] tokio::task::JoinError),
#[error("file store error")]
FileStoreError(#[from] file_store::Error),
#[error("proto decode error")]
DecodeError(#[from] helium_proto::DecodeError),
#[error("killed due to {0}")]
KilledError(String),
#[error("error sending over mpsc channel")]
SendError(#[from] mpsc::error::SendError<String>),
}

#[derive(Clone)]
struct Price {
price: u64,
timestamp: DateTime<Utc>,
}

impl TryFrom<&PriceReportV1> for Price {
type Error = anyhow::Error;
type Error = PriceTrackerError;

fn try_from(value: &PriceReportV1) -> Result<Self, Self::Error> {
Ok(Self {
price: value.price,
timestamp: Utc
.timestamp_opt(value.timestamp as i64, 0)
.single()
.ok_or_else(|| anyhow!("Invalid timestamp: {}", value.timestamp))?,
.ok_or_else(|| PriceTrackerError::InvalidTimestamp(value.timestamp))?,
})
}
}
Expand Down Expand Up @@ -52,7 +71,13 @@ impl PriceTracker {
pub async fn start(
settings: &Settings,
shutdown: triggered::Listener,
) -> Result<(Self, impl std::future::Future<Output = Result<()>>)> {
) -> Result<
(
Self,
impl std::future::Future<Output = Result<(), PriceTrackerError>>,
),
PriceTrackerError,
> {
let file_store = FileStore::from_settings(&settings.file_store).await?;
let (price_sender, price_receiver) = watch::channel(Prices::new());
let (task_kill_sender, task_kill_receiver) = mpsc::channel(1);
Expand Down Expand Up @@ -81,25 +106,25 @@ impl PriceTracker {
match handle.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(Error::from(err)),
Err(err) => Err(PriceTrackerError::from(err)),
}
}))
}

pub async fn price(&self, token_type: &BlockchainTokenTypeV1) -> Result<u64> {
pub async fn price(
&self,
token_type: &BlockchainTokenTypeV1,
) -> Result<u64, PriceTrackerError> {
let result = self
.price_receiver
.borrow()
.get(token_type)
.ok_or_else(|| anyhow!("price not available"))
.ok_or_else(|| PriceTrackerError::PriceNotAvailable)
.and_then(|price| {
if price.timestamp > Utc::now() - self.price_duration {
Ok(price.price)
} else {
Err(anyhow!(
"price too old, price timestamp: {}",
price.timestamp
))
Err(PriceTrackerError::PriceTooOld(price.timestamp))
}
});

Expand All @@ -117,7 +142,7 @@ async fn run(
price_sender: watch::Sender<Prices>,
mut after: DateTime<Utc>,
shutdown: triggered::Listener,
) -> Result<()> {
) -> Result<(), PriceTrackerError> {
let mut trigger = tokio::time::interval(std::time::Duration::from_secs(30));

loop {
Expand All @@ -132,8 +157,8 @@ async fn run(
let timestamp = process_files(&file_store, &price_sender, after).await?;
after = timestamp.unwrap_or(after);
}
msg = task_killer.recv() => if let Some(string) = msg {
return Err(anyhow!(string));
msg = task_killer.recv() => if let Some(error) = msg {
return Err(PriceTrackerError::KilledError(error));
}
}
}
Expand All @@ -145,21 +170,21 @@ async fn calculate_initial_prices(
file_store: &FileStore,
price_duration: Duration,
sender: &watch::Sender<Prices>,
) -> Result<DateTime<Utc>> {
) -> Result<DateTime<Utc>, PriceTrackerError> {
tracing::debug!("PriceTracker: Updating initial prices");
process_files(file_store, sender, Utc::now() - price_duration)
.await?
.ok_or_else(|| anyhow!("price not available"))
.ok_or_else(|| PriceTrackerError::PriceNotAvailable)
}

async fn process_files(
file_store: &FileStore,
sender: &watch::Sender<Prices>,
after: DateTime<Utc>,
) -> Result<Option<DateTime<Utc>>> {
) -> Result<Option<DateTime<Utc>>, PriceTrackerError> {
file_store
.list(FileType::PriceReport, after, None)
.map_err(Error::from)
.map_err(PriceTrackerError::from)
.and_then(|file| process_file(file_store, file, sender))
.try_fold(None, |_old, ts| async move { Ok(Some(ts)) })
.await
Expand All @@ -169,15 +194,15 @@ async fn process_file(
file_store: &FileStore,
file: FileInfo,
sender: &watch::Sender<Prices>,
) -> Result<DateTime<Utc>> {
) -> Result<DateTime<Utc>, PriceTrackerError> {
tracing::debug!("PriceTracker: processing pricing report file {}", file.key);
let timestamp = file.timestamp;

file_store
.stream_file(file)
.await?
.map_err(Error::from)
.and_then(|buf| async { PriceReportV1::decode(buf).map_err(Error::from) })
.map_err(PriceTrackerError::from)
.and_then(|buf| async { PriceReportV1::decode(buf).map_err(PriceTrackerError::from) })
.and_then(|report| async move {
Price::try_from(&report).map(|price| (report.token_type(), price))
})
Expand Down

0 comments on commit 7d35456

Please sign in to comment.