diff --git a/Cargo.lock b/Cargo.lock index d4fe5fb37865..b4dedd6d30c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1525,6 +1525,7 @@ dependencies = [ "common-functions", "common-infallible", "common-meta-types", + "common-streams", "futures", "lazy_static", "pretty_assertions", @@ -1535,16 +1536,21 @@ dependencies = [ name = "common-streams" version = "0.1.0" dependencies = [ + "async-stream", + "async-trait", "common-arrow", "common-base", + "common-dal", "common-datablocks", "common-datavalues", "common-exception", "common-io", "crossbeam 0.8.1", + "csv-async", "futures", "pin-project-lite", "pretty_assertions", + "tokio-stream", ] [[package]] @@ -1887,6 +1893,23 @@ dependencies = [ "serde", ] +[[package]] +name = "csv-async" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7240eee21f34a5a649dbba0dc52e79c71870dd8350263bdd3e7c95d642ebda03" +dependencies = [ + "bstr", + "cfg-if 1.0.0", + "csv-core", + "futures", + "itoa", + "ryu", + "serde", + "tokio", + "tokio-stream", +] + [[package]] name = "csv-core" version = "0.1.10" @@ -2044,6 +2067,7 @@ version = "0.1.0" dependencies = [ "ahash", "async-compat", + "async-stream", "async-trait", "bumpalo", "byteorder", diff --git a/cli/src/cmds/status.rs b/cli/src/cmds/status.rs index 7495c178a4a2..2df0ca8fa4a0 100644 --- a/cli/src/cmds/status.rs +++ b/cli/src/cmds/status.rs @@ -504,6 +504,10 @@ impl LocalRuntime for LocalQueryConfig { databend_query::configs::config_storage::DISK_STORAGE_DATA_PATH, conf.storage.disk.data_path, ) + .env( + databend_query::configs::config_storage::DISK_STORAGE_TEMP_DATA_PATH, + conf.storage.disk.temp_data_path, + ) .env( databend_query::configs::config_query::QUERY_HTTP_HANDLER_HOST, conf.query.http_handler_host, diff --git a/common/dal/src/impls/local.rs b/common/dal/src/impls/local.rs index 594650e61e13..59335a426b1e 100644 --- a/common/dal/src/impls/local.rs +++ b/common/dal/src/impls/local.rs @@ -45,10 +45,8 @@ impl Local { pub fn with_path(root_path: PathBuf) -> Local { Local { root: root_path } } -} -impl Local { - fn prefix_with_root(&self, path: &str) -> Result { + pub fn prefix_with_root(&self, path: &str) -> Result { let path = normalize_path(&self.root.join(&path)); if path.starts_with(&self.root) { Ok(path) @@ -56,7 +54,10 @@ impl Local { // TODO customize error code Err(ErrorCode::from(Error::new( ErrorKind::Other, - format!("please dont play with me, malicious path {:?}", path), + format!( + "please dont play with me, malicious path {:?}, root path {:?}", + path, self.root + ), ))) } } @@ -120,7 +121,7 @@ impl DataAccessor for Local { } // from cargo::util::path -fn normalize_path(path: &Path) -> PathBuf { +pub fn normalize_path(path: &Path) -> PathBuf { let mut components = path.components().peekable(); let mut ret = if let Some(c @ Component::Prefix(..)) = components.peek().cloned() { components.next(); diff --git a/common/datavalues/src/data_schema.rs b/common/datavalues/src/data_schema.rs index 5e9ee4bf9032..fe32e5780e31 100644 --- a/common/datavalues/src/data_schema.rs +++ b/common/datavalues/src/data_schema.rs @@ -116,7 +116,16 @@ impl DataSchema { } /// project will do column pruning. - pub fn project(&self, fields: Vec) -> Self { + pub fn project(&self, projection: Vec) -> Self { + let fields = projection + .iter() + .map(|idx| self.fields()[*idx].clone()) + .collect(); + Self::new_from(fields, self.meta().clone()) + } + + /// project will do column pruning. + pub fn project_by_fields(&self, fields: Vec) -> Self { Self::new_from(fields, self.meta().clone()) } diff --git a/common/datavalues/src/types/serializations/mod.rs b/common/datavalues/src/types/serializations/mod.rs index 615a7729cc44..fc3ae2fa6ede 100644 --- a/common/datavalues/src/types/serializations/mod.rs +++ b/common/datavalues/src/types/serializations/mod.rs @@ -32,7 +32,7 @@ pub use number::*; pub use string::*; // capacity. -pub trait TypeSerializer { +pub trait TypeSerializer: Send + Sync { fn serialize_strings(&self, column: &DataColumn) -> Result>; fn de(&mut self, reader: &mut &[u8]) -> Result<()>; diff --git a/common/planners/Cargo.toml b/common/planners/Cargo.toml index aa8ad18f9924..1d401302326f 100644 --- a/common/planners/Cargo.toml +++ b/common/planners/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" [dependencies] # In alphabetical order # Workspace dependencies common-datavalues = {path = "../datavalues"} +common-streams = {path = "../streams"} common-functions = {path = "../functions"} common-exception = {path = "../exception"} common-datablocks = {path = "../datablocks"} diff --git a/common/planners/src/plan_insert_into.rs b/common/planners/src/plan_insert_into.rs index 282de642bf5c..24241f0e2b00 100644 --- a/common/planners/src/plan_insert_into.rs +++ b/common/planners/src/plan_insert_into.rs @@ -12,16 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; -use common_infallible::Mutex; use common_meta_types::MetaId; -type BlockStream = - std::pin::Pin + Sync + Send + 'static>>; - #[derive(serde::Serialize, serde::Deserialize, Clone)] pub struct InsertIntoPlan { pub db_name: String, @@ -29,8 +22,7 @@ pub struct InsertIntoPlan { pub tbl_id: MetaId, pub schema: DataSchemaRef, - #[serde(skip, default = "InsertIntoPlan::empty_stream")] - pub input_stream: Arc>>, + pub values_opt: Option, } impl PartialEq for InsertIntoPlan { @@ -42,14 +34,7 @@ impl PartialEq for InsertIntoPlan { } impl InsertIntoPlan { - pub fn empty_stream() -> Arc>> { - Arc::new(Mutex::new(None)) - } pub fn schema(&self) -> DataSchemaRef { self.schema.clone() } - pub fn set_input_stream(&self, input_stream: BlockStream) { - let mut writer = self.input_stream.lock(); - *writer = Some(input_stream); - } } diff --git a/common/planners/src/plan_read_datasource.rs b/common/planners/src/plan_read_datasource.rs index 430ba10d5d35..e6b14de823a1 100644 --- a/common/planners/src/plan_read_datasource.rs +++ b/common/planners/src/plan_read_datasource.rs @@ -52,7 +52,7 @@ impl ReadDataSourcePlan { .clone() .map(|x| { let fields: Vec<_> = x.iter().map(|(_, f)| f.clone()).collect(); - Arc::new(self.table_info.schema().project(fields)) + Arc::new(self.table_info.schema().project_by_fields(fields)) }) .unwrap_or_else(|| self.table_info.schema()) } @@ -63,4 +63,22 @@ impl ReadDataSourcePlan { .clone() .unwrap_or_else(|| self.table_info.schema().fields_map()) } + + pub fn projections(&self) -> Vec { + let default_proj = || { + (0..self.table_info.schema().fields().len()) + .into_iter() + .collect::>() + }; + + if let Some(Extras { + projection: Some(prj), + .. + }) = &self.push_downs + { + prj.clone() + } else { + default_proj() + } + } } diff --git a/common/streams/Cargo.toml b/common/streams/Cargo.toml index 3b0403353ba2..4ac20b4f41c2 100644 --- a/common/streams/Cargo.toml +++ b/common/streams/Cargo.toml @@ -14,6 +14,7 @@ common-datablocks = {path = "../datablocks"} common-datavalues = {path = "../datavalues"} common-exception = {path = "../exception"} common-io = {path = "../io"} +common-dal = {path = "../dal"} # Github dependencies @@ -21,6 +22,10 @@ common-io = {path = "../io"} crossbeam = "0.8" futures = "0.3" pin-project-lite = "^0.2" +async-trait = "0.1" +async-stream = "0.3.2" +csv-async = {version = "1.1", features = ["tokio"] } +tokio-stream = { version = "0.1", features = ["net"] } [dev-dependencies] pretty_assertions = "1.0" diff --git a/common/streams/src/lib.rs b/common/streams/src/lib.rs index e0335d5d61fd..db901a21664c 100644 --- a/common/streams/src/lib.rs +++ b/common/streams/src/lib.rs @@ -30,7 +30,6 @@ mod stream_abort; mod stream_correct_with_schema; mod stream_datablock; mod stream_limit_by; -mod stream_parquet; mod stream_progress; mod stream_skip; mod stream_sort; @@ -39,12 +38,11 @@ mod stream_sub_queries; mod stream_take; pub use sources::*; -pub use stream::SendableDataBlockStream; +pub use stream::*; pub use stream_abort::AbortStream; pub use stream_correct_with_schema::CorrectWithSchemaStream; pub use stream_datablock::DataBlockStream; pub use stream_limit_by::LimitByStream; -pub use stream_parquet::ParquetStream; pub use stream_progress::ProgressStream; pub use stream_skip::SkipStream; pub use stream_sort::SortStream; diff --git a/common/streams/src/sources/mod.rs b/common/streams/src/sources/mod.rs index 3410f1780e24..02214156b32d 100644 --- a/common/streams/src/sources/mod.rs +++ b/common/streams/src/sources/mod.rs @@ -14,6 +14,7 @@ mod source; mod source_csv; +mod source_parquet; mod source_values; #[cfg(test)] @@ -22,4 +23,5 @@ mod source_test; pub use source::FormatSettings; pub use source::Source; pub use source_csv::CsvSource; +pub use source_parquet::ParquetSource; pub use source_values::ValueSource; diff --git a/common/streams/src/sources/source.rs b/common/streams/src/sources/source.rs index ecbe36392918..0cfca794e7eb 100644 --- a/common/streams/src/sources/source.rs +++ b/common/streams/src/sources/source.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; use common_datablocks::DataBlock; use common_exception::Result; -pub trait Source: Sync + Send { - fn read(&mut self) -> Result>; +#[async_trait] +pub trait Source: Send { + async fn read(&mut self) -> Result>; } #[allow(dead_code)] diff --git a/common/streams/src/sources/source_csv.rs b/common/streams/src/sources/source_csv.rs index 3f80636e884f..6a7d5e366b67 100644 --- a/common/streams/src/sources/source_csv.rs +++ b/common/streams/src/sources/source_csv.rs @@ -12,31 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io; - -use common_arrow::arrow::io::csv::read::ByteRecord; -use common_arrow::arrow::io::csv::read::Reader; -use common_arrow::arrow::io::csv::read::ReaderBuilder; +use async_trait::async_trait; +use common_base::tokio; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; use common_exception::ToErrorCode; +use csv_async::AsyncReader; +use csv_async::AsyncReaderBuilder; +use tokio_stream::StreamExt; use crate::Source; pub struct CsvSource { - reader: Reader, + reader: AsyncReader, schema: DataSchemaRef, block_size: usize, rows: usize, } impl CsvSource -where R: io::Read + Sync + Send +where R: tokio::io::AsyncRead + Unpin + Send + Sync { - pub fn new(reader: R, schema: DataSchemaRef, block_size: usize) -> Self { - let reader = ReaderBuilder::new().has_headers(false).from_reader(reader); + pub fn new(reader: R, schema: DataSchemaRef, header: bool, block_size: usize) -> Self { + let reader = AsyncReaderBuilder::new() + .has_headers(header) + .create_reader(reader); Self { reader, @@ -47,11 +49,11 @@ where R: io::Read + Sync + Send } } +#[async_trait] impl Source for CsvSource -where R: io::Read + Sync + Send +where R: tokio::io::AsyncRead + Unpin + Send + Sync { - fn read(&mut self) -> Result> { - let mut record = ByteRecord::new(); + async fn read(&mut self) -> Result> { let mut desers = self .schema .fields() @@ -59,29 +61,33 @@ where R: io::Read + Sync + Send .map(|f| f.data_type().create_serializer(self.block_size)) .collect::>>()?; - for row in 0..self.block_size { - let v = self - .reader - .read_byte_record(&mut record) - .map_err_to_code(ErrorCode::BadBytes, || { - format!("Parse csv error at line {}", self.rows) - })?; + let mut rows = 0; + let mut records = self.reader.byte_records(); - if !v { - if row == 0 { - return Ok(None); - } + while let Some(record) = records.next().await { + let record = record.map_err_to_code(ErrorCode::BadBytes, || { + format!("Parse csv error at line {}", self.rows) + })?; + + if record.is_empty() { break; } - desers - .iter_mut() - .enumerate() - .for_each(|(col, deser)| match record.get(col) { + for (col, deser) in desers.iter_mut().enumerate() { + match record.get(col) { Some(bytes) => deser.de_text(bytes).unwrap(), None => deser.de_null(), - }); - + } + } + rows += 1; self.rows += 1; + + if rows >= self.block_size { + break; + } + } + + if rows == 0 { + return Ok(None); } let series = desers diff --git a/common/streams/src/sources/source_parquet.rs b/common/streams/src/sources/source_parquet.rs new file mode 100644 index 000000000000..5ca6d04f5284 --- /dev/null +++ b/common/streams/src/sources/source_parquet.rs @@ -0,0 +1,132 @@ +// Copyright 2020 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use common_arrow::arrow::datatypes::Schema as ArrowSchema; +use common_arrow::arrow::io::parquet::read::decompress; +use common_arrow::arrow::io::parquet::read::page_stream_to_array; +use common_arrow::arrow::io::parquet::read::read_metadata_async; +use common_arrow::arrow::io::parquet::read::schema::FileMetaData; +use common_arrow::parquet::read::get_page_stream; +use common_dal::DataAccessor; +use common_datablocks::DataBlock; +use common_datavalues::prelude::DataColumn; +use common_datavalues::series::IntoSeries; +use common_datavalues::DataSchemaRef; +use common_exception::ErrorCode; +use common_exception::Result; +use futures::StreamExt; +use futures::TryStreamExt; + +use crate::Source; + +pub struct ParquetSource { + data_accessor: Arc, + path: String, + + block_schema: DataSchemaRef, + arrow_table_schema: ArrowSchema, + projection: Vec, + row_group: usize, + row_groups: usize, + metadata: Option, +} + +impl ParquetSource { + pub fn new( + data_accessor: Arc, + path: String, + table_schema: DataSchemaRef, + projection: Vec, + ) -> Self { + let block_schema = Arc::new(table_schema.project(projection.clone())); + Self { + data_accessor, + path, + block_schema, + arrow_table_schema: table_schema.to_arrow(), + projection, + row_group: 0, + row_groups: 0, + metadata: None, + } + } +} + +#[async_trait] +impl Source for ParquetSource { + async fn read(&mut self) -> Result> { + let metadata = match self.metadata.clone() { + Some(m) => m, + None => { + let mut reader = self + .data_accessor + .get_input_stream(self.path.as_str(), None)?; + let m = read_metadata_async(&mut reader) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + self.metadata = Some(m.clone()); + self.row_groups = m.row_groups.len(); + self.row_group = 0; + m + } + }; + + if self.row_group >= self.row_groups { + return Ok(None); + } + let col_num = self.projection.len(); + let row_group = self.row_group; + let cols = self + .projection + .clone() + .into_iter() + .map(|idx| (metadata.row_groups[row_group].column(idx).clone(), idx)); + + let fields = self.arrow_table_schema.fields(); + + let stream = futures::stream::iter(cols).map(|(col_meta, idx)| { + let column_chunk_meta = (metadata.row_groups[row_group].columns()[idx]).clone(); + let data_accessor = self.data_accessor.clone(); + let path = self.path.clone(); + + async move { + let mut reader = data_accessor.get_input_stream(path.as_str(), None)?; + // TODO cache block column + let col_pages = + get_page_stream(&col_meta, &mut reader, vec![], Arc::new(|_, _| true)) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let pages = + col_pages.map(|compressed_page| decompress(compressed_page?, &mut vec![])); + let array = + page_stream_to_array(pages, &column_chunk_meta, fields[idx].data_type.clone()) + .await?; + let array: Arc = array.into(); + Ok::<_, ErrorCode>(DataColumn::Array(array.into_series())) + } + }); + + // TODO configuration of the buffer size + let buffer_size = 10; + let n = std::cmp::min(buffer_size, col_num); + let data_cols = stream.buffered(n).try_collect().await?; + + self.row_group += 1; + let block = DataBlock::create(self.block_schema.clone(), data_cols); + Ok(Some(block)) + } +} diff --git a/common/streams/src/sources/source_test.rs b/common/streams/src/sources/source_test.rs index c97af785a566..18f3bcd55c37 100644 --- a/common/streams/src/sources/source_test.rs +++ b/common/streams/src/sources/source_test.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_base::tokio; use common_datablocks::assert_blocks_eq; use common_datavalues::DataField; use common_datavalues::DataSchemaRefExt; @@ -21,8 +22,8 @@ use crate::CsvSource; use crate::Source; use crate::ValueSource; -#[test] -fn test_parse_values() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_parse_values() { let buffer = "(1, 'str', 1) , (-1, ' str ' , 1.1) , ( 2, 'aa aa', 2.2), (3, \"33'33\", 3.3) "; @@ -32,7 +33,7 @@ fn test_parse_values() { DataField::new("c", DataType::Float64, false), ]); let mut values_source = ValueSource::new(buffer.as_bytes(), schema, 10); - let block = values_source.read().unwrap().unwrap(); + let block = values_source.read().await.unwrap().unwrap(); assert_blocks_eq( vec![ "+----+-------+-----+", @@ -47,12 +48,12 @@ fn test_parse_values() { &[block], ); - let block = values_source.read().unwrap(); + let block = values_source.read().await.unwrap(); assert!(block.is_none()); } -#[test] -fn test_parse_csvs() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_parse_csvs() { let buffer = "1,\"1\",1.11\n2,\"2\",2\n3,\"3-'3'-3\",3\n"; let schema = DataSchemaRefExt::create(vec![ @@ -60,8 +61,8 @@ fn test_parse_csvs() { DataField::new("b", DataType::String, false), DataField::new("c", DataType::Float64, false), ]); - let mut values_source = CsvSource::new(buffer.as_bytes(), schema, 10); - let block = values_source.read().unwrap().unwrap(); + let mut csv_source = CsvSource::new(buffer.as_bytes(), schema, false, 10); + let block = csv_source.read().await.unwrap().unwrap(); assert_blocks_eq( vec![ "+---+---------+------+", @@ -75,6 +76,6 @@ fn test_parse_csvs() { &[block], ); - let block = values_source.read().unwrap(); + let block = csv_source.read().await.unwrap(); assert!(block.is_none()); } diff --git a/common/streams/src/sources/source_values.rs b/common/streams/src/sources/source_values.rs index c30db0caf0cb..aaabb2743642 100644 --- a/common/streams/src/sources/source_values.rs +++ b/common/streams/src/sources/source_values.rs @@ -15,6 +15,7 @@ use std::io; use std::io::BufReader; +use async_trait::async_trait; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::Result; @@ -43,10 +44,11 @@ where R: io::Read + Send + Sync } } +#[async_trait] impl Source for ValueSource where R: io::Read + Send + Sync { - fn read(&mut self) -> Result> { + async fn read(&mut self) -> Result> { let reader = &mut self.reader; let mut buf = Vec::new(); let mut temp = Vec::new(); diff --git a/common/streams/src/stream_parquet.rs b/common/streams/src/stream_parquet.rs deleted file mode 100644 index 368bc6010b7b..000000000000 --- a/common/streams/src/stream_parquet.rs +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2020 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::task::Context; -use std::task::Poll; - -use common_datablocks::DataBlock; -use common_exception::Result; -use crossbeam::channel::Receiver; -use futures::Stream; - -pub struct ParquetStream { - response_rx: Receiver>>, -} - -impl ParquetStream { - pub fn try_create(response_rx: Receiver>>) -> Result { - Ok(ParquetStream { response_rx }) - } -} - -impl Stream for ParquetStream { - type Item = Result; - - fn poll_next(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - match self.response_rx.recv() { - Ok(block) => Poll::Ready(block), - // RecvError means receiver has exited and closed the channel - Err(_) => Poll::Ready(None), - } - } -} diff --git a/common/streams/src/stream_source.rs b/common/streams/src/stream_source.rs index 3baaf8e3d938..b026a82195ea 100644 --- a/common/streams/src/stream_source.rs +++ b/common/streams/src/stream_source.rs @@ -12,33 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::task::Context; -use std::task::Poll; - -use common_datablocks::DataBlock; +use async_stream::stream; use common_exception::Result; -use pin_project_lite::pin_project; +use crate::SendableDataBlockStream; use crate::Source; -pin_project! { - pub struct SourceStream { - source: Box, - } +pub struct SourceStream { + source: Box, } impl SourceStream { - pub fn create(source: Box) -> Self { + pub fn new(source: Box) -> Self { SourceStream { source } } -} - -impl futures::Stream for SourceStream { - type Item = Result; - fn poll_next(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - let this = self.project(); - let block = this.source.read()?; - Poll::Ready(block.map(Ok)) + pub async fn execute(self) -> Result { + let mut source = self.source; + let s = stream! { + loop { + let block = source.read().await; + match block { + Ok(None) => break, + Ok(Some(b)) => yield(Ok(b)), + Err(e) => yield(Err(e)), + } + } + }; + Ok(Box::pin(s)) } } diff --git a/query/Cargo.toml b/query/Cargo.toml index 0233c67ac760..72f8e80ea9d4 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -56,6 +56,7 @@ sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "ec ahash = "0.7.6" async-compat = "0.2.1" async-trait = "0.1" +async-stream = "0.3.2" poem = { version = "1.0.23", features = ["rustls"] } bumpalo = "3.8.0" byteorder = "1" diff --git a/query/benches/suites/mod.rs b/query/benches/suites/mod.rs index da2dcf0b48b3..09a945f16d41 100644 --- a/query/benches/suites/mod.rs +++ b/query/benches/suites/mod.rs @@ -33,7 +33,7 @@ pub async fn select_executor(sql: &str) -> Result<()> { if let PlanNode::Select(plan) = PlanParser::create(ctx.clone()).build_from_sql(sql)? { let executor = SelectInterpreter::try_create(ctx, plan)?; - let mut stream = executor.execute().await?; + let mut stream = executor.execute(None).await?; while let Some(_block) = stream.next().await {} } else { assert!(false) diff --git a/query/src/catalogs/table.rs b/query/src/catalogs/table.rs index 85ce395d1434..5b0aef33d86c 100644 --- a/query/src/catalogs/table.rs +++ b/query/src/catalogs/table.rs @@ -91,6 +91,7 @@ pub trait Table: Sync + Send { &self, _io_ctx: Arc, _insert_plan: InsertIntoPlan, + _stream: SendableDataBlockStream, ) -> Result<()> { Err(ErrorCode::UnImplement(format!( "append data for local table {} is not implemented", diff --git a/query/src/configs/config_storage.rs b/query/src/configs/config_storage.rs index 2c8316cdc209..7c3a1d8a91c2 100644 --- a/query/src/configs/config_storage.rs +++ b/query/src/configs/config_storage.rs @@ -24,6 +24,7 @@ pub const STORAGE_TYPE: &str = "STORAGE_TYPE"; // Disk Storage env. pub const DISK_STORAGE_DATA_PATH: &str = "DISK_STORAGE_DATA_PATH"; +pub const DISK_STORAGE_TEMP_DATA_PATH: &str = "DISK_STORAGE_TEMP_DATA_PATH"; // S3 Storage env. const S3_STORAGE_REGION: &str = "S3_STORAGE_REGION"; @@ -63,15 +64,19 @@ impl FromStr for StorageType { Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, StructOpt, StructOptToml, )] pub struct DiskStorageConfig { - #[structopt(long, env = DISK_STORAGE_DATA_PATH, default_value = "", help = "Disk storage backend address")] + #[structopt(long, env = DISK_STORAGE_DATA_PATH, default_value = "", help = "Disk storage backend data path")] #[serde(default)] pub data_path: String, + #[structopt(long, env = DISK_STORAGE_TEMP_DATA_PATH, default_value = "", help = "Disk storage tempory data path for external data")] + #[serde(default)] + pub temp_data_path: String, } impl DiskStorageConfig { pub fn default() -> Self { DiskStorageConfig { data_path: "".to_string(), + temp_data_path: "".to_string(), } } } @@ -199,6 +204,14 @@ impl StorageConfig { DISK_STORAGE_DATA_PATH ); + env_helper!( + mut_config.storage, + disk, + temp_data_path, + String, + DISK_STORAGE_TEMP_DATA_PATH + ); + // S3. env_helper!(mut_config.storage, s3, region, String, S3_STORAGE_REGION); env_helper!( diff --git a/query/src/configs/config_test.rs b/query/src/configs/config_test.rs index ff42e6e1787d..d6d7ea6525c3 100644 --- a/query/src/configs/config_test.rs +++ b/query/src/configs/config_test.rs @@ -76,6 +76,7 @@ storage_type = \"disk\" [storage.disk] data_path = \"\" +temp_data_path = \"\" [storage.s3] region = \"\" diff --git a/query/src/datasources/common/dal_builder_test.rs b/query/src/datasources/common/dal_builder_test.rs index 28860b6e9c49..5805a10e195b 100644 --- a/query/src/datasources/common/dal_builder_test.rs +++ b/query/src/datasources/common/dal_builder_test.rs @@ -13,6 +13,8 @@ // limitations under the License. // +use std::env; + use common_dal::DataAccessorBuilder; use crate::configs::AzureStorageBlobConfig; @@ -27,6 +29,8 @@ fn test_dal_builder() -> common_exception::Result<()> { storage_type: "disk".to_string(), disk: DiskStorageConfig { data_path: "/tmp".to_string(), + /// temporary directory for testing, default to current directory + temp_data_path: env::current_dir()?.display().to_string(), }, s3: S3StorageConfig { region: "".to_string(), diff --git a/query/src/datasources/database/example/README.md b/query/src/datasources/database/example/README.md index 0a3397a86beb..99a47d28a61f 100644 --- a/query/src/datasources/database/example/README.md +++ b/query/src/datasources/database/example/README.md @@ -178,6 +178,7 @@ pub trait Table: Sync + Send { &self, _ctx: DatabendQueryContextRef, _insert_plan: InsertIntoPlan, + _stream: SendableDataBlockStream, ) -> Result<()> { Err(ErrorCode::UnImplement(format!( "append data for local table {} is not implemented", diff --git a/query/src/datasources/database/example/example_table.rs b/query/src/datasources/database/example/example_table.rs index d97ba6503361..d7265a557f70 100644 --- a/query/src/datasources/database/example/example_table.rs +++ b/query/src/datasources/database/example/example_table.rs @@ -88,6 +88,7 @@ impl Table for ExampleTable { &self, _io_ctx: Arc, _insert_plan: InsertIntoPlan, + _stream: SendableDataBlockStream, ) -> Result<()> { Ok(()) } diff --git a/query/src/datasources/table/csv/csv_table.rs b/query/src/datasources/table/csv/csv_table.rs index b0fef92e308d..5efbff4fad11 100644 --- a/query/src/datasources/table/csv/csv_table.rs +++ b/query/src/datasources/table/csv/csv_table.rs @@ -17,26 +17,31 @@ use std::any::Any; use std::fs::File; use std::sync::Arc; +use async_stream::stream; +use common_base::tokio; use common_context::DataContext; use common_context::IOContext; use common_context::TableIOContext; +use common_dal::Local; use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::TableInfo; use common_planners::Extras; +use common_planners::Part; use common_planners::Partitions; use common_planners::ReadDataSourcePlan; use common_planners::Statistics; +use common_streams::CsvSource; use common_streams::SendableDataBlockStream; +use common_streams::Source; use crate::catalogs::Table; use crate::datasources::common::count_lines; -use crate::datasources::common::generate_parts; -use crate::datasources::table::csv::csv_table_stream::CsvTableStream; use crate::sessions::DatabendQueryContext; pub struct CsvTable { table_info: TableInfo, + // TODO: support s3 protocol && support gob matcher files file: String, has_header: bool, } @@ -77,35 +82,69 @@ impl Table for CsvTable { fn read_partitions( &self, - io_ctx: Arc, + _io_ctx: Arc, _push_downs: Option, ) -> Result<(Statistics, Partitions)> { - let start_line: usize = if self.has_header { 1 } else { 0 }; let file = &self.file; let lines_count = count_lines(File::open(file.clone())?)?; let bytes = File::open(file.clone())?.metadata()?.len() as usize; - let parts = generate_parts( - start_line as u64, - io_ctx.get_max_threads() as u64, - lines_count as u64, - ); + let parts = vec![Part { + name: file.clone(), + version: 0, + }]; Ok((Statistics::new_estimated(lines_count, bytes), parts)) } async fn read( &self, io_ctx: Arc, - _plan: &ReadDataSourcePlan, + plan: &ReadDataSourcePlan, ) -> Result { let ctx: Arc = io_ctx .get_user_data()? .expect("DatabendQueryContext should not be None"); - Ok(Box::pin(CsvTableStream::try_create( - ctx, - self.table_info.schema(), - self.file.clone(), - )?)) + let conf = ctx.get_config().storage.disk; + let local = Local::new(conf.temp_data_path.as_str()); + + let ctx_clone = ctx.clone(); + let schema = plan.schema(); + let block_size = ctx.get_settings().get_max_block_size()? as usize; + let has_header = self.has_header; + + let s = stream! { + loop { + let partitions = ctx_clone.try_get_partitions(1); + match partitions { + Ok(partitions) => { + if partitions.is_empty() { + break; + } + + /// TODO use dal, but inputstream is not send which is need for csv-async reader + let part = partitions.get(0).unwrap(); + let file = part.name.clone(); + + let path = local.prefix_with_root(&file)?; + let std_file = std::fs::File::open(path)?; + let reader = tokio::fs::File::from_std(std_file); + + let mut source = CsvSource::new(reader, schema.clone(), has_header, block_size); + + loop { + let block = source.read().await; + match block { + Ok(None) => break, + Ok(Some(b)) => yield(Ok(b)), + Err(e) => yield(Err(e)), + } + } + } + Err(e) => yield(Err(e)) + } + } + }; + Ok(Box::pin(s)) } } diff --git a/query/src/datasources/table/csv/csv_table_stream.rs b/query/src/datasources/table/csv/csv_table_stream.rs deleted file mode 100644 index 229d6bace8aa..000000000000 --- a/query/src/datasources/table/csv/csv_table_stream.rs +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2020 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::convert::TryFrom; -use std::sync::Arc; -use std::task::Poll; - -use common_arrow::arrow::io::csv::read; -use common_datablocks::DataBlock; -use common_datavalues::DataSchemaRef; -use common_exception::ErrorCode; -use common_exception::Result; -use futures::Stream; - -use crate::sessions::DatabendQueryContextRef; - -pub struct CsvTableStream { - ctx: DatabendQueryContextRef, - file: String, - schema: DataSchemaRef, -} - -impl CsvTableStream { - pub fn try_create( - ctx: DatabendQueryContextRef, - schema: DataSchemaRef, - file: String, - ) -> Result { - Ok(CsvTableStream { ctx, file, schema }) - } - - pub fn try_get_one_block(&self) -> Result> { - let partitions = self.ctx.try_get_partitions(1)?; - if partitions.is_empty() { - return Ok(None); - } - - let part = partitions[0].clone(); - let names: Vec<_> = part.name.split('-').collect(); - let begin: usize = names[1].parse()?; - let end: usize = names[2].parse()?; - let block_size = end - begin; - - let arrow_schema = Arc::new(self.schema.to_arrow()); - let mut reader = read::ReaderBuilder::new() - .has_headers(false) - .from_path(&self.file) - .map_err(|e| ErrorCode::CannotReadFile(e.to_string()))?; - - let mut rows = vec![read::ByteRecord::default(); block_size]; - let rows_read = read::read_rows(&mut reader, begin, &mut rows)?; - let rows = &rows[..rows_read]; - - let record = read::deserialize_batch( - rows, - arrow_schema.fields(), - None, - 0, - read::deserialize_column, - )?; - - let block = DataBlock::try_from(record)?; - Ok(Some(block)) - } -} - -impl Stream for CsvTableStream { - type Item = Result; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - _: &mut std::task::Context<'_>, - ) -> Poll> { - let block = self.try_get_one_block()?; - Poll::Ready(block.map(Ok)) - } -} diff --git a/query/src/datasources/table/csv/csv_table_test.rs b/query/src/datasources/table/csv/csv_table_test.rs index 79fac725beaf..3354c050dbe1 100644 --- a/query/src/datasources/table/csv/csv_table_test.rs +++ b/query/src/datasources/table/csv/csv_table_test.rs @@ -136,12 +136,12 @@ async fn test_csv_table_parse_error() -> Result<()> { "+---------+---------+---------+---------+", "| column1 | column2 | column3 | column4 |", "+---------+---------+---------+---------+", - "| 1 | NULL | 100 | NULL |", - "| 2 | NULL | 80 | NULL |", - "| 3 | NULL | 60 | NULL |", - "| 4 | NULL | 70 | NULL |", - "| 5 | NULL | 55 | NULL |", - "| 6 | NULL | 99 | NULL |", + "| 1 | 0 | 100 | NULL |", + "| 2 | 0 | 80 | NULL |", + "| 3 | 0 | 60 | NULL |", + "| 4 | 0 | 70 | NULL |", + "| 5 | 0 | 55 | NULL |", + "| 6 | 0 | 99 | NULL |", "+---------+---------+---------+---------+", ], &result.unwrap(), diff --git a/query/src/datasources/table/csv/mod.rs b/query/src/datasources/table/csv/mod.rs index 18e0882ab904..de1449de024d 100644 --- a/query/src/datasources/table/csv/mod.rs +++ b/query/src/datasources/table/csv/mod.rs @@ -13,6 +13,5 @@ // limitations under the License. // pub mod csv_table; -pub mod csv_table_stream; #[cfg(test)] mod csv_table_test; diff --git a/query/src/datasources/table/fuse/index/min_max_test.rs b/query/src/datasources/table/fuse/index/min_max_test.rs index d5e953c202ce..76b67fe30a34 100644 --- a/query/src/datasources/table/fuse/index/min_max_test.rs +++ b/query/src/datasources/table/fuse/index/min_max_test.rs @@ -25,7 +25,6 @@ use common_datavalues::DataField; use common_datavalues::DataSchemaRefExt; use common_datavalues::DataType; use common_exception::Result; -use common_infallible::Mutex; use common_meta_types::TableMeta; use common_planners::col; use common_planners::lit; @@ -74,10 +73,10 @@ async fn test_min_max_index() -> Result<()> { let blocks = (0..num) .into_iter() .map(|idx| { - DataBlock::create_by_array(test_schema.clone(), vec![ + Ok(DataBlock::create_by_array(test_schema.clone(), vec![ Series::new(vec![idx + 1, idx + 2, idx + 3]), Series::new(vec![idx * num + 1, idx * num + 2, idx * num + 3]), - ]) + ])) }) .collect::>(); @@ -86,11 +85,14 @@ async fn test_min_max_index() -> Result<()> { tbl_name: test_tbl_name.to_string(), tbl_id: table.get_id(), schema: test_schema.clone(), - input_stream: Arc::new(Mutex::new(Some(Box::pin(futures::stream::iter(blocks))))), + values_opt: None, }; let io_ctx = Arc::new(ctx.get_cluster_table_io_context()?); let da = io_ctx.get_data_accessor()?; - table.append_data(io_ctx.clone(), insert_into_plan).await?; + let stream = Box::pin(futures::stream::iter(blocks)); + table + .append_data(io_ctx.clone(), insert_into_plan, stream) + .await?; // get the latest tbl let table = catalog diff --git a/query/src/datasources/table/fuse/io/block_appender.rs b/query/src/datasources/table/fuse/io/block_appender.rs index f21754993aef..fc2e519d2205 100644 --- a/query/src/datasources/table/fuse/io/block_appender.rs +++ b/query/src/datasources/table/fuse/io/block_appender.rs @@ -23,6 +23,7 @@ use common_datablocks::DataBlock; use common_datavalues::DataSchema; use common_exception::ErrorCode; use common_exception::Result; +use common_streams::SendableDataBlockStream; use futures::StreamExt; use rusoto_core::ByteStream; @@ -30,9 +31,6 @@ use crate::datasources::table::fuse::util; use crate::datasources::table::fuse::SegmentInfo; use crate::datasources::table::fuse::Stats; -pub type BlockStream = - std::pin::Pin + Sync + Send + 'static>>; - /// dummy struct, namespace placeholder pub struct BlockAppender; @@ -40,7 +38,7 @@ impl BlockAppender { // TODO should return a stream of SegmentInfo (batch blocks into segments) pub async fn append_blocks( data_accessor: Arc, - mut stream: BlockStream, + mut stream: SendableDataBlockStream, data_schema: &DataSchema, ) -> Result { let mut stats_acc = util::StatisticsAccumulator::new(); @@ -48,6 +46,7 @@ impl BlockAppender { // accumulate the stats and save the blocks while let Some(block) = stream.next().await { + let block = block?; stats_acc.acc(&block)?; let schema = block.schema().to_arrow(); let location = util::gen_unique_block_location(); diff --git a/query/src/datasources/table/fuse/io/block_appender_test.rs b/query/src/datasources/table/fuse/io/block_appender_test.rs index 63e2561bf869..10bc0ab8c2cc 100644 --- a/query/src/datasources/table/fuse/io/block_appender_test.rs +++ b/query/src/datasources/table/fuse/io/block_appender_test.rs @@ -32,7 +32,7 @@ async fn test_fuse_table_block_appender() { let local_fs = common_dal::Local::with_path(tmp_dir.path().to_owned()); let schema = DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]); let block = DataBlock::create_by_array(schema.clone(), vec![Series::new(vec![1, 2, 3])]); - let block_stream = futures::stream::iter(vec![block]); + let block_stream = futures::stream::iter(vec![Ok(block)]); let r = BlockAppender::append_blocks(Arc::new(local_fs), Box::pin(block_stream), schema.as_ref()) .await; diff --git a/query/src/datasources/table/fuse/io/block_reader.rs b/query/src/datasources/table/fuse/io/block_reader.rs index b2e4ff01e7ef..0a588c0b2c4f 100644 --- a/query/src/datasources/table/fuse/io/block_reader.rs +++ b/query/src/datasources/table/fuse/io/block_reader.rs @@ -66,7 +66,6 @@ pub async fn do_read( let col_num = projection.len(); // TODO pass in parquet file len let mut reader = data_accessor.get_input_stream(loc, None)?; - // TODO cache parquet meta let metadata = read_metadata_async(&mut reader) .await diff --git a/query/src/datasources/table/fuse/table.rs b/query/src/datasources/table/fuse/table.rs index d43aa0b23871..55d39ecb57d5 100644 --- a/query/src/datasources/table/fuse/table.rs +++ b/query/src/datasources/table/fuse/table.rs @@ -89,8 +89,9 @@ impl Table for FuseTable { &self, io_ctx: Arc, insert_plan: InsertIntoPlan, + stream: SendableDataBlockStream, ) -> Result<()> { - self.do_append(io_ctx, insert_plan).await + self.do_append(io_ctx, insert_plan, stream).await } async fn truncate( diff --git a/query/src/datasources/table/fuse/table_do_append.rs b/query/src/datasources/table/fuse/table_do_append.rs index b1adc9919bfc..ae51838279c7 100644 --- a/query/src/datasources/table/fuse/table_do_append.rs +++ b/query/src/datasources/table/fuse/table_do_append.rs @@ -18,11 +18,11 @@ use std::sync::Arc; use common_context::IOContext; use common_context::TableIOContext; use common_datavalues::DataSchema; -use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::MetaId; use common_meta_types::MetaVersion; use common_planners::InsertIntoPlan; +use common_streams::SendableDataBlockStream; use uuid::Uuid; use crate::datasources::table::fuse::util; @@ -39,25 +39,15 @@ impl FuseTable { &self, io_ctx: Arc, insert_plan: InsertIntoPlan, + stream: SendableDataBlockStream, ) -> Result<()> { - // 1. take out input stream from plan - // Assumes that, insert_interpreter has properly batched data blocks - let block_stream = { - match insert_plan.input_stream.lock().take() { - Some(s) => s, - None => return Err(ErrorCode::EmptyData("input stream consumed")), - } - }; - + // 1. get da let da = io_ctx.get_data_accessor()?; // 2. Append blocks to storage - let segment_info = BlockAppender::append_blocks( - da.clone(), - block_stream, - self.table_info.schema().as_ref(), - ) - .await?; + let segment_info = + BlockAppender::append_blocks(da.clone(), stream, self.table_info.schema().as_ref()) + .await?; // 3. save segment info let seg_loc = util::gen_segment_info_location(); diff --git a/query/src/datasources/table/fuse/table_test.rs b/query/src/datasources/table/fuse/table_test.rs index b8b9fa65d4d7..7dbd3a247200 100644 --- a/query/src/datasources/table/fuse/table_test.rs +++ b/query/src/datasources/table/fuse/table_test.rs @@ -53,8 +53,14 @@ async fn test_fuse_table_simple_case() -> Result<()> { // insert 10 blocks let num_blocks = 5; let io_ctx = Arc::new(ctx.get_cluster_table_io_context()?); - let insert_into_plan = fixture.insert_plan_of_table(table.as_ref(), num_blocks); - table.append_data(io_ctx.clone(), insert_into_plan).await?; + let insert_into_plan = fixture.insert_plan_of_table(table.as_ref()); + let stream = Box::pin(futures::stream::iter(TestFixture::gen_block_stream( + num_blocks, + ))); + + table + .append_data(io_ctx.clone(), insert_into_plan, stream) + .await?; // get the latest tbl let prev_version = table.get_table_info().ident.version; @@ -150,8 +156,15 @@ async fn test_fuse_table_truncate() -> Result<()> { assert!(r.is_ok()); // 2. truncate table which has data - let insert_into_plan = fixture.insert_plan_of_table(table.as_ref(), 10); - table.append_data(io_ctx.clone(), insert_into_plan).await?; + let num_blocks = 10; + let insert_into_plan = fixture.insert_plan_of_table(table.as_ref()); + let stream = Box::pin(futures::stream::iter(TestFixture::gen_block_stream( + num_blocks, + ))); + + table + .append_data(io_ctx.clone(), insert_into_plan, stream) + .await?; let source_plan = table.read_plan(io_ctx.clone(), None)?; // get the latest tbl diff --git a/query/src/datasources/table/fuse/table_test_fixture.rs b/query/src/datasources/table/fuse/table_test_fixture.rs index 631b36310284..6b52393c17b7 100644 --- a/query/src/datasources/table/fuse/table_test_fixture.rs +++ b/query/src/datasources/table/fuse/table_test_fixture.rs @@ -13,8 +13,6 @@ // limitations under the License. // -use std::sync::Arc; - use common_datablocks::DataBlock; use common_datavalues::prelude::Series; use common_datavalues::prelude::SeriesFrom; @@ -22,7 +20,7 @@ use common_datavalues::DataField; use common_datavalues::DataSchemaRef; use common_datavalues::DataSchemaRefExt; use common_datavalues::DataType; -use common_infallible::Mutex; +use common_exception::Result; use common_meta_types::TableMeta; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; @@ -49,6 +47,7 @@ impl TestFixture { config.storage.storage_type = "Disk".to_string(); // use `TempDir` as root path (auto clean) config.storage.disk.data_path = tmp_dir.path().to_str().unwrap().to_string(); + config.storage.disk.temp_data_path = tmp_dir.path().to_str().unwrap().to_string(); let ctx = crate::tests::try_create_context_with_config(config).unwrap(); let random_prefix: String = Uuid::new_v4().to_simple().to_string(); @@ -97,25 +96,25 @@ impl TestFixture { } } - pub fn insert_plan_of_table(&self, table: &dyn Table, block_num: u32) -> InsertIntoPlan { + pub fn insert_plan_of_table(&self, table: &dyn Table) -> InsertIntoPlan { InsertIntoPlan { db_name: self.default_db(), tbl_name: self.default_table(), tbl_id: table.get_id(), schema: TestFixture::default_schema(), - input_stream: Arc::new(Mutex::new(Some(Box::pin(futures::stream::iter( - TestFixture::gen_block_stream(block_num), - ))))), + values_opt: None, } } - pub fn gen_block_stream(num: u32) -> Vec { + pub fn gen_block_stream(num: u32) -> Vec> { (0..num) .into_iter() .map(|_v| { let schema = DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]); - DataBlock::create_by_array(schema, vec![Series::new(vec![1, 2, 3])]) + Ok(DataBlock::create_by_array(schema, vec![Series::new(vec![ + 1, 2, 3, + ])])) }) .collect() } diff --git a/query/src/datasources/table/fuse/util/statistic_helper_test.rs b/query/src/datasources/table/fuse/util/statistic_helper_test.rs index bba5bd5a48e2..71b91bd0efa8 100644 --- a/query/src/datasources/table/fuse/util/statistic_helper_test.rs +++ b/query/src/datasources/table/fuse/util/statistic_helper_test.rs @@ -43,7 +43,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> { let schema = DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]); let col_stats = blocks .iter() - .map(statistic_helper::block_stats) + .map(|b| statistic_helper::block_stats(&b.clone().unwrap())) .collect::>>()?; let r = statistic_helper::column_stats_reduce_with_schema(&col_stats, &schema); assert!(r.is_ok()); @@ -61,7 +61,8 @@ fn test_ft_stats_accumulator() -> common_exception::Result<()> { let mut stats_acc = statistic_helper::StatisticsAccumulator::new(); let mut meta_acc = statistic_helper::BlockMetaAccumulator::new(); blocks.iter().try_for_each(|item| { - stats_acc.acc(item)?; + let item = item.clone().unwrap(); + stats_acc.acc(&item)?; meta_acc.acc(1, "".to_owned(), &mut stats_acc); Ok::<_, ErrorCode>(()) })?; diff --git a/query/src/datasources/table/memory/memory_table.rs b/query/src/datasources/table/memory/memory_table.rs index e877d10dc8a5..6589b50797fd 100644 --- a/query/src/datasources/table/memory/memory_table.rs +++ b/query/src/datasources/table/memory/memory_table.rs @@ -125,19 +125,15 @@ impl Table for MemoryTable { async fn append_data( &self, _io_ctx: Arc, - _insert_plan: InsertIntoPlan, + insert_plan: InsertIntoPlan, + mut stream: SendableDataBlockStream, ) -> Result<()> { - let mut s = { - let mut inner = _insert_plan.input_stream.lock(); - (*inner).take() - } - .ok_or_else(|| ErrorCode::EmptyData("input stream consumed"))?; - - if _insert_plan.schema().as_ref().fields() != self.table_info.schema().as_ref().fields() { + if insert_plan.schema().as_ref().fields() != self.table_info.schema().as_ref().fields() { return Err(ErrorCode::BadArguments("DataBlock schema mismatch")); } - while let Some(block) = s.next().await { + while let Some(block) = stream.next().await { + let block = block?; let mut blocks = self.blocks.write(); blocks.push(block); } diff --git a/query/src/datasources/table/memory/memory_table_test.rs b/query/src/datasources/table/memory/memory_table_test.rs index 141234a8e30f..fa301300920e 100644 --- a/query/src/datasources/table/memory/memory_table_test.rs +++ b/query/src/datasources/table/memory/memory_table_test.rs @@ -21,7 +21,6 @@ use common_datablocks::assert_blocks_sorted_eq; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::Result; -use common_infallible::Mutex; use common_meta_types::TableInfo; use common_meta_types::TableMeta; use common_planners::*; @@ -64,18 +63,18 @@ async fn test_memorytable() -> Result<()> { Series::new(vec![4u64, 3]), Series::new(vec![33u64, 33]), ]); - let blocks = vec![block, block2]; + let blocks = vec![Ok(block), Ok(block2)]; - let input_stream = futures::stream::iter::>(blocks.clone()); + let input_stream = futures::stream::iter::>>(blocks.clone()); let insert_plan = InsertIntoPlan { db_name: "default".to_string(), tbl_name: "a".to_string(), tbl_id: 0, schema, - input_stream: Arc::new(Mutex::new(Some(Box::pin(input_stream)))), + values_opt: None, }; table - .append_data(io_ctx.clone(), insert_plan) + .append_data(io_ctx.clone(), insert_plan, Box::pin(input_stream)) .await .unwrap(); } diff --git a/query/src/datasources/table/null/null_table.rs b/query/src/datasources/table/null/null_table.rs index 24609a84a85f..992475afd035 100644 --- a/query/src/datasources/table/null/null_table.rs +++ b/query/src/datasources/table/null/null_table.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use common_context::DataContext; use common_context::TableIOContext; use common_datablocks::DataBlock; -use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::TableInfo; use common_planners::InsertIntoPlan; @@ -72,14 +71,10 @@ impl Table for NullTable { &self, _io_ctx: Arc, _insert_plan: InsertIntoPlan, + mut stream: SendableDataBlockStream, ) -> Result<()> { - let mut s = { - let mut inner = _insert_plan.input_stream.lock(); - (*inner).take() - } - .ok_or_else(|| ErrorCode::EmptyData("input stream consumed"))?; - - while let Some(block) = s.next().await { + while let Some(block) = stream.next().await { + let block = block?; info!("Ignore one block rows: {}", block.num_rows()) } Ok(()) diff --git a/query/src/datasources/table/null/null_table_test.rs b/query/src/datasources/table/null/null_table_test.rs index ff51da001f13..b1c90e7b051b 100644 --- a/query/src/datasources/table/null/null_table_test.rs +++ b/query/src/datasources/table/null/null_table_test.rs @@ -20,7 +20,6 @@ use common_context::TableDataContext; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::Result; -use common_infallible::Mutex; use common_meta_types::TableInfo; use common_meta_types::TableMeta; use common_planners::*; @@ -64,18 +63,19 @@ async fn test_null_table() -> Result<()> { Series::new(vec![1u64, 2]), Series::new(vec![11u64, 22]), ]); - let blocks = vec![block]; - let input_stream = futures::stream::iter::>(blocks.clone()); + let blocks = vec![Ok(block)]; + + let input_stream = futures::stream::iter::>>(blocks.clone()); let insert_plan = InsertIntoPlan { db_name: "default".to_string(), tbl_name: "a".to_string(), tbl_id: 0, schema: schema.clone(), - input_stream: Arc::new(Mutex::new(Some(Box::pin(input_stream)))), + values_opt: None, }; table - .append_data(io_ctx.clone(), insert_plan) + .append_data(io_ctx.clone(), insert_plan, Box::pin(input_stream)) .await .unwrap(); } diff --git a/query/src/datasources/table/parquet/parquet_table.rs b/query/src/datasources/table/parquet/parquet_table.rs index 119876a86285..63672fd63898 100644 --- a/query/src/datasources/table/parquet/parquet_table.rs +++ b/query/src/datasources/table/parquet/parquet_table.rs @@ -13,26 +13,27 @@ // limitations under the License. use std::any::Any; -use std::convert::TryInto; -use std::fs::File; use std::sync::Arc; -use common_arrow::arrow::io::parquet::read; -use common_base::tokio::task; +use async_stream::stream; use common_context::DataContext; +use common_context::IOContext; use common_context::TableIOContext; -use common_datablocks::DataBlock; +use common_dal::Local; use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::TableInfo; +use common_planners::Extras; +use common_planners::Part; +use common_planners::Partitions; use common_planners::ReadDataSourcePlan; -use common_streams::ParquetStream; +use common_planners::Statistics; +use common_streams::ParquetSource; use common_streams::SendableDataBlockStream; -use crossbeam::channel::bounded; -use crossbeam::channel::Receiver; -use crossbeam::channel::Sender; +use common_streams::Source; use crate::catalogs::Table; +use crate::sessions::DatabendQueryContext; pub struct ParquetTable { table_info: TableInfo, @@ -61,36 +62,6 @@ impl ParquetTable { } } -fn read_file( - file: &str, - tx: Sender>>, - projection: &[usize], -) -> Result<()> { - let reader = File::open(file)?; - let reader = read::RecordReader::try_new(reader, Some(projection.to_vec()), None, None, None)?; - - for maybe_batch in reader { - match maybe_batch { - Ok(batch) => { - tx.send(Some(Ok(batch.try_into()?))) - .map_err(|e| ErrorCode::UnknownException(e.to_string()))?; - } - Err(e) => { - let err_msg = format!("Error reading batch from {:?}: {}", file, e.to_string()); - - tx.send(Some(Result::Err(ErrorCode::CannotReadFile( - err_msg.clone(), - )))) - .map_err(|send_error| ErrorCode::UnknownException(send_error.to_string()))?; - - return Result::Err(ErrorCode::CannotReadFile(err_msg)); - } - } - } - - Ok(()) -} - #[async_trait::async_trait] impl Table for ParquetTable { fn as_any(&self) -> &dyn Any { @@ -101,25 +72,61 @@ impl Table for ParquetTable { &self.table_info } - async fn read( + fn benefit_column_prune(&self) -> bool { + true + } + + fn read_partitions( &self, _io_ctx: Arc, + _push_downs: Option, + ) -> Result<(Statistics, Partitions)> { + let parts = vec![Part { + name: self.file.clone(), + version: 0, + }]; + Ok((Statistics::default(), parts)) + } + + async fn read( + &self, + io_ctx: Arc, plan: &ReadDataSourcePlan, ) -> Result { - type BlockSender = Sender>>; - type BlockReceiver = Receiver>>; - - let (response_tx, response_rx): (BlockSender, BlockReceiver) = bounded(2); - - let file = self.file.clone(); - let projection: Vec = plan.scan_fields().keys().cloned().collect::>(); - - task::spawn_blocking(move || { - if let Err(e) = read_file(&file, response_tx, &projection) { - println!("Parquet reader thread terminated due to error: {:?}", e); + let ctx: Arc = io_ctx + .get_user_data()? + .expect("DatabendQueryContext should not be None"); + let ctx_clone = ctx.clone(); + let table_schema = self.get_table_info().schema(); + let projection = plan.projections(); + let conf = ctx.get_config().storage; + let dal = Arc::new(Local::new(conf.disk.temp_data_path.as_str())); + + let s = stream! { + loop { + let partitions = ctx_clone.try_get_partitions(1); + match partitions { + Ok(partitions) => { + if partitions.is_empty() { + break; + } + let part = partitions.get(0).unwrap(); + + let mut source = ParquetSource::new(dal.clone(), part.name.clone(), table_schema.clone(), projection.clone()); + + loop { + let block = source.read().await; + match block { + Ok(None) => break, + Ok(Some(b)) => yield(Ok(b)), + Err(e) => yield(Err(e)), + } + } + } + Err(e) => yield(Err(e)) + } } - }); - - Ok(Box::pin(ParquetStream::try_create(response_rx)?)) + }; + Ok(Box::pin(s)) } } diff --git a/query/src/datasources/table/parquet/parquet_table_test.rs b/query/src/datasources/table/parquet/parquet_table_test.rs index c080d6e13bf3..d4531da37a94 100644 --- a/query/src/datasources/table/parquet/parquet_table_test.rs +++ b/query/src/datasources/table/parquet/parquet_table_test.rs @@ -58,6 +58,7 @@ async fn test_parquet_table() -> Result<()> { let io_ctx = ctx.get_cluster_table_io_context()?; let io_ctx = Arc::new(io_ctx); let source_plan = table.read_plan(io_ctx.clone(), None)?; + ctx.try_set_partitions(source_plan.parts.clone())?; let stream = table.read(io_ctx, &source_plan).await?; let blocks = stream.try_collect::>().await?; diff --git a/query/src/interpreters/interpreter.rs b/query/src/interpreters/interpreter.rs index dbb6ed993c6f..a46c46691e37 100644 --- a/query/src/interpreters/interpreter.rs +++ b/query/src/interpreters/interpreter.rs @@ -20,7 +20,10 @@ use common_streams::SendableDataBlockStream; #[async_trait::async_trait] pub trait Interpreter: Sync + Send { fn name(&self) -> &str; - async fn execute(&self) -> Result; + async fn execute( + &self, + input_stream: Option, + ) -> Result; fn schema(&self) -> DataSchemaRef { DataSchemaRefExt::create(vec![]) diff --git a/query/src/interpreters/interpreter_database_create.rs b/query/src/interpreters/interpreter_database_create.rs index 3b745922b7d0..f8d60bee447d 100644 --- a/query/src/interpreters/interpreter_database_create.rs +++ b/query/src/interpreters/interpreter_database_create.rs @@ -46,8 +46,11 @@ impl Interpreter for CreateDatabaseInterpreter { "CreateDatabaseInterpreter" } - #[tracing::instrument(level = "info", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))] - async fn execute(&self) -> Result { + #[tracing::instrument(level = "info", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))] + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let datasource = self.ctx.get_catalog(); datasource.create_database(self.plan.clone()).await?; diff --git a/query/src/interpreters/interpreter_database_create_test.rs b/query/src/interpreters/interpreter_database_create_test.rs index 4805f934ca39..dc72208e3c6d 100644 --- a/query/src/interpreters/interpreter_database_create_test.rs +++ b/query/src/interpreters/interpreter_database_create_test.rs @@ -32,7 +32,7 @@ async fn test_create_database_interpreter() -> Result<()> { { let executor = CreateDatabaseInterpreter::try_create(ctx, plan.clone())?; assert_eq!(executor.name(), "CreateDatabaseInterpreter"); - let mut stream = executor.execute().await?; + let mut stream = executor.execute(None).await?; while let Some(_block) = stream.next().await {} } else { panic!() diff --git a/query/src/interpreters/interpreter_database_drop.rs b/query/src/interpreters/interpreter_database_drop.rs index 35c9d3f76906..00465ab0547b 100644 --- a/query/src/interpreters/interpreter_database_drop.rs +++ b/query/src/interpreters/interpreter_database_drop.rs @@ -44,7 +44,10 @@ impl Interpreter for DropDatabaseInterpreter { "DropDatabaseInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let datasource = self.ctx.get_catalog(); datasource.drop_database(self.plan.clone()).await?; diff --git a/query/src/interpreters/interpreter_database_drop_test.rs b/query/src/interpreters/interpreter_database_drop_test.rs index 697a57423c3a..14bcfea043f9 100644 --- a/query/src/interpreters/interpreter_database_drop_test.rs +++ b/query/src/interpreters/interpreter_database_drop_test.rs @@ -30,7 +30,7 @@ async fn test_drop_database_interpreter() -> Result<()> { { let executor = DropDatabaseInterpreter::try_create(ctx, plan.clone())?; assert_eq!(executor.name(), "DropDatabaseInterpreter"); - let stream = executor.execute().await?; + let stream = executor.execute(None).await?; let result = stream.try_collect::>().await?; let expected = vec!["++", "++"]; common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice()); diff --git a/query/src/interpreters/interpreter_describe_table.rs b/query/src/interpreters/interpreter_describe_table.rs index e9906e9608fd..f5df2b9f35a0 100644 --- a/query/src/interpreters/interpreter_describe_table.rs +++ b/query/src/interpreters/interpreter_describe_table.rs @@ -46,7 +46,10 @@ impl Interpreter for DescribeTableInterpreter { "DescribeTableInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let table = self .ctx .get_table(self.plan.db.as_str(), self.plan.table.as_str())?; diff --git a/query/src/interpreters/interpreter_describe_table_test.rs b/query/src/interpreters/interpreter_describe_table_test.rs index 9964aa07d8b5..23efc6acc8de 100644 --- a/query/src/interpreters/interpreter_describe_table_test.rs +++ b/query/src/interpreters/interpreter_describe_table_test.rs @@ -31,7 +31,7 @@ async fn interpreter_describe_table_test() -> Result<()> { .build_from_sql("create table default.a(a bigint, b int, c varchar(255), d smallint, e Date ) Engine = Null")? { let executor = CreateTableInterpreter::try_create(ctx.clone(), plan.clone())?; - let _ = executor.execute().await?; + let _ = executor.execute(None).await?; } } @@ -43,7 +43,7 @@ async fn interpreter_describe_table_test() -> Result<()> { let executor = DescribeTableInterpreter::try_create(ctx.clone(), plan.clone())?; assert_eq!(executor.name(), "DescribeTableInterpreter"); - let stream = executor.execute().await?; + let stream = executor.execute(None).await?; let result = stream.try_collect::>().await?; let expected = vec![ "+-------+--------+------+", diff --git a/query/src/interpreters/interpreter_explain.rs b/query/src/interpreters/interpreter_explain.rs index e781038886c0..0e8bff3a5f16 100644 --- a/query/src/interpreters/interpreter_explain.rs +++ b/query/src/interpreters/interpreter_explain.rs @@ -40,7 +40,10 @@ impl Interpreter for ExplainInterpreter { "ExplainInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let schema = self.schema(); let block = match self.explain.typ { diff --git a/query/src/interpreters/interpreter_explain_test.rs b/query/src/interpreters/interpreter_explain_test.rs index 2d6d86d57040..c8ef313ff7df 100644 --- a/query/src/interpreters/interpreter_explain_test.rs +++ b/query/src/interpreters/interpreter_explain_test.rs @@ -31,7 +31,7 @@ async fn test_explain_interpreter() -> Result<()> { let executor = ExplainInterpreter::try_create(ctx, plan)?; assert_eq!(executor.name(), "ExplainInterpreter"); - let stream = executor.execute().await?; + let stream = executor.execute(None).await?; let result = stream.try_collect::>().await?; let block = &result[0]; assert_eq!(block.num_columns(), 1); diff --git a/query/src/interpreters/interpreter_insert_into.rs b/query/src/interpreters/interpreter_insert_into.rs index 56d432c2c51c..00e830bc5924 100644 --- a/query/src/interpreters/interpreter_insert_into.rs +++ b/query/src/interpreters/interpreter_insert_into.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io::Cursor; use std::sync::Arc; +use common_exception::ErrorCode; use common_exception::Result; use common_planners::InsertIntoPlan; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; +use common_streams::SourceStream; +use common_streams::ValueSource; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; @@ -43,15 +47,32 @@ impl Interpreter for InsertIntoInterpreter { "InsertIntoInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + mut input_stream: Option, + ) -> Result { let table = self .ctx .get_table(&self.plan.db_name, &self.plan.tbl_name)?; let io_ctx = self.ctx.get_cluster_table_io_context()?; let io_ctx = Arc::new(io_ctx); + let input_stream = if self.plan.values_opt.is_some() { + let values = self.plan.values_opt.clone().take().unwrap(); + let block_size = self.ctx.get_settings().get_max_block_size()? as usize; + let values_source = + ValueSource::new(Cursor::new(values), self.plan.schema(), block_size); + let stream_source = SourceStream::new(Box::new(values_source)); + stream_source.execute().await + } else { + input_stream + .take() + .ok_or_else(|| ErrorCode::EmptyData("input stream not exist or consumed")) + }?; - table.append_data(io_ctx, self.plan.clone()).await?; + table + .append_data(io_ctx, self.plan.clone(), input_stream) + .await?; Ok(Box::pin(DataBlockStream::create( self.plan.schema(), None, diff --git a/query/src/interpreters/interpreter_kill.rs b/query/src/interpreters/interpreter_kill.rs index 7247f6ca15ae..f0c83d0e68e0 100644 --- a/query/src/interpreters/interpreter_kill.rs +++ b/query/src/interpreters/interpreter_kill.rs @@ -42,7 +42,10 @@ impl Interpreter for KillInterpreter { "KillInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let id = &self.plan.id; match self.ctx.get_sessions_manager().get_session(id) { None => Err(ErrorCode::UnknownSession(format!( diff --git a/query/src/interpreters/interpreter_select.rs b/query/src/interpreters/interpreter_select.rs index 4c2d423677aa..631b40f9b93f 100644 --- a/query/src/interpreters/interpreter_select.rs +++ b/query/src/interpreters/interpreter_select.rs @@ -57,8 +57,11 @@ impl Interpreter for SelectInterpreter { "SelectInterpreter" } - #[tracing::instrument(level = "info", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))] - async fn execute(&self) -> Result { + #[tracing::instrument(level = "info", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))] + async fn execute( + &self, + _input_stream: Option, + ) -> Result { // TODO: maybe panic? let mut scheduled = Scheduled::new(); let timeout = self.ctx.get_settings().get_flight_client_timeout()?; diff --git a/query/src/interpreters/interpreter_select_test.rs b/query/src/interpreters/interpreter_select_test.rs index 9c65ef14c0fb..d712c4436503 100644 --- a/query/src/interpreters/interpreter_select_test.rs +++ b/query/src/interpreters/interpreter_select_test.rs @@ -31,7 +31,7 @@ async fn test_select_interpreter() -> anyhow::Result<()> { let executor = SelectInterpreter::try_create(ctx.clone(), plan)?; assert_eq!(executor.name(), "SelectInterpreter"); - let stream = executor.execute().await?; + let stream = executor.execute(None).await?; let result = stream.try_collect::>().await?; let block = &result[0]; assert_eq!(block.num_columns(), 1); @@ -48,7 +48,7 @@ async fn test_select_interpreter() -> anyhow::Result<()> { let executor = SelectInterpreter::try_create(ctx.clone(), plan)?; assert_eq!(executor.name(), "SelectInterpreter"); - let stream = executor.execute().await?; + let stream = executor.execute(None).await?; let result = stream.try_collect::>().await?; let block = &result[0]; assert_eq!(block.num_columns(), 4); diff --git a/query/src/interpreters/interpreter_setting.rs b/query/src/interpreters/interpreter_setting.rs index ec1fd2986b17..b16ef264d747 100644 --- a/query/src/interpreters/interpreter_setting.rs +++ b/query/src/interpreters/interpreter_setting.rs @@ -43,7 +43,10 @@ impl Interpreter for SettingInterpreter { "SettingInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let plan = self.set.clone(); for var in plan.vars { match var.variable.to_lowercase().as_str() { diff --git a/query/src/interpreters/interpreter_setting_test.rs b/query/src/interpreters/interpreter_setting_test.rs index 5f1f7cb5a830..ecb213b8d198 100644 --- a/query/src/interpreters/interpreter_setting_test.rs +++ b/query/src/interpreters/interpreter_setting_test.rs @@ -31,7 +31,7 @@ async fn test_setting_interpreter() -> Result<()> { let executor = SettingInterpreter::try_create(ctx, plan)?; assert_eq!(executor.name(), "SettingInterpreter"); - let mut stream = executor.execute().await?; + let mut stream = executor.execute(None).await?; while let Some(_block) = stream.next().await {} } else { panic!() @@ -48,7 +48,7 @@ async fn test_setting_interpreter_error() -> Result<()> { PlanParser::create(ctx.clone()).build_from_sql("set xx=1")? { let executor = SettingInterpreter::try_create(ctx, plan)?; - if let Err(e) = executor.execute().await { + if let Err(e) = executor.execute(None).await { let expect = "Code: 20, displayText = Unknown variable: \"xx\"."; assert_eq!(expect, format!("{}", e)); } else { diff --git a/query/src/interpreters/interpreter_show_create_table.rs b/query/src/interpreters/interpreter_show_create_table.rs index 3fa23d7c8466..50e21497747e 100644 --- a/query/src/interpreters/interpreter_show_create_table.rs +++ b/query/src/interpreters/interpreter_show_create_table.rs @@ -51,7 +51,10 @@ impl Interpreter for ShowCreateTableInterpreter { "ShowCreateTableInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let catalog = self.ctx.get_catalog(); let table = catalog.get_table(&self.plan.db, &self.plan.table).await?; diff --git a/query/src/interpreters/interpreter_show_create_table_test.rs b/query/src/interpreters/interpreter_show_create_table_test.rs index a1511ae7578c..899a5dc65b23 100644 --- a/query/src/interpreters/interpreter_show_create_table_test.rs +++ b/query/src/interpreters/interpreter_show_create_table_test.rs @@ -31,7 +31,7 @@ async fn interpreter_show_create_table_test() -> Result<()> { .build_from_sql("create table default.a(a bigint, b int, c varchar(255), d smallint, e Date ) Engine = Null")? { let executor = CreateTableInterpreter::try_create(ctx.clone(), plan.clone())?; - let _ = executor.execute().await?; + let _ = executor.execute(None).await?; } } @@ -42,7 +42,7 @@ async fn interpreter_show_create_table_test() -> Result<()> { { let executor = ShowCreateTableInterpreter::try_create(ctx.clone(), plan.clone())?; assert_eq!(executor.name(), "ShowCreateTableInterpreter"); - let stream = executor.execute().await?; + let stream = executor.execute(None).await?; let result = stream.try_collect::>().await?; let expected = vec![ "+-------+--------------------+", diff --git a/query/src/interpreters/interpreter_table_create.rs b/query/src/interpreters/interpreter_table_create.rs index 960a2ada01c4..c7c0463d52cb 100644 --- a/query/src/interpreters/interpreter_table_create.rs +++ b/query/src/interpreters/interpreter_table_create.rs @@ -44,7 +44,10 @@ impl Interpreter for CreateTableInterpreter { "CreateTableInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let catalog = self.ctx.get_catalog(); catalog.create_table(self.plan.clone()).await?; diff --git a/query/src/interpreters/interpreter_table_create_test.rs b/query/src/interpreters/interpreter_table_create_test.rs index 14b0c6dca853..5a5b2750bdc5 100644 --- a/query/src/interpreters/interpreter_table_create_test.rs +++ b/query/src/interpreters/interpreter_table_create_test.rs @@ -38,7 +38,7 @@ async fn test_create_table_interpreter() -> Result<()> { assert_eq!(plan.schema().field_with_name("d")?.data_type(), &DataType::Int16); assert_eq!(plan.schema().field_with_name("e")?.data_type(), &DataType::Date16); - let mut stream = executor.execute().await?; + let mut stream = executor.execute(None).await?; while let Some(_block) = stream.next().await {} } else { panic!() diff --git a/query/src/interpreters/interpreter_table_drop.rs b/query/src/interpreters/interpreter_table_drop.rs index bace3b2892bd..7e2abfa83305 100644 --- a/query/src/interpreters/interpreter_table_drop.rs +++ b/query/src/interpreters/interpreter_table_drop.rs @@ -41,7 +41,10 @@ impl Interpreter for DropTableInterpreter { "DropTableInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let catalog = self.ctx.get_catalog(); catalog.drop_table(self.plan.clone()).await?; diff --git a/query/src/interpreters/interpreter_table_drop_test.rs b/query/src/interpreters/interpreter_table_drop_test.rs index 3a1672a9ab8a..85934e3f459f 100644 --- a/query/src/interpreters/interpreter_table_drop_test.rs +++ b/query/src/interpreters/interpreter_table_drop_test.rs @@ -31,7 +31,7 @@ async fn test_drop_table_interpreter() -> Result<()> { .build_from_sql("create table default.a(a bigint, b int, c varchar(255), d smallint, e Date ) Engine = Null")? { let executor = CreateTableInterpreter::try_create(ctx.clone(), plan.clone())?; - let _ = executor.execute().await?; + let _ = executor.execute(None).await?; } } @@ -42,7 +42,7 @@ async fn test_drop_table_interpreter() -> Result<()> { { let executor = DropTableInterpreter::try_create(ctx.clone(), plan.clone())?; assert_eq!(executor.name(), "DropTableInterpreter"); - let stream = executor.execute().await?; + let stream = executor.execute(None).await?; let result = stream.try_collect::>().await?; let expected = vec!["++", "++"]; common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice()); diff --git a/query/src/interpreters/interpreter_truncate_table.rs b/query/src/interpreters/interpreter_truncate_table.rs index eae4b4843ce7..0dd3e3eb10fc 100644 --- a/query/src/interpreters/interpreter_truncate_table.rs +++ b/query/src/interpreters/interpreter_truncate_table.rs @@ -43,7 +43,10 @@ impl Interpreter for TruncateTableInterpreter { "TruncateTableInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let table = self .ctx .get_table(self.plan.db.as_str(), self.plan.table.as_str())?; diff --git a/query/src/interpreters/interpreter_truncate_table_test.rs b/query/src/interpreters/interpreter_truncate_table_test.rs index c2bd7ccea445..05dd86bb3eeb 100644 --- a/query/src/interpreters/interpreter_truncate_table_test.rs +++ b/query/src/interpreters/interpreter_truncate_table_test.rs @@ -31,7 +31,7 @@ async fn test_truncate_table_interpreter() -> Result<()> { .build_from_sql("create table default.a(a String, b String) Engine = Memory")? { let executor = CreateTableInterpreter::try_create(ctx.clone(), plan.clone())?; - let _ = executor.execute().await?; + let _ = executor.execute(None).await?; } } @@ -41,7 +41,7 @@ async fn test_truncate_table_interpreter() -> Result<()> { .build_from_sql("insert into default.a values('1,1', '2,2')")? { let executor = InsertIntoInterpreter::try_create(ctx.clone(), plan.clone())?; - let _ = executor.execute().await?; + let _ = executor.execute(None).await?; } } @@ -51,7 +51,7 @@ async fn test_truncate_table_interpreter() -> Result<()> { PlanParser::create(ctx.clone()).build_from_sql("select * from default.a")? { let executor = SelectInterpreter::try_create(ctx.clone(), plan.clone())?; - let stream = executor.execute().await?; + let stream = executor.execute(None).await?; let result = stream.try_collect::>().await?; let expected = vec![ "+-----+-----+", @@ -74,7 +74,7 @@ async fn test_truncate_table_interpreter() -> Result<()> { let executor = TruncateTableInterpreter::try_create(ctx.clone(), plan.clone())?; assert_eq!(executor.name(), "TruncateTableInterpreter"); - let stream = executor.execute().await?; + let stream = executor.execute(None).await?; let result = stream.try_collect::>().await?; let expected = vec!["++", "++"]; common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice()); @@ -89,7 +89,7 @@ async fn test_truncate_table_interpreter() -> Result<()> { PlanParser::create(ctx.clone()).build_from_sql("select * from default.a")? { let executor = SelectInterpreter::try_create(ctx.clone(), plan.clone())?; - let stream = executor.execute().await?; + let stream = executor.execute(None).await?; let result = stream.try_collect::>().await?; let expected = vec!["++", "++"]; common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice()); diff --git a/query/src/interpreters/interpreter_use_database.rs b/query/src/interpreters/interpreter_use_database.rs index ed66dbaf253e..7ac9108c9c9d 100644 --- a/query/src/interpreters/interpreter_use_database.rs +++ b/query/src/interpreters/interpreter_use_database.rs @@ -44,7 +44,10 @@ impl Interpreter for UseDatabaseInterpreter { "UseDatabaseInterpreter" } - async fn execute(&self) -> Result { + async fn execute( + &self, + _input_stream: Option, + ) -> Result { self.ctx.set_current_database(self.plan.db.clone())?; let schema = Arc::new(DataSchema::empty()); Ok(Box::pin(DataBlockStream::create(schema, None, vec![]))) diff --git a/query/src/interpreters/interpreter_use_database_test.rs b/query/src/interpreters/interpreter_use_database_test.rs index 2b1614c76626..0b0bf1194ff5 100644 --- a/query/src/interpreters/interpreter_use_database_test.rs +++ b/query/src/interpreters/interpreter_use_database_test.rs @@ -31,7 +31,7 @@ async fn test_use_interpreter() -> Result<()> { let executor = UseDatabaseInterpreter::try_create(ctx, plan)?; assert_eq!(executor.name(), "UseDatabaseInterpreter"); - let mut stream = executor.execute().await?; + let mut stream = executor.execute(None).await?; while let Some(_block) = stream.next().await {} } else { panic!() @@ -47,7 +47,7 @@ async fn test_use_database_interpreter_error() -> Result<()> { if let PlanNode::UseDatabase(plan) = PlanParser::create(ctx.clone()).build_from_sql("use xx")? { let executor = UseDatabaseInterpreter::try_create(ctx, plan)?; - if let Err(e) = executor.execute().await { + if let Err(e) = executor.execute(None).await { let expect = "Code: 3, displayText = Cannot USE 'xx', because the 'xx' doesn't exist."; assert_eq!(expect, format!("{}", e)); } else { diff --git a/query/src/interpreters/interpreter_user_alter.rs b/query/src/interpreters/interpreter_user_alter.rs index e6ea73cf1e9e..d1e8004b246a 100644 --- a/query/src/interpreters/interpreter_user_alter.rs +++ b/query/src/interpreters/interpreter_user_alter.rs @@ -42,8 +42,11 @@ impl Interpreter for AlterUserInterpreter { "AlterUserInterpreter" } - #[tracing::instrument(level = "info", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))] - async fn execute(&self) -> Result { + #[tracing::instrument(level = "info", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))] + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let plan = self.plan.clone(); let user_mgr = self.ctx.get_sessions_manager().get_user_manager(); //TODO:alter current user diff --git a/query/src/interpreters/interpreter_user_alter_test.rs b/query/src/interpreters/interpreter_user_alter_test.rs index 810c4be4dc3d..5086c811d6db 100644 --- a/query/src/interpreters/interpreter_user_alter_test.rs +++ b/query/src/interpreters/interpreter_user_alter_test.rs @@ -53,7 +53,7 @@ async fn test_alter_user_interpreter() -> Result<()> { )? { let executor = AlterUserInterpreter::try_create(ctx, plan.clone())?; assert_eq!(executor.name(), "AlterUserInterpreter"); - let mut stream = executor.execute().await?; + let mut stream = executor.execute(None).await?; while let Some(_block) = stream.next().await {} let new_user = user_mgr.get_user(name, hostname).await?; assert_eq!(new_user.password, Vec::from(new_password)) diff --git a/query/src/interpreters/interpreter_user_create.rs b/query/src/interpreters/interpreter_user_create.rs index 6f5fc5573ea9..dbc230c58ea7 100644 --- a/query/src/interpreters/interpreter_user_create.rs +++ b/query/src/interpreters/interpreter_user_create.rs @@ -48,8 +48,11 @@ impl Interpreter for CreatUserInterpreter { "CreateUserInterpreter" } - #[tracing::instrument(level = "info", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))] - async fn execute(&self) -> Result { + #[tracing::instrument(level = "info", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))] + async fn execute( + &self, + _input_stream: Option, + ) -> Result { let plan = self.plan.clone(); let user_mgr = self.ctx.get_sessions_manager().get_user_manager(); let user_info = UserInfo { diff --git a/query/src/interpreters/interpreter_user_create_test.rs b/query/src/interpreters/interpreter_user_create_test.rs index 5e4eb6ec089a..c9f085d53612 100644 --- a/query/src/interpreters/interpreter_user_create_test.rs +++ b/query/src/interpreters/interpreter_user_create_test.rs @@ -32,7 +32,7 @@ async fn test_create_user_interpreter() -> Result<()> { { let executor = CreatUserInterpreter::try_create(ctx, plan.clone())?; assert_eq!(executor.name(), "CreateUserInterpreter"); - let mut stream = executor.execute().await?; + let mut stream = executor.execute(None).await?; while let Some(_block) = stream.next().await {} } else { panic!() diff --git a/query/src/servers/clickhouse/interactive_worker_base.rs b/query/src/servers/clickhouse/interactive_worker_base.rs index d7992e96769f..8af1cb1c8030 100644 --- a/query/src/servers/clickhouse/interactive_worker_base.rs +++ b/query/src/servers/clickhouse/interactive_worker_base.rs @@ -72,7 +72,7 @@ impl InteractiveWorkerBase { let start = Instant::now(); let interpreter = InterpreterFactory::get(ctx.clone(), plan)?; let name = interpreter.name().to_string(); - let async_data_stream = interpreter.execute(); + let async_data_stream = interpreter.execute(None); let mut data_stream = async_data_stream.await?; histogram!( super::clickhouse_metrics::METRIC_INTERPRETER_USEDTIME, @@ -123,7 +123,7 @@ impl InteractiveWorkerBase { input: stream, schema: sc, }; - insert.set_input_stream(Box::pin(stream)); + let interpreter = InterpreterFactory::get(ctx.clone(), PlanNode::InsertInto(insert))?; let name = interpreter.name().to_string(); @@ -134,7 +134,7 @@ impl InteractiveWorkerBase { let sent_all_data = ch_ctx.state.sent_all_data.clone(); let start = Instant::now(); ctx.try_spawn(async move { - interpreter.execute().await.unwrap(); + interpreter.execute(Some(Box::pin(stream))).await.unwrap(); sent_all_data.notify_one(); })?; histogram!( @@ -152,26 +152,14 @@ pub struct FromClickHouseBlockStream { } impl futures::stream::Stream for FromClickHouseBlockStream { - type Item = DataBlock; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { self.input.poll_next_unpin(cx).map(|x| match x { - Some(v) => { - let block = from_clickhouse_block(self.schema.clone(), v); - match block { - Ok(block) => Some(block), - Err(e) => { - log::error!( - "failed to convert ClickHouseBlock to block , breaking out, {:?}", - e - ); - None - } - } - } + Some(v) => Some(from_clickhouse_block(self.schema.clone(), v)), _ => None, }) } diff --git a/query/src/servers/http/v1/query/execute_state.rs b/query/src/servers/http/v1/query/execute_state.rs index 0eca2a7b0829..2fb5a1fd7d6d 100644 --- a/query/src/servers/http/v1/query/execute_state.rs +++ b/query/src/servers/http/v1/query/execute_state.rs @@ -109,7 +109,7 @@ impl ExecuteState { let schema = plan.schema(); let interpreter = InterpreterFactory::get(context.clone(), plan.clone())?; - let data_stream = interpreter.execute().await?; + let data_stream = interpreter.execute(None).await?; let mut data_stream = context.try_create_abortable(data_stream)?; let (abort_tx, mut abort_rx) = mpsc::channel(2); diff --git a/query/src/servers/http/v1/statement.rs b/query/src/servers/http/v1/statement.rs index a8f19939c460..a7f9f9f618ce 100644 --- a/query/src/servers/http/v1/statement.rs +++ b/query/src/servers/http/v1/statement.rs @@ -115,7 +115,7 @@ impl HttpQuery { ctx.attach_query_str(&self.sql); let plan = PlanParser::create(ctx.clone()).build_from_sql(&self.sql)?; let interpreter = InterpreterFactory::get(ctx.clone(), plan.clone())?; - let data_stream = interpreter.execute().await?; + let data_stream = interpreter.execute(None).await?; let state = HttpQueryState { session, data_stream, diff --git a/query/src/servers/mysql/mysql_interactive_worker.rs b/query/src/servers/mysql/mysql_interactive_worker.rs index 90e394381c26..94266c4a4658 100644 --- a/query/src/servers/mysql/mysql_interactive_worker.rs +++ b/query/src/servers/mysql/mysql_interactive_worker.rs @@ -316,7 +316,7 @@ impl InteractiveWorkerBase { let instant = Instant::now(); let interpreter = InterpreterFactory::get(context.clone(), plan?)?; - let data_stream = interpreter.execute().await?; + let data_stream = interpreter.execute(None).await?; histogram!( super::mysql_metrics::METRIC_INTERPRETER_USEDTIME, instant.elapsed() diff --git a/query/src/sql/plan_parser.rs b/query/src/sql/plan_parser.rs index ebf776ea0cdc..d79ce4cc7db8 100644 --- a/query/src/sql/plan_parser.rs +++ b/query/src/sql/plan_parser.rs @@ -15,12 +15,10 @@ use std::collections::HashMap; use std::sync::Arc; -use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; use common_functions::aggregates::AggregateFunctionFactory; -use common_infallible::Mutex; use common_meta_types::TableMeta; use common_planners::expand_aggregate_arg_exprs; use common_planners::expand_wildcard; @@ -53,8 +51,6 @@ use common_planners::ShowCreateTablePlan; use common_planners::TruncateTablePlan; use common_planners::UseDatabasePlan; use common_planners::VarValue; -use common_streams::Source; -use common_streams::ValueSource; use common_tracing::tracing; use nom::FindSubstring; use sqlparser::ast::FunctionArg; @@ -483,25 +479,13 @@ impl PlanParser { schema = DataSchemaRefExt::create(fields); } - let mut input_stream = futures::stream::iter::>(vec![]); - + let mut values_opt = None; if let Some(source) = source { if let sqlparser::ast::SetExpr::Values(_vs) = &source.body { tracing::debug!("{:?}", format_sql); let index = format_sql.find_substring(" VALUES ").unwrap(); let values = &format_sql[index + " VALUES ".len()..]; - - let block_size = self.ctx.get_settings().get_max_block_size()? as usize; - let mut source = ValueSource::new(values.as_bytes(), schema.clone(), block_size); - let mut blocks = vec![]; - loop { - let block = source.read()?; - match block { - Some(b) => blocks.push(b), - None => break, - } - } - input_stream = futures::stream::iter(blocks); + values_opt = Some(values.to_owned()); } } @@ -510,7 +494,7 @@ impl PlanParser { tbl_name, tbl_id, schema, - input_stream: Arc::new(Mutex::new(Some(Box::pin(input_stream)))), + values_opt, }; Ok(PlanNode::InsertInto(plan_node)) } diff --git a/tests/data/sample.sql b/tests/data/sample.sql index 2c122f87dccf..7bd29fbbfdcc 100644 --- a/tests/data/sample.sql +++ b/tests/data/sample.sql @@ -16,7 +16,6 @@ create table test_parquet ( double_col double, date_string_col varchar(255), string_col varchar(255), - timestamp_col Timestamp ) Engine = Parquet location = 'tests/data/alltypes_plain.parquet'; select * from system.tables;