Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(executor): globalize QueriesPipelineExecutor creation method #15129

Merged
merged 11 commits into from
Apr 2, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ io-uring = [
# "databend-common-meta-raft-store/io-uring",
]

enable_queries_executor = []

[dependencies]
# Workspace dependencies
databend-common-arrow = { path = "../../common/arrow" }
Expand Down Expand Up @@ -139,6 +141,7 @@ match-template = { workspace = true }
metrics = "0.20.1"
minitrace = { workspace = true }
naive-cityhash = "0.2.0"
num_cpus = "1.16.0"
once_cell = { workspace = true }
opendal = { workspace = true }
opensrv-mysql = { version = "0.5.0", features = ["tls"] }
Expand Down
7 changes: 7 additions & 0 deletions src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use crate::auth::AuthMgr;
use crate::catalogs::DatabaseCatalog;
use crate::clusters::ClusterDiscovery;
use crate::locks::LockManager;
#[cfg(feature = "enable_queries_executor")]
use crate::pipelines::executor::GlobalQueriesExecutor;
use crate::servers::http::v1::HttpQueryManager;
use crate::sessions::QueriesQueueManager;
use crate::sessions::SessionManager;
Expand Down Expand Up @@ -131,6 +133,11 @@ impl GlobalServices {
CloudControlApiProvider::init(addr, config.query.cloud_control_grpc_timeout).await?;
}

#[cfg(feature = "enable_queries_executor")]
{
GlobalQueriesExecutor::init()?;
}

