Skip to content

Commit

Permalink
refactor: cache invalidator (#3611)
Browse files Browse the repository at this point in the history
* chore: remove some alias

* refactor: cache invalidator
  • Loading branch information
fengjiachun authored Mar 29, 2024
1 parent ffbb132 commit f49cd0c
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 73 deletions.
86 changes: 51 additions & 35 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use common_catalog::consts::{
};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
use common_meta::cache_invalidator::{CacheInvalidator, Context, MultiCacheInvalidator};
use common_meta::instruction::CacheIdent;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
Expand All @@ -44,8 +43,8 @@ use table::TableRef;

use crate::error::Error::{GetTableCache, TableCacheNotGet};
use crate::error::{
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
Result as CatalogResult, TableCacheNotGetSnafu, TableMetadataManagerSnafu,
InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, Result,
TableCacheNotGetSnafu, TableMetadataManagerSnafu,
};
use crate::information_schema::InformationSchemaProvider;
use crate::CatalogManager;
Expand All @@ -57,29 +56,31 @@ use crate::CatalogManager;
/// comes from `SystemCatalog`, which is static and read-only.
#[derive(Clone)]
pub struct KvBackendCatalogManager {
// TODO(LFC): Maybe use a real implementation for Standalone mode.
// Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend
// is implemented by RaftEngine. Maybe we need a cache for it?
cache_invalidator: CacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
table_metadata_manager: TableMetadataManagerRef,
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
table_cache: AsyncCache<String, TableRef>,
}

fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
let table_info = table_info_value
.table_info
.try_into()
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?;
Ok(DistTable::table(Arc::new(table_info)))
struct TableCacheInvalidator {
table_cache: AsyncCache<String, TableRef>,
}

impl TableCacheInvalidator {
pub fn new(table_cache: AsyncCache<String, TableRef>) -> Self {
Self { table_cache }
}
}

#[async_trait::async_trait]
impl CacheInvalidator for KvBackendCatalogManager {
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> MetaResult<()> {
for cache in &caches {
impl CacheInvalidator for TableCacheInvalidator {
async fn invalidate(
&self,
_ctx: &Context,
caches: Vec<CacheIdent>,
) -> common_meta::error::Result<()> {
for cache in caches {
if let CacheIdent::TableName(table_name) = cache {
let table_cache_key = format_full_table_name(
&table_name.catalog_name,
Expand All @@ -89,7 +90,7 @@ impl CacheInvalidator for KvBackendCatalogManager {
self.table_cache.invalidate(&table_cache_key).await;
}
}
self.cache_invalidator.invalidate(ctx, caches).await
Ok(())
}
}

Expand All @@ -99,11 +100,21 @@ const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60);

impl KvBackendCatalogManager {
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
pub async fn new(
backend: KvBackendRef,
multi_cache_invalidator: Arc<MultiCacheInvalidator>,
) -> Arc<Self> {
let table_cache: AsyncCache<String, TableRef> = CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
.time_to_live(TABLE_CACHE_TTL)
.time_to_idle(TABLE_CACHE_TTI)
.build();
multi_cache_invalidator
.add_invalidator(Arc::new(TableCacheInvalidator::new(table_cache.clone())))
.await;

Arc::new_cyclic(|me| Self {
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
cache_invalidator,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
Expand All @@ -112,10 +123,7 @@ impl KvBackendCatalogManager {
me.clone(),
)),
},
table_cache: CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
.time_to_live(TABLE_CACHE_TTL)
.time_to_idle(TABLE_CACHE_TTI)
.build(),
table_cache,
})
}

Expand All @@ -134,7 +142,7 @@ impl CatalogManager for KvBackendCatalogManager {
self
}

async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
async fn catalog_names(&self) -> Result<Vec<String>> {
let stream = self
.table_metadata_manager
.catalog_manager()
Expand All @@ -149,7 +157,7 @@ impl CatalogManager for KvBackendCatalogManager {
Ok(keys)
}

async fn schema_names(&self, catalog: &str) -> CatalogResult<Vec<String>> {
async fn schema_names(&self, catalog: &str) -> Result<Vec<String>> {
let stream = self
.table_metadata_manager
.schema_manager()
Expand All @@ -165,7 +173,7 @@ impl CatalogManager for KvBackendCatalogManager {
Ok(keys.into_iter().collect())
}

async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult<Vec<String>> {
async fn table_names(&self, catalog: &str, schema: &str) -> Result<Vec<String>> {
let stream = self
.table_metadata_manager
.table_name_manager()
Expand All @@ -183,15 +191,15 @@ impl CatalogManager for KvBackendCatalogManager {
Ok(tables.into_iter().collect())
}

async fn catalog_exists(&self, catalog: &str) -> CatalogResult<bool> {
async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
self.table_metadata_manager
.catalog_manager()
.exists(CatalogNameKey::new(catalog))
.await
.context(TableMetadataManagerSnafu)
}

async fn schema_exists(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool> {
if self.system_catalog.schema_exist(schema) {
return Ok(true);
}
Expand All @@ -203,7 +211,7 @@ impl CatalogManager for KvBackendCatalogManager {
.context(TableMetadataManagerSnafu)
}

async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
if self.system_catalog.table_exist(schema, table) {
return Ok(true);
}
Expand All @@ -222,7 +230,7 @@ impl CatalogManager for KvBackendCatalogManager {
catalog: &str,
schema: &str,
table_name: &str,
) -> CatalogResult<Option<TableRef>> {
) -> Result<Option<TableRef>> {
if let Some(table) = self.system_catalog.table(catalog, schema, table_name) {
return Ok(Some(table));
}
Expand Down Expand Up @@ -256,7 +264,7 @@ impl CatalogManager for KvBackendCatalogManager {
}
.fail();
};
make_table(table_info_value)
build_table(table_info_value)
};

match self
Expand All @@ -279,7 +287,7 @@ impl CatalogManager for KvBackendCatalogManager {
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, CatalogResult<TableRef>> {
) -> BoxStream<'a, Result<TableRef>> {
let sys_tables = try_stream!({
// System tables
let sys_table_names = self.system_catalog.table_names(schema);
Expand All @@ -303,7 +311,7 @@ impl CatalogManager for KvBackendCatalogManager {
while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids
.into_iter()
.collect::<Result<Vec<_>, _>>()
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(BoxedError::new)
.context(ListTablesSnafu { catalog, schema })?;

Expand All @@ -315,7 +323,7 @@ impl CatalogManager for KvBackendCatalogManager {
.context(TableMetadataManagerSnafu)?;

for table_info_value in table_info_values.into_values() {
yield make_table(table_info_value)?;
yield build_table(table_info_value)?;
}
}
});
Expand All @@ -324,6 +332,14 @@ impl CatalogManager for KvBackendCatalogManager {
}
}

fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
let table_info = table_info_value
.table_info
.try_into()
.context(InvalidTableInfoInCatalogSnafu)?;
Ok(DistTable::table(Arc::new(table_info)))
}

// TODO: This struct can hold a static map of all system tables when
// the upper layer (e.g., procedure) can inform the catalog manager
// a new catalog is created.
Expand Down
7 changes: 5 additions & 2 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use catalog::kvbackend::{
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::ext::ErrorExt;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
Expand Down Expand Up @@ -252,9 +253,11 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {

let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());

let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
cached_meta_backend.clone(),
]));
let catalog_list =
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend);
KvBackendCatalogManager::new(cached_meta_backend.clone(), multi_cache_invalidator).await;
let plugins: Plugins = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,
Expand Down
20 changes: 14 additions & 6 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use async_trait::async_trait;
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager};
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::logging;
Expand Down Expand Up @@ -247,13 +248,20 @@ impl StartCommand {
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

let catalog_manager =
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend.clone());
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
cached_meta_backend.clone(),
]));
let catalog_manager = KvBackendCatalogManager::new(
cached_meta_backend.clone(),
multi_cache_invalidator.clone(),
)
.await;

let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(catalog_manager.clone())),
Arc::new(InvalidateTableCacheHandler::new(
multi_cache_invalidator.clone(),
)),
]);

