Skip to content

Commit

Permalink
feat(query): fix event
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Mar 27, 2024
1 parent be48753 commit 00d79b3
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ struct OutputsBuffer {

impl OutputsBuffer {
pub fn create(capacity: usize, outputs: usize) -> OutputsBuffer {
let capacity = capacity * 1000;
OutputsBuffer {
inner: vec![capacity; outputs]
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,6 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>
continue;
}

// We pull all the data that are not the max_partition_count and all spill data
if self.inputs[index].max_partition_count == MAX_PARTITION_COUNT
&& self.inputs[index].bucket > SINGLE_LEVEL_BUCKET_NUM
{
self.inputs[index].port.set_not_need_data();
continue;
}

if !self.inputs[index].port.has_data() {
self.inputs[index].port.set_need_data();
self.initialized_all_inputs = false;
Expand All @@ -133,6 +125,14 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>
self.inputs[index].max_partition_count,
) = self.add_bucket(data_block)?;

// We pull all the data that are not the max_partition_count and all spill data
if self.inputs[index].max_partition_count == MAX_PARTITION_COUNT
&& self.inputs[index].bucket > SINGLE_LEVEL_BUCKET_NUM
{
self.inputs[index].port.set_not_need_data();
continue;
}

// we need pull all spill data in init, and data less than max partition
if self.inputs[index].bucket <= SINGLE_LEVEL_BUCKET_NUM
|| self.inputs[index].max_partition_count < MAX_PARTITION_COUNT
Expand All @@ -150,15 +150,6 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>
continue;
}

// We pull all the data that are not the max_partition_count
if self.inputs[index].max_partition_count > 0
&& self.inputs[index].bucket > SINGLE_LEVEL_BUCKET_NUM
&& self.inputs[index].max_partition_count == self.max_partition_count
{
self.inputs[index].port.set_not_need_data();
continue;
}

if !self.inputs[index].port.has_data() {
self.inputs[index].port.set_need_data();
self.initialized_all_inputs = false;
Expand All @@ -173,6 +164,15 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>
self.inputs[index].max_partition_count,
) = self.add_bucket(data_block)?;

// We pull all the data that are not the max_partition_count
if self.inputs[index].max_partition_count > 0
&& self.inputs[index].bucket > SINGLE_LEVEL_BUCKET_NUM
&& self.inputs[index].max_partition_count == self.max_partition_count
{
self.inputs[index].port.set_not_need_data();
continue;
}

// we need pull all spill data in init, and data less than max partition
if self.inputs[index].bucket <= SINGLE_LEVEL_BUCKET_NUM
|| self.inputs[index].max_partition_count < self.max_partition_count
Expand Down

0 comments on commit 00d79b3

Please sign in to comment.