Skip to content

Commit

Permalink
Merge pull request #8651 from BohuTANG/dev-refine-3
Browse files Browse the repository at this point in the history
chore: merge query/streams to some crates
  • Loading branch information
BohuTANG authored Nov 5, 2022
2 parents c76112b + 72112eb commit 01d97e7
Show file tree
Hide file tree
Showing 36 changed files with 53 additions and 108 deletions.
19 changes: 3 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ members = [
"src/query/storages/stage",
"src/query/storages/system",
"src/query/storages/view",
"src/query/streams",
"src/query/users",
# databend-query
"src/query/service",
Expand Down
2 changes: 2 additions & 0 deletions src/query/datablocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ common-io = { path = "../../common/io" }
# Crates.io dependencies
ahash = "0.8.0"
comfy-table = "6.1.0"
futures = "0.3.24"
parking_lot = "0.12.1"
pin-project-lite = "0.2.9"
primitive-types = "0.12.0"
regex = "1.6.0"

Expand Down
6 changes: 4 additions & 2 deletions src/query/datablocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ mod data_block_debug;
mod kernels;
mod memory;
mod meta_info;
mod utils;
mod serialize;
mod stream;

pub use block_compact_thresholds::BlockCompactThresholds;
pub use data_block::DataBlock;
Expand All @@ -30,4 +31,5 @@ pub use kernels::*;
pub use memory::InMemoryData;
pub use meta_info::MetaInfo;
pub use meta_info::MetaInfoPtr;
pub use utils::*;
pub use serialize::*;
pub use stream::SendableDataBlockStream;
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Datafuse Labs.
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_datablocks::DataBlock;
use common_exception::Result;

use crate::DataBlock;

pub type SendableDataBlockStream =
std::pin::Pin<Box<dyn futures::stream::Stream<Item = Result<DataBlock>> + Send>>;
1 change: 0 additions & 1 deletion src/query/pipeline/sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ common-io = { path = "../../../common/io" }
common-meta-types = { path = "../../../meta/types" }
common-pipeline-core = { path = "../core" }
common-settings = { path = "../../settings" }
common-streams = { path = "../../streams" }

async-trait = { version = "0.1.57", package = "async-trait-fn" }
bstr = "1.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_datablocks::DataBlock;
use common_datablocks::SendableDataBlockStream;
use common_exception::ErrorCode;
use common_exception::Result;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_streams::SendableDataBlockStream;
use futures::StreamExt;

use crate::processors::sources::AsyncSource;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ common-storages-stage = { path = "../storages/stage" }
common-storages-system = { path = "../storages/system" }
common-storages-table-meta = { path = "../storages/table-meta" }
common-storages-view = { path = "../storages/view" }
common-streams = { path = "../streams" }
common-tracing = { path = "../../common/tracing" }
common-users = { path = "../users" }

Expand Down Expand Up @@ -107,6 +106,7 @@ opensrv-mysql = "0.2.0"
openssl = { version = "0.10.41", features = ["vendored"] }
parking_lot = "0.12.1"
petgraph = "0.6.2"
pin-project-lite = "0.2.9"
poem = { version = "1", features = ["rustls", "multipart", "compression"] }
primitive-types = "0.12.0"
rand = "0.8.5"
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/api/http/v1/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

use std::sync::Arc;

use common_datablocks::SendableDataBlockStream;
use common_exception::ErrorCode;
use common_exception::Result;
use common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use common_streams::SendableDataBlockStream;
use poem::http::StatusCode;
use poem::Body;
use poem::IntoResponse;
Expand All @@ -27,7 +27,7 @@ use crate::sessions::QueryContext;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
use crate::sessions::TableContext;
use crate::stream::DataBlockStream;
use crate::stream::ReadDataBlockStream;

// read log files from cfg.log.log_dir
#[poem::handler]
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ use std::sync::Arc;
use std::time::SystemTime;

use common_catalog::table_context::TableContext;
use common_datablocks::SendableDataBlockStream;
use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_datavalues::DataSchemaRefExt;
use common_exception::ErrorCode;
use common_exception::Result;
use common_streams::DataBlockStream;
use common_streams::ProgressStream;
use common_streams::SendableDataBlockStream;

use crate::interpreters::InterpreterQueryLog;
use crate::pipelines::executor::ExecutorSettings;
Expand All @@ -33,6 +31,8 @@ use crate::pipelines::PipelineBuildResult;
use crate::pipelines::SourcePipeBuilder;
use crate::sessions::QueryContext;
use crate::sessions::SessionManager;
use crate::stream::DataBlockStream;
use crate::stream::ProgressStream;
use crate::stream::PullingExecutorStream;

#[async_trait::async_trait]
Expand Down
5 changes: 2 additions & 3 deletions src/query/service/src/procedures/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datablocks::SendableDataBlockStream;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -23,8 +24,6 @@ use common_pipeline_core::Pipe;
use common_pipeline_core::Pipeline;
use common_pipeline_sources::processors::sources::StreamSource;
use common_sql::validate_function_arg;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use futures::StreamExt;

use crate::procedures::ProcedureFeatures;
Expand Down Expand Up @@ -132,8 +131,8 @@ where Self: Sized
}

