Skip to content

Commit

Permalink
Merge pull request #5154 from systeminit/disable_disk_cache
Browse files Browse the repository at this point in the history
chore: disable the foyer disk cache
  • Loading branch information
sprutton1 authored Dec 18, 2024
2 parents f97f5ff + 02f930b commit 687b89c
Showing 1 changed file with 17 additions and 66 deletions.
83 changes: 17 additions & 66 deletions lib/si-layer-cache/src/hybrid_cache.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
use foyer::opentelemetry_0_26::OpenTelemetryMetricsRegistry;
use foyer::{
DirectFsDeviceOptions, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions,
RateLimitPicker, RecoverMode,
};
use std::cmp::max;
use foyer::{Cache as MemCache, CacheBuilder};
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock};
use telemetry::opentelemetry::global;
use telemetry::tracing::{error, info};
use tokio::fs;

use serde::{de::DeserializeOwned, Deserialize, Serialize};

use crate::db::serialize;
use crate::error::LayerDbResult;
use crate::LayerDbError;

const FOYER_DISK_CACHE_MINUMUM: u64 = 1024 * 1024 * 1024; // 1gb
const _FOYER_DISK_CACHE_MINUMUM: u64 = 1024 * 1024 * 1024; // 1gb
const DEFAULT_MEMORY_RESERVED_PERCENT: u8 = 40;
const DEFAULT_MEMORY_USABLE_MAX_PERCENT: u8 = 100;
const DEFAULT_DISK_RESERVED_PERCENT: u8 = 5;
Expand Down Expand Up @@ -47,7 +41,7 @@ pub struct Cache<V>
where
V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
cache: HybridCache<Arc<str>, MaybeDeserialized<V>>,
cache: MemCache<Arc<str>, MaybeDeserialized<V>>,
}

impl<V> Cache<V>
Expand All @@ -70,28 +64,8 @@ where
computed_memory_cache_capacity_bytes.try_into()?
};

fs::create_dir_all(config.disk_path.as_path()).await?;
// Compute total disk which is in use for `disk_path`
let total_disk_bytes = fs4::total_space(config.disk_path.as_path())?;

let disk_cache_capacity_bytes = {
// Subtract reserved disk percentage to determine total usable cache disk
let total_usable_disk_bytes = (total_disk_bytes as f64
* (1.0 - (config.disk_reserved_percent as f64 / 100.0)))
.floor() as u64;
// Compute final usable disk as a percentage of the maximum usable disk
let computed_disk_cache_capacity_bytes = (total_usable_disk_bytes as f64
* (config.disk_usable_max_percent as f64 / 100.0))
.floor() as u64;

// Ensure that the computed value is at least as big as the Foyer minimum
max(computed_disk_cache_capacity_bytes, FOYER_DISK_CACHE_MINUMUM).try_into()?
};

info!(
cache.name = &config.name,
cache.disk.total_bytes = total_disk_bytes,
cache.disk.size_bytes = disk_cache_capacity_bytes,
cache.disk.reserved_percent = config.disk_reserved_percent,
cache.disk.usable_max_percent = config.disk_usable_max_percent,
cache.disk.rate_limit = config.disk_admission_rate_limit,
Expand All @@ -104,44 +78,24 @@ where

let cache_name: &'static str = config.name.leak();

let cache: HybridCache<Arc<str>, MaybeDeserialized<V>> = HybridCacheBuilder::new()
.with_name(cache_name)
.with_metrics_registry(OpenTelemetryMetricsRegistry::new(global::meter(cache_name)))
.memory(memory_cache_capacity_bytes)
.with_weighter(
|_key: &Arc<str>, value: &MaybeDeserialized<V>| match value {
MaybeDeserialized::RawBytes(bytes) => bytes.len(),
MaybeDeserialized::DeserializedValue { size_hint, .. } => *size_hint,
},
)
.storage(Engine::Large)
.with_admission_picker(Arc::new(RateLimitPicker::new(
config.disk_admission_rate_limit,
)))
.with_device_options(
DirectFsDeviceOptions::new(config.disk_path)
.with_capacity(disk_cache_capacity_bytes),
)
.with_large_object_disk_cache_options(
LargeEngineOptions::new()
.with_buffer_pool_size(config.disk_buffer_size)
.with_eviction_pickers(vec![Box::<FifoPicker>::default()])
.with_flushers(config.disk_buffer_flushers)
.with_recover_concurrency(config.disk_recover_concurrency)
.with_indexer_shards(config.disk_indexer_shards)
.with_reclaimers(config.disk_reclaimers),
)
.with_recover_mode(RecoverMode::Quiet)
.build()
.await
.map_err(|e| LayerDbError::Foyer(e.into()))?;
let cache: MemCache<Arc<str>, MaybeDeserialized<V>> =
CacheBuilder::new(memory_cache_capacity_bytes)
.with_name(cache_name)
.with_metrics_registry(OpenTelemetryMetricsRegistry::new(global::meter(cache_name)))
.with_weighter(
|_key: &Arc<str>, value: &MaybeDeserialized<V>| match value {
MaybeDeserialized::RawBytes(bytes) => bytes.len(),
MaybeDeserialized::DeserializedValue { size_hint, .. } => *size_hint,
},
)
.build();

Ok(Self { cache })
}

pub async fn get(&self, key: &str) -> Option<V> {
match self.cache.obtain(key.into()).await {
Ok(Some(entry)) => match entry.value() {
match self.cache.get(key) {
Some(entry) => match entry.value() {
MaybeDeserialized::DeserializedValue { value, .. } => Some(value.clone()),
MaybeDeserialized::RawBytes(bytes) => {
// If we fail to deserialize the raw bytes for some reason, pretend that we never
Expand Down Expand Up @@ -189,10 +143,7 @@ where
}

pub async fn close(&self) -> LayerDbResult<()> {
self.cache
.close()
.await
.map_err(|e| LayerDbError::Foyer(e.into()))?;
self.cache.clear();
Ok(())
}
}
Expand Down

0 comments on commit 687b89c

Please sign in to comment.