Skip to content

Commit

Permalink
fix: remove compaction retry when memory limit (#739)
Browse files Browse the repository at this point in the history
* support auto create config

* remove compaction retry

* add table name in log

* modify log

* continue to open shard when open table failed

* fmt

* romove unused lines

* Update server/src/grpc/meta_event_service/mod.rs

Co-authored-by: kamille <34352236+Rachelint@users.noreply.github.com>

* add finsih log after open shard

* clippy

* return error when some tables failed to open

* add cost time

* log detail table error

* blank

* chore

* format log

* remove compaction retry when memory limit

---------

Co-authored-by: kamille <34352236+Rachelint@users.noreply.github.com>
  • Loading branch information
MichaelLeeHZ and Rachelint authored Mar 15, 2023
1 parent 509c2c3 commit c59d9c3
Showing 1 changed file with 5 additions and 31 deletions.
36 changes: 5 additions & 31 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use snafu::{ResultExt, Snafu};
use table_engine::table::TableId;
use tokio::{
sync::{
mpsc::{self, error::SendError, Receiver, Sender},
mpsc::{self, Receiver, Sender},
Mutex,
},
time,
Expand All @@ -38,9 +38,7 @@ use crate::{
PickerManager, TableCompactionRequest, WaitError, WaiterNotifier,
},
instance::{
flush_compaction::{self, TableFlushOptions},
write_worker::CompactionNotifier,
Instance, SpaceStore,
flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore,
},
table::data::TableDataRef,
TableOptions,
Expand Down Expand Up @@ -574,12 +572,11 @@ impl ScheduleWorker {
None => {
// Memory usage exceeds the threshold, let's put pack the
// request.
debug!(
"Compaction task is ignored, because of high memory usage:{}, task:{:?}",
warn!(
"Compaction task is ignored, because of high memory usage:{}, task:{:?}, table:{}",
self.memory_limit.usage.load(Ordering::Relaxed),
compaction_task,
compaction_task, table_data.name
);
self.put_back_compaction_request(compact_req).await;
return;
}
};
Expand All @@ -596,29 +593,6 @@ impl ScheduleWorker {
);
}

async fn put_back_compaction_request(&self, req: TableCompactionRequest) {
if let Err(SendError(ScheduleTask::Request(TableCompactionRequest {
compaction_notifier,
waiter,
..
}))) = self.sender.send(ScheduleTask::Request(req)).await
{
let e = Arc::new(
flush_compaction::Other {
msg: "Failed to put back the compaction request for memory usage exceeds",
}
.build(),
);
if let Some(notifier) = compaction_notifier {
notifier.notify_err(e.clone());
}

let waiter_notifier = WaiterNotifier::new(waiter);
let wait_err = WaitError::Compaction { source: e };
waiter_notifier.notify_wait_result(Err(wait_err));
}
}

async fn schedule(&mut self) {
self.compact_tables().await;
self.flush_tables().await;
Expand Down

0 comments on commit c59d9c3

Please sign in to comment.