Skip to content

Commit

Permalink
feat(query): support settings admin api for global level settings
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Mar 11, 2024
1 parent 1732d8d commit 9de3d31
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/query/service/src/api/http/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod instance_status;
pub mod logs;
pub mod processes;
pub mod queries_queue;
pub mod settings;
pub mod stream_status;
pub mod system;
pub mod tenant_tables;
143 changes: 143 additions & 0 deletions src/query/service/src/api/http/v1/settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_types::NonEmptyString;
use databend_common_settings::Settings;
use poem::web::Json;
use poem::web::Path;
use poem::IntoResponse;

#[derive(serde::Serialize, serde::Deserialize)]
pub struct SettingsItem {
pub name: String,
pub desc: &'static str,
pub user_value: String,
pub default_value: String,
pub range: Option<String>,
}

async fn list_settings_impl(tenant: &str) -> Result<Vec<SettingsItem>> {
if tenant.is_empty() {
return Err(ErrorCode::TenantIsEmpty(
"Tenant can not empty(while list settings)",
));
}

let settings = Settings::create(NonEmptyString::new(tenant)?);
settings.load_changes().await?;

Ok(settings
.into_iter()
.map(|item| SettingsItem {
name: item.name,
desc: item.desc,
user_value: item.user_value.to_string(),
default_value: item.default_value.to_string(),
range: item.range.map(|x| x.to_string()),
})
.collect::<Vec<_>>())
}

async fn set_setting_impl(tenant: &str, key: &str, value: String) -> Result<Vec<SettingsItem>> {
if tenant.is_empty() {
return Err(ErrorCode::TenantIsEmpty(
"Tenant can not empty(while set setting)",
));
}

if key.is_empty() || key.len() > 1024 {
return Err(ErrorCode::BadArguments(
"Setting key is empty or large length(while set setting)",
));
}

let settings = Settings::create(NonEmptyString::new(tenant)?);
settings.set_global_setting(key.to_string(), value).await?;

Ok(settings
.into_iter()
.map(|item| SettingsItem {
name: item.name,
desc: item.desc,
user_value: item.user_value.to_string(),
default_value: item.default_value.to_string(),
range: item.range.map(|x| x.to_string()),
})
.collect::<Vec<_>>())
}

async fn unset_setting_impl(tenant: &str, key: &str) -> Result<Vec<SettingsItem>> {
if tenant.is_empty() {
return Err(ErrorCode::TenantIsEmpty(
"Tenant can not empty(while unset setting)",
));
}

if key.is_empty() || key.len() > 1024 {
return Err(ErrorCode::BadArguments(
"Setting key is empty or large length(while unset setting)",
));
}

let settings = Settings::create(NonEmptyString::new(tenant)?);
settings.try_drop_global_setting(key).await?;

Ok(settings
.into_iter()
.map(|item| SettingsItem {
name: item.name,
desc: item.desc,
user_value: item.user_value.to_string(),
default_value: item.default_value.to_string(),
range: item.range.map(|x| x.to_string()),
})
.collect::<Vec<_>>())
}

#[poem::handler]
#[async_backtrace::framed]
pub async fn list_settings(Path(tenant): Path<String>) -> poem::Result<impl IntoResponse> {
Ok(Json(
list_settings_impl(&tenant)
.await
.map_err(poem::error::InternalServerError)?,
))
}

#[poem::handler]
#[async_backtrace::framed]
pub async fn set_settings(
Path((tenant, key)): Path<(String, String)>,
value: Json<String>,
) -> poem::Result<impl IntoResponse> {
Ok(Json(
set_setting_impl(&tenant, &key, value.0)
.await
.map_err(poem::error::InternalServerError)?,
))
}

#[poem::handler]
#[async_backtrace::framed]
pub async fn unset_settings(
Path((tenant, key)): Path<(String, String)>,
) -> poem::Result<impl IntoResponse> {
Ok(Json(
unset_setting_impl(&tenant, &key)
.await
.map_err(poem::error::InternalServerError)?,
))
}
26 changes: 21 additions & 5 deletions src/query/service/src/api/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use log::warn;
use poem::get;
use poem::listener::RustlsCertificate;
use poem::listener::RustlsConfig;
use poem::post;
use poem::Endpoint;
use poem::Route;

