Skip to content

Commit

Permalink
Merge pull request #4862 from sundy-li/insert-pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Apr 22, 2022
2 parents 08b89d3 + f6dba13 commit f25c349
Show file tree
Hide file tree
Showing 43 changed files with 1,285 additions and 24 deletions.
3 changes: 3 additions & 0 deletions common/streams/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ mod source_ndjson;
mod source_parquet;

pub use source::Source;
pub use source_csv::CsvSource;
pub use source_csv::CsvSourceBuilder;
pub use source_ndjson::NDJsonSource;
pub use source_ndjson::NDJsonSourceBuilder;
pub use source_parquet::ParquetSource;
pub use source_parquet::ParquetSourceBuilder;
8 changes: 8 additions & 0 deletions query/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_exception::Result;
use common_streams::SendableDataBlockStream;

use crate::pipelines::new::NewPipeline;
use crate::pipelines::new::SourcePipeBuilder;

#[async_trait::async_trait]
/// Interpreter is a trait for different PlanNode
Expand Down Expand Up @@ -65,6 +66,13 @@ pub trait Interpreter: Sync + Send {
self.name()
)))
}

fn set_source_pipe_builder(&self, _builder: Option<SourcePipeBuilder>) -> Result<()> {
Err(ErrorCode::UnImplement(format!(
"UnImplement set_source_pipe_builder method for {:?}",
self.name()
)))
}
}

pub type InterpreterPtr = std::sync::Arc<dyn Interpreter>;
22 changes: 19 additions & 3 deletions query/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use regex::Regex;
use crate::interpreters::stream::ProcessorExecutorStream;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::pipelines::new::executor::PipelineCompleteExecutor;
use crate::pipelines::new::executor::PipelinePullingExecutor;
use crate::pipelines::new::NewPipeline;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -112,15 +113,30 @@ impl CopyInterpreter {
if let Err(e) = res {
return Err(e);
}

let table = ctx
.get_table(&self.plan.db_name, &self.plan.tbl_name)
.await?;

if ctx.get_settings().get_enable_new_processor_framework()? != 0
&& self.ctx.get_cluster().is_empty()
{
table.append2(ctx.clone(), &mut pipeline)?;
pipeline.set_max_threads(settings.get_max_threads()? as usize);

let async_runtime = ctx.get_storage_runtime();
let executor = PipelineCompleteExecutor::try_create(async_runtime, pipeline)?;
executor.execute()?;

return Ok(ctx.consume_precommit_blocks());
}

pipeline.set_max_threads(settings.get_max_threads()? as usize);

let async_runtime = ctx.get_storage_runtime();
let executor = PipelinePullingExecutor::try_create(async_runtime, pipeline)?;
let source_stream = Box::pin(ProcessorExecutorStream::create(executor)?);

let table = ctx
.get_table(&self.plan.db_name, &self.plan.tbl_name)
.await?;
let operations = table
.append_data(ctx.clone(), source_stream)
.await?
Expand Down
13 changes: 13 additions & 0 deletions query/src/interpreters/interpreter_factory_interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@ use std::sync::Arc;
use std::time::SystemTime;

use common_exception::Result;
use common_infallible::Mutex;
use common_planners::PlanNode;
use common_streams::ProgressStream;
use common_streams::SendableDataBlockStream;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::interpreters::InterpreterQueryLog;
use crate::pipelines::new::SourcePipeBuilder;
use crate::sessions::QueryContext;

pub struct InterceptorInterpreter {
ctx: Arc<QueryContext>,
inner: InterpreterPtr,
query_log: InterpreterQueryLog,
source_pipe_builder: Mutex<Option<SourcePipeBuilder>>,
}

impl InterceptorInterpreter {
Expand All @@ -37,6 +40,7 @@ impl InterceptorInterpreter {
ctx: ctx.clone(),
inner,
query_log: InterpreterQueryLog::create(ctx, plan),
source_pipe_builder: Mutex::new(None),
}
}
}
Expand All @@ -51,6 +55,9 @@ impl Interpreter for InterceptorInterpreter {
&self,
input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let _ = self
.inner
.set_source_pipe_builder((*self.source_pipe_builder.lock()).clone());
let result_stream = self.inner.execute(input_stream).await?;
let metric_stream =
ProgressStream::try_create(result_stream, self.ctx.get_result_progress())?;
Expand Down Expand Up @@ -83,4 +90,10 @@ impl Interpreter for InterceptorInterpreter {
}
self.query_log.log_finish(now).await
}

