Skip to content

Commit

Permalink
feat: start all layer_caches in parallel
Browse files Browse the repository at this point in the history
This is to help improve the speed at which we start teh application when
the disk caches are relatively full.
  • Loading branch information
sprutton1 committed Dec 13, 2024
1 parent 6cf877a commit f35d62c
Showing 1 changed file with 101 additions and 91 deletions.
192 changes: 101 additions & 91 deletions lib/si-layer-cache/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use serde::Deserialize;
use si_data_pg::PgPoolConfig;
use si_runtime::DedicatedExecutor;
use std::{future::IntoFuture, io, sync::Arc};
use std::sync::Arc;
use std::{future::IntoFuture, io};
use tokio::try_join;

use serde::{de::DeserializeOwned, Serialize};
use si_data_nats::{NatsClient, NatsConfig};
use si_data_pg::PgPool;
use si_events::{FuncRun, FuncRunLog};
use telemetry::prelude::*;
use tokio::sync::mpsc;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
Expand Down Expand Up @@ -100,95 +101,75 @@ where
let (tx, rx) = mpsc::unbounded_channel();
let persister_client = PersisterClient::new(tx);

let cas_cache: Arc<LayerCache<Arc<CasValue>>> = LayerCache::new(
cas::CACHE_NAME,
pg_pool.clone(),
cache_config
.clone()
.with_name(cas::CACHE_NAME)
.memory_usable_max_percent(30)
.disk_usable_max_percent(30)
.with_path_join(cas::CACHE_NAME),
compute_executor.clone(),
tracker.clone(),
token.clone(),
)
.await?;

let encrypted_secret_cache: Arc<LayerCache<Arc<EncryptedSecretValue>>> = LayerCache::new(
encrypted_secret::CACHE_NAME,
pg_pool.clone(),
cache_config
.clone()
.with_name(encrypted_secret::CACHE_NAME)
.memory_usable_max_percent(5)
.disk_usable_max_percent(5)
.with_path_join(encrypted_secret::CACHE_NAME),
compute_executor.clone(),
tracker.clone(),
token.clone(),
)
.await?;

let func_run_cache: Arc<LayerCache<Arc<FuncRun>>> = LayerCache::new(
func_run::CACHE_NAME,
pg_pool.clone(),
cache_config
.clone()
.with_name(func_run::CACHE_NAME)
.memory_usable_max_percent(5)
.disk_usable_max_percent(5)
.with_path_join(func_run::CACHE_NAME),
compute_executor.clone(),
tracker.clone(),
token.clone(),
)
.await?;

let func_run_log_cache: Arc<LayerCache<Arc<FuncRunLog>>> = LayerCache::new(
func_run_log::CACHE_NAME,
pg_pool.clone(),
cache_config
.clone()
.with_name(func_run_log::CACHE_NAME)
.memory_usable_max_percent(5)
.disk_usable_max_percent(5)
.with_path_join(func_run_log::CACHE_NAME),
compute_executor.clone(),
tracker.clone(),
token.clone(),
)
.await?;

let rebase_batch_cache: Arc<LayerCache<Arc<RebaseBatchValue>>> = LayerCache::new(
rebase_batch::CACHE_NAME,
pg_pool.clone(),
cache_config
.clone()
.with_name(rebase_batch::CACHE_NAME)
.memory_usable_max_percent(5)
.disk_usable_max_percent(5)
.with_path_join(rebase_batch::CACHE_NAME),
compute_executor.clone(),
tracker.clone(),
token.clone(),
)
.await?;

let snapshot_cache: Arc<LayerCache<Arc<WorkspaceSnapshotValue>>> = LayerCache::new(
workspace_snapshot::CACHE_NAME,
pg_pool.clone(),
cache_config
.clone()
.with_name(workspace_snapshot::CACHE_NAME)
.memory_usable_max_percent(50)
.disk_usable_max_percent(50)
.with_path_join(workspace_snapshot::CACHE_NAME),
compute_executor.clone(),
tracker.clone(),
token.clone(),
)
.await?;
let (
cas_cache,
encrypted_secret_cache,
func_run_cache,
func_run_log_cache,
rebase_batch_cache,
snapshot_cache,
) = try_join!(
create_layer_cache(
cas::CACHE_NAME,
pg_pool.clone(),
cache_config.clone(),
compute_executor.clone(),
tracker.clone(),
token.clone(),
30,
30
),
create_layer_cache(
encrypted_secret::CACHE_NAME,
pg_pool.clone(),
cache_config.clone(),
compute_executor.clone(),
tracker.clone(),
token.clone(),
5,
5
),
create_layer_cache(
func_run::CACHE_NAME,
pg_pool.clone(),
cache_config.clone(),
compute_executor.clone(),
tracker.clone(),
token.clone(),
5,
5
),
create_layer_cache(
func_run_log::CACHE_NAME,
pg_pool.clone(),
cache_config.clone(),
compute_executor.clone(),
tracker.clone(),
token.clone(),
5,
5
),
create_layer_cache(
rebase_batch::CACHE_NAME,
pg_pool.clone(),
cache_config.clone(),
compute_executor.clone(),
tracker.clone(),
token.clone(),
5,
5
),
create_layer_cache(
workspace_snapshot::CACHE_NAME,
pg_pool.clone(),
cache_config.clone(),
compute_executor.clone(),
tracker.clone(),
token.clone(),
50,
50
)
)?;

let cache_updates_task = CacheUpdatesTask::create(
instance_id,
Expand Down Expand Up @@ -296,6 +277,35 @@ where
}
}

#[allow(clippy::too_many_arguments)]
async fn create_layer_cache<T>(
name: &'static str,
pg_pool: PgPool,
cache_config: CacheConfig,
compute_executor: DedicatedExecutor,
tracker: TaskTracker,
token: CancellationToken,
memory_percent: u8,
disk_percent: u8,
) -> LayerDbResult<Arc<LayerCache<Arc<T>>>>
where
T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
LayerCache::new(
name,
pg_pool,
cache_config
.with_name(name)
.memory_usable_max_percent(memory_percent)
.disk_usable_max_percent(disk_percent)
.with_path_join(name),
compute_executor,
tracker,
token,
)
.await
}

#[must_use = "graceful shutdown must be spawned on runtime"]
#[derive(Debug, Clone)]
pub struct LayerDbGracefulShutdown {
Expand Down

0 comments on commit f35d62c

Please sign in to comment.