Skip to content

Commit

Permalink
avoid queue full block
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Jul 14, 2023
1 parent cc290f2 commit 0d86dd1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 30 deletions.
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl Instance {
}

#[inline]
fn read_runtime(&self) -> &Arc<Runtime> {
pub fn read_runtime(&self) -> &Arc<Runtime> {
&self.runtimes.read_runtime
}

Expand Down
107 changes: 78 additions & 29 deletions analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@

//! Table implementation

use std::{collections::HashMap, fmt, sync::Mutex};
use std::{
collections::HashMap,
fmt,
sync::{Arc, Mutex},
};

use async_trait::async_trait;
use common_types::{
row::{Row, RowGroupBuilder},
schema::Schema,
time::TimeRange,
};
use common_util::error::BoxError;
use common_util::{error::BoxError, future_cancel::CancellationSafeFuture};
use datafusion::{common::Column, logical_expr::Expr};
use futures::TryStreamExt;
use log::{error, warn};
Expand Down Expand Up @@ -47,6 +51,29 @@ const GET_METRICS_COLLECTOR_NAME: &str = "get";
// writes.
const ADDITIONAL_PENDING_WRITE_CAP_RATIO: usize = 10;

struct WriteRequests {
pub space_table: SpaceAndTable,
pub instance: InstanceRef,
pub table_data: TableDataRef,
pub pending_writes: Arc<Mutex<PendingWriteQueue>>,
}

impl WriteRequests {
pub fn new(
space_table: SpaceAndTable,
instance: InstanceRef,
table_data: TableDataRef,
pending_writes: Arc<Mutex<PendingWriteQueue>>,
) -> Self {
Self {
space_table,
instance,
table_data,
pending_writes,
}
}
}

/// Table trait implementation
pub struct TableImpl {
space_table: SpaceAndTable,
Expand All @@ -63,7 +90,7 @@ pub struct TableImpl {
table_data: TableDataRef,

/// Buffer for written rows.
pending_writes: Mutex<PendingWriteQueue>,
pending_writes: Arc<Mutex<PendingWriteQueue>>,
}

impl TableImpl {
Expand All @@ -78,7 +105,7 @@ impl TableImpl {
space_id,
table_id: table_data.id,
table_data,
pending_writes,
pending_writes: Arc::new(pending_writes),
}
}
}
Expand Down Expand Up @@ -250,35 +277,38 @@ impl TableImpl {
let mut pending_queue = self.pending_writes.lock().unwrap();
pending_queue.try_push(request)
};
let (request, mut serial_exec, notifiers) = match queue_res {

match queue_res {
QueueResult::First => {
// This is the first request in the queue, and we should
// take responsibilities for merging and writing the
// requests in the queue.
let serial_exec = self.table_data.serial_exec.lock().await;
// The `serial_exec` is acquired, let's merge the pending requests and write
// them all.
let pending_writes = {
let mut pending_queue = self.pending_writes.lock().unwrap();
pending_queue.take_pending_writes()
};
assert!(
!pending_writes.is_empty(),
"The pending writes should contain at least the one just pushed."
let write_requests = WriteRequests::new(
self.space_table.clone(),
self.instance.clone(),
self.table_data.clone(),
self.pending_writes.clone(),
);
let merged_write_request =
merge_pending_write_requests(pending_writes.writes, pending_writes.num_rows);
(merged_write_request, serial_exec, pending_writes.notifiers)

match CancellationSafeFuture::new(
Self::write_requests(write_requests),
self.instance.read_runtime().clone(),
)
.await
{
Ok(_) => Ok(num_rows),
Err(e) => Err(e),
}
}
QueueResult::Waiter(rx) => {
// The request is successfully pushed into the queue, and just wait for the
// write result.
match rx.await {
Ok(res) => {
res.box_err().context(Write { table: self.name() })?;
return Ok(num_rows);
Ok(num_rows)
}
Err(_) => return WaitForPendingWrites { table: self.name() }.fail(),
Err(_) => WaitForPendingWrites { table: self.name() }.fail(),
}
}
QueueResult::Reject(_) => {
Expand All @@ -288,24 +318,43 @@ impl TableImpl {
self.instance.max_rows_in_write_queue,
self.name(),
);
return TooManyPendingWrites { table: self.name() }.fail();
TooManyPendingWrites { table: self.name() }.fail()
}
}
}

async fn write_requests(write_requests: WriteRequests) -> Result<()> {
let mut serial_exec = write_requests.table_data.serial_exec.lock().await;
// The `serial_exec` is acquired, let's merge the pending requests and write
// them all.
let pending_writes = {
let mut pending_queue = write_requests.pending_writes.lock().unwrap();
pending_queue.take_pending_writes()
};
assert!(
!pending_writes.is_empty(),
"The pending writes should contain at least the one just pushed."
);
let merged_write_request =
merge_pending_write_requests(pending_writes.writes, pending_writes.num_rows);

let mut writer = Writer::new(
self.instance.clone(),
self.space_table.clone(),
write_requests.instance,
write_requests.space_table,
&mut serial_exec,
);
let write_res = writer
.write(request)
.write(merged_write_request)
.await
.box_err()
.context(Write { table: self.name() });
.context(Write {
table: write_requests.table_data.name.clone(),
});

// There is no waiter for pending writes, return the write result.
let notifiers = pending_writes.notifiers;
if notifiers.is_empty() {
return write_res;
return Ok(());
}

// Notify the waiters for the pending writes.
Expand All @@ -315,11 +364,11 @@ impl TableImpl {
if notifier.send(Ok(())).is_err() {
warn!(
"Failed to notify the ok result of pending writes, table:{}",
self.name()
write_requests.table_data.name
);
}
}
Ok(num_rows)
Ok(())
}
Err(e) => {
let err_msg = format!("Failed to do merge write, err:{e}");
Expand All @@ -328,7 +377,7 @@ impl TableImpl {
if notifier.send(err).is_err() {
warn!(
"Failed to notify the error result of pending writes, table:{}",
self.name()
write_requests.table_data.name
);
}
}
Expand Down

0 comments on commit 0d86dd1

Please sign in to comment.