Skip to content

Commit

Permalink
fix: add catalog schema cache
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 20, 2024
1 parent 440cd00 commit 07db1ca
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use std::collections::BTreeSet;
use std::sync::{Arc, Weak};

use async_stream::try_stream;
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
Expand All @@ -30,6 +32,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
use table::dist_table::DistTable;
Expand Down Expand Up @@ -84,6 +87,8 @@ impl CacheInvalidator for KvBackendCatalogManager {
}
}

const DEFAULT_CACHED_CATALOG: u64 = 128;

impl KvBackendCatalogManager {
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
Expand All @@ -92,9 +97,9 @@ impl KvBackendCatalogManager {
cache_invalidator,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(DEFAULT_CACHED_CATALOG),
information_schema_provider: Arc::new(InformationSchemaProvider::new(
// The catalog name is not used in system_catalog, so let it empty
String::default(),
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
},
Expand Down Expand Up @@ -296,6 +301,7 @@ impl CatalogManager for KvBackendCatalogManager {
#[derive(Clone)]
struct SystemCatalog {
catalog_manager: Weak<KvBackendCatalogManager>,
catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
information_schema_provider: Arc<InformationSchemaProvider>,
}

Expand Down Expand Up @@ -331,7 +337,12 @@ impl SystemCatalog {
fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Option<TableRef> {
if schema == INFORMATION_SCHEMA_NAME {
let information_schema_provider =
InformationSchemaProvider::new(catalog.to_string(), self.catalog_manager.clone());
self.catalog_cache.get_with_by_ref(catalog, move || {
Arc::new(InformationSchemaProvider::new(
catalog.to_string(),
self.catalog_manager.clone(),
))
});
information_schema_provider.table(table_name)
} else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
Some(NumbersTable::table(NUMBERS_TABLE_ID))
Expand Down

0 comments on commit 07db1ca

Please sign in to comment.