From 4a478c652feff30acdeff2e05ab552aaacee4975 Mon Sep 17 00:00:00 2001 From: Victor Login Date: Sun, 29 Sep 2024 00:08:09 +0300 Subject: [PATCH] currency: refactoring Signed-off-by: Victor Login --- boundaries/billing/currency/Cargo.toml | 1 + boundaries/billing/currency/src/main.rs | 99 +++++++++++++++---- .../exchange_rate/rate_fetcher_use_case.rs | 25 +++++ .../load_exchange_rates_cron/README.md | 53 ++++++++-- .../usecases/load_exchange_rates_cron/cron.rs | 29 ++++++ .../usecases/load_exchange_rates_cron/mod.rs | 0 .../billing/currency/src/usecases/mod.rs | 1 + 7 files changed, 182 insertions(+), 26 deletions(-) create mode 100644 boundaries/billing/currency/src/usecases/load_exchange_rates_cron/cron.rs create mode 100644 boundaries/billing/currency/src/usecases/load_exchange_rates_cron/mod.rs diff --git a/boundaries/billing/currency/Cargo.toml b/boundaries/billing/currency/Cargo.toml index cd4475fc330..d3e0bee1ec6 100644 --- a/boundaries/billing/currency/Cargo.toml +++ b/boundaries/billing/currency/Cargo.toml @@ -18,3 +18,4 @@ redis = { version = "0.27.2", features = ["aio", "tokio-comp"] } deadpool-redis = { version = "0.18.0", features = ["rt_tokio_1"] } dotenvy = "0.15.7" thiserror = "1.0.64" +tokio-cron-scheduler = "0.13.0" diff --git a/boundaries/billing/currency/src/main.rs b/boundaries/billing/currency/src/main.rs index 4b97b56d3b2..7492c9feebf 100644 --- a/boundaries/billing/currency/src/main.rs +++ b/boundaries/billing/currency/src/main.rs @@ -4,22 +4,24 @@ mod infrastructure; mod repository; mod usecases; +use std::convert::Infallible; use infrastructure::http::routes::api; use repository::exchange_rate::redis_repository::RedisExchangeRateRepository; use std::sync::Arc; -use tracing::info; +use tracing::{error, info}; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::EnvFilter; use usecases::currency_conversion::ICurrencyConversionUseCase; use usecases::exchange_rate::RateFetcherUseCase; use utoipa::OpenApi; -use warp::Filter; - -// Import dotenvy +use warp::{Filter, Rejection}; +use tokio_cron_scheduler::{Job, JobScheduler}; use dotenvy::dotenv; use std::env; - -// Import mock providers +use std::iter::Map; +use tracing_subscriber::filter::combinator::And; +use warp::path::Exact; +use warp::reply::Json; use crate::cache::CacheService; use crate::repository::exchange_rate::in_memory_repository::InMemoryExchangeRateRepository; use crate::repository::exchange_rate::repository::ExchangeRateRepository; @@ -47,16 +49,41 @@ struct ApiDoc; #[tokio::main] async fn main() { - // Load environment variables from .env file + // Load environment variables dotenv().ok(); + init_tracing(); + + // Initialize services (repositories, caches, external providers, etc.) + let (rate_fetcher_use_case, currency_conversion_use_case) = init_services().await; - // Initialize tracing subscriber with log level from environment + // Initialize scheduler and job + let mut scheduler = init_scheduler(rate_fetcher_use_case.clone()).await; + + // Generate OpenAPI specification + let openapi = ApiDoc::openapi(); + let openapi_filter = warp::path!("api-docs" / "openapi.json") + .map(move || warp::reply::json(&openapi)); + + // Serve the API routes + serve_api(rate_fetcher_use_case, currency_conversion_use_case).await; + + // Wait for a shutdown signal and stop the scheduler + tokio::signal::ctrl_c().await.unwrap(); + scheduler.shutdown().await.unwrap(); + println!("Shutting down"); +} + +/// Initialize environment logging using tracing. +fn init_tracing() { let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()); tracing_subscriber::fmt() .with_env_filter(EnvFilter::new(log_level)) .with_span_events(FmtSpan::CLOSE) .init(); +} +/// Initialize the services (repositories, caches, providers, etc.) +async fn init_services() -> (Arc, Arc) { // Retrieve Redis URL from environment let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set in .env"); @@ -82,21 +109,60 @@ async fn main() { 3, // max_retries )); - // Initialize InMemoryExchangeRateRepository if needed - let _exchange_rate_repository = Arc::new(InMemoryExchangeRateRepository::new()); - // Create CurrencyConversionUseCase let currency_conversion_use_case = Arc::new(CurrencyConversionUseCase::new( rate_fetcher_use_case.clone(), )); - // Generate OpenAPI specification - let openapi = ApiDoc::openapi(); + (rate_fetcher_use_case, currency_conversion_use_case) +} + +/// Initialize the scheduler and add the currency update job. +async fn init_scheduler(rate_fetcher_use_case: Arc) -> JobScheduler { + let scheduler = JobScheduler::new().await.unwrap(); - // Serve OpenAPI JSON at `/api-docs/openapi.json` - let openapi_filter = - warp::path!("api-docs" / "openapi.json").map(move || warp::reply::json(&openapi)); + // Define a cron job to run every hour (adjust for testing if needed) + let job = Job::new_async("0 * * * * *", move |_uuid, _l| { + let rate_fetcher_use_case = rate_fetcher_use_case.clone(); + Box::pin(async move { + run_currency_update_job(rate_fetcher_use_case).await; + }) + }).unwrap(); + + scheduler.add(job).await.unwrap(); + scheduler.start().await.unwrap(); + + scheduler +} + +/// Periodic currency update job. +async fn run_currency_update_job(rate_fetcher_use_case: Arc) { + info!("Starting exchange rate update job..."); + + let from_currency = "USD"; // Example currencies + let to_currency = "EUR"; + + match rate_fetcher_use_case.fetch_rate(from_currency, to_currency).await { + Some(exchange_rate) => { + info!( + "Successfully fetched exchange rate for {} to {}: {}", + from_currency, to_currency, exchange_rate.rate + ); + } + None => { + error!( + "Failed to fetch exchange rate for {} to {}", + from_currency, to_currency + ); + } + } +} +/// Set up and run the HTTP API server. +async fn serve_api( + rate_fetcher_use_case: Arc, + currency_conversion_use_case: Arc, +) { // Retrieve server host and port from environment let server_host = env::var("SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let server_port: u16 = env::var("SERVER_PORT") @@ -106,7 +172,6 @@ async fn main() { // Set up the HTTP server with the API routes and OpenAPI filter let routes = api(rate_fetcher_use_case, currency_conversion_use_case) - .or(openapi_filter) .with(warp::trace::request()); info!("Starting server at http://{}:{}", server_host, server_port); diff --git a/boundaries/billing/currency/src/usecases/exchange_rate/rate_fetcher_use_case.rs b/boundaries/billing/currency/src/usecases/exchange_rate/rate_fetcher_use_case.rs index 951876366dc..9a2c7bae8c9 100644 --- a/boundaries/billing/currency/src/usecases/exchange_rate/rate_fetcher_use_case.rs +++ b/boundaries/billing/currency/src/usecases/exchange_rate/rate_fetcher_use_case.rs @@ -71,10 +71,15 @@ impl RateFetcherUseCase { attempt, rate ); + + // Store the rate in cache and database + self.cache.set_rate(&rate).await.ok(); + // Step 3: Save using `save_rate` method which returns Result if let Err(e) = self.save_rate(rate.clone()).await { error!("Failed to save rate: {}", e); } + return Some(rate); } Err(e) => { @@ -99,6 +104,26 @@ impl RateFetcherUseCase { "Failed to fetch exchange rate for {} to {} after {} attempts", from, to, self.max_retries ); + + None + } + + async fn fetch_with_retry(&self, provider: Arc, from: &str, to: &str) -> Option { + let mut attempt = 0; + let max_attempts = 3; + + while attempt < max_attempts { + match provider.fetch_rate(from, to).await { + Ok(rate) => return Some(rate), + Err(e) => { + attempt += 1; + let backoff_duration = Duration::from_secs(2_u64.pow(attempt)); + error!("Attempt {} failed: {}. Retrying in {} seconds...", attempt, e, backoff_duration.as_secs()); + sleep(backoff_duration).await; + } + } + } + None } diff --git a/boundaries/billing/currency/src/usecases/load_exchange_rates_cron/README.md b/boundaries/billing/currency/src/usecases/load_exchange_rates_cron/README.md index 528a680cfc6..49f8988c1d5 100644 --- a/boundaries/billing/currency/src/usecases/load_exchange_rates_cron/README.md +++ b/boundaries/billing/currency/src/usecases/load_exchange_rates_cron/README.md @@ -1,27 +1,28 @@ -# Use Case: Scheduled Updates Rate Data from Subscriptions via Cron Job +# Use Case: Scheduled Updates of Exchange Rate Data via Cron Job -> [!NOTE] -> -> This use case covers the scheduled update process where a cron job runs at predefined intervals to -> refresh exchange rate data from Bloomberg and Yahoo APIs, ensuring that the data is up to date. +> **Note** +> This use case covers the scheduled update process where a cron job runs at predefined intervals to refresh exchange rate data from Bloomberg and Yahoo APIs, ensuring that the data is up to date. ## Flow 1. **Scheduled Trigger**: - A cron job is set up to run at regular intervals (e.g., hourly). This job triggers the Currency Service to refresh exchange rate data from external providers. The purpose is to keep the system's cache and database populated with the latest rates, reducing the need to make API calls for every user request. + A cron job is set up to run at regular intervals (e.g., hourly). This job triggers the **Currency Service** to refresh exchange rate data from external providers. The purpose is to keep the system's cache and database populated with the latest rates, reducing the need to make API calls for every user request. 2. **Fetch Exchange Rates**: - The Currency Service executes the `load_exchange_rates` use case to fetch exchange rates from both **Bloomberg** and **Yahoo** APIs. + The **Currency Service** executes the `load_exchange_rates` use case to fetch exchange rates from both **Bloomberg** and **Yahoo** APIs. By using this [Load Exchange Rate Data from Subscriptions](#use-case-load-exchange-rate-data-from-subscriptions) use case, it ensures that the system maintains an up-to-date set of exchange rates. 3. **Handle API Responses**: - **Success**: If the API calls to **Bloomberg** and **Yahoo** are successful, the service retrieves the updated exchange rate data. - **Failure and Fallback**: If one API fails or is unavailable, the service will attempt to use the alternative provider. Additionally, it implements retries with exponential backoff to handle transient failures and avoid overwhelming the external providers. -4. **Logging and Monitoring**: +4. **Store Rates in Cache and Database**: + Once the service successfully retrieves the updated rates, it stores them in the **Cache** for quick access and in the **Rate Database** for long-term storage and auditing. + +5. **Logging and Monitoring**: Throughout the process, the service logs the activities and any errors encountered, such as API failures or rate limit exceedances. This logging is crucial for monitoring the health of the system and for debugging issues when they occur. -5. **Completion**: +6. **Completion**: Once the data has been fetched, processed, and stored, the cron job execution is considered complete. The service waits until the next scheduled interval to perform another update. ## Sequence Diagram @@ -57,3 +58,37 @@ note over cron, service end note @enduml ``` + +## Error Handling + +### 1. API Failure + +- **Scenario**: One of the external APIs (Bloomberg or Yahoo) fails to return data or is unavailable. +- **Handling**: + - The service retries the request with exponential backoff. + - If the API continues to fail, the system falls back to the alternative provider. + - The failure is logged, and an alert may be triggered depending on the severity and duration of the outage. + +### 2. Rate Limit Exceeded + +- **Scenario**: The number of requests sent to the external API exceeds its rate limits. +- **Handling**: + - The service tracks API usage and implements rate limiting to prevent further requests from exceeding the limits. + - Retries are scheduled based on the API's rate limit reset time. + - Errors are logged for monitoring and further analysis. + +### 3. Cache or Database Unavailable + +- **Scenario**: The cache or database is unavailable when the service attempts to store the updated exchange rates. +- **Handling**: + - If the **Cache** is unavailable, the service will skip updating the cache and log the error. + - If the **Rate Database** is unavailable, the system attempts to retry at a later time. + - All failures are logged, and alerts may be triggered if necessary. + +### 4. Invalid or Inconsistent Data + +- **Scenario**: The data retrieved from the external provider is invalid or inconsistent. +- **Handling**: + - The service discards invalid data and logs the issue. + - It attempts to fetch the data again during the next scheduled cron job. + - The system will continue serving cached or previously stored data to clients until valid data is retrieved. diff --git a/boundaries/billing/currency/src/usecases/load_exchange_rates_cron/cron.rs b/boundaries/billing/currency/src/usecases/load_exchange_rates_cron/cron.rs new file mode 100644 index 00000000000..9a5830135b3 --- /dev/null +++ b/boundaries/billing/currency/src/usecases/load_exchange_rates_cron/cron.rs @@ -0,0 +1,29 @@ +use tokio_cron_scheduler::{Job, JobScheduler}; +use std::sync::Arc; +use crate::usecases::exchange_rate::fetcher::RateFetcherUseCase; +use tracing::{info, error}; + +async fn run_currency_update_job( + rate_fetcher_use_case: Arc +) { + info!("Starting exchange rate update job..."); + + let from_currency = "USD"; // Example currencies + let to_currency = "EUR"; + + match rate_fetcher_use_case.fetch_rate(from_currency, to_currency).await { + Some(exchange_rate) => { + info!( + "Successfully fetched exchange rate for {} to {}: {}", + from_currency, to_currency, exchange_rate.rate + ); + // Additional logic to store in cache/database if needed + } + None => { + error!( + "Failed to fetch exchange rate for {} to {}", + from_currency, to_currency + ); + } + } +} diff --git a/boundaries/billing/currency/src/usecases/load_exchange_rates_cron/mod.rs b/boundaries/billing/currency/src/usecases/load_exchange_rates_cron/mod.rs new file mode 100644 index 00000000000..e69de29bb2d diff --git a/boundaries/billing/currency/src/usecases/mod.rs b/boundaries/billing/currency/src/usecases/mod.rs index 6d63b497db1..861c08ab0a0 100644 --- a/boundaries/billing/currency/src/usecases/mod.rs +++ b/boundaries/billing/currency/src/usecases/mod.rs @@ -1,5 +1,6 @@ pub mod currency_conversion; pub mod exchange_rate; +mod load_exchange_rates_cron; // pub mod discrepancies; // pub mod load_exchange_rates; // pub mod load_exchange_rates_cron;