Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove anyhow from PriceTracker #418

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions price/src/price_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::{anyhow, Error, Result};
use chrono::{DateTime, Duration, TimeZone, Utc};
use file_store::{FileInfo, FileStore, FileType};
use futures::stream::{StreamExt, TryStreamExt};
Expand All @@ -7,22 +6,42 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::{mpsc, watch};

#[derive(thiserror::Error, Debug)]
pub enum PriceTrackerError {
#[error("invalid timestamp in price: {0}")]
InvalidTimestamp(u64),
#[error("price is not currently available")]
PriceNotAvailable,
#[error("price too old, price timestamp: {0}")]
PriceTooOld(DateTime<Utc>),
#[error("tokio join error")]
JoinError(#[from] tokio::task::JoinError),
#[error("file store error")]
FileStoreError(#[from] file_store::Error),
#[error("proto decode error")]
DecodeError(#[from] helium_proto::DecodeError),
#[error("killed due to {0}")]
KilledError(String),
#[error("error sending over mpsc channel")]
SendError(#[from] mpsc::error::SendError<String>),
}

#[derive(Clone)]
struct Price {
price: u64,
timestamp: DateTime<Utc>,
}

impl TryFrom<&PriceReportV1> for Price {
type Error = anyhow::Error;
type Error = PriceTrackerError;

fn try_from(value: &PriceReportV1) -> Result<Self, Self::Error> {
Ok(Self {
price: value.price,
timestamp: Utc
.timestamp_opt(value.timestamp as i64, 0)
.single()
.ok_or_else(|| anyhow!("Invalid timestamp: {}", value.timestamp))?,
.ok_or_else(|| PriceTrackerError::InvalidTimestamp(value.timestamp))?,
})
}
}
Expand Down Expand Up @@ -52,7 +71,13 @@ impl PriceTracker {
pub async fn start(
settings: &Settings,
shutdown: triggered::Listener,
) -> Result<(Self, impl std::future::Future<Output = Result<()>>)> {
) -> Result<
(
Self,
impl std::future::Future<Output = Result<(), PriceTrackerError>>,
),
PriceTrackerError,
> {
let file_store = FileStore::from_settings(&settings.file_store).await?;
let (price_sender, price_receiver) = watch::channel(Prices::new());
let (task_kill_sender, task_kill_receiver) = mpsc::channel(1);
Expand Down Expand Up @@ -81,25 +106,25 @@ impl PriceTracker {
match handle.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(Error::from(err)),
Err(err) => Err(PriceTrackerError::from(err)),
}
}))
}

pub async fn price(&self, token_type: &BlockchainTokenTypeV1) -> Result<u64> {
pub async fn price(
&self,
token_type: &BlockchainTokenTypeV1,
) -> Result<u64, PriceTrackerError> {
let result = self
.price_receiver
.borrow()
.get(token_type)
.ok_or_else(|| anyhow!("price not available"))
.ok_or_else(|| PriceTrackerError::PriceNotAvailable)
.and_then(|price| {
if price.timestamp > Utc::now() - self.price_duration {
Ok(price.price)
} else {
Err(anyhow!(
"price too old, price timestamp: {}",
price.timestamp
))
Err(PriceTrackerError::PriceTooOld(price.timestamp))
}
});

Expand All @@ -117,7 +142,7 @@ async fn run(
price_sender: watch::Sender<Prices>,
mut after: DateTime<Utc>,
shutdown: triggered::Listener,
) -> Result<()> {
) -> Result<(), PriceTrackerError> {
let mut trigger = tokio::time::interval(std::time::Duration::from_secs(30));

loop {
Expand All @@ -132,8 +157,8 @@ async fn run(
let timestamp = process_files(&file_store, &price_sender, after).await?;
after = timestamp.unwrap_or(after);
}
msg = task_killer.recv() => if let Some(string) = msg {
return Err(anyhow!(string));
msg = task_killer.recv() => if let Some(error) = msg {
return Err(PriceTrackerError::KilledError(error));
}
}
}
Expand All @@ -145,21 +170,21 @@ async fn calculate_initial_prices(
file_store: &FileStore,
price_duration: Duration,
sender: &watch::Sender<Prices>,
) -> Result<DateTime<Utc>> {
) -> Result<DateTime<Utc>, PriceTrackerError> {
tracing::debug!("PriceTracker: Updating initial prices");
process_files(file_store, sender, Utc::now() - price_duration)
.await?
.ok_or_else(|| anyhow!("price not available"))
.ok_or_else(|| PriceTrackerError::PriceNotAvailable)
}

async fn process_files(
file_store: &FileStore,
sender: &watch::Sender<Prices>,
after: DateTime<Utc>,
) -> Result<Option<DateTime<Utc>>> {
) -> Result<Option<DateTime<Utc>>, PriceTrackerError> {
file_store
.list(FileType::PriceReport, after, None)
.map_err(Error::from)
.map_err(PriceTrackerError::from)
.and_then(|file| process_file(file_store, file, sender))
.try_fold(None, |_old, ts| async move { Ok(Some(ts)) })
.await
Expand All @@ -169,15 +194,15 @@ async fn process_file(
file_store: &FileStore,
file: FileInfo,
sender: &watch::Sender<Prices>,
) -> Result<DateTime<Utc>> {
) -> Result<DateTime<Utc>, PriceTrackerError> {
tracing::debug!("PriceTracker: processing pricing report file {}", file.key);
let timestamp = file.timestamp;

file_store
.stream_file(file)
.await?
.map_err(Error::from)
.and_then(|buf| async { PriceReportV1::decode(buf).map_err(Error::from) })
.map_err(PriceTrackerError::from)
.and_then(|buf| async { PriceReportV1::decode(buf).map_err(PriceTrackerError::from) })
.and_then(|report| async move {
Price::try_from(&report).map(|price| (report.token_type(), price))
})
Expand Down