Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: define QueryEngine and wrap all things into it #1160

Merged
merged 9 commits into from
Aug 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ arena = { path = "components/arena" }
async-stream = "0.3.4"
async-trait = "0.1.72"
base64 = "0.13"
bytes = "1.1.0"
bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
Expand Down Expand Up @@ -188,6 +188,7 @@ catalog = { workspace = true }
catalog_impls = { workspace = true }
clap = { workspace = true }
cluster = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
Expand Down
64 changes: 60 additions & 4 deletions df_operator/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@

//! Function registry.

use std::{collections::HashMap, sync::Arc};

use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use datafusion::{
error::{DataFusionError, Result as DfResult},
execution::FunctionRegistry as DfFunctionRegistry,
logical_expr::{
AggregateUDF as DfAggregateUDF, ScalarUDF as DfScalarUDF, WindowUDF as DfWindowUDF,
},
};
use macros::define_result;
use snafu::{ensure, Backtrace, Snafu};

Expand All @@ -31,6 +41,7 @@ define_result!(Error);

/// A registry knows how to build logical expressions out of user-defined
/// function' names
// TODO: maybe unnecessary to define inner trait rather than using datafusion's?
pub trait FunctionRegistry {
fn register_udf(&mut self, udf: ScalarUdf) -> Result<()>;

Expand All @@ -41,6 +52,9 @@ pub trait FunctionRegistry {
fn find_udaf(&self, name: &str) -> Result<Option<AggregateUdf>>;

fn list_udfs(&self) -> Result<Vec<ScalarUdf>>;

// TODO: can we remove restriction about `Send` and `Sync`?
fn to_df_function_registry(self: Arc<Self>) -> Arc<dyn DfFunctionRegistry + Send + Sync>;
}

/// Default function registry.
Expand Down Expand Up @@ -96,8 +110,50 @@ impl FunctionRegistry for FunctionRegistryImpl {
}

fn list_udfs(&self) -> Result<Vec<ScalarUdf>> {
let udfs = self.scalar_functions.values().cloned().collect();
Ok(udfs)
Ok(self.scalar_functions.values().cloned().collect())
}

fn to_df_function_registry(self: Arc<Self>) -> Arc<dyn DfFunctionRegistry + Send + Sync> {
Arc::new(DfFunctionRegistryAdapter(self))
}
}

struct DfFunctionRegistryAdapter(FunctionRegistryRef);

impl DfFunctionRegistry for DfFunctionRegistryAdapter {
fn udfs(&self) -> HashSet<String> {
self.0
.list_udfs()
.expect("failed to list udfs")
.into_iter()
.map(|f| f.name().to_string())
.collect()
}

fn udf(&self, name: &str) -> DfResult<Arc<DfScalarUDF>> {
self.0
.find_udf(name)
.map_err(|e| DataFusionError::Internal(format!("failed to find udf, err:{e}")))?
.ok_or(DataFusionError::Internal(format!(
"udf not found, name:{name}"
)))
.map(|f| f.to_datafusion_udf())
}

fn udaf(&self, name: &str) -> DfResult<Arc<DfAggregateUDF>> {
self.0
.find_udaf(name)
.map_err(|e| DataFusionError::Internal(format!("failed to find udaf, err:{e}")))?
.ok_or(DataFusionError::Internal(format!(
"udaf not found, name:{name}"
)))
.map(|f| f.to_datafusion_udaf())
}

fn udwf(&self, _name: &str) -> DfResult<Arc<DfWindowUDF>> {
Err(DataFusionError::Internal(
"no udwfs defined now".to_string(),
))
}
}