mod impls {

use super::*;
use crate::stream::DataBlockStream;

// To avoid implementation conflicts, introduce a new type
pub(in self::super) struct OneBlockProcedureWrapper<T>(pub T);
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/servers/http/v1/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::StreamExt;

use crate::sessions::QueryContext;
use crate::storages::Table;
use crate::stream::DataBlockStream;
use crate::stream::ReadDataBlockStream;

pub type SendableVu8Stream =
std::pin::Pin<Box<dyn futures::stream::Stream<Item = Result<Vec<u8>>> + Send>>;
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_base::base::GlobalIORuntime;
use common_base::base::ProgressValues;
use common_base::base::Thread;
use common_base::base::TrySpawn;
use common_datablocks::SendableDataBlockStream;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -32,8 +33,6 @@ use common_storages_fuse_result::BlockBufferWriterMemOnly;
use common_storages_fuse_result::BlockBufferWriterWithResultTable;
use common_storages_fuse_result::ResultQueryInfo;
use common_storages_fuse_result::ResultTableSink;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use futures::StreamExt;
use futures_util::FutureExt;
use serde::Deserialize;
Expand All @@ -55,6 +54,7 @@ use crate::sessions::Session;
use crate::sessions::TableContext;
use crate::sql::plans::Plan;
use crate::sql::Planner;
use crate::stream::DataBlockStream;

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
pub enum ExecuteStateKind {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ use common_base::base::tokio::io::AsyncWrite;
use common_base::base::TrySpawn;
use common_config::DATABEND_COMMIT_VERSION;
use common_datablocks::DataBlock;
use common_datablocks::SendableDataBlockStream;
use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::ToErrorCode;
use common_io::prelude::*;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use common_users::CertifiedInfo;
use common_users::UserApiProvider;
use futures_util::StreamExt;
Expand Down Expand Up @@ -55,6 +54,7 @@ use crate::sessions::Session;
use crate::sessions::TableContext;
use crate::sql::plans::Plan;
use crate::sql::Planner;
use crate::stream::DataBlockStream;

fn has_result_set_by_plan(plan: &Plan) -> bool {
matches!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use common_base::base::tokio::io::AsyncWrite;
use common_datablocks::SendableDataBlockStream;
use common_datavalues::prelude::TypeID;
use common_datavalues::remove_nullable;
use common_datavalues::DataField;
Expand All @@ -24,7 +25,6 @@ use common_datavalues::TypeSerializer;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::FormatSettings;
use common_streams::SendableDataBlockStream;
use futures_util::StreamExt;
use opensrv_mysql::*;
use tracing::error;
Expand Down
File renamed without changes.
7 changes: 6 additions & 1 deletion src/query/service/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,10 @@
mod processor_executor_stream;
mod table_read_block_stream;

mod datablock_stream;
mod progress_stream;

pub use datablock_stream::DataBlockStream;
pub use processor_executor_stream::PullingExecutorStream;
pub use table_read_block_stream::DataBlockStream;
pub use progress_stream::ProgressStream;
pub use table_read_block_stream::ReadDataBlockStream;
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ use std::task::Poll;
use common_base::base::Progress;
use common_base::base::ProgressValues;
use common_datablocks::DataBlock;
use common_datablocks::SendableDataBlockStream;
use common_exception::Result;
use futures::Stream;
use pin_project_lite::pin_project;

use crate::SendableDataBlockStream;

pin_project! {
pub struct ProgressStream {
#[pin]
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/stream/table_read_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use std::sync::Arc;

use common_datablocks::SendableDataBlockStream;
use common_exception::Result;
use common_planner::ReadDataSourcePlan;
use common_streams::SendableDataBlockStream;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelinePullingExecutor;
Expand All @@ -27,7 +27,7 @@ use crate::storages::Table;
use crate::stream::PullingExecutorStream;

#[async_trait::async_trait]
pub trait DataBlockStream: Send + Sync {
pub trait ReadDataBlockStream: Send + Sync {
async fn read_data_block_stream(
&self,
_ctx: Arc<QueryContext>,
Expand All @@ -36,7 +36,7 @@ pub trait DataBlockStream: Send + Sync {
}

#[async_trait::async_trait]
impl<T: ?Sized + Table> DataBlockStream for T {
impl<T: ?Sized + Table> ReadDataBlockStream for T {
async fn read_data_block_stream(
&self,
ctx: Arc<QueryContext>,
Expand Down
1 change: 1 addition & 0 deletions src/query/service/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ mod servers;
mod sessions;
mod sql;
mod storages;
mod stream;
mod table_functions;
mod tests;
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use common_catalog::table::CompactTarget;
use common_catalog::table::Table;
use common_catalog::table::TableExt;
use common_datablocks::DataBlock;
use common_datablocks::SendableDataBlockStream;
use common_exception::ErrorCode;
use common_exception::Result;
use common_storages_fuse::FuseTable;
use common_streams::SendableDataBlockStream;
use databend_query::sessions::QueryContext;
use databend_query::sessions::TableContext;
use futures_util::TryStreamExt;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/storages/fuse/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use databend_query::sql::plans::create_table_v2::CreateTablePlanV2;
use databend_query::sql::Planner;
use databend_query::storages::fuse::io::MetaReaders;
use databend_query::storages::fuse::FuseTable;
use databend_query::stream::DataBlockStream;
use databend_query::stream::ReadDataBlockStream;
use futures::TryStreamExt;

use crate::storages::fuse::table_test_fixture::TestFixture;
Expand Down
Loading

1 comment on commit 01d97e7

@vercel
Copy link

@vercel vercel bot commented on 01d97e7 Nov 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-databend.vercel.app
databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app

Please sign in to comment.