diff --git a/price/src/price_tracker.rs b/price/src/price_tracker.rs index 248460222..e7b5d9e5e 100644 --- a/price/src/price_tracker.rs +++ b/price/src/price_tracker.rs @@ -1,4 +1,3 @@ -use anyhow::{anyhow, Error, Result}; use chrono::{DateTime, Duration, TimeZone, Utc}; use file_store::{FileInfo, FileStore, FileType}; use futures::stream::{StreamExt, TryStreamExt}; @@ -7,6 +6,26 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tokio::sync::{mpsc, watch}; +#[derive(thiserror::Error, Debug)] +pub enum PriceTrackerError { + #[error("invalid timestamp in price: {0}")] + InvalidTimestamp(u64), + #[error("price is not currently available")] + PriceNotAvailable, + #[error("price too old, price timestamp: {0}")] + PriceTooOld(DateTime), + #[error("tokio join error")] + JoinError(#[from] tokio::task::JoinError), + #[error("file store error")] + FileStoreError(#[from] file_store::Error), + #[error("proto decode error")] + DecodeError(#[from] helium_proto::DecodeError), + #[error("killed due to {0}")] + KilledError(String), + #[error("error sending over mpsc channel")] + SendError(#[from] mpsc::error::SendError), +} + #[derive(Clone)] struct Price { price: u64, @@ -14,7 +33,7 @@ struct Price { } impl TryFrom<&PriceReportV1> for Price { - type Error = anyhow::Error; + type Error = PriceTrackerError; fn try_from(value: &PriceReportV1) -> Result { Ok(Self { @@ -22,7 +41,7 @@ impl TryFrom<&PriceReportV1> for Price { timestamp: Utc .timestamp_opt(value.timestamp as i64, 0) .single() - .ok_or_else(|| anyhow!("Invalid timestamp: {}", value.timestamp))?, + .ok_or_else(|| PriceTrackerError::InvalidTimestamp(value.timestamp))?, }) } } @@ -52,7 +71,13 @@ impl PriceTracker { pub async fn start( settings: &Settings, shutdown: triggered::Listener, - ) -> Result<(Self, impl std::future::Future>)> { + ) -> Result< + ( + Self, + impl std::future::Future>, + ), + PriceTrackerError, + > { let file_store = FileStore::from_settings(&settings.file_store).await?; let (price_sender, price_receiver) = watch::channel(Prices::new()); let (task_kill_sender, task_kill_receiver) = mpsc::channel(1); @@ -81,25 +106,25 @@ impl PriceTracker { match handle.await { Ok(Ok(())) => Ok(()), Ok(Err(err)) => Err(err), - Err(err) => Err(Error::from(err)), + Err(err) => Err(PriceTrackerError::from(err)), } })) } - pub async fn price(&self, token_type: &BlockchainTokenTypeV1) -> Result { + pub async fn price( + &self, + token_type: &BlockchainTokenTypeV1, + ) -> Result { let result = self .price_receiver .borrow() .get(token_type) - .ok_or_else(|| anyhow!("price not available")) + .ok_or_else(|| PriceTrackerError::PriceNotAvailable) .and_then(|price| { if price.timestamp > Utc::now() - self.price_duration { Ok(price.price) } else { - Err(anyhow!( - "price too old, price timestamp: {}", - price.timestamp - )) + Err(PriceTrackerError::PriceTooOld(price.timestamp)) } }); @@ -117,7 +142,7 @@ async fn run( price_sender: watch::Sender, mut after: DateTime, shutdown: triggered::Listener, -) -> Result<()> { +) -> Result<(), PriceTrackerError> { let mut trigger = tokio::time::interval(std::time::Duration::from_secs(30)); loop { @@ -132,8 +157,8 @@ async fn run( let timestamp = process_files(&file_store, &price_sender, after).await?; after = timestamp.unwrap_or(after); } - msg = task_killer.recv() => if let Some(string) = msg { - return Err(anyhow!(string)); + msg = task_killer.recv() => if let Some(error) = msg { + return Err(PriceTrackerError::KilledError(error)); } } } @@ -145,21 +170,21 @@ async fn calculate_initial_prices( file_store: &FileStore, price_duration: Duration, sender: &watch::Sender, -) -> Result> { +) -> Result, PriceTrackerError> { tracing::debug!("PriceTracker: Updating initial prices"); process_files(file_store, sender, Utc::now() - price_duration) .await? - .ok_or_else(|| anyhow!("price not available")) + .ok_or_else(|| PriceTrackerError::PriceNotAvailable) } async fn process_files( file_store: &FileStore, sender: &watch::Sender, after: DateTime, -) -> Result>> { +) -> Result>, PriceTrackerError> { file_store .list(FileType::PriceReport, after, None) - .map_err(Error::from) + .map_err(PriceTrackerError::from) .and_then(|file| process_file(file_store, file, sender)) .try_fold(None, |_old, ts| async move { Ok(Some(ts)) }) .await @@ -169,15 +194,15 @@ async fn process_file( file_store: &FileStore, file: FileInfo, sender: &watch::Sender, -) -> Result> { +) -> Result, PriceTrackerError> { tracing::debug!("PriceTracker: processing pricing report file {}", file.key); let timestamp = file.timestamp; file_store .stream_file(file) .await? - .map_err(Error::from) - .and_then(|buf| async { PriceReportV1::decode(buf).map_err(Error::from) }) + .map_err(PriceTrackerError::from) + .and_then(|buf| async { PriceReportV1::decode(buf).map_err(PriceTrackerError::from) }) .and_then(|report| async move { Price::try_from(&report).map(|price| (report.token_type(), price)) })