Expand Down Expand Up @@ -77,11 +78,9 @@ impl HttpService {
)
.at("/debug/home", get(debug_home_handler))
.at("/debug/pprof/profile", get(debug_pprof_handler))
.at("/debug/async_tasks/dump", get(debug_dump_stack))
.at(
"/v1/background/:tenant/background_tasks",
get(super::http::v1::background_tasks::list_background_tasks),
);
.at("/debug/async_tasks/dump", get(debug_dump_stack));

// Multiple tenants admin api
if self.config.query.management_mode {
route = route
.at(
Expand All @@ -91,6 +90,23 @@ impl HttpService {
.at(
"v1/tenants/:tenant/stream_status",
get(super::http::v1::stream_status::stream_status_handler),
)
.at(
"/v1/background/:tenant/background_tasks",
get(super::http::v1::background_tasks::list_background_tasks),
)
.at(
"/v1/tenants/:tenant/background_tasks",
get(super::http::v1::background_tasks::list_background_tasks),
)
.at(
"/v1/tenants/:tenant/settings",
get(super::http::v1::settings::list_settings),
)
.at(
"/v1/tenants/:tenant/settings/:key",
post(super::http::v1::settings::set_settings)
.delete(super::http::v1::settings::unset_settings),
);
}

Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ impl SessionManager {

let tenant = GlobalConfig::instance().query.tenant_id.clone();
let settings = Settings::create(tenant);
self.load_config_changes(&settings)?;
settings.load_global_changes().await?;
settings.load_changes().await?;

self.create_with_settings(typ, settings)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl AsyncSource for LicenseInfoSource {

let settings = self.ctx.get_settings();
// sync global changes on distributed node cluster.
settings.load_global_changes().await?;
settings.load_changes().await?;
let license = unsafe {
settings
.get_enterprise_license()
Expand Down
27 changes: 26 additions & 1 deletion src/query/settings/src/settings_global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::sync::Arc;

use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::UserSetting;
use databend_common_meta_app::principal::UserSettingValue;
Expand Down Expand Up @@ -61,7 +63,30 @@ impl Settings {
}

#[async_backtrace::framed]
pub async fn load_global_changes(&self) -> Result<()> {
pub async fn load_changes(&self) -> Result<()> {
self.load_config_changes()?;
self.load_global_changes().await
}

fn load_config_changes(&self) -> Result<()> {
let query_config = &GlobalConfig::instance().query;
if let Some(parquet_fast_read_bytes) = query_config.parquet_fast_read_bytes {
self.set_parquet_fast_read_bytes(parquet_fast_read_bytes)?;
}

if let Some(max_storage_io_requests) = query_config.max_storage_io_requests {
self.set_max_storage_io_requests(max_storage_io_requests)?;
}

if let Some(enterprise_license_key) = query_config.databend_enterprise_license.clone() {
unsafe {
self.set_enterprise_license(enterprise_license_key)?;
}
}
Ok(())
}

async fn load_global_changes(&self) -> Result<(), ErrorCode> {
let default_settings = DefaultSettings::instance()?;

let api = UserApiProvider::instance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ sleep 5
echo "select st.name bt,type, bt.trigger from system.background_tasks AS bt JOIN system.tables st ON bt.table_id = st.table_id where bt.trigger is not null and bt.created_on > TO_TIMESTAMP('$current_time') order by st.name;" | $BENDSQL_CLIENT_CONNECT
echo "select * from system.processes where type != 'HTTPQuery';" | $BENDSQL_CLIENT_CONNECT

table_ids=$(curl -X GET -s http://localhost:8080/v1/background/test_tenant/background_tasks?timestamp=$encoded_time | jq '[.task_infos[] | .[1].compaction_task_stats.table_id]')
table_ids=$(curl -X GET -s http://localhost:8080/v1/tenants/test_tenant/background_tasks?timestamp=$encoded_time | jq '[.task_infos[] | .[1].compaction_task_stats.table_id]')

# Convert the table_ids JSON array to a comma-separated list
table_ids_list=$(echo $table_ids | jq -r 'join(",")')
Expand Down

0 comments on commit 9de3d31

Please sign in to comment.