diff --git a/rust/cubestore/cubestore-sql-tests/src/multiproc.rs b/rust/cubestore/cubestore-sql-tests/src/multiproc.rs index 1f8a22ea086eb..1db6649ec1bd6 100644 --- a/rust/cubestore/cubestore-sql-tests/src/multiproc.rs +++ b/rust/cubestore/cubestore-sql-tests/src/multiproc.rs @@ -37,7 +37,7 @@ where for inputs in worker_inputs { let (send_done, recv_done) = ipc_channel::ipc::bytes_channel().unwrap(); let args = (send_init.clone(), recv_done, inputs, timeout); - let handle = respawn(args, &[], &[]).unwrap(); + let handle = respawn(args, &["--".to_string(), "--nocapture".to_string()], &[]).unwrap(); // Ensure we signal completion to all started workers even if errors occur along the way. join_workers.push(scopeguard::guard( (send_done, handle), diff --git a/rust/cubestore/cubestore-sql-tests/tests/cluster.rs b/rust/cubestore/cubestore-sql-tests/tests/cluster.rs index 7a94659b78eff..460d9d64b0bfd 100644 --- a/rust/cubestore/cubestore-sql-tests/tests/cluster.rs +++ b/rust/cubestore/cubestore-sql-tests/tests/cluster.rs @@ -6,6 +6,7 @@ use serde_derive::{Deserialize, Serialize}; use cubestore::config::Config; use cubestore::util::respawn; +use cubestore::util::respawn::register_pushdownable_envs; use cubestore_sql_tests::multiproc::{ multiproc_child_main, run_multiproc_test, MultiProcTest, SignalInit, WaitCompletion, WorkerProc, }; @@ -16,6 +17,7 @@ const WORKER_PORTS: [u16; 2] = [51337, 51338]; #[cfg(not(target_os = "windows"))] fn main() { + register_pushdownable_envs(&["CUBESTORE_TEST_LOG_WORKER"]); respawn::register_handler(multiproc_child_main::); respawn::init(); // TODO: logs in worker processes. @@ -99,7 +101,11 @@ impl WorkerProc for WorkerFn { } Config::test(&test_name) .update_config(|mut c| { - c.select_worker_pool_size = 2; + c.select_worker_pool_size = if std::env::var("CUBESTORE_TEST_LOG_WORKER").is_ok() { + 0 + } else { + 2 + }; c.server_name = format!("localhost:{}", WORKER_PORTS[id]); c.worker_bind_address = Some(c.server_name.clone()); c.metastore_remote_address = Some(format!("localhost:{}", METASTORE_PORT)); diff --git a/rust/cubestore/cubestore/src/queryplanner/planning.rs b/rust/cubestore/cubestore/src/queryplanner/planning.rs index bea1b76dc98eb..35b47504095f4 100644 --- a/rust/cubestore/cubestore/src/queryplanner/planning.rs +++ b/rust/cubestore/cubestore/src/queryplanner/planning.rs @@ -1627,30 +1627,15 @@ impl CubeExtensionPlanner { } // Note that MergeExecs are added automatically when needed. if let Some(c) = self.cluster.as_ref() { - let mut send: Arc = Arc::new(ClusterSendExec::new( + Ok(Arc::new(ClusterSendExec::new( schema, c.clone(), self.serialized_plan.clone(), snapshots, input, use_streaming, - )?); - // TODO upgrade DF - if send.properties().partitioning.partition_count() != 1 { - send = Arc::new(RepartitionExec::try_new( - send, - Partitioning::UnknownPartitioning(1), - )?); - } - Ok(send) + )?)) } else { - // TODO upgrade DF - if input.output_partitioning().partition_count() != 1 { - input = Arc::new(RepartitionExec::try_new( - input, - Partitioning::UnknownPartitioning(1), - )?); - } Ok(Arc::new(WorkerExec { input, schema, diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 43685d702715b..163d5accfa168 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -748,12 +748,9 @@ impl CubeTable { } let schema = table_projected_schema; - let partition_num = partition_execs - .iter() - .map(|c| c.properties().partitioning.partition_count()) - .sum(); + let partition_num = partition_execs.len(); - let read_data = Arc::new(CubeTableExec { + let read_data: Arc = Arc::new(CubeTableExec { schema: schema.clone(), partition_execs, index_snapshot: self.index_snapshot.clone(), @@ -856,10 +853,7 @@ impl CubeTable { .collect::, _>>()?; Arc::new(SortPreservingMergeExec::new(join_columns, read_data)) } else { - Arc::new(RepartitionExec::try_new( - read_data, - Partitioning::UnknownPartitioning(1), - )?) + read_data }; Ok(plan)