let heartbeat_task = HeartbeatTask::new(
Expand All @@ -264,12 +272,12 @@ impl StartCommand {

let mut instance = FrontendBuilder::new(
cached_meta_backend.clone(),
catalog_manager.clone(),
catalog_manager,
Arc::new(DatanodeClients::default()),
meta_client,
)
.with_cache_invalidator(catalog_manager.clone())
.with_plugin(plugins.clone())
.with_cache_invalidator(multi_cache_invalidator)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::ProcedureExecutorRef;
Expand Down Expand Up @@ -400,8 +400,9 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
let catalog_manager =
KvBackendCatalogManager::new(kv_backend.clone(), Arc::new(DummyCacheInvalidator));
KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await;

let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
Expand Down Expand Up @@ -432,19 +433,18 @@ impl StartCommand {
table_metadata_manager,
procedure_manager.clone(),
datanode_manager.clone(),
catalog_manager.clone(),
multi_cache_invalidator,
table_meta_allocator,
)
.await?;

let mut frontend = FrontendBuilder::new(
kv_backend,
catalog_manager.clone(),
catalog_manager,
datanode_manager,
ddl_task_executor,
)
.with_plugin(fe_plugins.clone())
.with_cache_invalidator(catalog_manager)
.try_build()
.await
.context(StartFrontendSnafu)?;
Expand Down
30 changes: 30 additions & 0 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::sync::Arc;

use tokio::sync::RwLock;

use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoKey;
Expand Down Expand Up @@ -58,6 +60,34 @@ impl CacheInvalidator for DummyCacheInvalidator {
}
}

#[derive(Default)]
pub struct MultiCacheInvalidator {
invalidators: RwLock<Vec<CacheInvalidatorRef>>,
}

impl MultiCacheInvalidator {
pub fn with_invalidators(invalidators: Vec<CacheInvalidatorRef>) -> Self {
Self {
invalidators: RwLock::new(invalidators),
}
}

pub async fn add_invalidator(&self, invalidator: CacheInvalidatorRef) {
self.invalidators.write().await.push(invalidator);
}
}

#[async_trait::async_trait]
impl CacheInvalidator for MultiCacheInvalidator {
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> Result<()> {
let invalidators = self.invalidators.read().await;
for invalidator in invalidators.iter() {
invalidator.invalidate(ctx, caches.clone()).await?;
}
Ok(())
}
}

#[async_trait::async_trait]
impl<T> CacheInvalidator for T
where
Expand Down
Loading

0 comments on commit f49cd0c

Please sign in to comment.