fn set_source_pipe_builder(&self, builder: Option<SourcePipeBuilder>) -> Result<()> {
let mut guard = self.source_pipe_builder.lock();
*guard = builder;
Ok(())
}
}
159 changes: 157 additions & 2 deletions query/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_functions::scalars::CastFunction;
use common_functions::scalars::FunctionContext;
use common_infallible::Mutex;
use common_meta_types::GrantObject;
use common_meta_types::UserPrivilegeType;
use common_planners::InsertInputSource;
use common_planners::InsertPlan;
use common_planners::PlanNode;
use common_planners::SelectPlan;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use futures::TryStreamExt;
Expand All @@ -28,17 +34,148 @@ use crate::interpreters::interpreter_insert_with_stream::InsertWithStream;
use crate::interpreters::plan_schedulers::InsertWithPlan;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::interpreters::SelectInterpreter;
use crate::pipelines::new::executor::PipelineCompleteExecutor;
use crate::pipelines::new::processors::port::OutputPort;
use crate::pipelines::new::processors::BlocksSource;
use crate::pipelines::new::processors::TransformAddOn;
use crate::pipelines::new::processors::TransformCastSchema;
use crate::pipelines::new::NewPipeline;
use crate::pipelines::new::SourcePipeBuilder;
use crate::pipelines::transforms::AddOnStream;
use crate::sessions::QueryContext;

pub struct InsertInterpreter {
ctx: Arc<QueryContext>,
plan: InsertPlan,
source_pipe_builder: Mutex<Option<SourcePipeBuilder>>,
}

impl InsertInterpreter {
pub fn try_create(ctx: Arc<QueryContext>, plan: InsertPlan) -> Result<InterpreterPtr> {
Ok(Arc::new(InsertInterpreter { ctx, plan }))
Ok(Arc::new(InsertInterpreter {
ctx,
plan,
source_pipe_builder: Mutex::new(None),
}))
}

async fn execute_new(
&self,
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = &self.plan;
let settings = self.ctx.get_settings();
let table = self
.ctx
.get_table(&plan.database_name, &plan.table_name)
.await?;

let mut pipeline = self.create_new_pipeline()?;
let mut builder = SourcePipeBuilder::create();
let mut need_cast_schema = false;
match &self.plan.source {
InsertInputSource::Values(values) => {
let blocks = Arc::new(Mutex::new(VecDeque::from_iter(vec![values.block.clone()])));

for _index in 0..settings.get_max_threads()? {
let output = OutputPort::create();
builder.add_source(
output.clone(),
BlocksSource::create(self.ctx.clone(), output.clone(), blocks.clone())?,
);
}
pipeline.add_pipe(builder.finalize());
}
InsertInputSource::StreamingWithFormat(_) => {
pipeline.add_pipe(
((*self.source_pipe_builder.lock()).clone())
.ok_or_else(|| ErrorCode::EmptyData("empty source pipe builder"))?
.finalize(),
);
}
InsertInputSource::SelectPlan(plan) => {
need_cast_schema = self.check_schema_cast(plan)?;
let select_interpreter =
SelectInterpreter::try_create(self.ctx.clone(), SelectPlan {
input: Arc::new((**plan).clone()),
})?;
pipeline = select_interpreter.create_new_pipeline()?;
}
};

// cast schema
if need_cast_schema {
let mut functions = Vec::with_capacity(self.plan.schema().fields().len());
for field in self.plan.schema().fields() {
let name = format!("{:?}", field.data_type());
let cast_function = CastFunction::create("cast", &name).unwrap();
functions.push(cast_function);
}
let tz = self.ctx.get_settings().get_timezone()?;
let tz = String::from_utf8(tz).map_err(|_| {
ErrorCode::LogicalError("Timezone has been checked and should be valid.")
})?;
let func_ctx = FunctionContext { tz };
pipeline.add_transform(|transform_input_port, transform_output_port| {
TransformCastSchema::try_create(
transform_input_port,
transform_output_port,
self.plan.schema(),
functions.clone(),
func_ctx.clone(),
)
})?;
}

let need_fill_missing_columns = table.schema() != plan.schema();
if need_fill_missing_columns {
pipeline.add_transform(|transform_input_port, transform_output_port| {
TransformAddOn::try_create(
transform_input_port,
transform_output_port,
self.plan.schema(),
table.schema(),
self.ctx.clone(),
)
})?;
}

table.append2(self.ctx.clone(), &mut pipeline)?;

let async_runtime = self.ctx.get_storage_runtime();

pipeline.set_max_threads(self.ctx.get_settings().get_max_threads()? as usize);
let executor = PipelineCompleteExecutor::try_create(async_runtime, pipeline)?;
executor.execute()?;
drop(executor);

let append_entries = self.ctx.consume_precommit_blocks();
table
.commit_insertion(self.ctx.clone(), append_entries, self.plan.overwrite)
.await?;

Ok(Box::pin(DataBlockStream::create(
self.plan.schema(),
None,
vec![],
)))
}

fn check_schema_cast(&self, plan_node: &PlanNode) -> common_exception::Result<bool> {
let output_schema = &self.plan.schema;
let select_schema = plan_node.schema();

// validate schema
if select_schema.fields().len() < output_schema.fields().len() {
return Err(ErrorCode::BadArguments(
"Fields in select statement is less than expected",
));
}

// check if cast needed
let cast_needed = select_schema != *output_schema;
Ok(cast_needed)
}
}

