diff --git a/lib/si-layer-cache/src/db.rs b/lib/si-layer-cache/src/db.rs index f14a257ee5..376288f715 100644 --- a/lib/si-layer-cache/src/db.rs +++ b/lib/si-layer-cache/src/db.rs @@ -1,12 +1,13 @@ use serde::Deserialize; use si_data_pg::PgPoolConfig; use si_runtime::DedicatedExecutor; -use std::{future::IntoFuture, io, sync::Arc}; +use std::sync::Arc; +use std::{future::IntoFuture, io}; +use tokio::try_join; use serde::{de::DeserializeOwned, Serialize}; use si_data_nats::{NatsClient, NatsConfig}; use si_data_pg::PgPool; -use si_events::{FuncRun, FuncRunLog}; use telemetry::prelude::*; use tokio::sync::mpsc; use tokio_util::{sync::CancellationToken, task::TaskTracker}; @@ -100,95 +101,75 @@ where let (tx, rx) = mpsc::unbounded_channel(); let persister_client = PersisterClient::new(tx); - let cas_cache: Arc>> = LayerCache::new( - cas::CACHE_NAME, - pg_pool.clone(), - cache_config - .clone() - .with_name(cas::CACHE_NAME) - .memory_usable_max_percent(30) - .disk_usable_max_percent(30) - .with_path_join(cas::CACHE_NAME), - compute_executor.clone(), - tracker.clone(), - token.clone(), - ) - .await?; - - let encrypted_secret_cache: Arc>> = LayerCache::new( - encrypted_secret::CACHE_NAME, - pg_pool.clone(), - cache_config - .clone() - .with_name(encrypted_secret::CACHE_NAME) - .memory_usable_max_percent(5) - .disk_usable_max_percent(5) - .with_path_join(encrypted_secret::CACHE_NAME), - compute_executor.clone(), - tracker.clone(), - token.clone(), - ) - .await?; - - let func_run_cache: Arc>> = LayerCache::new( - func_run::CACHE_NAME, - pg_pool.clone(), - cache_config - .clone() - .with_name(func_run::CACHE_NAME) - .memory_usable_max_percent(5) - .disk_usable_max_percent(5) - .with_path_join(func_run::CACHE_NAME), - compute_executor.clone(), - tracker.clone(), - token.clone(), - ) - .await?; - - let func_run_log_cache: Arc>> = LayerCache::new( - func_run_log::CACHE_NAME, - pg_pool.clone(), - cache_config - .clone() - .with_name(func_run_log::CACHE_NAME) - .memory_usable_max_percent(5) - .disk_usable_max_percent(5) - .with_path_join(func_run_log::CACHE_NAME), - compute_executor.clone(), - tracker.clone(), - token.clone(), - ) - .await?; - - let rebase_batch_cache: Arc>> = LayerCache::new( - rebase_batch::CACHE_NAME, - pg_pool.clone(), - cache_config - .clone() - .with_name(rebase_batch::CACHE_NAME) - .memory_usable_max_percent(5) - .disk_usable_max_percent(5) - .with_path_join(rebase_batch::CACHE_NAME), - compute_executor.clone(), - tracker.clone(), - token.clone(), - ) - .await?; - - let snapshot_cache: Arc>> = LayerCache::new( - workspace_snapshot::CACHE_NAME, - pg_pool.clone(), - cache_config - .clone() - .with_name(workspace_snapshot::CACHE_NAME) - .memory_usable_max_percent(50) - .disk_usable_max_percent(50) - .with_path_join(workspace_snapshot::CACHE_NAME), - compute_executor.clone(), - tracker.clone(), - token.clone(), - ) - .await?; + let ( + cas_cache, + encrypted_secret_cache, + func_run_cache, + func_run_log_cache, + rebase_batch_cache, + snapshot_cache, + ) = try_join!( + create_layer_cache( + cas::CACHE_NAME, + pg_pool.clone(), + cache_config.clone(), + compute_executor.clone(), + tracker.clone(), + token.clone(), + 30, + 30 + ), + create_layer_cache( + encrypted_secret::CACHE_NAME, + pg_pool.clone(), + cache_config.clone(), + compute_executor.clone(), + tracker.clone(), + token.clone(), + 5, + 5 + ), + create_layer_cache( + func_run::CACHE_NAME, + pg_pool.clone(), + cache_config.clone(), + compute_executor.clone(), + tracker.clone(), + token.clone(), + 5, + 5 + ), + create_layer_cache( + func_run_log::CACHE_NAME, + pg_pool.clone(), + cache_config.clone(), + compute_executor.clone(), + tracker.clone(), + token.clone(), + 5, + 5 + ), + create_layer_cache( + rebase_batch::CACHE_NAME, + pg_pool.clone(), + cache_config.clone(), + compute_executor.clone(), + tracker.clone(), + token.clone(), + 5, + 5 + ), + create_layer_cache( + workspace_snapshot::CACHE_NAME, + pg_pool.clone(), + cache_config.clone(), + compute_executor.clone(), + tracker.clone(), + token.clone(), + 50, + 50 + ) + )?; let cache_updates_task = CacheUpdatesTask::create( instance_id, @@ -296,6 +277,35 @@ where } } +#[allow(clippy::too_many_arguments)] +async fn create_layer_cache( + name: &'static str, + pg_pool: PgPool, + cache_config: CacheConfig, + compute_executor: DedicatedExecutor, + tracker: TaskTracker, + token: CancellationToken, + memory_percent: u8, + disk_percent: u8, +) -> LayerDbResult>>> +where + T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, +{ + LayerCache::new( + name, + pg_pool, + cache_config + .with_name(name) + .memory_usable_max_percent(memory_percent) + .disk_usable_max_percent(disk_percent) + .with_path_join(name), + compute_executor, + tracker, + token, + ) + .await +} + #[must_use = "graceful shutdown must be spawned on runtime"] #[derive(Debug, Clone)] pub struct LayerDbGracefulShutdown {