diff --git a/Cargo.lock b/Cargo.lock index b3ce6f883a9c..c814e9c1c116 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1320,7 +1320,9 @@ dependencies = [ "common-datavalues", "common-exception", "common-io", + "futures", "parking_lot 0.12.1", + "pin-project-lite", "pretty_assertions", "primitive-types", "regex", @@ -1854,7 +1856,6 @@ dependencies = [ "common-meta-types", "common-pipeline-core", "common-settings", - "common-streams", "crossbeam-channel", "csv-core", "futures", @@ -2148,7 +2149,6 @@ dependencies = [ "common-storages-fuse", "common-storages-table-meta", "common-storages-view", - "common-streams", "common-users", "futures", "itertools", @@ -2505,19 +2505,6 @@ dependencies = [ "common-meta-app", ] -[[package]] -name = "common-streams" -version = "0.1.0" -dependencies = [ - "common-base", - "common-datablocks", - "common-datavalues", - "common-exception", - "futures", - "opendal", - "pin-project-lite", -] - [[package]] name = "common-tracing" version = "0.1.0" @@ -3130,7 +3117,6 @@ dependencies = [ "common-storages-system", "common-storages-table-meta", "common-storages-view", - "common-streams", "common-tracing", "common-users", "criterion", @@ -3155,6 +3141,7 @@ dependencies = [ "openssl", "parking_lot 0.12.1", "petgraph", + "pin-project-lite", "poem", "pretty_assertions", "primitive-types", diff --git a/Cargo.toml b/Cargo.toml index 6aad39ad841a..624591fdcc40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,6 @@ members = [ "src/query/storages/stage", "src/query/storages/system", "src/query/storages/view", - "src/query/streams", "src/query/users", # databend-query "src/query/service", diff --git a/src/query/datablocks/Cargo.toml b/src/query/datablocks/Cargo.toml index b323435c731b..db43af7b4940 100644 --- a/src/query/datablocks/Cargo.toml +++ b/src/query/datablocks/Cargo.toml @@ -22,7 +22,9 @@ common-io = { path = "../../common/io" } # Crates.io dependencies ahash = "0.8.0" comfy-table = "6.1.0" +futures = "0.3.24" parking_lot = "0.12.1" +pin-project-lite = "0.2.9" primitive-types = "0.12.0" regex = "1.6.0" diff --git a/src/query/datablocks/src/lib.rs b/src/query/datablocks/src/lib.rs index eef4063187fa..10fb7d1e02f3 100644 --- a/src/query/datablocks/src/lib.rs +++ b/src/query/datablocks/src/lib.rs @@ -21,7 +21,8 @@ mod data_block_debug; mod kernels; mod memory; mod meta_info; -mod utils; +mod serialize; +mod stream; pub use block_compact_thresholds::BlockCompactThresholds; pub use data_block::DataBlock; @@ -30,4 +31,5 @@ pub use kernels::*; pub use memory::InMemoryData; pub use meta_info::MetaInfo; pub use meta_info::MetaInfoPtr; -pub use utils::*; +pub use serialize::*; +pub use stream::SendableDataBlockStream; diff --git a/src/query/datablocks/src/utils.rs b/src/query/datablocks/src/serialize.rs similarity index 100% rename from src/query/datablocks/src/utils.rs rename to src/query/datablocks/src/serialize.rs diff --git a/src/query/streams/src/stream.rs b/src/query/datablocks/src/stream.rs similarity index 91% rename from src/query/streams/src/stream.rs rename to src/query/datablocks/src/stream.rs index ca54346dcd83..d86642a573d5 100644 --- a/src/query/streams/src/stream.rs +++ b/src/query/datablocks/src/stream.rs @@ -1,4 +1,4 @@ -// Copyright 2021 Datafuse Labs. +// Copyright 2022 Datafuse Labs. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_datablocks::DataBlock; use common_exception::Result; +use crate::DataBlock; + pub type SendableDataBlockStream = std::pin::Pin> + Send>>; diff --git a/src/query/pipeline/sources/Cargo.toml b/src/query/pipeline/sources/Cargo.toml index 9e72869622a0..a5014ac1fec5 100644 --- a/src/query/pipeline/sources/Cargo.toml +++ b/src/query/pipeline/sources/Cargo.toml @@ -24,7 +24,6 @@ common-io = { path = "../../../common/io" } common-meta-types = { path = "../../../meta/types" } common-pipeline-core = { path = "../core" } common-settings = { path = "../../settings" } -common-streams = { path = "../../streams" } async-trait = { version = "0.1.57", package = "async-trait-fn" } bstr = "1.0.1" diff --git a/src/query/pipeline/sources/src/processors/sources/stream_source.rs b/src/query/pipeline/sources/src/processors/sources/stream_source.rs index 5174d24940f8..5ae7d8c49461 100644 --- a/src/query/pipeline/sources/src/processors/sources/stream_source.rs +++ b/src/query/pipeline/sources/src/processors/sources/stream_source.rs @@ -16,11 +16,11 @@ use std::sync::Arc; use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; +use common_datablocks::SendableDataBlockStream; use common_exception::ErrorCode; use common_exception::Result; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::ProcessorPtr; -use common_streams::SendableDataBlockStream; use futures::StreamExt; use crate::processors::sources::AsyncSource; diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 0b008c11cf43..1a90de5088e5 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -72,7 +72,6 @@ common-storages-stage = { path = "../storages/stage" } common-storages-system = { path = "../storages/system" } common-storages-table-meta = { path = "../storages/table-meta" } common-storages-view = { path = "../storages/view" } -common-streams = { path = "../streams" } common-tracing = { path = "../../common/tracing" } common-users = { path = "../users" } @@ -107,6 +106,7 @@ opensrv-mysql = "0.2.0" openssl = { version = "0.10.41", features = ["vendored"] } parking_lot = "0.12.1" petgraph = "0.6.2" +pin-project-lite = "0.2.9" poem = { version = "1", features = ["rustls", "multipart", "compression"] } primitive-types = "0.12.0" rand = "0.8.5" diff --git a/src/query/service/src/api/http/v1/logs.rs b/src/query/service/src/api/http/v1/logs.rs index 5acdbd0bd136..e020e1cb82ee 100644 --- a/src/query/service/src/api/http/v1/logs.rs +++ b/src/query/service/src/api/http/v1/logs.rs @@ -14,10 +14,10 @@ use std::sync::Arc; +use common_datablocks::SendableDataBlockStream; use common_exception::ErrorCode; use common_exception::Result; use common_sql::executor::table_read_plan::ToReadDataSourcePlan; -use common_streams::SendableDataBlockStream; use poem::http::StatusCode; use poem::Body; use poem::IntoResponse; @@ -27,7 +27,7 @@ use crate::sessions::QueryContext; use crate::sessions::SessionManager; use crate::sessions::SessionType; use crate::sessions::TableContext; -use crate::stream::DataBlockStream; +use crate::stream::ReadDataBlockStream; // read log files from cfg.log.log_dir #[poem::handler] diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 1c2809e6958f..ffdf5bb9122a 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -16,14 +16,12 @@ use std::sync::Arc; use std::time::SystemTime; use common_catalog::table_context::TableContext; +use common_datablocks::SendableDataBlockStream; use common_datavalues::DataSchema; use common_datavalues::DataSchemaRef; use common_datavalues::DataSchemaRefExt; use common_exception::ErrorCode; use common_exception::Result; -use common_streams::DataBlockStream; -use common_streams::ProgressStream; -use common_streams::SendableDataBlockStream; use crate::interpreters::InterpreterQueryLog; use crate::pipelines::executor::ExecutorSettings; @@ -33,6 +31,8 @@ use crate::pipelines::PipelineBuildResult; use crate::pipelines::SourcePipeBuilder; use crate::sessions::QueryContext; use crate::sessions::SessionManager; +use crate::stream::DataBlockStream; +use crate::stream::ProgressStream; use crate::stream::PullingExecutorStream; #[async_trait::async_trait] diff --git a/src/query/service/src/procedures/procedure.rs b/src/query/service/src/procedures/procedure.rs index ffdd3462b4d1..521fbcf01820 100644 --- a/src/query/service/src/procedures/procedure.rs +++ b/src/query/service/src/procedures/procedure.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use common_datablocks::DataBlock; +use common_datablocks::SendableDataBlockStream; use common_datavalues::DataSchema; use common_exception::ErrorCode; use common_exception::Result; @@ -23,8 +24,6 @@ use common_pipeline_core::Pipe; use common_pipeline_core::Pipeline; use common_pipeline_sources::processors::sources::StreamSource; use common_sql::validate_function_arg; -use common_streams::DataBlockStream; -use common_streams::SendableDataBlockStream; use futures::StreamExt; use crate::procedures::ProcedureFeatures; @@ -132,8 +131,8 @@ where Self: Sized } mod impls { - use super::*; + use crate::stream::DataBlockStream; // To avoid implementation conflicts, introduce a new type pub(in self::super) struct OneBlockProcedureWrapper(pub T); diff --git a/src/query/service/src/servers/http/v1/download.rs b/src/query/service/src/servers/http/v1/download.rs index 68e167c8aa28..c374ae325bb1 100644 --- a/src/query/service/src/servers/http/v1/download.rs +++ b/src/query/service/src/servers/http/v1/download.rs @@ -27,7 +27,7 @@ use futures::StreamExt; use crate::sessions::QueryContext; use crate::storages::Table; -use crate::stream::DataBlockStream; +use crate::stream::ReadDataBlockStream; pub type SendableVu8Stream = std::pin::Pin>> + Send>>; diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index b51e201184b3..a37770125baa 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -24,6 +24,7 @@ use common_base::base::GlobalIORuntime; use common_base::base::ProgressValues; use common_base::base::Thread; use common_base::base::TrySpawn; +use common_datablocks::SendableDataBlockStream; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; @@ -32,8 +33,6 @@ use common_storages_fuse_result::BlockBufferWriterMemOnly; use common_storages_fuse_result::BlockBufferWriterWithResultTable; use common_storages_fuse_result::ResultQueryInfo; use common_storages_fuse_result::ResultTableSink; -use common_streams::DataBlockStream; -use common_streams::SendableDataBlockStream; use futures::StreamExt; use futures_util::FutureExt; use serde::Deserialize; @@ -55,6 +54,7 @@ use crate::sessions::Session; use crate::sessions::TableContext; use crate::sql::plans::Plan; use crate::sql::Planner; +use crate::stream::DataBlockStream; #[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)] pub enum ExecuteStateKind { diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 2a0e0bf2efd7..15d6e79cfdf1 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -20,12 +20,11 @@ use common_base::base::tokio::io::AsyncWrite; use common_base::base::TrySpawn; use common_config::DATABEND_COMMIT_VERSION; use common_datablocks::DataBlock; +use common_datablocks::SendableDataBlockStream; use common_exception::ErrorCode; use common_exception::Result; use common_exception::ToErrorCode; use common_io::prelude::*; -use common_streams::DataBlockStream; -use common_streams::SendableDataBlockStream; use common_users::CertifiedInfo; use common_users::UserApiProvider; use futures_util::StreamExt; @@ -55,6 +54,7 @@ use crate::sessions::Session; use crate::sessions::TableContext; use crate::sql::plans::Plan; use crate::sql::Planner; +use crate::stream::DataBlockStream; fn has_result_set_by_plan(plan: &Plan) -> bool { matches!( diff --git a/src/query/service/src/servers/mysql/writers/query_result_writer.rs b/src/query/service/src/servers/mysql/writers/query_result_writer.rs index e3c43d8106b6..5fbe7f4f399d 100644 --- a/src/query/service/src/servers/mysql/writers/query_result_writer.rs +++ b/src/query/service/src/servers/mysql/writers/query_result_writer.rs @@ -13,6 +13,7 @@ // limitations under the License. use common_base::base::tokio::io::AsyncWrite; +use common_datablocks::SendableDataBlockStream; use common_datavalues::prelude::TypeID; use common_datavalues::remove_nullable; use common_datavalues::DataField; @@ -24,7 +25,6 @@ use common_datavalues::TypeSerializer; use common_exception::ErrorCode; use common_exception::Result; use common_io::prelude::FormatSettings; -use common_streams::SendableDataBlockStream; use futures_util::StreamExt; use opensrv_mysql::*; use tracing::error; diff --git a/src/query/streams/src/stream_datablock.rs b/src/query/service/src/stream/datablock_stream.rs similarity index 100% rename from src/query/streams/src/stream_datablock.rs rename to src/query/service/src/stream/datablock_stream.rs diff --git a/src/query/service/src/stream/mod.rs b/src/query/service/src/stream/mod.rs index 6aacda1541e1..2c4eb3ee2d75 100644 --- a/src/query/service/src/stream/mod.rs +++ b/src/query/service/src/stream/mod.rs @@ -15,5 +15,10 @@ mod processor_executor_stream; mod table_read_block_stream; +mod datablock_stream; +mod progress_stream; + +pub use datablock_stream::DataBlockStream; pub use processor_executor_stream::PullingExecutorStream; -pub use table_read_block_stream::DataBlockStream; +pub use progress_stream::ProgressStream; +pub use table_read_block_stream::ReadDataBlockStream; diff --git a/src/query/streams/src/stream_progress.rs b/src/query/service/src/stream/progress_stream.rs similarity index 97% rename from src/query/streams/src/stream_progress.rs rename to src/query/service/src/stream/progress_stream.rs index 11bd63955a78..45c25e4c22f1 100644 --- a/src/query/streams/src/stream_progress.rs +++ b/src/query/service/src/stream/progress_stream.rs @@ -19,12 +19,11 @@ use std::task::Poll; use common_base::base::Progress; use common_base::base::ProgressValues; use common_datablocks::DataBlock; +use common_datablocks::SendableDataBlockStream; use common_exception::Result; use futures::Stream; use pin_project_lite::pin_project; -use crate::SendableDataBlockStream; - pin_project! { pub struct ProgressStream { #[pin] diff --git a/src/query/service/src/stream/table_read_block_stream.rs b/src/query/service/src/stream/table_read_block_stream.rs index 41f7258c7f47..8da4ec3689ff 100644 --- a/src/query/service/src/stream/table_read_block_stream.rs +++ b/src/query/service/src/stream/table_read_block_stream.rs @@ -14,9 +14,9 @@ use std::sync::Arc; +use common_datablocks::SendableDataBlockStream; use common_exception::Result; use common_planner::ReadDataSourcePlan; -use common_streams::SendableDataBlockStream; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelinePullingExecutor; @@ -27,7 +27,7 @@ use crate::storages::Table; use crate::stream::PullingExecutorStream; #[async_trait::async_trait] -pub trait DataBlockStream: Send + Sync { +pub trait ReadDataBlockStream: Send + Sync { async fn read_data_block_stream( &self, _ctx: Arc, @@ -36,7 +36,7 @@ pub trait DataBlockStream: Send + Sync { } #[async_trait::async_trait] -impl DataBlockStream for T { +impl ReadDataBlockStream for T { async fn read_data_block_stream( &self, ctx: Arc, diff --git a/src/query/service/tests/it/main.rs b/src/query/service/tests/it/main.rs index 5a49186101d4..85e97638475d 100644 --- a/src/query/service/tests/it/main.rs +++ b/src/query/service/tests/it/main.rs @@ -26,5 +26,6 @@ mod servers; mod sessions; mod sql; mod storages; +mod stream; mod table_functions; mod tests; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 294d5b9b4457..a987c6eabd61 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -19,10 +19,10 @@ use common_catalog::table::CompactTarget; use common_catalog::table::Table; use common_catalog::table::TableExt; use common_datablocks::DataBlock; +use common_datablocks::SendableDataBlockStream; use common_exception::ErrorCode; use common_exception::Result; use common_storages_fuse::FuseTable; -use common_streams::SendableDataBlockStream; use databend_query::sessions::QueryContext; use databend_query::sessions::TableContext; use futures_util::TryStreamExt; diff --git a/src/query/service/tests/it/storages/fuse/table.rs b/src/query/service/tests/it/storages/fuse/table.rs index ece35bf55158..74cdae2f2c1c 100644 --- a/src/query/service/tests/it/storages/fuse/table.rs +++ b/src/query/service/tests/it/storages/fuse/table.rs @@ -35,7 +35,7 @@ use databend_query::sql::plans::create_table_v2::CreateTablePlanV2; use databend_query::sql::Planner; use databend_query::storages::fuse::io::MetaReaders; use databend_query::storages::fuse::FuseTable; -use databend_query::stream::DataBlockStream; +use databend_query::stream::ReadDataBlockStream; use futures::TryStreamExt; use crate::storages::fuse::table_test_fixture::TestFixture; diff --git a/src/query/service/tests/it/storages/fuse/table_test_fixture.rs b/src/query/service/tests/it/storages/fuse/table_test_fixture.rs index 8aa4eae026a0..4b6e5fcd019f 100644 --- a/src/query/service/tests/it/storages/fuse/table_test_fixture.rs +++ b/src/query/service/tests/it/storages/fuse/table_test_fixture.rs @@ -20,6 +20,7 @@ use common_catalog::catalog::CATALOG_DEFAULT; use common_catalog::table::AppendMode; use common_datablocks::assert_blocks_sorted_eq_with_name; use common_datablocks::DataBlock; +use common_datablocks::SendableDataBlockStream; use common_datavalues::prelude::*; use common_exception::Result; use common_meta_app::schema::DatabaseMeta; @@ -30,7 +31,6 @@ use common_storage::StorageFsConfig; use common_storage::StorageParams; use common_storages_fuse::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX; use common_storages_table_meta::table::OPT_KEY_DATABASE_ID; -use common_streams::SendableDataBlockStream; use databend_query::interpreters::append2table; use databend_query::interpreters::CreateTableInterpreterV2; use databend_query::interpreters::Interpreter; @@ -49,7 +49,7 @@ use databend_query::storages::fuse::FUSE_TBL_BLOCK_PREFIX; use databend_query::storages::fuse::FUSE_TBL_SEGMENT_PREFIX; use databend_query::storages::fuse::FUSE_TBL_SNAPSHOT_PREFIX; use databend_query::storages::Table; -use databend_query::stream::DataBlockStream; +use databend_query::stream::ReadDataBlockStream; use databend_query::table_functions::TableArgs; use futures::TryStreamExt; use parking_lot::Mutex; diff --git a/src/query/service/tests/it/storages/memory.rs b/src/query/service/tests/it/storages/memory.rs index 6b137a21c6f6..61e0df6c4923 100644 --- a/src/query/service/tests/it/storages/memory.rs +++ b/src/query/service/tests/it/storages/memory.rs @@ -26,7 +26,7 @@ use common_sql::executor::table_read_plan::ToReadDataSourcePlan; use common_storages_memory::MemoryTable; use databend_query::sessions::TableContext; use databend_query::sql::plans::create_table_v2::TableOptions; -use databend_query::stream::DataBlockStream; +use databend_query::stream::ReadDataBlockStream; use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] diff --git a/src/query/service/tests/it/storages/null.rs b/src/query/service/tests/it/storages/null.rs index ba64518e781f..9554ba874876 100644 --- a/src/query/service/tests/it/storages/null.rs +++ b/src/query/service/tests/it/storages/null.rs @@ -20,7 +20,7 @@ use common_meta_app::schema::TableMeta; use common_sql::executor::table_read_plan::ToReadDataSourcePlan; use common_storages_null::NullTable; use databend_query::sql::plans::create_table_v2::TableOptions; -use databend_query::stream::DataBlockStream; +use databend_query::stream::ReadDataBlockStream; use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] diff --git a/src/query/service/tests/it/storages/result/result_table.rs b/src/query/service/tests/it/storages/result/result_table.rs index 5ce5179414f1..766d1b663c87 100644 --- a/src/query/service/tests/it/storages/result/result_table.rs +++ b/src/query/service/tests/it/storages/result/result_table.rs @@ -31,7 +31,7 @@ use common_storages_fuse_result::ResultTable; use common_storages_fuse_result::ResultTableWriter; use databend_query::sessions::TableContext; use databend_query::storages::Table; -use databend_query::stream::DataBlockStream; +use databend_query::stream::ReadDataBlockStream; use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] diff --git a/src/query/service/tests/it/storages/system.rs b/src/query/service/tests/it/storages/system.rs index 03ca73086a76..f4c698db767a 100644 --- a/src/query/service/tests/it/storages/system.rs +++ b/src/query/service/tests/it/storages/system.rs @@ -48,7 +48,7 @@ use common_storages_system::UsersTable; use common_users::UserApiProvider; use databend_query::sessions::QueryContext; use databend_query::sessions::TableContext; -use databend_query::stream::DataBlockStream; +use databend_query::stream::ReadDataBlockStream; use futures::TryStreamExt; use goldenfile::Mint; use wiremock::matchers::method; diff --git a/src/query/streams/tests/it/stream_datablock.rs b/src/query/service/tests/it/stream/datablock_stream.rs similarity index 98% rename from src/query/streams/tests/it/stream_datablock.rs rename to src/query/service/tests/it/stream/datablock_stream.rs index e74bcbca92a5..7f2ebe6852ca 100644 --- a/src/query/streams/tests/it/stream_datablock.rs +++ b/src/query/service/tests/it/stream/datablock_stream.rs @@ -15,7 +15,7 @@ use common_base::base::tokio; use common_datablocks::*; use common_datavalues::prelude::*; -use common_streams::*; +use databend_query::stream::DataBlockStream; use futures::stream::StreamExt; #[tokio::test] diff --git a/src/query/streams/tests/it/main.rs b/src/query/service/tests/it/stream/mod.rs similarity index 93% rename from src/query/streams/tests/it/main.rs rename to src/query/service/tests/it/stream/mod.rs index 125044351bb7..ce9e26a45f80 100644 --- a/src/query/streams/tests/it/main.rs +++ b/src/query/service/tests/it/stream/mod.rs @@ -11,6 +11,5 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - -mod stream_datablock; -mod stream_progress; +mod datablock_stream; +mod progress_stream; diff --git a/src/query/streams/tests/it/stream_progress.rs b/src/query/service/tests/it/stream/progress_stream.rs similarity index 95% rename from src/query/streams/tests/it/stream_progress.rs rename to src/query/service/tests/it/stream/progress_stream.rs index 858e115356f4..1637b6c4099c 100644 --- a/src/query/streams/tests/it/stream_progress.rs +++ b/src/query/service/tests/it/stream/progress_stream.rs @@ -19,7 +19,8 @@ use common_base::base::*; use common_datablocks::*; use common_datavalues::prelude::*; use common_exception::Result; -use common_streams::*; +use databend_query::stream::DataBlockStream; +use databend_query::stream::ProgressStream; use futures::TryStreamExt; #[tokio::test] diff --git a/src/query/service/tests/it/table_functions/numbers_table.rs b/src/query/service/tests/it/table_functions/numbers_table.rs index a81f1678a3e8..41d7e01b896d 100644 --- a/src/query/service/tests/it/table_functions/numbers_table.rs +++ b/src/query/service/tests/it/table_functions/numbers_table.rs @@ -22,7 +22,7 @@ use databend_query::sessions::SessionManager; use databend_query::sessions::SessionType; use databend_query::sessions::TableContext; use databend_query::sql::Planner; -use databend_query::stream::DataBlockStream; +use databend_query::stream::ReadDataBlockStream; use databend_query::table_functions::NumbersTable; use futures::TryStreamExt; use pretty_assertions::assert_eq; diff --git a/src/query/storages/fuse/fuse-result/Cargo.toml b/src/query/storages/fuse/fuse-result/Cargo.toml index 36ca2e943b40..255d36508530 100644 --- a/src/query/storages/fuse/fuse-result/Cargo.toml +++ b/src/query/storages/fuse/fuse-result/Cargo.toml @@ -31,7 +31,6 @@ common-storage = { path = "../../../../common/storage" } common-storages-fuse = { path = "../../fuse/fuse" } common-storages-table-meta = { path = "../../table-meta" } common-storages-view = { path = "../../view" } -common-streams = { path = "../../../streams" } common-users = { path = "../../../users" } async-trait = { version = "0.1.57", package = "async-trait-fn" } diff --git a/src/query/storages/fuse/fuse-result/src/writer.rs b/src/query/storages/fuse/fuse-result/src/writer.rs index d0f3a41511fc..74450aee430c 100644 --- a/src/query/storages/fuse/fuse-result/src/writer.rs +++ b/src/query/storages/fuse/fuse-result/src/writer.rs @@ -20,6 +20,7 @@ use backon::Retryable; use common_catalog::table_context::TableContext; use common_datablocks::serialize_data_blocks; use common_datablocks::DataBlock; +use common_datablocks::SendableDataBlockStream; use common_exception::Result; use common_planner::PartInfoPtr; use common_storages_fuse::statistics::BlockStatistics; @@ -27,7 +28,6 @@ use common_storages_fuse::statistics::StatisticsAccumulator; use common_storages_fuse::FuseTable; use common_storages_table_meta::meta::SegmentInfo; use common_storages_table_meta::meta::Statistics as FuseMetaStatistics; -use common_streams::SendableDataBlockStream; use futures::StreamExt; use opendal::Operator; use tracing::warn; diff --git a/src/query/streams/Cargo.toml b/src/query/streams/Cargo.toml deleted file mode 100644 index ef60d587535a..000000000000 --- a/src/query/streams/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -authors = ["Databend Authors "] -edition = "2021" -license = "Apache-2.0" -name = "common-streams" -publish = false -version = "0.1.0" - -[lib] -doctest = false -test = false - -[dependencies] # In alphabetical order -# Workspace dependencies -common-base = { path = "../../common/base" } -common-datablocks = { path = "../datablocks" } -common-datavalues = { path = "../datavalues" } -common-exception = { path = "../../common/exception" } - -# Github dependencies - -# Crates.io dependencies -futures = "0.3.24" -pin-project-lite = "0.2.9" - -[dev-dependencies] -opendal = { version = "0.19", features = ["layers-retry", "compress"] } diff --git a/src/query/streams/src/lib.rs b/src/query/streams/src/lib.rs deleted file mode 100644 index 819b46b62294..000000000000 --- a/src/query/streams/src/lib.rs +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod stream; -mod stream_datablock; -mod stream_progress; - -pub use stream::*; -pub use stream_datablock::DataBlockStream; -pub use stream_progress::ProgressStream;