Skip to content

Commit

Permalink
Merge 67543cc into a8ed20e
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi authored Apr 9, 2023
2 parents a8ed20e + 67543cc commit 9fa1b8b
Show file tree
Hide file tree
Showing 25 changed files with 892 additions and 2,122 deletions.
10 changes: 2 additions & 8 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Compaction.

Expand All @@ -11,7 +11,6 @@ use tokio::sync::oneshot;

use crate::{
compaction::picker::{CommonCompactionPicker, CompactionPickerRef},
instance::write_worker::CompactionNotifier,
sst::file::{FileHandle, Level},
table::data::TableDataRef,
table_options::COMPACTION_STRATEGY,
Expand Down Expand Up @@ -429,18 +428,13 @@ impl Drop for WaiterNotifier {
/// Request to compact single table.
pub struct TableCompactionRequest {
pub table_data: TableDataRef,
pub compaction_notifier: Option<CompactionNotifier>,
pub waiter: Option<oneshot::Sender<WaitResult<()>>>,
}

impl TableCompactionRequest {
pub fn no_waiter(
table_data: TableDataRef,
compaction_notifier: Option<CompactionNotifier>,
) -> Self {
pub fn no_waiter(table_data: TableDataRef) -> Self {
TableCompactionRequest {
table_data,
compaction_notifier,
waiter: None,
}
}
Expand Down
44 changes: 17 additions & 27 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

// Compaction scheduler.

Expand Down Expand Up @@ -38,7 +38,8 @@ use crate::{
PickerManager, TableCompactionRequest, WaitError, WaiterNotifier,
},
instance::{
flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore,
flush_compaction::{Flusher, TableFlushOptions},
SpaceStore,
},
sst::factory::{ScanOptions, SstWriteOptions},
table::data::TableDataRef,
Expand Down Expand Up @@ -450,7 +451,6 @@ impl ScheduleWorker {
&self,
table_data: TableDataRef,
compaction_task: CompactionTask,
compaction_notifier: Option<CompactionNotifier>,
waiter_notifier: WaiterNotifier,
token: MemoryUsageToken,
) {
Expand Down Expand Up @@ -510,27 +510,18 @@ impl ScheduleWorker {
// Notify the background compact table result.
match res {
Ok(()) => {
if let Some(notifier) = compaction_notifier.clone() {
notifier.notify_ok();
}
waiter_notifier.notify_wait_result(Ok(()));

if keep_scheduling_compaction {
schedule_table_compaction(
sender,
TableCompactionRequest::no_waiter(
table_data.clone(),
compaction_notifier.clone(),
),
TableCompactionRequest::no_waiter(table_data.clone()),
)
.await;
}
}
Err(e) => {
let e = Arc::new(e);
if let Some(notifier) = compaction_notifier {
notifier.notify_err(e.clone());
}

let wait_err = WaitError::Compaction { source: e };
waiter_notifier.notify_wait_result(Err(wait_err));
Expand Down Expand Up @@ -604,16 +595,9 @@ impl ScheduleWorker {
}
};

let compaction_notifier = compact_req.compaction_notifier;
let waiter_notifier = WaiterNotifier::new(compact_req.waiter);

self.do_table_compaction_task(
table_data,
compaction_task,
compaction_notifier,
waiter_notifier,
token,
);
self.do_table_compaction_task(table_data, compaction_task, waiter_notifier, token);
}

async fn schedule(&mut self) {
Expand All @@ -634,25 +618,31 @@ impl ScheduleWorker {

// This will spawn a background job to purge ssts and avoid schedule thread
// blocked.
self.handle_table_compaction_request(TableCompactionRequest::no_waiter(
table_data, None,
))
.await;
self.handle_table_compaction_request(TableCompactionRequest::no_waiter(table_data))
.await;
}
}

async fn flush_tables(&self) {
let mut tables_buf = Vec::new();
self.space_store.list_all_tables(&mut tables_buf);
let flusher = Flusher {
space_store: self.space_store.clone(),
runtime: self.runtime.clone(),
write_sst_max_buffer_size: self.write_sst_max_buffer_size,
};

for table_data in &tables_buf {
let last_flush_time = table_data.last_flush_time();
if last_flush_time + self.max_unflushed_duration.as_millis_u64()
> common_util::time::current_time_millis()
{
let mut serializer = table_data.serializer.lock().await;
let flush_scheduler = serializer.flush_scheduler();
// Instance flush the table asynchronously.
if let Err(e) =
Instance::flush_table(table_data.clone(), TableFlushOptions::default()).await
if let Err(e) = flusher
.schedule_flush(flush_scheduler, table_data, TableFlushOptions::default())
.await
{
error!("Failed to flush table, err:{}", e);
}
Expand Down
Loading

0 comments on commit 9fa1b8b

Please sign in to comment.