Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Checks whether a region should flush periodically #3459

Merged
merged 8 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pin-project.workspace = true
prometheus.workspace = true
prost.workspace = true
puffin.workspace = true
rand.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
Expand All @@ -75,7 +76,6 @@ common-procedure-test.workspace = true
common-test-util.workspace = true
criterion = "0.4"
log-store.workspace = true
rand.workspace = true
toml.workspace = true

[[bench]]
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl MitoEngine {
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
time_provider: crate::time_provider::TimeProviderRef,
) -> Result<MitoEngine> {
config.sanitize(data_home)?;

Expand All @@ -385,6 +386,7 @@ impl MitoEngine {
object_store_manager,
write_buffer_manager,
listener,
time_provider,
)
.await?,
config,
Expand Down
103 changes: 103 additions & 0 deletions src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

//! Flush tests for mito engine.

use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;

use api::v1::Rows;
use common_recordbatch::RecordBatches;
use common_time::util::current_time_millis;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
Expand All @@ -28,6 +31,8 @@ use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema,
CreateRequestBuilder, MockWriteBufferManager, TestEnv,
};
use crate::time_provider::TimeProvider;
use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;

#[tokio::test]
async fn test_manual_flush() {
Expand Down Expand Up @@ -272,3 +277,101 @@ async fn test_flush_reopen_region() {
assert_eq!(2, version_data.last_entry_id);
assert_eq!(5, version_data.committed_sequence);
}

#[derive(Debug)]
struct MockTimeProvider {
now: AtomicI64,
elapsed: AtomicI64,
}

impl TimeProvider for MockTimeProvider {
fn current_time_millis(&self) -> i64 {
self.now.load(Ordering::Relaxed)
}

fn elapsed_since(&self, _current_millis: i64) -> i64 {
self.elapsed.load(Ordering::Relaxed)
}

fn wait_duration(&self, _duration: Duration) -> Duration {
Duration::from_millis(20)
}
}

impl MockTimeProvider {
fn new(now: i64) -> Self {
Self {
now: AtomicI64::new(now),
elapsed: AtomicI64::new(0),
}
}

fn set_now(&self, now: i64) {
self.now.store(now, Ordering::Relaxed);
}

fn set_elapsed(&self, elapsed: i64) {
self.elapsed.store(elapsed, Ordering::Relaxed);
}
}

#[tokio::test]
async fn test_auto_flush_engine() {
let mut env = TestEnv::new();
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let now = current_time_millis();
let time_provider = Arc::new(MockTimeProvider::new(now));
let engine = env
.create_engine_with_time(
MitoConfig {
auto_flush_interval: Duration::from_secs(60 * 5),
..Default::default()
},
Some(write_buffer_manager.clone()),
Some(listener.clone()),
time_provider.clone(),
)
.await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Prepares rows for flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;

// Sets current time to now + auto_flush_interval * 2.
time_provider.set_now(now + (60 * 5 * 2) * 1000);
// Sets elapsed time to MAX_INITIAL_CHECK_DELAY_SECS + 1.
time_provider.set_elapsed((MAX_INITIAL_CHECK_DELAY_SECS as i64 + 1) * 1000);

// Wait until flush is finished.
tokio::time::timeout(Duration::from_secs(3), listener.wait())
.await
.unwrap();

let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
13 changes: 8 additions & 5 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub enum FlushReason {
Manual,
/// Flush to alter table.
Alter,
/// Flush periodically.
Periodically,
}

impl FlushReason {
Expand Down Expand Up @@ -432,18 +434,19 @@ impl FlushScheduler {
) -> Result<()> {
debug_assert_eq!(region_id, task.region_id);

FLUSH_REQUESTS_TOTAL
.with_label_values(&[task.reason.as_str()])
.inc();

let version = version_control.current().version;
if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() {
if version.memtables.is_empty() {
debug_assert!(!self.region_status.contains_key(&region_id));
// The region has nothing to flush.
task.on_success();
return Ok(());
}

// Don't increase the counter if a region has nothing to flush.
FLUSH_REQUESTS_TOTAL
.with_label_values(&[task.reason.as_str()])
.inc();

// Add this region to status map.
let flush_status = self
.region_status
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod request;
pub mod row_converter;
pub(crate) mod schedule;
pub mod sst;
mod time_provider;
pub mod wal;
mod worker;

Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, RwLock};

use common_telemetry::info;
use common_time::util::current_time_millis;
use common_wal::options::WalOptions;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
Expand All @@ -37,6 +36,7 @@ use crate::memtable::MemtableId;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::OnFailure;
use crate::sst::file_purger::FilePurgerRef;
use crate::time_provider::TimeProviderRef;

/// This is the approximate factor to estimate the size of wal.
const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
Expand Down Expand Up @@ -83,6 +83,9 @@ pub(crate) struct MitoRegion {
last_flush_millis: AtomicI64,
/// Whether the region is writable.
writable: AtomicBool,

/// Provider to get current time.
time_provider: TimeProviderRef,
}

pub(crate) type MitoRegionRef = Arc<MitoRegion>;
Expand Down Expand Up @@ -119,7 +122,7 @@ impl MitoRegion {

/// Update flush time to current time.
pub(crate) fn update_flush_millis(&self) {
let now = current_time_millis();
let now = self.time_provider.current_time_millis();
self.last_flush_millis.store(now, Ordering::Relaxed);
}

Expand Down
17 changes: 14 additions & 3 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::Arc;

use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use common_wal::options::WalOptions;
use futures::StreamExt;
use object_store::manager::ObjectStoreManagerRef;
Expand All @@ -46,6 +45,7 @@ use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::{EntryId, Wal};

/// Builder to create a new [MitoRegion] or open an existing one.
Expand All @@ -60,6 +60,7 @@ pub(crate) struct RegionOpener {
cache_manager: Option<CacheManagerRef>,
skip_wal_replay: bool,
intermediate_manager: IntermediateManager,
time_provider: Option<TimeProviderRef>,
}

impl RegionOpener {
Expand All @@ -83,6 +84,7 @@ impl RegionOpener {
cache_manager: None,
skip_wal_replay: false,
intermediate_manager,
time_provider: None,
}
}

Expand Down Expand Up @@ -182,6 +184,9 @@ impl RegionOpener {
object_store,
self.intermediate_manager,
));
let time_provider = self
.time_provider
.unwrap_or_else(|| Arc::new(StdTimeProvider));

Ok(MitoRegion {
region_id,
Expand All @@ -194,9 +199,10 @@ impl RegionOpener {
self.cache_manager,
)),
wal_options,
last_flush_millis: AtomicI64::new(current_time_millis()),
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
// Region is writable after it is created.
writable: AtomicBool::new(true),
time_provider,
})
}

Expand Down Expand Up @@ -294,6 +300,10 @@ impl RegionOpener {
} else {
info!("Skip the WAL replay for region: {}", region_id);
}
let time_provider = self
.time_provider
.clone()
.unwrap_or_else(|| Arc::new(StdTimeProvider));

let region = MitoRegion {
region_id: self.region_id,
Expand All @@ -302,9 +312,10 @@ impl RegionOpener {
manifest_manager,
file_purger,
wal_options,
last_flush_millis: AtomicI64::new(current_time_millis()),
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
// Region is always opened in read only mode.
writable: AtomicBool::new(false),
time_provider,
};
Ok(Some(region))
}
Expand Down
33 changes: 33 additions & 0 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::{Batch, BatchBuilder, BatchReader};
use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
use crate::sst::index::intermediate::IntermediateManager;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::worker::WorkerGroup;

#[derive(Debug)]
Expand Down Expand Up @@ -179,6 +180,7 @@ impl TestEnv {
object_store_manager,
manager,
listener,
Arc::new(StdTimeProvider),
)
.await
.unwrap()
Expand Down Expand Up @@ -219,6 +221,37 @@ impl TestEnv {
object_store_manager,
manager,
listener,
Arc::new(StdTimeProvider),
)
.await
.unwrap()
}

/// Creates a new engine with specific config and manager/listener/time provider under this env.
pub async fn create_engine_with_time(
&mut self,
config: MitoConfig,
manager: Option<WriteBufferManagerRef>,
listener: Option<EventListenerRef>,
time_provider: TimeProviderRef,
) -> MitoEngine {
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;

let logstore = Arc::new(log_store);
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store_manager = Some(object_store_manager.clone());

let data_home = self.data_home().display().to_string();

MitoEngine::new_for_test(
&data_home,
config,
logstore,
object_store_manager,
manager,
listener,
time_provider.clone(),
)
.await
.unwrap()
Expand Down
Loading