Skip to content

Commit

Permalink
currency: refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Login <batazor111@gmail.com>
  • Loading branch information
batazor committed Sep 28, 2024
1 parent b2cf970 commit 4a478c6
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 26 deletions.
1 change: 1 addition & 0 deletions boundaries/billing/currency/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ redis = { version = "0.27.2", features = ["aio", "tokio-comp"] }
deadpool-redis = { version = "0.18.0", features = ["rt_tokio_1"] }
dotenvy = "0.15.7"
thiserror = "1.0.64"
tokio-cron-scheduler = "0.13.0"
99 changes: 82 additions & 17 deletions boundaries/billing/currency/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@ mod infrastructure;
mod repository;
mod usecases;

use std::convert::Infallible;
use infrastructure::http::routes::api;
use repository::exchange_rate::redis_repository::RedisExchangeRateRepository;
use std::sync::Arc;
use tracing::info;
use tracing::{error, info};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::EnvFilter;
use usecases::currency_conversion::ICurrencyConversionUseCase;
use usecases::exchange_rate::RateFetcherUseCase;
use utoipa::OpenApi;
use warp::Filter;

// Import dotenvy
use warp::{Filter, Rejection};
use tokio_cron_scheduler::{Job, JobScheduler};
use dotenvy::dotenv;
use std::env;

// Import mock providers
use std::iter::Map;
use tracing_subscriber::filter::combinator::And;
use warp::path::Exact;
use warp::reply::Json;
use crate::cache::CacheService;
use crate::repository::exchange_rate::in_memory_repository::InMemoryExchangeRateRepository;
use crate::repository::exchange_rate::repository::ExchangeRateRepository;
Expand Down Expand Up @@ -47,16 +49,41 @@ struct ApiDoc;

#[tokio::main]
async fn main() {
// Load environment variables from .env file
// Load environment variables
dotenv().ok();
init_tracing();

// Initialize services (repositories, caches, external providers, etc.)
let (rate_fetcher_use_case, currency_conversion_use_case) = init_services().await;

// Initialize tracing subscriber with log level from environment
// Initialize scheduler and job
let mut scheduler = init_scheduler(rate_fetcher_use_case.clone()).await;

// Generate OpenAPI specification
let openapi = ApiDoc::openapi();
let openapi_filter = warp::path!("api-docs" / "openapi.json")
.map(move || warp::reply::json(&openapi));

// Serve the API routes
serve_api(rate_fetcher_use_case, currency_conversion_use_case).await;

// Wait for a shutdown signal and stop the scheduler
tokio::signal::ctrl_c().await.unwrap();
scheduler.shutdown().await.unwrap();
println!("Shutting down");
}

/// Initialize environment logging using tracing.
fn init_tracing() {
let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::new(log_level))
.with_span_events(FmtSpan::CLOSE)
.init();
}

/// Initialize the services (repositories, caches, providers, etc.)
async fn init_services() -> (Arc<RateFetcherUseCase>, Arc<CurrencyConversionUseCase>) {
// Retrieve Redis URL from environment
let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set in .env");

Expand All @@ -82,21 +109,60 @@ async fn main() {
3, // max_retries
));

// Initialize InMemoryExchangeRateRepository if needed
let _exchange_rate_repository = Arc::new(InMemoryExchangeRateRepository::new());

// Create CurrencyConversionUseCase
let currency_conversion_use_case = Arc::new(CurrencyConversionUseCase::new(
rate_fetcher_use_case.clone(),
));

// Generate OpenAPI specification
let openapi = ApiDoc::openapi();
(rate_fetcher_use_case, currency_conversion_use_case)
}