Expand All @@ -52,8 +189,15 @@ impl Interpreter for InsertInterpreter {
&self,
mut input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = &self.plan;
let settings = self.ctx.get_settings();

// Use insert in new processor
if settings.get_enable_new_processor_framework()? != 0 && self.ctx.get_cluster().is_empty()
{
return self.execute_new(input_stream).await;
}

let plan = &self.plan;
self.ctx
.get_current_session()
.validate_privilege(
Expand Down Expand Up @@ -128,4 +272,15 @@ impl Interpreter for InsertInterpreter {
vec![],
)))
}

fn create_new_pipeline(&self) -> Result<NewPipeline> {
let new_pipeline = NewPipeline::create();
Ok(new_pipeline)
}

fn set_source_pipe_builder(&self, builder: Option<SourcePipeBuilder>) -> Result<()> {
let mut guard = self.source_pipe_builder.lock();
*guard = builder;
Ok(())
}
}
1 change: 1 addition & 0 deletions query/src/interpreters/interpreter_query_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub struct LogEvent {
pub extra: String,
}

#[derive(Clone)]
pub struct InterpreterQueryLog {
ctx: Arc<QueryContext>,
plan: PlanNode,
Expand Down
1 change: 0 additions & 1 deletion query/src/pipelines/new/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ impl ScheduleQueue {
while let Some(processor) = self.async_queue.pop_front() {
tasks.push_back(ExecutorTask::Async(processor));
}

global.push_tasks(ctx, tasks)
}

Expand Down
2 changes: 2 additions & 0 deletions query/src/pipelines/new/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ mod pipeline_pulling_executor;
mod pipeline_pushing_executor;

pub use executor_graph::RunningGraph;
pub use pipeline_complete_executor::PipelineCompleteExecutor;
pub use pipeline_executor::PipelineExecutor;
pub use pipeline_pulling_executor::PipelinePullingExecutor;
pub use pipeline_pushing_executor::PipelinePushingExecutor;
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl PipelineCompleteExecutor {
}

let executor = PipelineExecutor::create(async_runtime, pipeline)?;

Ok(PipelineCompleteExecutor { executor })
}

Expand Down
2 changes: 1 addition & 1 deletion query/src/pipelines/new/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ impl PipelineExecutor {
while let Some(task) = init_schedule_queue.pop_task() {
tasks.push_back(task);
}

global_tasks_queue.init_tasks(tasks);

Ok(Arc::new(PipelineExecutor {
graph,
threads_num,
Expand Down
2 changes: 2 additions & 0 deletions query/src/pipelines/new/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub mod processors;
mod unsafe_cell_wrap;

pub use pipe::NewPipe;
pub use pipe::SinkPipeBuilder;
pub use pipe::SourcePipeBuilder;
pub use pipe::TransformPipeBuilder;
pub use pipeline::NewPipeline;
pub use pipeline_builder::QueryPipelineBuilder;
1 change: 1 addition & 0 deletions query/src/pipelines/new/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl NewPipe {
}
}

#[derive(Clone)]
pub struct SourcePipeBuilder {
processors: Vec<ProcessorPtr>,
outputs_port: Vec<Arc<OutputPort>>,
Expand Down
Loading

0 comments on commit f25c349

Please sign in to comment.