From 12fc590c11a4a2d04337254bd2e115352de6434c Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 20 Mar 2024 06:53:48 +0000 Subject: [PATCH 1/2] refactor: refactor InvalidateCache Instruction --- src/common/meta/src/instruction.rs | 19 +++++---- src/datanode/src/heartbeat/handler.rs | 4 +- .../handler/invalidate_table_cache.rs | 42 +++++++++---------- src/frontend/src/heartbeat/handler/tests.rs | 11 +++-- src/meta-srv/src/cache_invalidator.rs | 6 +-- .../region_failover/invalidate_cache.rs | 9 ++-- .../src/procedure/region_migration.rs | 9 ++-- 7 files changed, 56 insertions(+), 44 deletions(-) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index d0f5c9a27d1a..4b0055551615 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -124,7 +124,7 @@ impl OpenRegion { } /// The instruction of downgrading leader region. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct DowngradeRegion { /// The [RegionId]. pub region_id: RegionId, @@ -137,7 +137,7 @@ impl Display for DowngradeRegion { } /// Upgrades a follower region to leader region. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct UpgradeRegion { /// The [RegionId]. pub region_id: RegionId, @@ -151,7 +151,14 @@ pub struct UpgradeRegion { pub wait_for_replay_timeout: Option, } -#[derive(Debug, Clone, Serialize, Deserialize, Display)] +#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq, Eq)] +/// The identifier of cache. +pub enum CacheIdent { + TableId(TableId), + TableName(TableName), +} + +#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] pub enum Instruction { /// Opens a region. /// @@ -165,10 +172,8 @@ pub enum Instruction { UpgradeRegion(UpgradeRegion), /// Downgrades a region. DowngradeRegion(DowngradeRegion), - /// Invalidates a specified table cache. - InvalidateTableIdCache(TableId), - /// Invalidates a specified table name index cache. - InvalidateTableNameCache(TableName), + /// Invalidates batch cache. + InvalidateCaches(Vec), } /// The reply of [UpgradeRegion]. diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 5eaa3ad88c93..6b581e89ed83 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -81,9 +81,7 @@ impl RegionHeartbeatResponseHandler { Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| { handler_context.handle_upgrade_region_instruction(upgrade_region) })), - Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => { - InvalidHeartbeatResponseSnafu.fail() - } + Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), } } } diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index dfbd40208984..150f53bb43e1 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -18,9 +18,8 @@ use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::instruction::{CacheIdent, Instruction, InstructionReply, SimpleReply}; use common_telemetry::error; -use futures::future::Either; #[derive(Clone)] pub struct InvalidateTableCacheHandler { @@ -32,8 +31,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { matches!( ctx.incoming_message.as_ref(), - Some((_, Instruction::InvalidateTableIdCache { .. })) - | Some((_, Instruction::InvalidateTableNameCache { .. })) + Some((_, Instruction::InvalidateCaches(_))) ) } @@ -42,28 +40,28 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { let cache_invalidator = self.cache_invalidator.clone(); let (meta, invalidator) = match ctx.incoming_message.take() { - Some((meta, Instruction::InvalidateTableIdCache(table_id))) => ( - meta, - Either::Left(async move { - cache_invalidator - .invalidate_table_id(&Context::default(), table_id) - .await - }), - ), - Some((meta, Instruction::InvalidateTableNameCache(table_name))) => ( - meta, - Either::Right(async move { - cache_invalidator - .invalidate_table_name(&Context::default(), table_name) - .await - }), - ), + Some((meta, Instruction::InvalidateCaches(caches))) => (meta, async move { + for cache in caches { + // Local cache invalidation always succeeds. + match cache { + CacheIdent::TableId(table_id) => { + let _ = cache_invalidator + .invalidate_table_id(&Context::default(), table_id) + .await; + } + CacheIdent::TableName(table_name) => { + let _ = cache_invalidator + .invalidate_table_name(&Context::default(), table_name) + .await; + } + } + } + }), _ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"), }; let _handle = common_runtime::spawn_bg(async move { - // Local cache invalidation always succeeds. - let _ = invalidator.await; + invalidator.await; if let Err(e) = mailbox .send(( diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index ba9e9541439b..f23558cc7e16 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -22,7 +22,7 @@ use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::instruction::{CacheIdent, Instruction, InstructionReply, SimpleReply}; use common_meta::key::table_info::TableInfoKey; use common_meta::key::TableMetaKey; use partition::manager::TableRouteCacheInvalidator; @@ -74,7 +74,7 @@ async fn test_invalidate_table_cache_handler() { handle_instruction( executor.clone(), mailbox.clone(), - Instruction::InvalidateTableIdCache(table_id), + Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]), ) .await; @@ -90,7 +90,12 @@ async fn test_invalidate_table_cache_handler() { .contains_key(&table_info_key.as_raw_key())); // removes a invalid key - handle_instruction(executor, mailbox, Instruction::InvalidateTableIdCache(0)).await; + handle_instruction( + executor, + mailbox, + Instruction::InvalidateCaches(vec![CacheIdent::TableId(0)]), + ) + .await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( diff --git a/src/meta-srv/src/cache_invalidator.rs b/src/meta-srv/src/cache_invalidator.rs index c830a548962a..bb31781b1263 100644 --- a/src/meta-srv/src/cache_invalidator.rs +++ b/src/meta-srv/src/cache_invalidator.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, Context}; use common_meta::error::{self as meta_error, Result as MetaResult}; -use common_meta::instruction::Instruction; +use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::table_name::TableName; use snafu::ResultExt; use table::metadata::TableId; @@ -66,12 +66,12 @@ impl MetasrvCacheInvalidator { #[async_trait] impl CacheInvalidator for MetasrvCacheInvalidator { async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> { - let instruction = Instruction::InvalidateTableIdCache(table_id); + let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]); self.broadcast(ctx, instruction).await } async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> { - let instruction = Instruction::InvalidateTableNameCache(table_name); + let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableName(table_name)]); self.broadcast(ctx, instruction).await } } diff --git a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs index a68092763144..d7231abfc834 100644 --- a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs +++ b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs @@ -14,7 +14,7 @@ use api::v1::meta::MailboxMessage; use async_trait::async_trait; -use common_meta::instruction::Instruction; +use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; @@ -35,7 +35,7 @@ impl InvalidateCache { ctx: &RegionFailoverContext, table_id: TableId, ) -> Result<()> { - let instruction = Instruction::InvalidateTableIdCache(table_id); + let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]); let msg = &MailboxMessage::json_message( "Invalidate Table Cache", @@ -133,7 +133,10 @@ mod tests { assert_eq!( received.payload, Some(Payload::Json( - serde_json::to_string(&Instruction::InvalidateTableIdCache(table_id)).unwrap(), + serde_json::to_string(&Instruction::InvalidateCaches(vec![ + CacheIdent::TableId(table_id) + ])) + .unwrap(), )) ); } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 9e49d266ccaf..b1c5a22f69e5 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -29,7 +29,7 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_error::ext::BoxedError; -use common_meta::instruction::Instruction; +use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; @@ -321,7 +321,7 @@ impl Context { /// Broadcasts the invalidate table cache message. pub async fn invalidate_table_cache(&self) -> Result<()> { let table_id = self.region_id().table_id(); - let instruction = Instruction::InvalidateTableIdCache(table_id); + let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]); let msg = &MailboxMessage::json_message( "Invalidate Table Cache", @@ -582,7 +582,10 @@ mod tests { let msg = resp.mailbox_message.unwrap(); let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap(); - assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024)); + assert_eq!( + instruction, + Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)]) + ); } fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec { From 868ebe8ee87c96ca8e817b03812d849c67dbe7d7 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 20 Mar 2024 08:29:34 +0000 Subject: [PATCH 2/2] refactor: refactor CacheInvalidator --- src/catalog/src/kvbackend/manager.rs | 33 ++++++------- src/common/meta/src/cache_invalidator.rs | 46 ++++++++----------- src/common/meta/src/ddl/alter_table.rs | 11 ++++- .../meta/src/ddl/drop_table/executor.rs | 14 +++--- .../handler/invalidate_table_cache.rs | 23 +++------- src/meta-srv/src/cache_invalidator.rs | 11 +---- src/operator/src/statement/ddl.rs | 26 +++++------ 7 files changed, 70 insertions(+), 94 deletions(-) diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 05c2431a4a42..6a6038f1daea 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -25,13 +25,13 @@ use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context}; use common_meta::error::Result as MetaResult; +use common_meta::instruction::CacheIdent; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_name::TableNameKey; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; -use common_meta::table_name::TableName; use futures_util::stream::BoxStream; use futures_util::{StreamExt, TryStreamExt}; use moka::future::{Cache as AsyncCache, CacheBuilder}; @@ -39,7 +39,6 @@ use moka::sync::Cache; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use snafu::prelude::*; use table::dist_table::DistTable; -use table::metadata::TableId; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::TableRef; @@ -79,24 +78,18 @@ fn make_table(table_info_value: TableInfoValue) -> CatalogResult { #[async_trait::async_trait] impl CacheInvalidator for KvBackendCatalogManager { - async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> { - self.cache_invalidator - .invalidate_table_id(ctx, table_id) - .await - } - - async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> { - let table_cache_key = format_full_table_name( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - ); - self.cache_invalidator - .invalidate_table_name(ctx, table_name) - .await?; - self.table_cache.invalidate(&table_cache_key).await; - - Ok(()) + async fn invalidate(&self, ctx: &Context, caches: Vec) -> MetaResult<()> { + for cache in &caches { + if let CacheIdent::TableName(table_name) = cache { + let table_cache_key = format_full_table_name( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ); + self.table_cache.invalidate(&table_cache_key).await; + } + } + self.cache_invalidator.invalidate(ctx, caches).await } } diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 16b9d3b52733..7eed7f0139c8 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -14,14 +14,12 @@ use std::sync::Arc; -use table::metadata::TableId; - use crate::error::Result; +use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoKey; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteKey; use crate::key::TableMetaKey; -use crate::table_name::TableName; /// KvBackend cache invalidator #[async_trait::async_trait] @@ -46,10 +44,7 @@ pub struct Context { #[async_trait::async_trait] pub trait CacheInvalidator: Send + Sync { - // Invalidates table cache - async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> Result<()>; - - async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> Result<()>; + async fn invalidate(&self, ctx: &Context, caches: Vec) -> Result<()>; } pub type CacheInvalidatorRef = Arc; @@ -58,11 +53,7 @@ pub struct DummyCacheInvalidator; #[async_trait::async_trait] impl CacheInvalidator for DummyCacheInvalidator { - async fn invalidate_table_id(&self, _ctx: &Context, _table_id: TableId) -> Result<()> { - Ok(()) - } - - async fn invalidate_table_name(&self, _ctx: &Context, _table_name: TableName) -> Result<()> { + async fn invalidate(&self, _ctx: &Context, _caches: Vec) -> Result<()> { Ok(()) } } @@ -72,21 +63,22 @@ impl CacheInvalidator for T where T: KvCacheInvalidator, { - async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> Result<()> { - let key: TableNameKey = (&table_name).into(); - - self.invalidate_key(&key.as_raw_key()).await; - - Ok(()) - } - - async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> Result<()> { - let key = TableInfoKey::new(table_id); - self.invalidate_key(&key.as_raw_key()).await; - - let key = &TableRouteKey { table_id }; - self.invalidate_key(&key.as_raw_key()).await; - + async fn invalidate(&self, _ctx: &Context, caches: Vec) -> Result<()> { + for cache in caches { + match cache { + CacheIdent::TableId(table_id) => { + let key = TableInfoKey::new(table_id); + self.invalidate_key(&key.as_raw_key()).await; + + let key = &TableRouteKey { table_id }; + self.invalidate_key(&key.as_raw_key()).await; + } + CacheIdent::TableName(table_name) => { + let key: TableNameKey = (&table_name).into(); + self.invalidate_key(&key.as_raw_key()).await + } + } + } Ok(()) } } diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index adffa5258602..e554256a1b39 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -43,6 +43,7 @@ use crate::cache_invalidator::Context; use crate::ddl::utils::add_peer_context_if_needed; use crate::ddl::DdlContext; use crate::error::{self, ConvertAlterTableRequestSnafu, Error, InvalidProtoMsgSnafu, Result}; +use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::DeserializedValueWithBytes; @@ -333,11 +334,17 @@ impl AlterTableProcedure { if matches!(alter_kind, Kind::RenameTable { .. }) { cache_invalidator - .invalidate_table_name(&Context::default(), self.data.table_ref().into()) + .invalidate( + &Context::default(), + vec![CacheIdent::TableName(self.data.table_ref().into())], + ) .await?; } else { cache_invalidator - .invalidate_table_id(&Context::default(), self.data.table_id()) + .invalidate( + &Context::default(), + vec![CacheIdent::TableId(self.data.table_id())], + ) .await?; }; diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 591c1cafb3c6..e7e1992b337b 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -28,6 +28,7 @@ use crate::cache_invalidator::Context; use crate::ddl::utils::add_peer_context_if_needed; use crate::ddl::DdlContext; use crate::error::{self, Result}; +use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; @@ -120,13 +121,14 @@ impl DropTableExecutor { subject: Some("Invalidate table cache by dropping table".to_string()), }; - // TODO(weny): merge these two invalidation instructions. cache_invalidator - .invalidate_table_name(&ctx, self.table.table_ref().into()) - .await?; - - cache_invalidator - .invalidate_table_id(&ctx, self.table_id) + .invalidate( + &ctx, + vec![ + CacheIdent::TableName(self.table.table_ref().into()), + CacheIdent::TableId(self.table_id), + ], + ) .await?; Ok(()) diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 150f53bb43e1..48abd8fadda4 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -18,7 +18,7 @@ use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::instruction::{CacheIdent, Instruction, InstructionReply, SimpleReply}; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_telemetry::error; #[derive(Clone)] @@ -41,27 +41,16 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { let (meta, invalidator) = match ctx.incoming_message.take() { Some((meta, Instruction::InvalidateCaches(caches))) => (meta, async move { - for cache in caches { - // Local cache invalidation always succeeds. - match cache { - CacheIdent::TableId(table_id) => { - let _ = cache_invalidator - .invalidate_table_id(&Context::default(), table_id) - .await; - } - CacheIdent::TableName(table_name) => { - let _ = cache_invalidator - .invalidate_table_name(&Context::default(), table_name) - .await; - } - } - } + cache_invalidator + .invalidate(&Context::default(), caches) + .await }), _ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"), }; let _handle = common_runtime::spawn_bg(async move { - invalidator.await; + // Local cache invalidation always succeeds. + let _ = invalidator.await; if let Err(e) = mailbox .send(( diff --git a/src/meta-srv/src/cache_invalidator.rs b/src/meta-srv/src/cache_invalidator.rs index bb31781b1263..f4085b6bf9fd 100644 --- a/src/meta-srv/src/cache_invalidator.rs +++ b/src/meta-srv/src/cache_invalidator.rs @@ -18,9 +18,7 @@ use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, Context}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::instruction::{CacheIdent, Instruction}; -use common_meta::table_name::TableName; use snafu::ResultExt; -use table::metadata::TableId; use crate::metasrv::MetasrvInfo; use crate::service::mailbox::{BroadcastChannel, MailboxRef}; @@ -65,13 +63,8 @@ impl MetasrvCacheInvalidator { #[async_trait] impl CacheInvalidator for MetasrvCacheInvalidator { - async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> { - let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]); - self.broadcast(ctx, instruction).await - } - - async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> { - let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableName(table_name)]); + async fn invalidate(&self, ctx: &Context, caches: Vec) -> MetaResult<()> { + let instruction = Instruction::InvalidateCaches(caches); self.broadcast(ctx, instruction).await } } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index c338f306b3b8..6500b99c2987 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -24,6 +24,7 @@ use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_meta::cache_invalidator::Context; use common_meta::ddl::ExecutorContext; +use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::key::NAME_PATTERN; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; @@ -329,12 +330,13 @@ impl StatementExecutor { // Invalidates local cache ASAP. self.cache_invalidator - .invalidate_table_id(&Context::default(), table_id) - .await - .context(error::InvalidateTableCacheSnafu)?; - - self.cache_invalidator - .invalidate_table_name(&Context::default(), table_name.clone()) + .invalidate( + &Context::default(), + vec![ + CacheIdent::TableId(table_id), + CacheIdent::TableName(table_name.clone()), + ], + ) .await .context(error::InvalidateTableCacheSnafu)?; @@ -459,14 +461,12 @@ impl StatementExecutor { // Invalidates local cache ASAP. self.cache_invalidator - .invalidate_table_id(&Context::default(), table_id) - .await - .context(error::InvalidateTableCacheSnafu)?; - - self.cache_invalidator - .invalidate_table_name( + .invalidate( &Context::default(), - TableName::new(catalog_name, schema_name, table_name), + vec![ + CacheIdent::TableId(table_id), + CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)), + ], ) .await .context(error::InvalidateTableCacheSnafu)?;