Skip to content

Commit

Permalink
bug: adjust the max io requests with block number not segment number
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Oct 20, 2022
1 parent c15ca52 commit d17890d
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub trait TableContext: Send + Sync {
fn try_get_part(&self) -> Option<PartInfoPtr>;
// Update the context partition pool from the pipeline builder.
fn try_set_partitions(&self, partitions: Partitions) -> Result<()>;
// Get the partition queue size.
fn get_partition_queue_size(&self) -> usize;
fn attach_query_str(&self, kind: String, query: &str);
fn get_fragment_id(&self) -> usize;
fn get_catalog(&self, catalog_name: &str) -> Result<Arc<dyn Catalog>>;
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ impl TableContext for QueryContext {
}
Ok(())
}

fn get_partition_queue_size(&self) -> usize {
self.partition_queue.read().len()
}

fn attach_query_str(&self, kind: String, query: &str) {
self.shared.attach_query_str(kind, query);
}
Expand Down
13 changes: 5 additions & 8 deletions src/query/storages/fuse/src/operations/read_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,13 @@ impl FuseTable {
}

// Adjust the max io request.
fn adjust_max_io_requests(
ctx: Arc<dyn TableContext>,
plan: &ReadDataSourcePlan,
) -> Result<usize> {
let parts_len = plan.parts.len();
fn adjust_max_io_requests(ctx: Arc<dyn TableContext>) -> Result<usize> {
let parts_size = ctx.get_partition_queue_size();
let max_storage_io = ctx.get_settings().get_max_storage_io_requests()? as usize;
let max_io_requests = if parts_len > max_storage_io {
let max_io_requests = if parts_size > max_storage_io {
max_storage_io
} else {
parts_len
parts_size
};

Ok(std::cmp::max(1, max_io_requests))
Expand Down Expand Up @@ -204,7 +201,7 @@ impl FuseTable {
let prewhere_filter = self.build_prewhere_filter_executor(ctx.clone(), plan)?;
let remain_reader = self.build_remain_reader(plan)?;

let max_io_requests = Self::adjust_max_io_requests(ctx.clone(), plan)?;
let max_io_requests = Self::adjust_max_io_requests(ctx.clone())?;
info!("read block data adjust max io requests:{}", max_io_requests);

// Add source pipe.
Expand Down

0 comments on commit d17890d

Please sign in to comment.