Skip to content

Commit

Permalink
Merge pull request #8037 from zhang2014/interrupt_join_build
Browse files Browse the repository at this point in the history
fix(processor): support interrupt join build side
  • Loading branch information
mergify[bot] authored Oct 10, 2022
2 parents b76ff7f + 63418d5 commit 1b74b8f
Show file tree
Hide file tree
Showing 18 changed files with 238 additions and 137 deletions.
8 changes: 8 additions & 0 deletions src/query/pipeline/core/src/processors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub trait Processor: Send {

fn event(&mut self) -> Result<Event>;

// When the synchronization task needs to run for a long time, the interrupt function needs to be implemented.
fn interrupt(&self) {}

// Synchronous work.
fn process(&mut self) -> Result<()> {
Err(ErrorCode::UnImplement("Unimplemented process."))
Expand Down Expand Up @@ -96,6 +99,11 @@ impl ProcessorPtr {
(*self.inner.get()).event()
}

/// # Safety
pub unsafe fn interrupt(&self) {
(*self.inner.get()).interrupt()
}

/// # Safety
pub unsafe fn process(&self) -> Result<()> {
(*self.inner.get()).process()
Expand Down
6 changes: 6 additions & 0 deletions src/query/pipeline/sinks/src/processors/sinks/sync_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub trait Sink: Send {
Ok(())
}

fn interrupt(&self) {}

fn consume(&mut self, data_block: DataBlock) -> Result<()>;
}

Expand Down Expand Up @@ -94,6 +96,10 @@ impl<T: Sink + 'static> Processor for Sinker<T> {
}
}

fn interrupt(&self) {
self.inner.interrupt()
}

