diff --git a/src/catalog/src/kvbackend/client.rs b/src/catalog/src/kvbackend/client.rs index e3351c6d270f..0cec4817cc7a 100644 --- a/src/catalog/src/kvbackend/client.rs +++ b/src/catalog/src/kvbackend/client.rs @@ -15,7 +15,7 @@ use std::any::Any; use std::fmt::Debug; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use std::usize; @@ -190,8 +190,14 @@ impl KvBackend for CachedMetaKvBackend { async fn get(&self, key: &[u8]) -> Result> { let _timer = METRIC_CATALOG_KV_GET.start_timer(); + let pre_version = Arc::new(Mutex::new(None)); + let init = async { + let version_clone = pre_version.clone(); let _timer = METRIC_CATALOG_KV_REMOTE_GET.start_timer(); + + version_clone.lock().unwrap().replace(self.version()); + self.kv_backend.get(key).await.map(|val| { val.with_context(|| CacheNotGetSnafu { key: String::from_utf8_lossy(key), @@ -202,7 +208,7 @@ impl KvBackend for CachedMetaKvBackend { // currently moka doesn't have `optionally_try_get_with_by_ref` // TODO(fys): change to moka method when available // https://github.com/moka-rs/moka/issues/254 - match self.cache.try_get_with_by_ref(key, init).await { + let ret = match self.cache.try_get_with_by_ref(key, init).await { Ok(val) => Ok(Some(val)), Err(e) => match e.as_ref() { CacheNotGet { .. } => Ok(None), @@ -211,7 +217,18 @@ impl KvBackend for CachedMetaKvBackend { } .map_err(|e| GetKvCache { err_msg: e.to_string(), - }) + }); + + if pre_version + .lock() + .unwrap() + .as_ref() + .map_or(false, |v| !self.validate_version(*v)) + { + self.cache.invalidate(key).await; + } + + ret } }