Expand Down
1 change: 1 addition & 0 deletions interpreters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ common_types = { workspace = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
df_operator = { workspace = true }
futures = { workspace = true }
generic_error = { workspace = true }
hash_ext = { workspace = true }
log = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions interpreters/src/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use arrow::{
};
use async_trait::async_trait;
use macros::define_result;
use query_engine::executor::RecordBatchVec;
use query_frontend::plan::DescribeTablePlan;
use snafu::{ResultExt, Snafu};
use table_engine::table::TableRef;

use crate::interpreter::{
Describe, Interpreter, InterpreterPtr, Output, Result as InterpreterResult,
use crate::{
interpreter::{Describe, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
RecordBatchVec,
};

#[derive(Debug, Snafu)]
Expand Down
6 changes: 3 additions & 3 deletions interpreters/src/exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use arrow::{
};
use async_trait::async_trait;
use macros::define_result;
use query_engine::executor::RecordBatchVec;
use query_frontend::plan::ExistsTablePlan;
use snafu::{ResultExt, Snafu};

use crate::interpreter::{
Exists, Interpreter, InterpreterPtr, Output, Result as InterpreterResult,
use crate::{
interpreter::{Exists, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
RecordBatchVec,
};

#[derive(Debug, Snafu)]
Expand Down
14 changes: 7 additions & 7 deletions interpreters/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! Interpreter factory

use catalog::manager::ManagerRef;
use query_engine::{executor::Executor, physical_planner::PhysicalPlanner};
use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef};
use query_frontend::plan::Plan;
use table_engine::engine::TableEngineRef;

Expand All @@ -35,18 +35,18 @@ use crate::{
};

/// A factory to create interpreters
pub struct Factory<Q, P> {
query_executor: Q,
physical_planner: P,
pub struct Factory {
query_executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
}

impl<Q: Executor + 'static, P: PhysicalPlanner> Factory<Q, P> {
impl Factory {
pub fn new(
query_executor: Q,
physical_planner: P,
query_executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
Expand Down
3 changes: 2 additions & 1 deletion interpreters/src/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

use async_trait::async_trait;
use macros::define_result;
use query_engine::executor::RecordBatchVec;
use snafu::Snafu;

use crate::RecordBatchVec;

// Make the variant closer to actual error code like invalid arguments.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
Expand Down
5 changes: 5 additions & 0 deletions interpreters/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#![feature(string_remove_matches)]

use common_types::record_batch::RecordBatch;

pub mod alter_table;
pub mod context;
pub mod create;
Expand All @@ -36,3 +38,6 @@ mod show_create;

#[cfg(test)]
mod tests;

// Use a type alias so that we are able to replace the implementation
pub type RecordBatchVec = Vec<RecordBatch>;
49 changes: 36 additions & 13 deletions interpreters/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,46 @@
//! Interpreter for select statement

use async_trait::async_trait;
use futures::TryStreamExt;
use generic_error::{BoxError, GenericError};
use log::debug;
use macros::define_result;
use query_engine::{executor::Executor, physical_planner::PhysicalPlanner};
use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef};
use query_frontend::plan::QueryPlan;
use snafu::{ResultExt, Snafu};
use table_engine::stream::SendableRecordBatchStream;

use crate::{
context::Context,
interpreter::{Interpreter, InterpreterPtr, Output, Result as InterpreterResult, Select},
RecordBatchVec,
};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to create query context, err:{}", source))]
CreateQueryContext { source: crate::context::Error },

#[snafu(display("Failed to execute logical plan, err:{}", source))]
ExecutePlan { source: query_engine::error::Error },
#[snafu(display("Failed to execute physical plan, msg:{}, err:{}", msg, source))]
ExecutePlan { msg: String, source: GenericError },
}

define_result!(Error);

/// Select interpreter
pub struct SelectInterpreter<T, P> {
pub struct SelectInterpreter {
ctx: Context,
plan: QueryPlan,
executor: T,
physical_planner: P,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
}

impl<T: Executor + 'static, P: PhysicalPlanner> SelectInterpreter<T, P> {
impl SelectInterpreter {
pub fn create(
ctx: Context,
plan: QueryPlan,
executor: T,
physical_planner: P,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
) -> InterpreterPtr {
Box::new(Self {
ctx,
Expand All @@ -62,7 +66,7 @@ impl<T: Executor + 'static, P: PhysicalPlanner> SelectInterpreter<T, P> {
}

#[async_trait]
impl<T: Executor, P: PhysicalPlanner> Interpreter for SelectInterpreter<T, P> {
impl Interpreter for SelectInterpreter {
async fn execute(self: Box<Self>) -> InterpreterResult<Output> {
let request_id = self.ctx.request_id();
debug!(
Expand All @@ -81,21 +85,40 @@ impl<T: Executor, P: PhysicalPlanner> Interpreter for SelectInterpreter<T, P> {
.physical_planner
.plan(&query_ctx, self.plan)
.await
.context(ExecutePlan)
.box_err()
.context(ExecutePlan {
msg: "failed to build physical plan",
})
.context(Select)?;

let record_batches = self
let record_batch_stream = self
.executor
.execute(&query_ctx, physical_plan)
.await
.context(ExecutePlan)
.box_err()
.context(ExecutePlan {
msg: "failed to execute physical plan",
})
.context(Select)?;

debug!(
"Interpreter execute select finish, request_id:{}",
request_id
);

let record_batches = collect(record_batch_stream).await?;

Ok(Output::Records(record_batches))
}
}

async fn collect(stream: SendableRecordBatchStream) -> InterpreterResult<RecordBatchVec> {
stream
.try_collect()
.await
.box_err()
.context(ExecutePlan {
msg: "failed to collect execution results",
})
.context(Select)
}
2 changes: 1 addition & 1 deletion interpreters/src/show_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ use arrow::{
use datafusion::logical_expr::Expr;
use datafusion_proto::bytes::Serializeable;
use log::error;
use query_engine::executor::RecordBatchVec;
use query_frontend::{ast::ShowCreateObject, plan::ShowCreatePlan};
use snafu::ensure;
use table_engine::{partition::PartitionInfo, table::TableRef};

use crate::{
interpreter::Output,
show::{Result, UnsupportedType},
RecordBatchVec,
};

pub struct ShowCreateInterpreter {
Expand Down
Loading
Loading