/// Initialize the scheduler and add the currency update job.
async fn init_scheduler(rate_fetcher_use_case: Arc<RateFetcherUseCase>) -> JobScheduler {
let scheduler = JobScheduler::new().await.unwrap();

// Serve OpenAPI JSON at `/api-docs/openapi.json`
let openapi_filter =
warp::path!("api-docs" / "openapi.json").map(move || warp::reply::json(&openapi));
// Define a cron job to run every hour (adjust for testing if needed)
let job = Job::new_async("0 * * * * *", move |_uuid, _l| {
let rate_fetcher_use_case = rate_fetcher_use_case.clone();
Box::pin(async move {
run_currency_update_job(rate_fetcher_use_case).await;
})
}).unwrap();

scheduler.add(job).await.unwrap();
scheduler.start().await.unwrap();

scheduler
}

/// Periodic currency update job.
async fn run_currency_update_job(rate_fetcher_use_case: Arc<RateFetcherUseCase>) {
info!("Starting exchange rate update job...");

let from_currency = "USD"; // Example currencies
let to_currency = "EUR";

match rate_fetcher_use_case.fetch_rate(from_currency, to_currency).await {
Some(exchange_rate) => {
info!(
"Successfully fetched exchange rate for {} to {}: {}",
from_currency, to_currency, exchange_rate.rate
);
}
None => {
error!(
"Failed to fetch exchange rate for {} to {}",
from_currency, to_currency
);
}
}
}

