Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async datastream #2786

Merged
merged 7 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cli/src/cmds/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ impl LocalRuntime for LocalQueryConfig {
databend_query::configs::config_storage::DISK_STORAGE_DATA_PATH,
conf.storage.disk.data_path,
)
.env(
databend_query::configs::config_storage::DISK_STORAGE_TEMP_DATA_PATH,
conf.storage.disk.temp_data_path,
)
.env(
databend_query::configs::config_query::QUERY_HTTP_HANDLER_HOST,
conf.query.http_handler_host,
Expand Down
11 changes: 6 additions & 5 deletions common/dal/src/impls/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,19 @@ impl Local {
pub fn with_path(root_path: PathBuf) -> Local {
Local { root: root_path }
}
}

impl Local {
fn prefix_with_root(&self, path: &str) -> Result<PathBuf> {
pub fn prefix_with_root(&self, path: &str) -> Result<PathBuf> {
let path = normalize_path(&self.root.join(&path));
if path.starts_with(&self.root) {
Ok(path)
} else {
// TODO customize error code
Err(ErrorCode::from(Error::new(
ErrorKind::Other,
format!("please dont play with me, malicious path {:?}", path),
format!(
"please dont play with me, malicious path {:?}, root path {:?}",
path, self.root
),
)))
}
}
Expand Down Expand Up @@ -120,7 +121,7 @@ impl DataAccessor for Local {
}

// from cargo::util::path
fn normalize_path(path: &Path) -> PathBuf {
pub fn normalize_path(path: &Path) -> PathBuf {
let mut components = path.components().peekable();
let mut ret = if let Some(c @ Component::Prefix(..)) = components.peek().cloned() {
components.next();
Expand Down
11 changes: 10 additions & 1 deletion common/datavalues/src/data_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,16 @@ impl DataSchema {
}

/// project will do column pruning.
pub fn project(&self, fields: Vec<DataField>) -> Self {
pub fn project(&self, projection: Vec<usize>) -> Self {
let fields = projection
.iter()
.map(|idx| self.fields()[*idx].clone())
.collect();
Self::new_from(fields, self.meta().clone())
}

/// project will do column pruning.
pub fn project_by_fields(&self, fields: Vec<DataField>) -> Self {
Self::new_from(fields, self.meta().clone())
}

Expand Down
2 changes: 1 addition & 1 deletion common/datavalues/src/types/serializations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub use number::*;
pub use string::*;

// capacity.
pub trait TypeSerializer {
pub trait TypeSerializer: Send + Sync {
fn serialize_strings(&self, column: &DataColumn) -> Result<Vec<String>>;

fn de(&mut self, reader: &mut &[u8]) -> Result<()>;
Expand Down
1 change: 1 addition & 0 deletions common/planners/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
[dependencies] # In alphabetical order
# Workspace dependencies
common-datavalues = {path = "../datavalues"}
common-streams = {path = "../streams"}
common-functions = {path = "../functions"}
common-exception = {path = "../exception"}
common-datablocks = {path = "../datablocks"}
Expand Down
17 changes: 1 addition & 16 deletions common/planners/src/plan_insert_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_infallible::Mutex;
use common_meta_types::MetaId;

type BlockStream =
std::pin::Pin<Box<dyn futures::stream::Stream<Item = DataBlock> + Sync + Send + 'static>>;

#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct InsertIntoPlan {
pub db_name: String,
pub tbl_name: String,
pub tbl_id: MetaId,
pub schema: DataSchemaRef,

#[serde(skip, default = "InsertIntoPlan::empty_stream")]
pub input_stream: Arc<Mutex<Option<BlockStream>>>,
pub values_opt: Option<String>,
}

impl PartialEq for InsertIntoPlan {
Expand All @@ -42,14 +34,7 @@ impl PartialEq for InsertIntoPlan {
}

impl InsertIntoPlan {
pub fn empty_stream() -> Arc<Mutex<Option<BlockStream>>> {
Arc::new(Mutex::new(None))
}
pub fn schema(&self) -> DataSchemaRef {
self.schema.clone()
}
pub fn set_input_stream(&self, input_stream: BlockStream) {
let mut writer = self.input_stream.lock();
*writer = Some(input_stream);
}
}
20 changes: 19 additions & 1 deletion common/planners/src/plan_read_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ReadDataSourcePlan {
.clone()
.map(|x| {
let fields: Vec<_> = x.iter().map(|(_, f)| f.clone()).collect();
Arc::new(self.table_info.schema().project(fields))
Arc::new(self.table_info.schema().project_by_fields(fields))
})
.unwrap_or_else(|| self.table_info.schema())
}
Expand All @@ -63,4 +63,22 @@ impl ReadDataSourcePlan {
.clone()
.unwrap_or_else(|| self.table_info.schema().fields_map())
}

pub fn projections(&self) -> Vec<usize> {
let default_proj = || {
(0..self.table_info.schema().fields().len())
.into_iter()
.collect::<Vec<usize>>()
};

if let Some(Extras {
projection: Some(prj),
..
}) = &self.push_downs
{
prj.clone()
} else {
default_proj()
}
}
}
5 changes: 5 additions & 0 deletions common/streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ common-datablocks = {path = "../datablocks"}
common-datavalues = {path = "../datavalues"}
common-exception = {path = "../exception"}
common-io = {path = "../io"}
common-dal = {path = "../dal"}

# Github dependencies

# Crates.io dependencies
crossbeam = "0.8"
futures = "0.3"
pin-project-lite = "^0.2"
async-trait = "0.1"
async-stream = "0.3.2"
csv-async = {version = "1.1", features = ["tokio"] }
tokio-stream = { version = "0.1", features = ["net"] }

[dev-dependencies]
pretty_assertions = "1.0"
4 changes: 1 addition & 3 deletions common/streams/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ mod stream_abort;
mod stream_correct_with_schema;
mod stream_datablock;
mod stream_limit_by;
mod stream_parquet;
mod stream_progress;
mod stream_skip;
mod stream_sort;
Expand All @@ -39,12 +38,11 @@ mod stream_sub_queries;
mod stream_take;

pub use sources::*;
pub use stream::SendableDataBlockStream;
pub use stream::*;
pub use stream_abort::AbortStream;
pub use stream_correct_with_schema::CorrectWithSchemaStream;
pub use stream_datablock::DataBlockStream;
pub use stream_limit_by::LimitByStream;
pub use stream_parquet::ParquetStream;
pub use stream_progress::ProgressStream;
pub use stream_skip::SkipStream;
pub use stream_sort::SortStream;
Expand Down
2 changes: 2 additions & 0 deletions common/streams/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod source;
mod source_csv;
mod source_parquet;
mod source_values;

#[cfg(test)]
Expand All @@ -22,4 +23,5 @@ mod source_test;
pub use source::FormatSettings;
pub use source::Source;
pub use source_csv::CsvSource;
pub use source_parquet::ParquetSource;
pub use source_values::ValueSource;
6 changes: 4 additions & 2 deletions common/streams/src/sources/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use common_datablocks::DataBlock;
use common_exception::Result;

pub trait Source: Sync + Send {
fn read(&mut self) -> Result<Option<DataBlock>>;
#[async_trait]
pub trait Source: Send {
async fn read(&mut self) -> Result<Option<DataBlock>>;
}

#[allow(dead_code)]
Expand Down
64 changes: 35 additions & 29 deletions common/streams/src/sources/source_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;

use common_arrow::arrow::io::csv::read::ByteRecord;
use common_arrow::arrow::io::csv::read::Reader;
use common_arrow::arrow::io::csv::read::ReaderBuilder;
use async_trait::async_trait;
use common_base::tokio;
use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::ToErrorCode;
use csv_async::AsyncReader;
use csv_async::AsyncReaderBuilder;
use tokio_stream::StreamExt;

use crate::Source;

pub struct CsvSource<R> {
reader: Reader<R>,
reader: AsyncReader<R>,
schema: DataSchemaRef,
block_size: usize,
rows: usize,
}

impl<R> CsvSource<R>
where R: io::Read + Sync + Send
where R: tokio::io::AsyncRead + Unpin + Send + Sync
{
pub fn new(reader: R, schema: DataSchemaRef, block_size: usize) -> Self {
let reader = ReaderBuilder::new().has_headers(false).from_reader(reader);
pub fn new(reader: R, schema: DataSchemaRef, header: bool, block_size: usize) -> Self {
let reader = AsyncReaderBuilder::new()
.has_headers(header)
.create_reader(reader);

Self {
reader,
Expand All @@ -47,41 +49,45 @@ where R: io::Read + Sync + Send
}
}

#[async_trait]
impl<R> Source for CsvSource<R>
where R: io::Read + Sync + Send
where R: tokio::io::AsyncRead + Unpin + Send + Sync
{
fn read(&mut self) -> Result<Option<DataBlock>> {
let mut record = ByteRecord::new();
async fn read(&mut self) -> Result<Option<DataBlock>> {
let mut desers = self
.schema
.fields()
.iter()
.map(|f| f.data_type().create_serializer(self.block_size))
.collect::<Result<Vec<_>>>()?;

for row in 0..self.block_size {
let v = self
.reader
.read_byte_record(&mut record)
.map_err_to_code(ErrorCode::BadBytes, || {
format!("Parse csv error at line {}", self.rows)
})?;
let mut rows = 0;
let mut records = self.reader.byte_records();

if !v {
if row == 0 {
return Ok(None);
}
while let Some(record) = records.next().await {
let record = record.map_err_to_code(ErrorCode::BadBytes, || {
format!("Parse csv error at line {}", self.rows)
})?;

if record.is_empty() {
break;
}
desers
.iter_mut()
.enumerate()
.for_each(|(col, deser)| match record.get(col) {
for (col, deser) in desers.iter_mut().enumerate() {
match record.get(col) {
Some(bytes) => deser.de_text(bytes).unwrap(),
None => deser.de_null(),
});

}
}
rows += 1;
self.rows += 1;

if rows >= self.block_size {
break;
}
}

if rows == 0 {
return Ok(None);
}

let series = desers
Expand Down
Loading