Ok(())
}
}
4 changes: 2 additions & 2 deletions src/query/service/src/pipelines/executor/executor_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use databend_common_exception::Result;
pub struct ExecutorSettings {
pub query_id: Arc<String>,
pub max_threads: u64,
pub enable_new_executor: bool,
pub enable_queries_executor: bool,
pub max_execute_time_in_seconds: Duration,
pub executor_node_id: String,
}
Expand All @@ -35,7 +35,7 @@ impl ExecutorSettings {
let max_execute_time_in_seconds = settings.get_max_execute_time_in_seconds()?;

Ok(ExecutorSettings {
enable_new_executor: settings.get_enable_experimental_queries_executor()?,
enable_queries_executor: settings.get_enable_experimental_queries_executor()?,
query_id: Arc::new(query_id),
max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds),
max_threads,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_base::runtime::Thread;
use databend_common_exception::Result;
use log::info;

use crate::pipelines::executor::QueriesPipelineExecutor;

pub struct GlobalQueriesExecutor(pub QueriesPipelineExecutor);

impl GlobalQueriesExecutor {
pub fn init() -> Result<()> {
let num_cpus = num_cpus::get();
GlobalInstance::set(QueriesPipelineExecutor::create(num_cpus)?);
Thread::spawn(|| {
if let Err(e) = Self::instance().execute() {
info!("Executor finished with error: {:?}", e);
}
});
Ok(())
}

pub fn instance() -> Arc<QueriesPipelineExecutor> {
GlobalInstance::get()
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod executor_condvar;
mod executor_graph;
mod executor_settings;
mod executor_worker_context;
mod global_queries_executor;
mod pipeline_complete_executor;
mod pipeline_executor;
mod pipeline_pulling_executor;
Expand All @@ -35,6 +36,7 @@ pub use executor_settings::ExecutorSettings;
pub use executor_worker_context::CompletedAsyncTask;
pub use executor_worker_context::ExecutorTask;
pub use executor_worker_context::ExecutorWorkerContext;
pub use global_queries_executor::GlobalQueriesExecutor;
pub use pipeline_complete_executor::PipelineCompleteExecutor;
pub use pipeline_executor::PipelineExecutor;
pub use pipeline_pulling_executor::PipelinePullingExecutor;
Expand Down
25 changes: 10 additions & 15 deletions src/query/service/src/pipelines/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use parking_lot::Condvar;
use parking_lot::Mutex;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::QueriesPipelineExecutor;
use crate::pipelines::executor::GlobalQueriesExecutor;
use crate::pipelines::executor::QueryPipelineExecutor;
use crate::pipelines::executor::RunningGraph;

Expand All @@ -40,8 +40,6 @@ pub type FinishedCallback =
Box<dyn FnOnce(&Result<Vec<PlanProfile>, ErrorCode>) -> Result<()> + Send + Sync + 'static>;

pub struct QueryWrapper {
// TODO: will remove it after refactoring queries pipeline executor
executor: Arc<QueriesPipelineExecutor>,
graph: Arc<RunningGraph>,
settings: ExecutorSettings,
on_init_callback: Mutex<Option<InitCallback>>,
Expand All @@ -58,7 +56,7 @@ pub enum PipelineExecutor {

impl PipelineExecutor {
pub fn create(mut pipeline: Pipeline, settings: ExecutorSettings) -> Result<Self> {
if !settings.enable_new_executor {
if !settings.enable_queries_executor {
Ok(PipelineExecutor::QueryPipelineExecutor(
QueryPipelineExecutor::create(pipeline, settings)?,
))
Expand All @@ -78,7 +76,6 @@ impl PipelineExecutor {
)?;

Ok(PipelineExecutor::QueriesPipelineExecutor(QueryWrapper {
executor: QueriesPipelineExecutor::create(settings.clone())?,
graph,
settings,
on_init_callback: Mutex::new(on_init_callback),
Expand All @@ -93,7 +90,7 @@ impl PipelineExecutor {
mut pipelines: Vec<Pipeline>,
settings: ExecutorSettings,
) -> Result<Self> {
if !settings.enable_new_executor {
if !settings.enable_queries_executor {
Ok(PipelineExecutor::QueryPipelineExecutor(
QueryPipelineExecutor::from_pipelines(pipelines, settings)?,
))
Expand Down Expand Up @@ -141,7 +138,6 @@ impl PipelineExecutor {
)?;

Ok(PipelineExecutor::QueriesPipelineExecutor(QueryWrapper {
executor: QueriesPipelineExecutor::create(settings.clone())?,
graph,
settings,
on_init_callback: Mutex::new(on_init_callback),
Expand Down Expand Up @@ -181,9 +177,7 @@ impl PipelineExecutor {
&query_wrapper.on_init_callback,
&query_wrapper.settings.query_id,
)?;
query_wrapper
.executor
.send_graph(query_wrapper.graph.clone())?;
GlobalQueriesExecutor::instance().send_graph(query_wrapper.graph.clone())?;

let (lock, cvar) = &*query_wrapper.finish_condvar_wait;
let mut finished = lock.lock();
Expand All @@ -192,24 +186,25 @@ impl PipelineExecutor {
}

let may_error = query_wrapper.graph.get_error();
match may_error {
return match may_error {
None => {
let guard = query_wrapper.on_finished_callback.lock().take();
if let Some(on_finished_callback) = guard {
catch_unwind(move || {
on_finished_callback(&Ok(self.get_plans_profile()))
})??;
}
Ok(())
}
Some(cause) => {
let guard = query_wrapper.on_finished_callback.lock().take();
let cause_clone = cause.clone();
if let Some(on_finished_callback) = guard {
catch_unwind(move || on_finished_callback(&Err(cause)))??;
catch_unwind(move || on_finished_callback(&Err(cause_clone)))??;
}
Err(cause)
}
}

Ok(())
};
}
}
}
Expand Down
43 changes: 35 additions & 8 deletions src/query/service/src/pipelines/executor/queries_executor_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,14 @@ impl QueriesExecutorTasksQueue {
}

let workers_condvar = context.get_workers_condvar();
if !workers_condvar.has_waiting_async_task()
&& workers_tasks.workers_waiting_status.is_last_active_worker()
{
drop(workers_tasks);
self.finish(workers_condvar.clone());
return;
}

let worker_id = context.get_worker_id();
workers_tasks.workers_waiting_status.wait_worker(worker_id);
drop(workers_tasks);
workers_condvar.wait(worker_id, self.finished.clone());
}

pub fn init_sync_tasks(&self, tasks: VecDeque<ProcessorWrapper>) {
pub fn init_sync_tasks(&self, tasks: VecDeque<ProcessorWrapper>, condvar: Arc<WorkersCondvar>) {
let mut workers_tasks = self.workers_tasks.lock();

let mut worker_id = 0;
Expand All @@ -163,6 +156,40 @@ impl QueriesExecutorTasksQueue {
if worker_id == workers_tasks.next_tasks.workers_sync_tasks.len() {
worker_id = 0;
}

if workers_tasks.workers_waiting_status.is_waiting(worker_id) {
workers_tasks
.workers_waiting_status
.wakeup_worker(worker_id);
condvar.wakeup(worker_id);
}
}
}

pub fn init_async_tasks(
&self,
tasks: VecDeque<ProcessorWrapper>,
condvar: Arc<WorkersCondvar>,
) {
let mut workers_tasks = self.workers_tasks.lock();

let mut worker_id = 0;
for proc in tasks.into_iter() {
workers_tasks
.next_tasks
.push_task(worker_id, ExecutorTask::Async(proc));

worker_id += 1;
if worker_id == workers_tasks.next_tasks.workers_sync_tasks.len() {
worker_id = 0;
}

if workers_tasks.workers_waiting_status.is_waiting(worker_id) {
workers_tasks
.workers_waiting_status
.wakeup_worker(worker_id);
condvar.wakeup(worker_id);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -31,8 +30,6 @@ use minitrace::full_name;
use minitrace::prelude::*;
use parking_lot::Mutex;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::ExecutorTask;
use crate::pipelines::executor::ExecutorWorkerContext;
use crate::pipelines::executor::QueriesExecutorTasksQueue;
use crate::pipelines::executor::RunningGraph;
Expand All @@ -51,9 +48,7 @@ pub struct QueriesPipelineExecutor {
}

impl QueriesPipelineExecutor {
pub fn create(settings: ExecutorSettings) -> Result<Arc<QueriesPipelineExecutor>> {
let threads_num = settings.max_threads as usize;

pub fn create(threads_num: usize) -> Result<Arc<QueriesPipelineExecutor>> {
let workers_condvar = WorkersCondvar::create(threads_num);
let global_tasks_queue = QueriesExecutorTasksQueue::create(threads_num);

Expand Down Expand Up @@ -97,22 +92,13 @@ impl QueriesPipelineExecutor {
unsafe {
let mut init_schedule_queue = graph.init_schedule_queue(self.threads_num)?;

let mut wakeup_worker_id = 0;
while let Some(proc) = init_schedule_queue.async_queue.pop_front() {
let mut tasks = VecDeque::with_capacity(1);
tasks.push_back(ExecutorTask::Async(proc));
self.global_tasks_queue
.push_tasks(wakeup_worker_id, None, tasks);

wakeup_worker_id += 1;
if wakeup_worker_id == self.threads_num {
wakeup_worker_id = 0;
}
}
let async_queue = std::mem::take(&mut init_schedule_queue.async_queue);
self.global_tasks_queue
.init_async_tasks(async_queue, self.workers_condvar.clone());

let sync_queue = std::mem::take(&mut init_schedule_queue.sync_queue);
self.global_tasks_queue.init_sync_tasks(sync_queue);
self.execute()?;
self.global_tasks_queue
.init_sync_tasks(sync_queue, self.workers_condvar.clone());
Ok(())
}
}
Expand Down Expand Up @@ -203,10 +189,6 @@ impl QueriesPipelineExecutor {
}
}
}
if graph.is_should_finish() {
// TODO: temporary finish method, will remove after change executor to a global service
self.finish(None);
}
if graph.is_all_nodes_finished() {
graph.should_finish(Ok(()))?;
}
Expand All @@ -217,7 +199,6 @@ impl QueriesPipelineExecutor {
if let Some(graph) = graph {
graph.should_finish(Err(cause.clone()))?;
}
self.finish(Some(cause));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ async fn create_executor_with_simple_pipeline(
let settings = ExecutorSettings {
query_id: Arc::new("".to_string()),
max_execute_time_in_seconds: Default::default(),
enable_new_executor: false,
enable_queries_executor: false,
max_threads: 8,
executor_node_id: "".to_string(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn test_always_call_on_finished() -> Result<()> {
let settings = ExecutorSettings {
query_id: Arc::new("".to_string()),
max_execute_time_in_seconds: Default::default(),
enable_new_executor: false,
enable_queries_executor: false,
max_threads: 8,
executor_node_id: "".to_string(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ async fn truncate_table(ctx: Arc<QueryContext>, table: Arc<dyn Table>) -> Result
table.truncate(ctx.clone(), &mut pipeline).await?;
if !pipeline.is_empty() {
pipeline.set_max_threads(1);
let executor_settings = ExecutorSettings::try_create(ctx.clone())?;
let mut executor_settings = ExecutorSettings::try_create(ctx.clone())?;
executor_settings.enable_queries_executor = false;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
ctx.set_executor(executor.get_inner())?;
executor.execute()?;
Expand Down
Loading