Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tableref cache #3420

Merged
merged 5 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ pub enum Error {
source: common_meta::error::Error,
location: Location,
},

#[snafu(display("Get null from table cache, key: {}", key))]
TableCacheNotGet { key: String, location: Location },

#[snafu(display("Failed to get table cache, err: {}", err_msg))]
GetTableCache { err_msg: String },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -311,6 +317,7 @@ impl ErrorExt for Error {
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
Error::TableMetadataManager { source, .. } => source.status_code(),
Error::TableCacheNotGet { .. } | Error::GetTableCache { .. } => StatusCode::Internal,
}
}

Expand Down
26 changes: 11 additions & 15 deletions src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,10 @@ impl CachedMetaKvBackendBuilder {
let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL);
let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI);

let cache = Arc::new(
CacheBuilder::new(cache_max_capacity)
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build(),
);
let cache = CacheBuilder::new(cache_max_capacity)
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build();

let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
Expand All @@ -104,7 +102,7 @@ impl CachedMetaKvBackendBuilder {
}
}

pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
pub type CacheBackend = Cache<Vec<u8>, KeyValue>;

/// A wrapper of `MetaKvBackend` with cache support.
///
Expand All @@ -117,7 +115,7 @@ pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
/// TTL and TTI for cache.
pub struct CachedMetaKvBackend {
kv_backend: KvBackendRef,
cache: CacheBackendRef,
cache: CacheBackend,
name: String,
version: AtomicUsize,
}
Expand Down Expand Up @@ -317,12 +315,10 @@ impl CachedMetaKvBackend {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
let cache = Arc::new(
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build(),
);
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();

let name = format!("CachedKvBackend({})", kv_backend.name());
Self {
Expand All @@ -333,7 +329,7 @@ impl CachedMetaKvBackend {
}
}

pub fn cache(&self) -> &CacheBackendRef {
pub fn cache(&self) -> &CacheBackend {
&self.cache
}

Expand Down
91 changes: 67 additions & 24 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
use std::any::Any;
use std::collections::BTreeSet;
use std::sync::{Arc, Weak};
use std::time::Duration;

use async_stream::try_stream;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
Expand All @@ -32,6 +34,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use moka::future::{Cache as AsyncCache, CacheBuilder};
use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
Expand All @@ -40,9 +43,10 @@ use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::TableRef;

use crate::error::Error::{GetTableCache, TableCacheNotGet};
use crate::error::{
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
Result as CatalogResult, TableMetadataManagerSnafu,
Result as CatalogResult, TableCacheNotGetSnafu, TableMetadataManagerSnafu,
};
use crate::information_schema::InformationSchemaProvider;
use crate::CatalogManager;
Expand All @@ -62,6 +66,7 @@ pub struct KvBackendCatalogManager {
table_metadata_manager: TableMetadataManagerRef,
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
table_cache: AsyncCache<String, TableRef>,
}

fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
Expand All @@ -81,13 +86,24 @@ impl CacheInvalidator for KvBackendCatalogManager {
}

async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
let table_cache_key = format_full_table_name(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
self.cache_invalidator
.invalidate_table_name(ctx, table_name)
.await
.await?;
self.table_cache.invalidate(&table_cache_key).await;

Ok(())
}
}

const DEFAULT_CACHED_CATALOG: u64 = 128;
const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
const TABLE_CACHE_MAX_CAPACITY: u64 = 65536;
const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60);

impl KvBackendCatalogManager {
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
Expand All @@ -97,12 +113,16 @@ impl KvBackendCatalogManager {
cache_invalidator,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(DEFAULT_CACHED_CATALOG),
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
information_schema_provider: Arc::new(InformationSchemaProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
},
table_cache: CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
.time_to_live(TABLE_CACHE_TTL)
.time_to_idle(TABLE_CACHE_TTI)
.build(),
})
}

Expand Down Expand Up @@ -217,29 +237,52 @@ impl CatalogManager for KvBackendCatalogManager {
return Ok(Some(table));
}

let key = TableNameKey::new(catalog, schema, table_name);
let Some(table_name_value) = self
.table_metadata_manager
.table_name_manager()
.get(key)
.await
.context(TableMetadataManagerSnafu)?
else {
return Ok(None);
let init = async {
let table_name_key = TableNameKey::new(catalog, schema, table_name);
let Some(table_name_value) = self
.table_metadata_manager
.table_name_manager()
.get(table_name_key)
.await
.context(TableMetadataManagerSnafu)?
else {
return TableCacheNotGetSnafu {
key: table_name_key.to_string(),
}
.fail();
};
let table_id = table_name_value.table_id();

let Some(table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner())
else {
return TableCacheNotGetSnafu {
key: table_name_key.to_string(),
}
.fail();
};
make_table(table_info_value)
};
let table_id = table_name_value.table_id();

let Some(table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
match self
.table_cache
.try_get_with_by_ref(&format_full_table_name(catalog, schema, table_name), init)
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
.await
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner())
else {
return Ok(None);
};
make_table(table_info_value).map(Some)
{
Ok(table) => Ok(Some(table)),
Err(err) => match err.as_ref() {
TableCacheNotGet { .. } => Ok(None),
_ => Err(err),
},
}
.map_err(|err| GetTableCache {
err_msg: err.to_string(),
})
}

async fn tables<'a>(
Expand Down
Loading