Skip to content

Commit

Permalink
refactor(executor): globalize QueriesPipelineExecutor creation meth…
Browse files Browse the repository at this point in the history
…od (#15129)

* refactor: globalize `QueriesPipelineExecutor` creation method

* fix: unit test and clippy warn

* fix: executor should also return execution result

Signed-off-by: Liuqing Yue <dqhl76@gmail.com>

* chore: disable queries executor and ensure not affecting currently using executor

Signed-off-by: Liuqing Yue <dqhl76@gmail.com>

* chore: disable queries executor and ensure not affecting currently using executor

Signed-off-by: Liuqing Yue <dqhl76@gmail.com>

* chore: use compile feature to disable queries executor

Signed-off-by: Liuqing Yue <dqhl76@gmail.com>

* fix: make clippy happy

Signed-off-by: Liuqing Yue <dqhl76@gmail.com>

---------

Signed-off-by: Liuqing Yue <dqhl76@gmail.com>
  • Loading branch information
dqhl76 authored Apr 2, 2024
1 parent da87e6e commit 4107bcf
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ io-uring = [
# "databend-common-meta-raft-store/io-uring",
]

enable_queries_executor = []

[dependencies]
# Workspace dependencies
databend-common-arrow = { path = "../../common/arrow" }
Expand Down Expand Up @@ -139,6 +141,7 @@ match-template = { workspace = true }
metrics = "0.20.1"
minitrace = { workspace = true }
naive-cityhash = "0.2.0"
num_cpus = "1.16.0"
once_cell = { workspace = true }
opendal = { workspace = true }
opensrv-mysql = { version = "0.5.0", features = ["tls"] }
Expand Down
7 changes: 7 additions & 0 deletions src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use crate::auth::AuthMgr;
use crate::catalogs::DatabaseCatalog;
use crate::clusters::ClusterDiscovery;
use crate::locks::LockManager;
#[cfg(feature = "enable_queries_executor")]
use crate::pipelines::executor::GlobalQueriesExecutor;
use crate::servers::http::v1::HttpQueryManager;
use crate::sessions::QueriesQueueManager;
use crate::sessions::SessionManager;
Expand Down Expand Up @@ -131,6 +133,11 @@ impl GlobalServices {
CloudControlApiProvider::init(addr, config.query.cloud_control_grpc_timeout).await?;
}

#[cfg(feature = "enable_queries_executor")]
{
GlobalQueriesExecutor::init()?;
}

Ok(())
}
}
4 changes: 2 additions & 2 deletions src/query/service/src/pipelines/executor/executor_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use databend_common_exception::Result;
pub struct ExecutorSettings {
pub query_id: Arc<String>,
pub max_threads: u64,
pub enable_new_executor: bool,
pub enable_queries_executor: bool,
pub max_execute_time_in_seconds: Duration,
pub executor_node_id: String,
}
Expand All @@ -35,7 +35,7 @@ impl ExecutorSettings {
let max_execute_time_in_seconds = settings.get_max_execute_time_in_seconds()?;

Ok(ExecutorSettings {
enable_new_executor: settings.get_enable_experimental_queries_executor()?,
enable_queries_executor: settings.get_enable_experimental_queries_executor()?,
query_id: Arc::new(query_id),
max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds),
max_threads,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_base::runtime::Thread;
use databend_common_exception::Result;
use log::info;

use crate::pipelines::executor::QueriesPipelineExecutor;

pub struct GlobalQueriesExecutor(pub QueriesPipelineExecutor);

impl GlobalQueriesExecutor {
pub fn init() -> Result<()> {
let num_cpus = num_cpus::get();
GlobalInstance::set(QueriesPipelineExecutor::create(num_cpus)?);
Thread::spawn(|| {
if let Err(e) = Self::instance().execute() {
info!("Executor finished with error: {:?}", e);
}
});
Ok(())
}

pub fn instance() -> Arc<QueriesPipelineExecutor> {
GlobalInstance::get()
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod executor_condvar;
mod executor_graph;
mod executor_settings;
mod executor_worker_context;
mod global_queries_executor;
mod pipeline_complete_executor;
mod pipeline_executor;
mod pipeline_pulling_executor;
Expand All @@ -35,6 +36,7 @@ pub use executor_settings::ExecutorSettings;
pub use executor_worker_context::CompletedAsyncTask;
pub use executor_worker_context::ExecutorTask;
pub use executor_worker_context::ExecutorWorkerContext;
pub use global_queries_executor::GlobalQueriesExecutor;
pub use pipeline_complete_executor::PipelineCompleteExecutor;
pub use pipeline_executor::PipelineExecutor;
pub use pipeline_pulling_executor::PipelinePullingExecutor;
Expand Down
25 changes: 10 additions & 15 deletions src/query/service/src/pipelines/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use parking_lot::Condvar;
use parking_lot::Mutex;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::QueriesPipelineExecutor;
use crate::pipelines::executor::GlobalQueriesExecutor;
use crate::pipelines::executor::QueryPipelineExecutor;
use crate::pipelines::executor::RunningGraph;

Expand All @@ -40,8 +40,6 @@ pub type FinishedCallback =
Box<dyn FnOnce(&Result<Vec<PlanProfile>, ErrorCode>) -> Result<()> + Send + Sync + 'static>;

pub struct QueryWrapper {
// TODO: will remove it after refactoring queries pipeline executor
executor: Arc<QueriesPipelineExecutor>,
graph: Arc<RunningGraph>,
settings: ExecutorSettings,
on_init_callback: Mutex<Option<InitCallback>>,
Expand All @@ -58,7 +56,7 @@ pub enum PipelineExecutor {

impl PipelineExecutor {
pub fn create(mut pipeline: Pipeline, settings: ExecutorSettings) -> Result<Self> {
if !settings.enable_new_executor {
if !settings.enable_queries_executor {
Ok(PipelineExecutor::QueryPipelineExecutor(
QueryPipelineExecutor::create(pipeline, settings)?,
))
Expand All @@ -78,7 +76,6 @@ impl PipelineExecutor {
)?;

Ok(PipelineExecutor::QueriesPipelineExecutor(QueryWrapper {
executor: QueriesPipelineExecutor::create(settings.clone())?,
graph,
settings,
on_init_callback: Mutex::new(on_init_callback),
Expand All @@ -93,7 +90,7 @@ impl PipelineExecutor {
mut pipelines: Vec<Pipeline>,
settings: ExecutorSettings,
) -> Result<Self> {
if !settings.enable_new_executor {
if !settings.enable_queries_executor {
Ok(PipelineExecutor::QueryPipelineExecutor(
QueryPipelineExecutor::from_pipelines(pipelines, settings)?,
))
Expand Down Expand Up @@ -141,7 +138,6 @@ impl PipelineExecutor {
)?;

Ok(PipelineExecutor::QueriesPipelineExecutor(QueryWrapper {
executor: QueriesPipelineExecutor::create(settings.clone())?,
graph,
settings,
on_init_callback: Mutex::new(on_init_callback),
Expand Down Expand Up @@ -181,9 +177,7 @@ impl PipelineExecutor {
&query_wrapper.on_init_callback,
&query_wrapper.settings.query_id,
)?;
query_wrapper
.executor
.send_graph(query_wrapper.graph.clone())?;
GlobalQueriesExecutor::instance().send_graph(query_wrapper.graph.clone())?;

let (lock, cvar) = &*query_wrapper.finish_condvar_wait;
let mut finished = lock.lock();
Expand All @@ -192,24 +186,25 @@ impl PipelineExecutor {
}

let may_error = query_wrapper.graph.get_error();
match may_error {
return match may_error {
None => {
let guard = query_wrapper.on_finished_callback.lock().take();
if let Some(on_finished_callback) = guard {
catch_unwind(move || {
on_finished_callback(&Ok(self.get_plans_profile()))
})??;
}
Ok(())
}
Some(cause) => {
let guard = query_wrapper.on_finished_callback.lock().take();
let cause_clone = cause.clone();
if let Some(on_finished_callback) = guard {
catch_unwind(move || on_finished_callback(&Err(cause)))??;
catch_unwind(move || on_finished_callback(&Err(cause_clone)))??;
}
Err(cause)
}
}

Ok(())
};
}
}
}
Expand Down
43 changes: 35 additions & 8 deletions src/query/service/src/pipelines/executor/queries_executor_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,14 @@ impl QueriesExecutorTasksQueue {
}

let workers_condvar = context.get_workers_condvar();
if !workers_condvar.has_waiting_async_task()
&& workers_tasks.workers_waiting_status.is_last_active_worker()
{
drop(workers_tasks);
self.finish(workers_condvar.clone());
return;
}

let worker_id = context.get_worker_id();
workers_tasks.workers_waiting_status.wait_worker(worker_id);
drop(workers_tasks);
workers_condvar.wait(worker_id, self.finished.clone());
}

pub fn init_sync_tasks(&self, tasks: VecDeque<ProcessorWrapper>) {
pub fn init_sync_tasks(&self, tasks: VecDeque<ProcessorWrapper>, condvar: Arc<WorkersCondvar>) {
let mut workers_tasks = self.workers_tasks.lock();

let mut worker_id = 0;
Expand All @@ -163,6 +156,40 @@ impl QueriesExecutorTasksQueue {
if worker_id == workers_tasks.next_tasks.workers_sync_tasks.len() {
worker_id = 0;
}

if workers_tasks.workers_waiting_status.is_waiting(worker_id) {
workers_tasks
.workers_waiting_status
.wakeup_worker(worker_id);
condvar.wakeup(worker_id);
}
}
}

pub fn init_async_tasks(
&self,
tasks: VecDeque<ProcessorWrapper>,
condvar: Arc<WorkersCondvar>,
) {
let mut workers_tasks = self.workers_tasks.lock();

let mut worker_id = 0;
for proc in tasks.into_iter() {
workers_tasks
.next_tasks
.push_task(worker_id, ExecutorTask::Async(proc));

worker_id += 1;
if worker_id == workers_tasks.next_tasks.workers_sync_tasks.len() {
worker_id = 0;
}

if workers_tasks.workers_waiting_status.is_waiting(worker_id) {
workers_tasks
.workers_waiting_status
.wakeup_worker(worker_id);
condvar.wakeup(worker_id);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -31,8 +30,6 @@ use minitrace::full_name;
use minitrace::prelude::*;
use parking_lot::Mutex;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::ExecutorTask;
use crate::pipelines::executor::ExecutorWorkerContext;
use crate::pipelines::executor::QueriesExecutorTasksQueue;
use crate::pipelines::executor::RunningGraph;
Expand All @@ -51,9 +48,7 @@ pub struct QueriesPipelineExecutor {
}

impl QueriesPipelineExecutor {
pub fn create(settings: ExecutorSettings) -> Result<Arc<QueriesPipelineExecutor>> {
let threads_num = settings.max_threads as usize;

pub fn create(threads_num: usize) -> Result<Arc<QueriesPipelineExecutor>> {
let workers_condvar = WorkersCondvar::create(threads_num);
let global_tasks_queue = QueriesExecutorTasksQueue::create(threads_num);

Expand Down Expand Up @@ -97,22 +92,13 @@ impl QueriesPipelineExecutor {
unsafe {
let mut init_schedule_queue = graph.init_schedule_queue(self.threads_num)?;

let mut wakeup_worker_id = 0;
while let Some(proc) = init_schedule_queue.async_queue.pop_front() {
let mut tasks = VecDeque::with_capacity(1);
tasks.push_back(ExecutorTask::Async(proc));
self.global_tasks_queue
.push_tasks(wakeup_worker_id, None, tasks);

wakeup_worker_id += 1;
if wakeup_worker_id == self.threads_num {
wakeup_worker_id = 0;
}
}
let async_queue = std::mem::take(&mut init_schedule_queue.async_queue);
self.global_tasks_queue
.init_async_tasks(async_queue, self.workers_condvar.clone());

let sync_queue = std::mem::take(&mut init_schedule_queue.sync_queue);
self.global_tasks_queue.init_sync_tasks(sync_queue);
self.execute()?;
self.global_tasks_queue
.init_sync_tasks(sync_queue, self.workers_condvar.clone());
Ok(())
}
}
Expand Down Expand Up @@ -203,10 +189,6 @@ impl QueriesPipelineExecutor {
}
}
}
if graph.is_should_finish() {
// TODO: temporary finish method, will remove after change executor to a global service
self.finish(None);
}
if graph.is_all_nodes_finished() {
graph.should_finish(Ok(()))?;
}
Expand All @@ -217,7 +199,6 @@ impl QueriesPipelineExecutor {
if let Some(graph) = graph {
graph.should_finish(Err(cause.clone()))?;
}
self.finish(Some(cause));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ async fn create_executor_with_simple_pipeline(
let settings = ExecutorSettings {
query_id: Arc::new("".to_string()),
max_execute_time_in_seconds: Default::default(),
enable_new_executor: false,
enable_queries_executor: false,
max_threads: 8,
executor_node_id: "".to_string(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn test_always_call_on_finished() -> Result<()> {
let settings = ExecutorSettings {
query_id: Arc::new("".to_string()),
max_execute_time_in_seconds: Default::default(),
enable_new_executor: false,
enable_queries_executor: false,
max_threads: 8,
executor_node_id: "".to_string(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ async fn truncate_table(ctx: Arc<QueryContext>, table: Arc<dyn Table>) -> Result
table.truncate(ctx.clone(), &mut pipeline).await?;
if !pipeline.is_empty() {
pipeline.set_max_threads(1);
let executor_settings = ExecutorSettings::try_create(ctx.clone())?;
let mut executor_settings = ExecutorSettings::try_create(ctx.clone())?;
executor_settings.enable_queries_executor = false;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
ctx.set_executor(executor.get_inner())?;
executor.execute()?;
Expand Down

0 comments on commit 4107bcf

Please sign in to comment.