fn process(&mut self) -> Result<()> {
if !self.called_on_start {
self.called_on_start = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;

use super::Compactor;
use super::TransformCompact;
use crate::processors::transforms::Aborting;

pub struct BlockCompactor {
max_rows_per_block: usize,
Expand All @@ -27,6 +30,7 @@ pub struct BlockCompactor {
// A flag denoting whether it is a recluster operation.
// Will be removed later.
is_recluster: bool,
aborting: Arc<AtomicBool>,
}

impl BlockCompactor {
Expand All @@ -41,6 +45,7 @@ impl BlockCompactor {
min_rows_per_block,
max_bytes_per_block,
is_recluster,
aborting: Arc::new(AtomicBool::new(false)),
}
}
}
Expand All @@ -54,6 +59,10 @@ impl Compactor for BlockCompactor {
true
}

fn interrupt(&self) {
self.aborting.store(true, Ordering::Release);
}

fn compact_partial(&self, blocks: &mut Vec<DataBlock>) -> Result<Vec<DataBlock>> {
if blocks.is_empty() {
return Ok(vec![]);
Expand Down Expand Up @@ -108,13 +117,13 @@ impl Compactor for BlockCompactor {
Ok(res)
}

fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
let mut res = Vec::with_capacity(blocks.len());
let mut temp_blocks = vec![];
let mut accumulated_rows = 0;

for block in blocks.iter() {
if aborting() {
if self.aborting.load(Ordering::Relaxed) {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
Expand Down Expand Up @@ -142,7 +151,7 @@ impl Compactor for BlockCompactor {
temp_blocks.push(block);

while accumulated_rows >= self.max_rows_per_block {
if aborting() {
if self.aborting.load(Ordering::Relaxed) {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
Expand All @@ -164,7 +173,7 @@ impl Compactor for BlockCompactor {
}

if accumulated_rows != 0 {
if aborting() {
if self.aborting.load(Ordering::Relaxed) {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@

use std::any::Any;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -27,14 +25,13 @@ use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_core::processors::Processor;

pub type Aborting = Arc<Box<dyn Fn() -> bool + Send + Sync + 'static>>;

pub struct TransformCompact<T: Compactor + Send + 'static> {
state: ProcessorState,
compactor: T,
aborting: Aborting,
}

pub type Aborting = Arc<Box<dyn Fn() -> bool + Send + Sync + 'static>>;

/// Compactor is a trait that defines how to compact blocks.
pub trait Compactor {
fn name() -> &'static str;
Expand All @@ -44,35 +41,31 @@ pub trait Compactor {
false
}

fn interrupt(&self) {}

/// `compact_partial` is called when a new block is pushed and `use_partial_compact` is enabled
fn compact_partial(&self, _blocks: &mut Vec<DataBlock>) -> Result<Vec<DataBlock>> {
Ok(vec![])
}

/// `compact_final` is called when all the blocks are pushed to finish the compaction
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>>;
fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;
}

impl<T: Compactor + Send + 'static> TransformCompact<T> {
pub fn try_create(
ctx: Arc<dyn TableContext>,
input_port: Arc<InputPort>,
output_port: Arc<OutputPort>,
compactor: T,
) -> Result<ProcessorPtr> {
let aborting = ctx.get_aborting();
let state = ProcessorState::Consume(ConsumeState {
input_port,
output_port,
input_data_blocks: vec![],
output_data_blocks: VecDeque::new(),
});

Ok(ProcessorPtr::create(Box::new(Self {
state,
compactor,
aborting: Arc::new(Box::new(move || aborting.load(Ordering::Relaxed))),
})))
Ok(ProcessorPtr::create(Box::new(Self { state, compactor })))
}

#[inline(always)]
Expand Down Expand Up @@ -152,6 +145,10 @@ impl<T: Compactor + Send + 'static> Processor for TransformCompact<T> {
}
}

fn interrupt(&self) {
self.compactor.interrupt();
}

fn process(&mut self) -> Result<()> {
match &mut self.state {
ProcessorState::Consume(state) => {
Expand All @@ -165,8 +162,7 @@ impl<T: Compactor + Send + 'static> Processor for TransformCompact<T> {
Ok(())
}
ProcessorState::Compacting(state) => {
let aborting = self.aborting.clone();
let compacted_blocks = self.compactor.compact_final(&state.blocks, aborting)?;
let compacted_blocks = self.compactor.compact_final(&state.blocks)?;

let mut temp_state = ProcessorState::Finished;
std::mem::swap(&mut self.state, &mut temp_state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datablocks::SortColumnDescription;
use common_exception::Result;
Expand All @@ -23,6 +27,7 @@ use crate::processors::transforms::Aborting;
pub struct SortMergeCompactor {
limit: Option<usize>,
sort_columns_descriptions: Vec<SortColumnDescription>,
aborting: Arc<AtomicBool>,
}

impl SortMergeCompactor {
Expand All @@ -33,6 +38,7 @@ impl SortMergeCompactor {
SortMergeCompactor {
limit,
sort_columns_descriptions,
aborting: Arc::new(AtomicBool::new(false)),
}
}
}
Expand All @@ -42,10 +48,17 @@ impl Compactor for SortMergeCompactor {
"SortMergeTransform"
}

fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
fn interrupt(&self) {
self.aborting.store(true, Ordering::Release);
}

fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
if blocks.is_empty() {
Ok(vec![])
} else {
let aborting = self.aborting.clone();
let aborting: Aborting = Arc::new(Box::new(move || aborting.load(Ordering::Relaxed)));

let block = DataBlock::merge_sort_blocks(
blocks,
&self.sort_columns_descriptions,
Expand Down
11 changes: 4 additions & 7 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,15 +375,12 @@ impl RunningGraph {
Ok(schedule_queue)
}

pub fn all_node_is_finished(&self) -> bool {
for node_index in self.0.graph.node_indices() {
let state = self.0.graph[node_index].state.lock().unwrap();
if !matches!(&*state, State::Finished) {
return false;
pub fn interrupt_running_nodes(&self) {
unsafe {
for node_index in self.0.graph.node_indices() {
self.0.graph[node_index].processor.interrupt();
}
}

true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl PipelineExecutor {
pub fn finish(&self, cause: Option<ErrorCode>) {
*self.finished_error.lock() = cause;
self.global_tasks_queue.finish(self.workers_condvar.clone());
self.graph.interrupt_running_nodes();
self.finished_notify.notify_waiters();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_transforms::processors::transforms::Aborting;

use super::ProbeState;

Expand All @@ -27,6 +26,8 @@ pub trait HashJoinState: Send + Sync {
/// Probe the hash table and retrieve matched rows as DataBlocks
fn probe(&self, input: &DataBlock, probe_state: &mut ProbeState) -> Result<Vec<DataBlock>>;

fn interrupt(&self);

/// Attach to state
fn attach(&self) -> Result<()>;

Expand All @@ -44,15 +45,11 @@ pub trait HashJoinState: Send + Sync {
async fn wait_finish(&self) -> Result<()>;

/// Get mark join results
fn mark_join_blocks(&self, flag: Aborting) -> Result<Vec<DataBlock>>;
fn mark_join_blocks(&self) -> Result<Vec<DataBlock>>;

/// Get right join results
fn right_join_blocks(&self, blocks: &[DataBlock], flag: Aborting) -> Result<Vec<DataBlock>>;
fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;

/// Get right semi/anti join results
fn right_anti_semi_join_blocks(
&self,
blocks: &[DataBlock],
flag: Aborting,
) -> Result<Vec<DataBlock>>;
fn right_anti_semi_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;
}
Loading

1 comment on commit 1b74b8f

@vercel
Copy link

@vercel vercel bot commented on 1b74b8f Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.rs
databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.