Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(executor): globalize QueriesPipelineExecutor creation method #15129

Merged
merged 11 commits into from
Apr 2, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,9 @@ pub struct QueryConfig {
#[clap(long, value_name = "VALUE", default_value = "0")]
pub cloud_control_grpc_timeout: u64,

#[clap(long)]
pub enable_queries_executor: bool,
dqhl76 marked this conversation as resolved.
Show resolved Hide resolved

#[clap(skip)]
pub settings: HashMap<String, SettingValue>,
}
Expand Down Expand Up @@ -1739,6 +1742,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
udf_server_allow_list: self.udf_server_allow_list,
cloud_control_grpc_server_address: self.cloud_control_grpc_server_address,
cloud_control_grpc_timeout: self.cloud_control_grpc_timeout,
enable_queries_executor: self.enable_queries_executor,
settings: self
.settings
.into_iter()
Expand Down Expand Up @@ -1836,6 +1840,7 @@ impl From<InnerQueryConfig> for QueryConfig {
udf_server_allow_list: inner.udf_server_allow_list,
cloud_control_grpc_server_address: inner.cloud_control_grpc_server_address,
cloud_control_grpc_timeout: inner.cloud_control_grpc_timeout,
enable_queries_executor: inner.enable_queries_executor,
settings: HashMap::new(),
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ pub struct QueryConfig {

pub cloud_control_grpc_server_address: Option<String>,
pub cloud_control_grpc_timeout: u64,

pub enable_queries_executor: bool,
pub settings: HashMap<String, UserSettingValue>,
}

Expand Down Expand Up @@ -302,6 +304,7 @@ impl Default for QueryConfig {
cloud_control_grpc_server_address: None,
cloud_control_grpc_timeout: 0,
data_retention_time_in_days_max: 90,
enable_queries_executor: false,
settings: HashMap::new(),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ match-template = { workspace = true }
metrics = "0.20.1"
minitrace = { workspace = true }
naive-cityhash = "0.2.0"
num_cpus = "1.16.0"
once_cell = { workspace = true }
opendal = { workspace = true }
opensrv-mysql = { version = "0.5.0", features = ["tls"] }
Expand Down
6 changes: 6 additions & 0 deletions src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::auth::AuthMgr;
use crate::catalogs::DatabaseCatalog;
use crate::clusters::ClusterDiscovery;
use crate::locks::LockManager;
use crate::pipelines::executor::GlobalQueriesExecutor;
use crate::servers::http::v1::HttpQueryManager;
use crate::sessions::QueriesQueueManager;
use crate::sessions::SessionManager;
Expand Down Expand Up @@ -131,6 +132,11 @@ impl GlobalServices {
CloudControlApiProvider::init(addr, config.query.cloud_control_grpc_timeout).await?;
}

// if config.query.enable_queries_executor {
// GlobalQueriesExecutor::init()?;
// }
GlobalQueriesExecutor::init()?;

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ impl ExecutorSettings {
let max_execute_time_in_seconds = settings.get_max_execute_time_in_seconds()?;

Ok(ExecutorSettings {
enable_new_executor: settings.get_enable_experimental_queries_executor()?,
enable_new_executor: true,
// enable_new_executor: settings.get_enable_experimental_queries_executor()?,
query_id: Arc::new(query_id),
max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds),
max_threads,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_base::runtime::Thread;
use databend_common_exception::Result;
use log::info;

use crate::pipelines::executor::QueriesPipelineExecutor;

pub struct GlobalQueriesExecutor(pub QueriesPipelineExecutor);

impl GlobalQueriesExecutor {
pub fn init() -> Result<()> {
let num_cpus = num_cpus::get();
GlobalInstance::set(QueriesPipelineExecutor::create(num_cpus)?);
Thread::spawn(|| {
if let Err(e) = Self::instance().execute() {
info!("Executor finished with error: {:?}", e);
}
});
Ok(())
}

pub fn instance() -> Arc<QueriesPipelineExecutor> {
GlobalInstance::get()
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod executor_condvar;
mod executor_graph;
mod executor_settings;
mod executor_worker_context;
mod global_queries_executor;
mod pipeline_complete_executor;
mod pipeline_executor;
mod pipeline_pulling_executor;
Expand All @@ -35,6 +36,7 @@ pub use executor_settings::ExecutorSettings;
pub use executor_worker_context::CompletedAsyncTask;
pub use executor_worker_context::ExecutorTask;
pub use executor_worker_context::ExecutorWorkerContext;
pub use global_queries_executor::GlobalQueriesExecutor;
pub use pipeline_complete_executor::PipelineCompleteExecutor;
pub use pipeline_executor::PipelineExecutor;
pub use pipeline_pulling_executor::PipelinePullingExecutor;
Expand Down
21 changes: 8 additions & 13 deletions src/query/service/src/pipelines/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use parking_lot::Condvar;
use parking_lot::Mutex;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::QueriesPipelineExecutor;
use crate::pipelines::executor::GlobalQueriesExecutor;
use crate::pipelines::executor::QueryPipelineExecutor;
use crate::pipelines::executor::RunningGraph;

Expand All @@ -40,8 +40,6 @@ pub type FinishedCallback =
Box<dyn FnOnce(&Result<Vec<PlanProfile>, ErrorCode>) -> Result<()> + Send + Sync + 'static>;

pub struct QueryWrapper {
// TODO: will remove it after refactoring queries pipeline executor
executor: Arc<QueriesPipelineExecutor>,
graph: Arc<RunningGraph>,
settings: ExecutorSettings,
on_init_callback: Mutex<Option<InitCallback>>,
Expand Down Expand Up @@ -78,7 +76,6 @@ impl PipelineExecutor {
)?;

Ok(PipelineExecutor::QueriesPipelineExecutor(QueryWrapper {
executor: QueriesPipelineExecutor::create(settings.clone())?,
graph,
settings,
on_init_callback: Mutex::new(on_init_callback),
Expand Down Expand Up @@ -141,7 +138,6 @@ impl PipelineExecutor {
)?;

Ok(PipelineExecutor::QueriesPipelineExecutor(QueryWrapper {
executor: QueriesPipelineExecutor::create(settings.clone())?,
graph,
settings,
on_init_callback: Mutex::new(on_init_callback),
Expand Down Expand Up @@ -181,9 +177,7 @@ impl PipelineExecutor {
&query_wrapper.on_init_callback,
&query_wrapper.settings.query_id,
)?;
query_wrapper
.executor
.send_graph(query_wrapper.graph.clone())?;
GlobalQueriesExecutor::instance().send_graph(query_wrapper.graph.clone())?;

let (lock, cvar) = &*query_wrapper.finish_condvar_wait;
let mut finished = lock.lock();
Expand All @@ -192,24 +186,25 @@ impl PipelineExecutor {
}

let may_error = query_wrapper.graph.get_error();
match may_error {
return match may_error {
None => {
let guard = query_wrapper.on_finished_callback.lock().take();
if let Some(on_finished_callback) = guard {
catch_unwind(move || {
on_finished_callback(&Ok(self.get_plans_profile()))
})??;
}
Ok(())
}
Some(cause) => {
let guard = query_wrapper.on_finished_callback.lock().take();
let cause_clone = cause.clone();
if let Some(on_finished_callback) = guard {
catch_unwind(move || on_finished_callback(&Err(cause)))??;
catch_unwind(move || on_finished_callback(&Err(cause_clone)))??;
}
Err(cause)
}
}

Ok(())
};
}
}
}
Expand Down
43 changes: 35 additions & 8 deletions src/query/service/src/pipelines/executor/queries_executor_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,14 @@ impl QueriesExecutorTasksQueue {
}

let workers_condvar = context.get_workers_condvar();
if !workers_condvar.has_waiting_async_task()
&& workers_tasks.workers_waiting_status.is_last_active_worker()
{
drop(workers_tasks);
self.finish(workers_condvar.clone());
return;
}

let worker_id = context.get_worker_id();
workers_tasks.workers_waiting_status.wait_worker(worker_id);
drop(workers_tasks);
workers_condvar.wait(worker_id, self.finished.clone());
}

pub fn init_sync_tasks(&self, tasks: VecDeque<ProcessorWrapper>) {
pub fn init_sync_tasks(&self, tasks: VecDeque<ProcessorWrapper>, condvar: Arc<WorkersCondvar>) {
let mut workers_tasks = self.workers_tasks.lock();

let mut worker_id = 0;
Expand All @@ -163,6 +156,40 @@ impl QueriesExecutorTasksQueue {
if worker_id == workers_tasks.next_tasks.workers_sync_tasks.len() {
worker_id = 0;
}

if workers_tasks.workers_waiting_status.is_waiting(worker_id) {
workers_tasks
.workers_waiting_status
.wakeup_worker(worker_id);
condvar.wakeup(worker_id);
}
}
}

pub fn init_async_tasks(
&self,
tasks: VecDeque<ProcessorWrapper>,
condvar: Arc<WorkersCondvar>,
) {
let mut workers_tasks = self.workers_tasks.lock();

let mut worker_id = 0;
for proc in tasks.into_iter() {
workers_tasks
.next_tasks
.push_task(worker_id, ExecutorTask::Async(proc));

worker_id += 1;
if worker_id == workers_tasks.next_tasks.workers_sync_tasks.len() {
worker_id = 0;
}

if workers_tasks.workers_waiting_status.is_waiting(worker_id) {
workers_tasks
.workers_waiting_status
.wakeup_worker(worker_id);
condvar.wakeup(worker_id);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -31,8 +30,6 @@ use minitrace::full_name;
use minitrace::prelude::*;
use parking_lot::Mutex;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::ExecutorTask;
use crate::pipelines::executor::ExecutorWorkerContext;
use crate::pipelines::executor::QueriesExecutorTasksQueue;
use crate::pipelines::executor::RunningGraph;
Expand All @@ -51,9 +48,7 @@ pub struct QueriesPipelineExecutor {
}

impl QueriesPipelineExecutor {
pub fn create(settings: ExecutorSettings) -> Result<Arc<QueriesPipelineExecutor>> {
let threads_num = settings.max_threads as usize;

pub fn create(threads_num: usize) -> Result<Arc<QueriesPipelineExecutor>> {
let workers_condvar = WorkersCondvar::create(threads_num);
let global_tasks_queue = QueriesExecutorTasksQueue::create(threads_num);

Expand Down Expand Up @@ -97,22 +92,13 @@ impl QueriesPipelineExecutor {
unsafe {
let mut init_schedule_queue = graph.init_schedule_queue(self.threads_num)?;

let mut wakeup_worker_id = 0;
while let Some(proc) = init_schedule_queue.async_queue.pop_front() {
let mut tasks = VecDeque::with_capacity(1);
tasks.push_back(ExecutorTask::Async(proc));
self.global_tasks_queue
.push_tasks(wakeup_worker_id, None, tasks);

wakeup_worker_id += 1;
if wakeup_worker_id == self.threads_num {
wakeup_worker_id = 0;
}
}
let async_queue = std::mem::take(&mut init_schedule_queue.async_queue);
self.global_tasks_queue
.init_async_tasks(async_queue, self.workers_condvar.clone());

let sync_queue = std::mem::take(&mut init_schedule_queue.sync_queue);
self.global_tasks_queue.init_sync_tasks(sync_queue);
self.execute()?;
self.global_tasks_queue
.init_sync_tasks(sync_queue, self.workers_condvar.clone());
Ok(())
}
}
Expand Down Expand Up @@ -203,10 +189,6 @@ impl QueriesPipelineExecutor {
}
}
}
if graph.is_should_finish() {
// TODO: temporary finish method, will remove after change executor to a global service
self.finish(None);
}
if graph.is_all_nodes_finished() {
graph.should_finish(Ok(()))?;
}
Expand All @@ -217,7 +199,6 @@ impl QueriesPipelineExecutor {
if let Some(graph) = graph {
graph.should_finish(Err(cause.clone()))?;
}
self.finish(Some(cause));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ async fn truncate_table(ctx: Arc<QueryContext>, table: Arc<dyn Table>) -> Result
table.truncate(ctx.clone(), &mut pipeline).await?;
if !pipeline.is_empty() {
pipeline.set_max_threads(1);
let executor_settings = ExecutorSettings::try_create(ctx.clone())?;
let mut executor_settings = ExecutorSettings::try_create(ctx.clone())?;
executor_settings.enable_new_executor = false;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
ctx.set_executor(executor.get_inner())?;
executor.execute()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'query' | 'default_compression' | 'auto' | '' |
| 'query' | 'default_storage_format' | 'auto' | '' |
| 'query' | 'disable_system_table_load' | 'false' | '' |
| 'query' | 'enable_queries_executor' | 'false' | '' |
| 'query' | 'enable_udf_server' | 'false' | '' |
| 'query' | 'flight_api_address' | '127.0.0.1:9090' | '' |
| 'query' | 'flight_sql_handler_host' | '127.0.0.1' | '' |
Expand Down
Loading