Skip to content

Commit

Permalink
chore(query): add current_query_id and database id for admin api (#16703
Browse files Browse the repository at this point in the history
)

* chore(query): add current_query_id and database id for admin api

* chore(query): add current_query_id and database id for admin api
  • Loading branch information
zhang2014 authored Oct 29, 2024
1 parent bd380ee commit e525f4c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/query/service/src/servers/admin/v1/processes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct ProcessInfo {
pub mysql_connection_id: Option<u32>,
pub created_time: SystemTime,
pub status_info: Option<String>,
pub current_query_id: Option<String>,
}

#[poem::handler]
Expand Down Expand Up @@ -63,6 +64,7 @@ pub async fn processlist_handler() -> poem::Result<impl IntoResponse> {
mysql_connection_id: process.mysql_connection_id,
created_time: process.created_time,
status_info: process.status_info.clone(),
current_query_id: process.current_query_id.clone(),
})
.collect::<Vec<_>>();
Ok(Json(processes))
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/servers/admin/v1/tenant_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct TenantTablesResponse {
pub struct TenantTableInfo {
pub table: String,
pub database: String,
pub database_id: String,
pub engine: String,
pub created_on: DateTime<Utc>,
pub updated_on: DateTime<Utc>,
Expand Down Expand Up @@ -69,6 +70,7 @@ async fn load_tenant_tables(tenant: &Tenant) -> Result<TenantTablesResponse> {
};

for database in databases {
let database_info = database.get_db_info();
let tables = match catalog.list_tables(tenant, database.name()).await {
Ok(v) => v,
Err(err) => {
Expand Down Expand Up @@ -105,6 +107,7 @@ async fn load_tenant_tables(tenant: &Tenant) -> Result<TenantTablesResponse> {
table_infos.push(TenantTableInfo {
table: table.name().to_string(),
database: database.name().to_string(),
database_id: format!("{}", database_info.database_id),
engine: table.engine().to_string(),
created_on: table.get_table_info().meta.created_on,
updated_on: table.get_table_info().meta.updated_on,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| 'creator' | 'system' | 'background_tasks' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' |
| 'creator' | 'system' | 'stages' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' |
| 'current_database' | 'system' | 'query_log' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'current_query_id' | 'system' | 'processes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'data_compressed_size' | 'system' | 'tables' | 'Nullable(UInt64)' | 'BIGINT UNSIGNED' | '' | '' | 'YES' | '' |
| 'data_compressed_size' | 'system' | 'tables_with_history' | 'Nullable(UInt64)' | 'BIGINT UNSIGNED' | '' | '' | 'YES' | '' |
| 'data_free' | 'information_schema' | 'tables' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' |
Expand Down
10 changes: 9 additions & 1 deletion src/query/storages/system/src/processes_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl SyncSystemTable for ProcessesTable {
let mut processes_time = Vec::with_capacity(processes_info.len());
let mut processes_created_time = Vec::with_capacity(processes_info.len());
let mut processes_status = Vec::with_capacity(processes_info.len());
let mut processes_current_query_id = Vec::with_capacity(processes_info.len());

for process_info in &processes_info {
let data_metrics = &process_info.data_metrics;
Expand Down Expand Up @@ -104,7 +105,6 @@ impl SyncSystemTable for ProcessesTable {
processes_mysql_connection_id.push(process_info.mysql_connection_id);
processes_time.push(time);
processes_created_time.push(created_time);

if let Some(data_metrics) = data_metrics {
processes_data_read_bytes.push(data_metrics.get_read_bytes() as u64);
processes_data_write_bytes.push(data_metrics.get_write_bytes() as u64);
Expand All @@ -115,6 +115,12 @@ impl SyncSystemTable for ProcessesTable {

// Status info.
processes_status.push(process_info.status_info.clone().unwrap_or("".to_owned()));
processes_current_query_id.push(
process_info
.current_query_id
.clone()
.unwrap_or("".to_owned()),
);
}

Ok(DataBlock::new_from_columns(vec![
Expand All @@ -135,6 +141,7 @@ impl SyncSystemTable for ProcessesTable {
UInt64Type::from_data(processes_time),
TimestampType::from_data(processes_created_time),
StringType::from_data(processes_status),
StringType::from_data(processes_current_query_id),
]))
}
}
Expand Down Expand Up @@ -177,6 +184,7 @@ impl ProcessesTable {
TableField::new("time", TableDataType::Number(NumberDataType::UInt64)),
TableField::new("created_time", TableDataType::Timestamp),
TableField::new("status", TableDataType::String),
TableField::new("current_query_id", TableDataType::String),
]);

let table_info = TableInfo {
Expand Down

0 comments on commit e525f4c

Please sign in to comment.