diff --git a/components/engine_traits/src/region_cache_engine.rs b/components/engine_traits/src/region_cache_engine.rs index 0c01a5f21c2..85ba49b0654 100644 --- a/components/engine_traits/src/region_cache_engine.rs +++ b/components/engine_traits/src/region_cache_engine.rs @@ -55,6 +55,7 @@ pub enum EvictReason { Merge, Disabled, ApplySnapshot, + Flashback, Manual, } diff --git a/components/engine_traits/src/write_batch.rs b/components/engine_traits/src/write_batch.rs index a53b2faf9aa..e1160852f93 100644 --- a/components/engine_traits/src/write_batch.rs +++ b/components/engine_traits/src/write_batch.rs @@ -1,6 +1,8 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use crate::{errors::Result, options::WriteOptions, CacheRegion}; +use kvproto::metapb; + +use crate::{errors::Result, options::WriteOptions}; /// Engines that can create write batches pub trait WriteBatchExt: Sized { @@ -139,5 +141,5 @@ pub trait WriteBatch: Mutable { /// It declares that the following consecutive write will be within this /// region. - fn prepare_for_region(&mut self, _: CacheRegion) {} + fn prepare_for_region(&mut self, _: &metapb::Region) {} } diff --git a/components/hybrid_engine/src/engine.rs b/components/hybrid_engine/src/engine.rs index de4faf64534..eb2f4afef74 100644 --- a/components/hybrid_engine/src/engine.rs +++ b/components/hybrid_engine/src/engine.rs @@ -4,6 +4,8 @@ use engine_traits::{ CacheRegion, FailedReason, KvEngine, Mutable, Peekable, ReadOptions, RegionCacheEngine, Result, SnapshotContext, SnapshotMiscExt, SyncMutable, WriteBatch, WriteBatchExt, }; +use keys::DATA_PREFIX_KEY; +use kvproto::metapb::{self, RegionEpoch}; use crate::{ metrics::{ @@ -126,8 +128,17 @@ where F: FnOnce(&mut ::WriteBatch) -> Result<()>, { let mut batch = self.write_batch(); - if let Some(region) = self.region_cache_engine.get_region_for_key(key) { - batch.prepare_for_region(region); + if let Some(cached_region) = self.region_cache_engine.get_region_for_key(key) { + // CacheRegion does not contains enough information for Region so the transfer + // is not accurate but this method is not called in production code. + let mut region = metapb::Region::default(); + region.set_id(cached_region.id); + region.set_start_key(cached_region.start[DATA_PREFIX_KEY.len()..].to_vec()); + region.set_end_key(cached_region.end[DATA_PREFIX_KEY.len()..].to_vec()); + let mut epoch = RegionEpoch::default(); + epoch.version = cached_region.epoch_version; + region.set_region_epoch(epoch); + batch.prepare_for_region(®ion); } f(&mut batch)?; let _ = batch.write()?; diff --git a/components/hybrid_engine/src/misc.rs b/components/hybrid_engine/src/misc.rs index 1201b07773d..d89a31d8f7a 100644 --- a/components/hybrid_engine/src/misc.rs +++ b/components/hybrid_engine/src/misc.rs @@ -186,13 +186,13 @@ pub mod tests { let cache_r3 = CacheRegion::from_region(&r3); let mut write_batch = hybrid_engine.write_batch(); - write_batch.prepare_for_region(cache_r1.clone()); + write_batch.prepare_for_region(&r1); write_batch.put(b"zk02", b"val").unwrap(); write_batch.put(b"zk03", b"val").unwrap(); - write_batch.prepare_for_region(cache_r2.clone()); + write_batch.prepare_for_region(&r2); write_batch.put(b"zk22", b"val").unwrap(); write_batch.put(b"zk23", b"val").unwrap(); - write_batch.prepare_for_region(cache_r3.clone()); + write_batch.prepare_for_region(&r3); write_batch.put(b"zk42", b"val").unwrap(); write_batch.put(b"zk43", b"val").unwrap(); write_batch.write().unwrap(); diff --git a/components/hybrid_engine/src/observer/load_eviction.rs b/components/hybrid_engine/src/observer/load_eviction.rs index 0f703ab7fa8..2aab6491461 100644 --- a/components/hybrid_engine/src/observer/load_eviction.rs +++ b/components/hybrid_engine/src/observer/load_eviction.rs @@ -174,6 +174,21 @@ impl QueryObserver for LoadEvictionObserver { } impl AdminObserver for LoadEvictionObserver { + fn pre_exec_admin( + &self, + ctx: &mut ObserverContext<'_>, + req: &kvproto::raft_cmdpb::AdminRequest, + _: u64, + _: u64, + ) -> bool { + if req.cmd_type == AdminCmdType::PrepareFlashback { + let cache_region = CacheRegion::from_region(ctx.region()); + self.evict_region(cache_region, EvictReason::Flashback); + } + + false + } + fn post_exec_admin( &self, ctx: &mut ObserverContext<'_>, diff --git a/components/hybrid_engine/src/observer/write_batch.rs b/components/hybrid_engine/src/observer/write_batch.rs index 0ee0136d099..efb62f1c2cf 100644 --- a/components/hybrid_engine/src/observer/write_batch.rs +++ b/components/hybrid_engine/src/observer/write_batch.rs @@ -1,7 +1,8 @@ // Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0. -use engine_traits::{is_data_cf, CacheRegion, KvEngine, Mutable, Result, WriteBatch, WriteOptions}; +use engine_traits::{is_data_cf, KvEngine, Mutable, Result, WriteBatch, WriteOptions}; use in_memory_engine::{RegionCacheMemoryEngine, RegionCacheWriteBatch}; +use kvproto::metapb; use raftstore::coprocessor::{ dispatcher::BoxWriteBatchObserver, Coprocessor, CoprocessorHost, ObservableWriteBatch, WriteBatchObserver, @@ -101,7 +102,7 @@ impl WriteBatch for HybridObservableWriteBatch { fn rollback_to_save_point(&mut self) -> Result<()> { self.cache_write_batch.rollback_to_save_point() } - fn prepare_for_region(&mut self, region: CacheRegion) { + fn prepare_for_region(&mut self, region: &metapb::Region) { self.cache_write_batch.prepare_for_region(region); } } diff --git a/components/hybrid_engine/src/snapshot.rs b/components/hybrid_engine/src/snapshot.rs index e81f6a52b65..46032e561d2 100644 --- a/components/hybrid_engine/src/snapshot.rs +++ b/components/hybrid_engine/src/snapshot.rs @@ -192,7 +192,7 @@ mod tests { assert!(!iter.seek_to_first().unwrap()); } let mut write_batch = hybrid_engine.write_batch(); - write_batch.prepare_for_region(cache_region.clone()); + write_batch.prepare_for_region(®ion); write_batch .cache_write_batch .set_region_cache_status(RegionCacheStatus::Cached); diff --git a/components/hybrid_engine/src/write_batch.rs b/components/hybrid_engine/src/write_batch.rs index 16163336c85..7c2d08b5755 100644 --- a/components/hybrid_engine/src/write_batch.rs +++ b/components/hybrid_engine/src/write_batch.rs @@ -3,9 +3,10 @@ use std::sync::atomic::{AtomicBool, Ordering}; use engine_traits::{ - is_data_cf, CacheRegion, KvEngine, Mutable, Result, WriteBatch, WriteBatchExt, WriteOptions, + is_data_cf, KvEngine, Mutable, Result, WriteBatch, WriteBatchExt, WriteOptions, }; use in_memory_engine::{RegionCacheMemoryEngine, RegionCacheWriteBatch}; +use kvproto::metapb; use crate::engine::HybridEngine; @@ -100,7 +101,7 @@ impl WriteBatch for HybridEngineWriteBatch { self.cache_write_batch.merge(other.cache_write_batch) } - fn prepare_for_region(&mut self, r: CacheRegion) { + fn prepare_for_region(&mut self, r: &metapb::Region) { self.cache_write_batch.prepare_for_region(r); } } @@ -175,7 +176,7 @@ mod tests { .unwrap(); let cache_region = CacheRegion::from_region(®ion); let mut write_batch = hybrid_engine.write_batch(); - write_batch.prepare_for_region(cache_region.clone()); + write_batch.prepare_for_region(®ion); write_batch .cache_write_batch .set_region_cache_status(RegionCacheStatus::Cached); @@ -248,10 +249,10 @@ mod tests { .unwrap(); let mut wb = hybrid_engine.write_batch(); - wb.prepare_for_region(cache_region1.clone()); + wb.prepare_for_region(®ion1); wb.put(b"zk05", b"val").unwrap(); wb.put(b"zk08", b"val2").unwrap(); - wb.prepare_for_region(cache_region2.clone()); + wb.prepare_for_region(®ion2); wb.put(b"zk25", b"val3").unwrap(); wb.put(b"zk27", b"val4").unwrap(); wb.write().unwrap(); @@ -276,9 +277,9 @@ mod tests { let mut wb = hybrid_engine.write_batch(); // all ranges overlapped with it will be evicted - wb.prepare_for_region(cache_region1.clone()); + wb.prepare_for_region(®ion1); wb.delete_range(b"zk05", b"zk08").unwrap(); - wb.prepare_for_region(cache_region2.clone()); + wb.prepare_for_region(®ion2); wb.delete_range(b"zk20", b"zk21").unwrap(); wb.write().unwrap(); diff --git a/components/hybrid_engine/tests/failpoints/test_write_batch.rs b/components/hybrid_engine/tests/failpoints/test_write_batch.rs index 1ffb12159a2..e1a07d094f4 100644 --- a/components/hybrid_engine/tests/failpoints/test_write_batch.rs +++ b/components/hybrid_engine/tests/failpoints/test_write_batch.rs @@ -47,17 +47,17 @@ fn test_sequence_number_unique() { // while we block the batch loading of region3, it's new KVs are still directly // written into the skiplist. let mut wb = hybrid_engine.write_batch(); - wb.prepare_for_region(CacheRegion::from_region(&r)); + wb.prepare_for_region(&r); wb.put(b"zk", b"val").unwrap(); // seq 3 wb.delete(b"zk").unwrap(); // seq 4 wb.put(b"zk2", b"val").unwrap(); // seq 5 - wb.prepare_for_region(CacheRegion::from_region(&r2)); + wb.prepare_for_region(&r2); wb.put(b"zk6", b"val").unwrap(); // seq 6 wb.delete(b"zk5").unwrap(); // seq 7 wb.put(b"zk5", b"val2").unwrap(); // seq 8 - wb.prepare_for_region(CacheRegion::from_region(&r3)); + wb.prepare_for_region(&r3); wb.put(b"zk8", b"val").unwrap(); // seq 9 wb.put(b"zk7", b"val2").unwrap(); // seq 10 diff --git a/components/in_memory_engine/src/background.rs b/components/in_memory_engine/src/background.rs index edfa9997a23..35db969f3f8 100644 --- a/components/in_memory_engine/src/background.rs +++ b/components/in_memory_engine/src/background.rs @@ -1311,6 +1311,7 @@ impl Runnable for BackgroundRunner { Some(&rocks_engine), &scheduler, false, + r.is_in_flashback, ); }), }, @@ -2745,8 +2746,8 @@ pub mod tests { .region_manager() .load_region(region2.clone()) .unwrap(); - engine.prepare_for_apply(®ion1); - engine.prepare_for_apply(®ion2); + engine.prepare_for_apply(®ion1, false); + engine.prepare_for_apply(®ion2, false); // concurrent write to rocksdb, but the key will not be loaded in the memory // engine @@ -2923,7 +2924,7 @@ pub mod tests { }, ); let cache_region = CacheRegion::from_region(®ion); - engine.prepare_for_apply(&cache_region); + engine.prepare_for_apply(&cache_region, false); // Wait for the range to be loaded. test_util::eventually( @@ -3037,7 +3038,7 @@ pub mod tests { for r in [®ion1, ®ion2] { let cache_region = CacheRegion::from_region(r); engine.load_region(cache_region.clone()).unwrap(); - engine.prepare_for_apply(&cache_region); + engine.prepare_for_apply(&cache_region, false); } // ensure all ranges are finshed @@ -3114,7 +3115,7 @@ pub mod tests { for r in [®ion1, ®ion2, ®ion3] { let cache_region = CacheRegion::from_region(r); engine.load_region(cache_region.clone()).unwrap(); - engine.prepare_for_apply(&cache_region); + engine.prepare_for_apply(&cache_region, false); } // ensure all ranges are finshed @@ -3164,7 +3165,7 @@ pub mod tests { rocks_engine.put_cf(CF_WRITE, &key, b"val").unwrap(); // After loading range1, the memory usage should be 140*6=840 engine.load_region(cache_region1.clone()).unwrap(); - engine.prepare_for_apply(&cache_region1); + engine.prepare_for_apply(&cache_region1, false); let region2 = new_region(2, construct_region_key(3), construct_region_key(5)); let key = construct_key(3, 10); @@ -3187,7 +3188,7 @@ pub mod tests { let cache_region2 = CacheRegion::from_region(®ion2); engine.load_region(cache_region2.clone()).unwrap(); - engine.prepare_for_apply(&cache_region2); + engine.prepare_for_apply(&cache_region2, false); // ensure all ranges are finshed test_util::eventually(Duration::from_millis(100), Duration::from_secs(2), || { diff --git a/components/in_memory_engine/src/cross_check.rs b/components/in_memory_engine/src/cross_check.rs index 2a8ed254585..778a1b00ae5 100644 --- a/components/in_memory_engine/src/cross_check.rs +++ b/components/in_memory_engine/src/cross_check.rs @@ -1054,7 +1054,7 @@ mod tests { { let mut wb = engine.write_batch(); - wb.prepare_for_region(cache_region.clone()); + wb.prepare_for_region(®ion); let mut disk_wb = rocks_engine.write_batch(); prepare_data(&mut wb, &mut disk_wb); diff --git a/components/in_memory_engine/src/engine.rs b/components/in_memory_engine/src/engine.rs index 85718646cc3..583b0341ac0 100644 --- a/components/in_memory_engine/src/engine.rs +++ b/components/in_memory_engine/src/engine.rs @@ -17,6 +17,7 @@ use engine_traits::{ CacheRegion, EvictReason, FailedReason, IterOptions, Iterable, KvEngine, RegionCacheEngine, RegionCacheEngineExt, RegionEvent, Result, CF_DEFAULT, CF_LOCK, CF_WRITE, DATA_CFS, }; +use fail::fail_point; use kvproto::metapb::Region; use pd_client::PdClient; use raftstore::{coprocessor::RegionInfoProvider, store::CasualRouter}; @@ -217,6 +218,7 @@ impl RegionCacheMemoryEngineCore { rocks_engine: Option<&RocksEngine>, scheduler: &Scheduler, should_set_in_written: bool, + in_flashback: bool, ) -> RegionCacheStatus { if !self.region_manager.is_active() { return RegionCacheStatus::NotInCache; @@ -251,11 +253,11 @@ impl RegionCacheMemoryEngineCore { return RegionCacheStatus::NotInCache; }; - if region_meta.get_region().epoch_version < region.epoch_version { + if region_meta.get_region().epoch_version < region.epoch_version || in_flashback { let meta = regions_map.remove_region(region.id); assert_eq!(meta.get_state(), RegionState::Pending); // try update outdated region. - if meta.can_be_updated_to(region) { + if !in_flashback && meta.can_be_updated_to(region) { info!("ime update outdated pending region"; "current_meta" => ?meta, "new_region" => ?region); @@ -264,6 +266,7 @@ impl RegionCacheMemoryEngineCore { regions_map.load_region(region.clone()).unwrap(); region_meta = regions_map.mut_region_meta(region.id).unwrap(); } else { + fail_point!("ime_fail_to_schedule_load"); info!("ime remove outdated pending region"; "pending_region" => ?meta.get_region(), "new_region" => ?region); @@ -429,12 +432,17 @@ impl RegionCacheMemoryEngine { // It handles the pending region and check whether to buffer write for this // region. - pub(crate) fn prepare_for_apply(&self, region: &CacheRegion) -> RegionCacheStatus { + pub(crate) fn prepare_for_apply( + &self, + region: &CacheRegion, + in_flashback: bool, + ) -> RegionCacheStatus { self.core.prepare_for_apply( region, self.rocks_engine.as_ref(), self.bg_work_manager.background_scheduler(), true, + in_flashback, ) } @@ -662,7 +670,7 @@ pub mod tests { let mut region2 = new_region(1, b"k1", b"k5"); region2.mut_region_epoch().version = 2; - engine.prepare_for_apply(&CacheRegion::from_region(®ion2)); + engine.prepare_for_apply(&CacheRegion::from_region(®ion2), false); assert_eq!( count_region(engine.core.region_manager(), |m| { matches!(m.get_state(), Pending | Loading) @@ -676,7 +684,7 @@ pub mod tests { let mut region2 = new_region(1, b"k2", b"k5"); region2.mut_region_epoch().version = 2; - engine.prepare_for_apply(&CacheRegion::from_region(®ion2)); + engine.prepare_for_apply(&CacheRegion::from_region(®ion2), false); assert_eq!( count_region(engine.core.region_manager(), |m| { matches!(m.get_state(), Pending | Loading) @@ -793,7 +801,7 @@ pub mod tests { assert!(engine.core.region_manager.is_active()); let mut wb = engine.write_batch(); - wb.prepare_for_region(cache_region.clone()); + wb.prepare_for_region(®ion); wb.put(b"zk00", b"v1").unwrap(); wb.put(b"zk10", b"v1").unwrap(); wb.put(b"zk20", b"v1").unwrap(); @@ -810,7 +818,7 @@ pub mod tests { ); let mut wb = engine.write_batch(); - wb.prepare_for_region(cache_region.clone()); + wb.prepare_for_region(®ion); wb.put(b"zk10", b"v2").unwrap(); wb.set_sequence_number(10).unwrap(); @@ -877,7 +885,7 @@ pub mod tests { engine.new_region(region.clone()); let mut wb = engine.write_batch(); - wb.prepare_for_region(cache_region.clone()); + wb.prepare_for_region(®ion); wb.set_sequence_number(10).unwrap(); wb.put(b"a", b"val1").unwrap(); wb.put(b"b", b"val2").unwrap(); diff --git a/components/in_memory_engine/src/metrics.rs b/components/in_memory_engine/src/metrics.rs index 5e86d8c29a7..a99ffcab5bb 100644 --- a/components/in_memory_engine/src/metrics.rs +++ b/components/in_memory_engine/src/metrics.rs @@ -41,6 +41,7 @@ make_auto_flush_static_metric! { memory_limit_reached, disabled, apply_snapshot, + flashback, manual, } @@ -215,6 +216,9 @@ pub(crate) fn observe_eviction_duration(secs: f64, evict_reason: EvictReason) { EvictReason::ApplySnapshot => IN_MEMORY_ENGINE_EVICTION_DURATION_HISTOGRAM_STATIC .apply_snapshot .observe(secs), + EvictReason::Flashback => IN_MEMORY_ENGINE_EVICTION_DURATION_HISTOGRAM_STATIC + .flashback + .observe(secs), EvictReason::Manual => IN_MEMORY_ENGINE_EVICTION_DURATION_HISTOGRAM_STATIC .manual .observe(secs), diff --git a/components/in_memory_engine/src/read.rs b/components/in_memory_engine/src/read.rs index 077ceca8daf..44f2d76ef3a 100644 --- a/components/in_memory_engine/src/read.rs +++ b/components/in_memory_engine/src/read.rs @@ -2048,7 +2048,7 @@ mod tests { engine.new_region(region.clone()); let mut wb = engine.write_batch(); - wb.prepare_for_region(range.clone()); + wb.prepare_for_region(®ion); put_entries(&mut wb); wb.set_sequence_number(wb_sequence).unwrap(); wb.write().unwrap(); @@ -2110,8 +2110,7 @@ mod tests { let mut wb = engine.write_batch(); let region = new_region(1, b"", b"z"); - let cache_region = CacheRegion::from_region(®ion); - wb.prepare_for_region(cache_region); + wb.prepare_for_region(®ion); wb.put(b"zb", b"f").unwrap(); wb.set_sequence_number(200).unwrap(); diff --git a/components/in_memory_engine/src/write_batch.rs b/components/in_memory_engine/src/write_batch.rs index 5903059376b..3c0a15c7c2f 100644 --- a/components/in_memory_engine/src/write_batch.rs +++ b/components/in_memory_engine/src/write_batch.rs @@ -11,6 +11,7 @@ use engine_traits::{ CacheRegion, EvictReason, MiscExt, Mutable, RegionCacheEngine, Result, WriteBatch, WriteBatchExt, WriteOptions, CF_DEFAULT, }; +use kvproto::metapb; use tikv_util::{box_err, config::ReadableSize, error, info, time::Instant, warn}; use crate::{ @@ -470,14 +471,18 @@ impl WriteBatch for RegionCacheWriteBatch { Ok(()) } - fn prepare_for_region(&mut self, region: CacheRegion) { + fn prepare_for_region(&mut self, region: &metapb::Region) { let time = Instant::now(); // record last region for clearing region in written flags. self.record_last_written_region(); + let cached_region = CacheRegion::from_region(region); // TODO: remote range. - self.set_region_cache_status(self.engine.prepare_for_apply(®ion)); - self.current_region = Some(region); + self.set_region_cache_status( + self.engine + .prepare_for_apply(&cached_region, region.is_in_flashback), + ); + self.current_region = Some(cached_region); self.current_region_evicted = false; self.prepare_for_write_duration += time.saturating_elapsed(); } @@ -530,9 +535,10 @@ mod tests { use engine_traits::{ CacheRegion, FailedReason, Peekable, RegionCacheEngine, WriteBatch, DATA_CFS, }; + use kvproto::metapb::{Region, RegionEpoch}; use online_config::{ConfigChange, ConfigManager, ConfigValue}; use tempfile::Builder; - use tikv_util::config::VersionTrack; + use tikv_util::{config::VersionTrack, store::new_peer}; use super::*; use crate::{ @@ -565,7 +571,7 @@ mod tests { engine.core.region_manager().set_safe_point(r.id, 10); let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(CacheRegion::from_region(&r)); + wb.prepare_for_region(&r); wb.put(b"aaa", b"bbb").unwrap(); wb.set_sequence_number(1).unwrap(); assert_eq!(wb.write().unwrap(), 1); @@ -585,7 +591,7 @@ mod tests { engine.core.region_manager().set_safe_point(r.id, 10); let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(CacheRegion::from_region(&r)); + wb.prepare_for_region(&r); wb.put(b"aaa", b"bbb").unwrap(); wb.set_save_point(); wb.put(b"aaa", b"ccc").unwrap(); @@ -613,12 +619,12 @@ mod tests { engine.core.region_manager().set_safe_point(r.id, 10); let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(CacheRegion::from_region(&r)); + wb.prepare_for_region(&r); wb.put(b"zaaa", b"bbb").unwrap(); wb.set_sequence_number(1).unwrap(); _ = wb.write(); wb.clear(); - wb.prepare_for_region(CacheRegion::from_region(&r)); + wb.prepare_for_region(&r); wb.put(b"zbbb", b"ccc").unwrap(); wb.delete(b"zaaa").unwrap(); wb.set_sequence_number(2).unwrap(); @@ -655,7 +661,7 @@ mod tests { let mut r1_new = new_region(1, b"k01".to_vec(), b"k06".to_vec()); r1_new.mut_region_epoch().version = 2; let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(CacheRegion::from_region(&r1_new)); + wb.prepare_for_region(&r1_new); assert!( engine .core @@ -683,7 +689,7 @@ mod tests { r1_new.mut_region_epoch().version = 2; let cache_r1_new = CacheRegion::from_region(&r1_new); let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(cache_r1_new.clone()); + wb.prepare_for_region(&r1_new); { let regions_map = engine.core.region_manager().regions_map.read(); let region_meta = regions_map.region_meta(1).unwrap(); @@ -715,7 +721,7 @@ mod tests { assert_eq!(skip_engine.node_count(), 2); let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(cache_r1_new.clone()); + wb.prepare_for_region(&r1_new); wb.put(b"zk01", b"val2").unwrap(); wb.set_sequence_number(6).unwrap(); wb.write().unwrap(); @@ -731,7 +737,7 @@ mod tests { .set_state(RegionState::PendingEvict); } let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(cache_r1_new.clone()); + wb.prepare_for_region(&r1_new); wb.put(b"zk02", b"val2").unwrap(); wb.set_sequence_number(7).unwrap(); wb.write().unwrap(); @@ -780,16 +786,16 @@ mod tests { let val1: Vec = vec![0; 150]; let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(CacheRegion::from_region(®ions[0])); + wb.prepare_for_region(®ions[0]); // memory required: // 4(key) + 8(sequencen number) + 150(value) + 16(2 Arc = vec![3; 150]; @@ -816,7 +822,7 @@ mod tests { // The memory will fail to acquire let val4: Vec = vec![3; 300]; - wb.prepare_for_region(CacheRegion::from_region(®ions[4])); + wb.prepare_for_region(®ions[4]); wb.put(b"zk41", &val4).unwrap(); // We should have allocated 740 as calculated above @@ -896,7 +902,7 @@ mod tests { let val1: Vec = (0..150).map(|_| 0).collect(); let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(CacheRegion::from_region(&r2)); + wb.prepare_for_region(&r2); wb.put(b"zkk11", &val1).unwrap(); let snap1 = engine .snapshot(CacheRegion::from_region(&r1), 1000, 1000) @@ -914,7 +920,7 @@ mod tests { assert!(snap1.get_value(b"zkk00").unwrap().is_none()); let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(CacheRegion::from_region(&r1)); + wb.prepare_for_region(&r1); // put should trigger the evict and it won't write into range cache wb.put(b"zkk01", &val1).unwrap(); wb.write_impl(1000).unwrap(); @@ -956,14 +962,22 @@ mod tests { )); engine.set_disk_engine(rocks_engine.clone()); - let r1 = CacheRegion::new(1, 0, b"k00", b"k10"); + let r1 = CacheRegion::new(1, 0, b"zk00", b"zk10"); engine.core().region_manager().load_region(r1).unwrap(); // load a region with a newer epoch and small range, should trigger replace. - let r_new = CacheRegion::new(1, 1, b"k00", b"k05"); + let mut r_new = Region::default(); + r_new.set_id(1); + let mut epoch = RegionEpoch::new(); + epoch.version = 1; + r_new.set_region_epoch(epoch); + r_new.set_peers(vec![new_peer(1, 1)].into()); + r_new.set_start_key(b"k00".to_vec()); + r_new.set_end_key(b"k05".to_vec()); let mut wb = RegionCacheWriteBatch::from(&engine); - wb.prepare_for_region(r_new.clone()); + wb.prepare_for_region(&r_new); + let r_new = CacheRegion::from_region(&r_new); { let regions_map = engine.core.region_manager.regions_map().read(); diff --git a/components/in_memory_engine/tests/failpoints/test_memory_engine.rs b/components/in_memory_engine/tests/failpoints/test_memory_engine.rs index c9e7dcd5bd7..8e4f62e2d2c 100644 --- a/components/in_memory_engine/tests/failpoints/test_memory_engine.rs +++ b/components/in_memory_engine/tests/failpoints/test_memory_engine.rs @@ -164,7 +164,6 @@ fn test_clean_up_tombstone() { let config = Arc::new(VersionTrack::new(InMemoryEngineConfig::config_for_test())); let engine = RegionCacheMemoryEngine::new(InMemoryEngineContext::new_for_tests(config.clone())); let region = new_region(1, b"".to_vec(), b"z".to_vec()); - let cache_region = CacheRegion::from_region(®ion); let (tx, rx) = sync_channel(0); fail::cfg_callback("clean_lock_tombstone_done", move || { tx.send(true).unwrap(); @@ -173,7 +172,7 @@ fn test_clean_up_tombstone() { engine.new_region(region.clone()); let mut wb = engine.write_batch(); - wb.prepare_for_region(cache_region.clone()); + wb.prepare_for_region(®ion); wb.put_cf("lock", b"k", b"val").unwrap(); wb.put_cf("lock", b"k1", b"val").unwrap(); wb.put_cf("lock", b"k2", b"val").unwrap(); @@ -185,7 +184,7 @@ fn test_clean_up_tombstone() { wb.write().unwrap(); let mut wb = engine.write_batch(); - wb.prepare_for_region(cache_region.clone()); + wb.prepare_for_region(®ion); wb.put_cf("lock", b"k", b"val").unwrap(); // seq 120 wb.put_cf("lock", b"k1", b"val").unwrap(); // seq 121 wb.put_cf("lock", b"k2", b"val").unwrap(); // seq 122 @@ -278,9 +277,9 @@ fn test_evict_with_loading_range() { let mut wb = engine.write_batch(); // prepare range to trigger loading - wb.prepare_for_region(cache_region1.clone()); - wb.prepare_for_region(cache_region2.clone()); - wb.prepare_for_region(cache_region3.clone()); + wb.prepare_for_region(&r1); + wb.prepare_for_region(&r2); + wb.prepare_for_region(&r3); wb.set_sequence_number(10).unwrap(); wb.write().unwrap(); @@ -335,12 +334,12 @@ fn test_cached_write_batch_cleared_when_load_failed() { let mut wb = engine.write_batch(); // range1 starts to load - wb.prepare_for_region(CacheRegion::from_region(&r1)); + wb.prepare_for_region(&r1); rx.recv_timeout(Duration::from_secs(5)).unwrap(); wb.put(b"zk05", b"val").unwrap(); wb.put(b"zk06", b"val").unwrap(); - wb.prepare_for_region(CacheRegion::from_region(&r2)); + wb.prepare_for_region(&r2); wb.put(b"zk25", b"val").unwrap(); wb.set_sequence_number(100).unwrap(); wb.write().unwrap(); @@ -414,25 +413,25 @@ fn test_concurrency_between_delete_range_and_write_to_memory() { let engine_clone = engine.clone(); let (range_prepared_tx, range_prepared_rx) = sync_channel(0); - let region1_clone = cache_region1.clone(); - let region2_clone = cache_region2.clone(); - let region3_clone = cache_region3.clone(); + let region1_clone = r1.clone(); + let region2_clone = r2.clone(); + let region3_clone = r3.clone(); let handle = std::thread::spawn(move || { let mut wb = engine_clone.write_batch(); - wb.prepare_for_region(region1_clone); + wb.prepare_for_region(®ion1_clone); wb.put_cf(CF_LOCK, b"zk02", b"val").unwrap(); wb.put_cf(CF_LOCK, b"zk03", b"val").unwrap(); wb.put_cf(CF_LOCK, b"zk04", b"val").unwrap(); wb.set_sequence_number(100).unwrap(); let mut wb2 = engine_clone.write_batch(); - wb2.prepare_for_region(region2_clone); + wb2.prepare_for_region(®ion2_clone); wb.put_cf(CF_LOCK, b"zk22", b"val").unwrap(); wb.put_cf(CF_LOCK, b"zk23", b"val").unwrap(); wb2.set_sequence_number(200).unwrap(); let mut wb3 = engine_clone.write_batch(); - wb3.prepare_for_region(region3_clone); + wb3.prepare_for_region(®ion3_clone); wb3.set_sequence_number(300).unwrap(); range_prepared_tx.send(true).unwrap(); @@ -561,7 +560,7 @@ fn test_double_delete_range_schedule() { let mut wb = engine.write_batch(); // prepare range to trigger loading - wb.prepare_for_region(CacheRegion::from_region(&r3)); + wb.prepare_for_region(&r3); wb.set_sequence_number(10).unwrap(); wb.write().unwrap(); @@ -626,7 +625,7 @@ fn test_load_with_gc() { let range = CacheRegion::from_region(®ion); engine.load_region(range.clone()).unwrap(); let mut wb = engine.write_batch(); - wb.prepare_for_region(range.clone()); + wb.prepare_for_region(®ion); wb.set_sequence_number(100).unwrap(); wb.write().unwrap(); @@ -697,7 +696,7 @@ fn test_region_split_before_batch_loading_start() { // use write batch to trigger scheduling pending region loading task. let mut wb = engine.write_batch(); - wb.prepare_for_region(cache_region.clone()); + wb.prepare_for_region(®ion); wb.set_sequence_number(10).unwrap(); wb.put(b"zk00", b"val2").unwrap(); wb.put(b"zk10", b"val2").unwrap(); @@ -766,7 +765,7 @@ fn test_cb_on_eviction() { engine.new_region(region.clone()); let mut wb = engine.write_batch(); - wb.prepare_for_region(cache_region.clone()); + wb.prepare_for_region(®ion); wb.set_sequence_number(10).unwrap(); wb.put(b"a", b"val1").unwrap(); wb.put(b"b", b"val2").unwrap(); diff --git a/components/raftstore/src/coprocessor/read_write/write_batch.rs b/components/raftstore/src/coprocessor/read_write/write_batch.rs index 28bed821825..bc1a334c7b3 100644 --- a/components/raftstore/src/coprocessor/read_write/write_batch.rs +++ b/components/raftstore/src/coprocessor/read_write/write_batch.rs @@ -2,7 +2,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; -use engine_traits::{CacheRegion, Mutable, Result, WriteBatch, WriteOptions, CF_DEFAULT}; +use engine_traits::{Mutable, Result, WriteBatch, WriteOptions, CF_DEFAULT}; +use kvproto::metapb; pub trait WriteBatchObserver: Send { fn create_observable_write_batch(&self) -> Box; @@ -113,7 +114,7 @@ impl WriteBatch for WriteBatchWrapper { unimplemented!("WriteBatchWrapper does not support merge") } - fn prepare_for_region(&mut self, region: CacheRegion) { + fn prepare_for_region(&mut self, region: &metapb::Region) { if let Some(w) = self.observable_write_batch.as_mut() { w.prepare_for_region(region) } diff --git a/components/raftstore/src/coprocessor/region_info_accessor.rs b/components/raftstore/src/coprocessor/region_info_accessor.rs index 8c541fad19e..760cf318d95 100644 --- a/components/raftstore/src/coprocessor/region_info_accessor.rs +++ b/components/raftstore/src/coprocessor/region_info_accessor.rs @@ -610,6 +610,7 @@ impl RegionCollector { .filter(|ri| { ri.role == StateRole::Leader && ac.region_stat.cop_detail.iterated_count() != 0 + && !ri.region.is_in_flashback }) .map(|ri| (ri, ac)) }) @@ -1163,6 +1164,8 @@ impl RegionInfoProvider for MockRegionInfoProvider { #[cfg(test)] mod tests { + use kvproto::metapb::RegionEpoch; + use pd_client::RegionWriteCfCopDetail; use txn_types::Key; use super::*; @@ -1808,4 +1811,101 @@ mod tests { ) .unwrap(); } + + #[test] + fn test_get_top_regions() { + let mut collector = + RegionCollector::new(Arc::new(RwLock::new(HashSet::default())), Box::new(|| 10)); + + let test_set = vec![ + // mvcc amp 5000 + (1, b"".to_vec(), b"k10".to_vec(), 1_000_000, 0, 200 - 1), + // mvcc amp 5 + ( + 2, + b"k10".to_vec(), + b"k20".to_vec(), + 1_000_000, + 0, + 2_000_000 - 1, + ), + // mvcc amp 50, filtered by mvcc amp + (3, b"k20".to_vec(), b"k30".to_vec(), 0, 100_000, 2_000 - 1), + // mvcc amp 100 + ( + 4, + b"k30".to_vec(), + b"k40".to_vec(), + 100_000, + 100_000, + 2_000 - 1, + ), + // mvcc amp 1000, filtered by next + prev + (5, b"k40".to_vec(), b"k50".to_vec(), 1000, 0, 0), + ]; + + let mut region1 = None; + let mut region4 = None; + for (id, start, end, next, prev, processed_keys) in test_set { + let mut region = Region::default(); + region.set_id(id); + region.set_start_key(start); + region.set_end_key(end); + let mut epoch = RegionEpoch::new(); + epoch.set_version(10); + region.set_region_epoch(epoch); + if id == 1 { + region1 = Some(region.clone()); + } else if id == 4 { + region4 = Some(region.clone()); + } + + collector.handle_raftstore_event(RaftStoreEvent::CreateRegion { + region: region.clone(), + role: StateRole::Leader, + }); + let mut stat = RegionStat::default(); + stat.cop_detail = RegionWriteCfCopDetail::new(next, prev, processed_keys); + collector.handle_raftstore_event(RaftStoreEvent::UpdateRegionActivity { + region, + activity: RegionActivity { region_stat: stat }, + }); + } + + let (tx, rx) = mpsc::channel(); + let cb = Box::new(move |regions| { + tx.send(regions).unwrap(); + }); + + collector.handle_get_top_regions(4, cb.clone()); + let regions = rx + .recv() + .unwrap() + .into_iter() + .map(|(r, _)| r.id) + .collect::>(); + assert_eq!(regions, vec![1, 4, 3]); + + let mut region1 = region1.unwrap(); + region1.set_is_in_flashback(true); + collector.handle_raftstore_event(RaftStoreEvent::UpdateRegion { + region: region1, + role: StateRole::Leader, + }); + + collector.handle_raftstore_event(RaftStoreEvent::RoleChange { + region: region4.unwrap(), + role: StateRole::Follower, + initialized: true, + }); + + collector.handle_get_top_regions(4, cb); + let regions = rx + .recv() + .unwrap() + .into_iter() + .map(|(r, _)| r.id) + .collect::>(); + assert_eq!(vec![3], regions); + } } diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index d3b5d0c5812..bceb69acb44 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -29,9 +29,9 @@ use batch_system::{ use collections::{HashMap, HashMapEntry, HashSet}; use crossbeam::channel::{TryRecvError, TrySendError}; use engine_traits::{ - util::SequenceNumber, CacheRegion, DeleteStrategy, KvEngine, Mutable, PerfContext, - PerfContextKind, RaftEngine, RaftEngineReadOnly, Range as EngineRange, Snapshot, SstMetaInfo, - WriteBatch, WriteOptions, ALL_CFS, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, + util::SequenceNumber, DeleteStrategy, KvEngine, Mutable, PerfContext, PerfContextKind, + RaftEngine, RaftEngineReadOnly, Range as EngineRange, Snapshot, SstMetaInfo, WriteBatch, + WriteOptions, ALL_CFS, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, }; use fail::fail_point; use health_controller::types::LatencyInspector; @@ -543,8 +543,7 @@ where pub fn prepare_for(&mut self, delegate: &mut ApplyDelegate) { self.applied_batch .push_batch(&delegate.observe_info, delegate.region.get_id()); - let cache_region = CacheRegion::from_region(&delegate.region); - self.kv_wb.prepare_for_region(cache_region); + self.kv_wb.prepare_for_region(&delegate.region); } /// Commits all changes have done for delegate. `persistent` indicates diff --git a/tests/failpoints/cases/test_region_cache_engine.rs b/tests/failpoints/cases/test_region_cache_engine.rs index 024d24b132a..37b89b354f2 100644 --- a/tests/failpoints/cases/test_region_cache_engine.rs +++ b/tests/failpoints/cases/test_region_cache_engine.rs @@ -16,7 +16,7 @@ use keys::{data_key, DATA_MAX_KEY, DATA_MIN_KEY}; use kvproto::{ import_sstpb::SstMeta, kvrpcpb::Context, - raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request}, + raft_cmdpb::{AdminCmdType, CmdType, RaftCmdRequest, RaftRequestHeader, Request}, }; use protobuf::Message; use tempfile::tempdir; @@ -799,3 +799,81 @@ fn test_delete_range() { delete_range(false); delete_range(true); } + +#[test] +fn test_evict_on_flashback() { + let mut cluster = new_server_cluster_with_hybrid_engine_with_no_region_cache(0, 1); + cluster.cfg.raft_store.apply_batch_system.pool_size = 1; + cluster.run(); + + let table = ProductTable::new(); + + must_copr_load_data(&mut cluster, &table, 1); + must_copr_load_data(&mut cluster, &table, 2); + must_copr_load_data(&mut cluster, &table, 3); + + let r = cluster.get_region(b""); + { + let region_cache_engine = cluster.sim.rl().get_region_cache_engine(1); + region_cache_engine + .core() + .region_manager() + .load_region(CacheRegion::from_region(&r)) + .unwrap(); + } + + cluster.must_send_wait_flashback_msg(r.id, AdminCmdType::PrepareFlashback); + cluster.must_send_wait_flashback_msg(r.id, AdminCmdType::FinishFlashback); + + let (tx, rx) = unbounded(); + fail::cfg_callback("on_region_cache_iterator_seek", move || { + tx.send(true).unwrap(); + }) + .unwrap(); + + must_copr_point_get(&mut cluster, &table, 1); + rx.try_recv().unwrap_err(); + + must_copr_point_get(&mut cluster, &table, 2); + rx.try_recv().unwrap_err(); + + must_copr_point_get(&mut cluster, &table, 3); + rx.try_recv().unwrap_err(); +} + +#[test] +fn test_load_during_flashback() { + fail::cfg("background_check_load_pending_interval", "return(1000)").unwrap(); + let mut cluster = new_server_cluster_with_hybrid_engine_with_no_region_cache(0, 1); + cluster.cfg.raft_store.apply_batch_system.pool_size = 1; + cluster.run(); + + let table = ProductTable::new(); + + must_copr_load_data(&mut cluster, &table, 1); + let r = cluster.get_region(b""); + cluster.must_send_wait_flashback_msg(r.id, AdminCmdType::PrepareFlashback); + let (tx, rx) = unbounded(); + fail::cfg_callback("ime_fail_to_schedule_load", move || { + tx.send(true).unwrap(); + }) + .unwrap(); + { + let region_cache_engine = cluster.sim.rl().get_region_cache_engine(1); + region_cache_engine + .core() + .region_manager() + .load_region(CacheRegion::from_region(&r)) + .unwrap(); + } + rx.recv_timeout(Duration::from_secs(5)).unwrap(); + { + let region_cache_engine = cluster.sim.rl().get_region_cache_engine(1); + assert!( + !region_cache_engine + .core() + .region_manager() + .contains_region(r.id) + ); + } +}