/// Set up and run the HTTP API server.
async fn serve_api(
rate_fetcher_use_case: Arc<RateFetcherUseCase>,
currency_conversion_use_case: Arc<CurrencyConversionUseCase>,
) {
// Retrieve server host and port from environment
let server_host = env::var("SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let server_port: u16 = env::var("SERVER_PORT")
Expand All @@ -106,7 +172,6 @@ async fn main() {

// Set up the HTTP server with the API routes and OpenAPI filter
let routes = api(rate_fetcher_use_case, currency_conversion_use_case)
.or(openapi_filter)
.with(warp::trace::request());

info!("Starting server at http://{}:{}", server_host, server_port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,15 @@ impl RateFetcherUseCase {
attempt,
rate
);

// Store the rate in cache and database
self.cache.set_rate(&rate).await.ok();

// Step 3: Save using `save_rate` method which returns Result
if let Err(e) = self.save_rate(rate.clone()).await {
error!("Failed to save rate: {}", e);
}

return Some(rate);
}
Err(e) => {
Expand All @@ -99,6 +104,26 @@ impl RateFetcherUseCase {
"Failed to fetch exchange rate for {} to {} after {} attempts",
from, to, self.max_retries
);

None
}

async fn fetch_with_retry(&self, provider: Arc<dyn ExternalRateProvider>, from: &str, to: &str) -> Option<ExchangeRate> {
let mut attempt = 0;
let max_attempts = 3;

while attempt < max_attempts {
match provider.fetch_rate(from, to).await {
Ok(rate) => return Some(rate),
Err(e) => {
attempt += 1;
let backoff_duration = Duration::from_secs(2_u64.pow(attempt));
error!("Attempt {} failed: {}. Retrying in {} seconds...", attempt, e, backoff_duration.as_secs());
sleep(backoff_duration).await;
}
}
}

None
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
# Use Case: Scheduled Updates Rate Data from Subscriptions via Cron Job
# Use Case: Scheduled Updates of Exchange Rate Data via Cron Job

> [!NOTE]
>
> This use case covers the scheduled update process where a cron job runs at predefined intervals to
> refresh exchange rate data from Bloomberg and Yahoo APIs, ensuring that the data is up to date.
> **Note**
> This use case covers the scheduled update process where a cron job runs at predefined intervals to refresh exchange rate data from Bloomberg and Yahoo APIs, ensuring that the data is up to date.
## Flow

1. **Scheduled Trigger**:
A cron job is set up to run at regular intervals (e.g., hourly). This job triggers the Currency Service to refresh exchange rate data from external providers. The purpose is to keep the system's cache and database populated with the latest rates, reducing the need to make API calls for every user request.
A cron job is set up to run at regular intervals (e.g., hourly). This job triggers the **Currency Service** to refresh exchange rate data from external providers. The purpose is to keep the system's cache and database populated with the latest rates, reducing the need to make API calls for every user request.

2. **Fetch Exchange Rates**:
The Currency Service executes the `load_exchange_rates` use case to fetch exchange rates from both **Bloomberg** and **Yahoo** APIs.
The **Currency Service** executes the `load_exchange_rates` use case to fetch exchange rates from both **Bloomberg** and **Yahoo** APIs.
By using this [Load Exchange Rate Data from Subscriptions](#use-case-load-exchange-rate-data-from-subscriptions) use case, it ensures that the system maintains an up-to-date set of exchange rates.

3. **Handle API Responses**:
- **Success**: If the API calls to **Bloomberg** and **Yahoo** are successful, the service retrieves the updated exchange rate data.
- **Failure and Fallback**: If one API fails or is unavailable, the service will attempt to use the alternative provider. Additionally, it implements retries with exponential backoff to handle transient failures and avoid overwhelming the external providers.

4. **Logging and Monitoring**:
4. **Store Rates in Cache and Database**:
Once the service successfully retrieves the updated rates, it stores them in the **Cache** for quick access and in the **Rate Database** for long-term storage and auditing.

5. **Logging and Monitoring**:
Throughout the process, the service logs the activities and any errors encountered, such as API failures or rate limit exceedances. This logging is crucial for monitoring the health of the system and for debugging issues when they occur.

5. **Completion**:
6. **Completion**:
Once the data has been fetched, processed, and stored, the cron job execution is considered complete. The service waits until the next scheduled interval to perform another update.

## Sequence Diagram
Expand Down Expand Up @@ -57,3 +58,37 @@ note over cron, service
end note
@enduml
```

## Error Handling

### 1. API Failure

- **Scenario**: One of the external APIs (Bloomberg or Yahoo) fails to return data or is unavailable.
- **Handling**:
- The service retries the request with exponential backoff.
- If the API continues to fail, the system falls back to the alternative provider.
- The failure is logged, and an alert may be triggered depending on the severity and duration of the outage.

### 2. Rate Limit Exceeded

- **Scenario**: The number of requests sent to the external API exceeds its rate limits.
- **Handling**:
- The service tracks API usage and implements rate limiting to prevent further requests from exceeding the limits.
- Retries are scheduled based on the API's rate limit reset time.
- Errors are logged for monitoring and further analysis.

### 3. Cache or Database Unavailable

- **Scenario**: The cache or database is unavailable when the service attempts to store the updated exchange rates.
- **Handling**:
- If the **Cache** is unavailable, the service will skip updating the cache and log the error.
- If the **Rate Database** is unavailable, the system attempts to retry at a later time.
- All failures are logged, and alerts may be triggered if necessary.

### 4. Invalid or Inconsistent Data

- **Scenario**: The data retrieved from the external provider is invalid or inconsistent.
- **Handling**:
- The service discards invalid data and logs the issue.
- It attempts to fetch the data again during the next scheduled cron job.
- The system will continue serving cached or previously stored data to clients until valid data is retrieved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use tokio_cron_scheduler::{Job, JobScheduler};
use std::sync::Arc;
use crate::usecases::exchange_rate::fetcher::RateFetcherUseCase;
use tracing::{info, error};

async fn run_currency_update_job(
rate_fetcher_use_case: Arc<RateFetcherUseCase>
) {
info!("Starting exchange rate update job...");

let from_currency = "USD"; // Example currencies
let to_currency = "EUR";

match rate_fetcher_use_case.fetch_rate(from_currency, to_currency).await {
Some(exchange_rate) => {
info!(
"Successfully fetched exchange rate for {} to {}: {}",
from_currency, to_currency, exchange_rate.rate
);
// Additional logic to store in cache/database if needed
}
None => {
error!(
"Failed to fetch exchange rate for {} to {}",
from_currency, to_currency
);
}
}
}
Empty file.
1 change: 1 addition & 0 deletions boundaries/billing/currency/src/usecases/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod currency_conversion;
pub mod exchange_rate;
mod load_exchange_rates_cron;
// pub mod discrepancies;
// pub mod load_exchange_rates;
// pub mod load_exchange_rates_cron;

0 comments on commit 4a478c6

Please sign in to comment.