Skip to content

Commit

Permalink
In-memory engine: evict on flashback (tikv#17664)
Browse files Browse the repository at this point in the history
ref tikv#16141, close tikv#17626

evict on flashback

Signed-off-by: SpadeA-Tang <tangchenjie1210@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
SpadeA-Tang and ti-chi-bot[bot] authored Oct 21, 2024
1 parent 0b1124b commit f520bdf
Show file tree
Hide file tree
Showing 20 changed files with 321 additions and 87 deletions.
1 change: 1 addition & 0 deletions components/engine_traits/src/region_cache_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum EvictReason {
Merge,
Disabled,
ApplySnapshot,
Flashback,
Manual,
}

Expand Down
6 changes: 4 additions & 2 deletions components/engine_traits/src/write_batch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{errors::Result, options::WriteOptions, CacheRegion};
use kvproto::metapb;

use crate::{errors::Result, options::WriteOptions};

/// Engines that can create write batches
pub trait WriteBatchExt: Sized {
Expand Down Expand Up @@ -139,5 +141,5 @@ pub trait WriteBatch: Mutable {

/// It declares that the following consecutive write will be within this
/// region.
fn prepare_for_region(&mut self, _: CacheRegion) {}
fn prepare_for_region(&mut self, _: &metapb::Region) {}
}
15 changes: 13 additions & 2 deletions components/hybrid_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use engine_traits::{
CacheRegion, FailedReason, KvEngine, Mutable, Peekable, ReadOptions, RegionCacheEngine, Result,
SnapshotContext, SnapshotMiscExt, SyncMutable, WriteBatch, WriteBatchExt,
};
use keys::DATA_PREFIX_KEY;
use kvproto::metapb::{self, RegionEpoch};

use crate::{
metrics::{
Expand Down Expand Up @@ -126,8 +128,17 @@ where
F: FnOnce(&mut <Self as WriteBatchExt>::WriteBatch) -> Result<()>,
{
let mut batch = self.write_batch();
if let Some(region) = self.region_cache_engine.get_region_for_key(key) {
batch.prepare_for_region(region);
if let Some(cached_region) = self.region_cache_engine.get_region_for_key(key) {
// CacheRegion does not contains enough information for Region so the transfer
// is not accurate but this method is not called in production code.
let mut region = metapb::Region::default();
region.set_id(cached_region.id);
region.set_start_key(cached_region.start[DATA_PREFIX_KEY.len()..].to_vec());
region.set_end_key(cached_region.end[DATA_PREFIX_KEY.len()..].to_vec());
let mut epoch = RegionEpoch::default();
epoch.version = cached_region.epoch_version;
region.set_region_epoch(epoch);
batch.prepare_for_region(&region);
}
f(&mut batch)?;
let _ = batch.write()?;
Expand Down
6 changes: 3 additions & 3 deletions components/hybrid_engine/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ pub mod tests {
let cache_r3 = CacheRegion::from_region(&r3);

let mut write_batch = hybrid_engine.write_batch();
write_batch.prepare_for_region(cache_r1.clone());
write_batch.prepare_for_region(&r1);
write_batch.put(b"zk02", b"val").unwrap();
write_batch.put(b"zk03", b"val").unwrap();
write_batch.prepare_for_region(cache_r2.clone());
write_batch.prepare_for_region(&r2);
write_batch.put(b"zk22", b"val").unwrap();
write_batch.put(b"zk23", b"val").unwrap();
write_batch.prepare_for_region(cache_r3.clone());
write_batch.prepare_for_region(&r3);
write_batch.put(b"zk42", b"val").unwrap();
write_batch.put(b"zk43", b"val").unwrap();
write_batch.write().unwrap();
Expand Down
15 changes: 15 additions & 0 deletions components/hybrid_engine/src/observer/load_eviction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ impl QueryObserver for LoadEvictionObserver {
}

impl AdminObserver for LoadEvictionObserver {
fn pre_exec_admin(
&self,
ctx: &mut ObserverContext<'_>,
req: &kvproto::raft_cmdpb::AdminRequest,
_: u64,
_: u64,
) -> bool {
if req.cmd_type == AdminCmdType::PrepareFlashback {
let cache_region = CacheRegion::from_region(ctx.region());
self.evict_region(cache_region, EvictReason::Flashback);
}

false
}

fn post_exec_admin(
&self,
ctx: &mut ObserverContext<'_>,
Expand Down
5 changes: 3 additions & 2 deletions components/hybrid_engine/src/observer/write_batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0.

use engine_traits::{is_data_cf, CacheRegion, KvEngine, Mutable, Result, WriteBatch, WriteOptions};
use engine_traits::{is_data_cf, KvEngine, Mutable, Result, WriteBatch, WriteOptions};
use in_memory_engine::{RegionCacheMemoryEngine, RegionCacheWriteBatch};
use kvproto::metapb;
use raftstore::coprocessor::{
dispatcher::BoxWriteBatchObserver, Coprocessor, CoprocessorHost, ObservableWriteBatch,
WriteBatchObserver,
Expand Down Expand Up @@ -101,7 +102,7 @@ impl WriteBatch for HybridObservableWriteBatch {
fn rollback_to_save_point(&mut self) -> Result<()> {
self.cache_write_batch.rollback_to_save_point()
}
fn prepare_for_region(&mut self, region: CacheRegion) {
fn prepare_for_region(&mut self, region: &metapb::Region) {
self.cache_write_batch.prepare_for_region(region);
}
}
Expand Down
2 changes: 1 addition & 1 deletion components/hybrid_engine/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ mod tests {
assert!(!iter.seek_to_first().unwrap());
}
let mut write_batch = hybrid_engine.write_batch();
write_batch.prepare_for_region(cache_region.clone());
write_batch.prepare_for_region(&region);
write_batch
.cache_write_batch
.set_region_cache_status(RegionCacheStatus::Cached);
Expand Down
15 changes: 8 additions & 7 deletions components/hybrid_engine/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
use std::sync::atomic::{AtomicBool, Ordering};

use engine_traits::{
is_data_cf, CacheRegion, KvEngine, Mutable, Result, WriteBatch, WriteBatchExt, WriteOptions,
is_data_cf, KvEngine, Mutable, Result, WriteBatch, WriteBatchExt, WriteOptions,
};
use in_memory_engine::{RegionCacheMemoryEngine, RegionCacheWriteBatch};
use kvproto::metapb;

use crate::engine::HybridEngine;

Expand Down Expand Up @@ -100,7 +101,7 @@ impl<EK: KvEngine> WriteBatch for HybridEngineWriteBatch<EK> {
self.cache_write_batch.merge(other.cache_write_batch)
}

fn prepare_for_region(&mut self, r: CacheRegion) {
fn prepare_for_region(&mut self, r: &metapb::Region) {
self.cache_write_batch.prepare_for_region(r);
}
}
Expand Down Expand Up @@ -175,7 +176,7 @@ mod tests {
.unwrap();
let cache_region = CacheRegion::from_region(&region);
let mut write_batch = hybrid_engine.write_batch();
write_batch.prepare_for_region(cache_region.clone());
write_batch.prepare_for_region(&region);
write_batch
.cache_write_batch
.set_region_cache_status(RegionCacheStatus::Cached);
Expand Down Expand Up @@ -248,10 +249,10 @@ mod tests {
.unwrap();

let mut wb = hybrid_engine.write_batch();
wb.prepare_for_region(cache_region1.clone());
wb.prepare_for_region(&region1);
wb.put(b"zk05", b"val").unwrap();
wb.put(b"zk08", b"val2").unwrap();
wb.prepare_for_region(cache_region2.clone());
wb.prepare_for_region(&region2);
wb.put(b"zk25", b"val3").unwrap();
wb.put(b"zk27", b"val4").unwrap();
wb.write().unwrap();
Expand All @@ -276,9 +277,9 @@ mod tests {

let mut wb = hybrid_engine.write_batch();
// all ranges overlapped with it will be evicted
wb.prepare_for_region(cache_region1.clone());
wb.prepare_for_region(&region1);
wb.delete_range(b"zk05", b"zk08").unwrap();
wb.prepare_for_region(cache_region2.clone());
wb.prepare_for_region(&region2);
wb.delete_range(b"zk20", b"zk21").unwrap();
wb.write().unwrap();

Expand Down
6 changes: 3 additions & 3 deletions components/hybrid_engine/tests/failpoints/test_write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ fn test_sequence_number_unique() {
// while we block the batch loading of region3, it's new KVs are still directly
// written into the skiplist.
let mut wb = hybrid_engine.write_batch();
wb.prepare_for_region(CacheRegion::from_region(&r));
wb.prepare_for_region(&r);
wb.put(b"zk", b"val").unwrap(); // seq 3
wb.delete(b"zk").unwrap(); // seq 4
wb.put(b"zk2", b"val").unwrap(); // seq 5

wb.prepare_for_region(CacheRegion::from_region(&r2));
wb.prepare_for_region(&r2);
wb.put(b"zk6", b"val").unwrap(); // seq 6
wb.delete(b"zk5").unwrap(); // seq 7
wb.put(b"zk5", b"val2").unwrap(); // seq 8

wb.prepare_for_region(CacheRegion::from_region(&r3));
wb.prepare_for_region(&r3);
wb.put(b"zk8", b"val").unwrap(); // seq 9
wb.put(b"zk7", b"val2").unwrap(); // seq 10

Expand Down
15 changes: 8 additions & 7 deletions components/in_memory_engine/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,7 @@ impl Runnable for BackgroundRunner {
Some(&rocks_engine),
&scheduler,
false,
r.is_in_flashback,
);
}),
},
Expand Down Expand Up @@ -2745,8 +2746,8 @@ pub mod tests {
.region_manager()
.load_region(region2.clone())
.unwrap();
engine.prepare_for_apply(&region1);
engine.prepare_for_apply(&region2);
engine.prepare_for_apply(&region1, false);
engine.prepare_for_apply(&region2, false);

// concurrent write to rocksdb, but the key will not be loaded in the memory
// engine
Expand Down Expand Up @@ -2923,7 +2924,7 @@ pub mod tests {
},
);
let cache_region = CacheRegion::from_region(&region);
engine.prepare_for_apply(&cache_region);
engine.prepare_for_apply(&cache_region, false);

// Wait for the range to be loaded.
test_util::eventually(
Expand Down Expand Up @@ -3037,7 +3038,7 @@ pub mod tests {
for r in [&region1, &region2] {
let cache_region = CacheRegion::from_region(r);
engine.load_region(cache_region.clone()).unwrap();
engine.prepare_for_apply(&cache_region);
engine.prepare_for_apply(&cache_region, false);
}

// ensure all ranges are finshed
Expand Down Expand Up @@ -3114,7 +3115,7 @@ pub mod tests {
for r in [&region1, &region2, &region3] {
let cache_region = CacheRegion::from_region(r);
engine.load_region(cache_region.clone()).unwrap();
engine.prepare_for_apply(&cache_region);
engine.prepare_for_apply(&cache_region, false);
}

// ensure all ranges are finshed
Expand Down Expand Up @@ -3164,7 +3165,7 @@ pub mod tests {
rocks_engine.put_cf(CF_WRITE, &key, b"val").unwrap();
// After loading range1, the memory usage should be 140*6=840
engine.load_region(cache_region1.clone()).unwrap();
engine.prepare_for_apply(&cache_region1);
engine.prepare_for_apply(&cache_region1, false);

let region2 = new_region(2, construct_region_key(3), construct_region_key(5));
let key = construct_key(3, 10);
Expand All @@ -3187,7 +3188,7 @@ pub mod tests {

let cache_region2 = CacheRegion::from_region(&region2);
engine.load_region(cache_region2.clone()).unwrap();
engine.prepare_for_apply(&cache_region2);
engine.prepare_for_apply(&cache_region2, false);

// ensure all ranges are finshed
test_util::eventually(Duration::from_millis(100), Duration::from_secs(2), || {
Expand Down
2 changes: 1 addition & 1 deletion components/in_memory_engine/src/cross_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ mod tests {

{
let mut wb = engine.write_batch();
wb.prepare_for_region(cache_region.clone());
wb.prepare_for_region(&region);
let mut disk_wb = rocks_engine.write_batch();

prepare_data(&mut wb, &mut disk_wb);
Expand Down
24 changes: 16 additions & 8 deletions components/in_memory_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use engine_traits::{
CacheRegion, EvictReason, FailedReason, IterOptions, Iterable, KvEngine, RegionCacheEngine,
RegionCacheEngineExt, RegionEvent, Result, CF_DEFAULT, CF_LOCK, CF_WRITE, DATA_CFS,
};
use fail::fail_point;
use kvproto::metapb::Region;
use pd_client::PdClient;
use raftstore::{coprocessor::RegionInfoProvider, store::CasualRouter};
Expand Down Expand Up @@ -217,6 +218,7 @@ impl RegionCacheMemoryEngineCore {
rocks_engine: Option<&RocksEngine>,
scheduler: &Scheduler<BackgroundTask>,
should_set_in_written: bool,
in_flashback: bool,
) -> RegionCacheStatus {
if !self.region_manager.is_active() {
return RegionCacheStatus::NotInCache;
Expand Down Expand Up @@ -251,11 +253,11 @@ impl RegionCacheMemoryEngineCore {
return RegionCacheStatus::NotInCache;
};

if region_meta.get_region().epoch_version < region.epoch_version {
if region_meta.get_region().epoch_version < region.epoch_version || in_flashback {
let meta = regions_map.remove_region(region.id);
assert_eq!(meta.get_state(), RegionState::Pending);
// try update outdated region.
if meta.can_be_updated_to(region) {
if !in_flashback && meta.can_be_updated_to(region) {
info!("ime update outdated pending region";
"current_meta" => ?meta,
"new_region" => ?region);
Expand All @@ -264,6 +266,7 @@ impl RegionCacheMemoryEngineCore {
regions_map.load_region(region.clone()).unwrap();
region_meta = regions_map.mut_region_meta(region.id).unwrap();
} else {
fail_point!("ime_fail_to_schedule_load");
info!("ime remove outdated pending region";
"pending_region" => ?meta.get_region(),
"new_region" => ?region);
Expand Down Expand Up @@ -429,12 +432,17 @@ impl RegionCacheMemoryEngine {

// It handles the pending region and check whether to buffer write for this
// region.
pub(crate) fn prepare_for_apply(&self, region: &CacheRegion) -> RegionCacheStatus {
pub(crate) fn prepare_for_apply(
&self,
region: &CacheRegion,
in_flashback: bool,
) -> RegionCacheStatus {
self.core.prepare_for_apply(
region,
self.rocks_engine.as_ref(),
self.bg_work_manager.background_scheduler(),
true,
in_flashback,
)
}

Expand Down Expand Up @@ -662,7 +670,7 @@ pub mod tests {

let mut region2 = new_region(1, b"k1", b"k5");
region2.mut_region_epoch().version = 2;
engine.prepare_for_apply(&CacheRegion::from_region(&region2));
engine.prepare_for_apply(&CacheRegion::from_region(&region2), false);
assert_eq!(
count_region(engine.core.region_manager(), |m| {
matches!(m.get_state(), Pending | Loading)
Expand All @@ -676,7 +684,7 @@ pub mod tests {

let mut region2 = new_region(1, b"k2", b"k5");
region2.mut_region_epoch().version = 2;
engine.prepare_for_apply(&CacheRegion::from_region(&region2));
engine.prepare_for_apply(&CacheRegion::from_region(&region2), false);
assert_eq!(
count_region(engine.core.region_manager(), |m| {
matches!(m.get_state(), Pending | Loading)
Expand Down Expand Up @@ -793,7 +801,7 @@ pub mod tests {
assert!(engine.core.region_manager.is_active());

let mut wb = engine.write_batch();
wb.prepare_for_region(cache_region.clone());
wb.prepare_for_region(&region);
wb.put(b"zk00", b"v1").unwrap();
wb.put(b"zk10", b"v1").unwrap();
wb.put(b"zk20", b"v1").unwrap();
Expand All @@ -810,7 +818,7 @@ pub mod tests {
);

let mut wb = engine.write_batch();
wb.prepare_for_region(cache_region.clone());
wb.prepare_for_region(&region);
wb.put(b"zk10", b"v2").unwrap();
wb.set_sequence_number(10).unwrap();

Expand Down Expand Up @@ -877,7 +885,7 @@ pub mod tests {
engine.new_region(region.clone());

let mut wb = engine.write_batch();
wb.prepare_for_region(cache_region.clone());
wb.prepare_for_region(&region);
wb.set_sequence_number(10).unwrap();
wb.put(b"a", b"val1").unwrap();
wb.put(b"b", b"val2").unwrap();
Expand Down
4 changes: 4 additions & 0 deletions components/in_memory_engine/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ make_auto_flush_static_metric! {
memory_limit_reached,
disabled,
apply_snapshot,
flashback,
manual,
}

Expand Down Expand Up @@ -215,6 +216,9 @@ pub(crate) fn observe_eviction_duration(secs: f64, evict_reason: EvictReason) {
EvictReason::ApplySnapshot => IN_MEMORY_ENGINE_EVICTION_DURATION_HISTOGRAM_STATIC
.apply_snapshot
.observe(secs),
EvictReason::Flashback => IN_MEMORY_ENGINE_EVICTION_DURATION_HISTOGRAM_STATIC
.flashback
.observe(secs),
EvictReason::Manual => IN_MEMORY_ENGINE_EVICTION_DURATION_HISTOGRAM_STATIC
.manual
.observe(secs),
Expand Down
Loading

0 comments on commit f520bdf

Please sign in to comment.