From dddaf1c94504a25015500e6189e50705202160f3 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Sun, 3 Nov 2024 14:29:32 +0800 Subject: [PATCH 1/5] fix: data_length, index_length, table_rows in tables --- src/catalog/src/error.rs | 7 +++ .../information_schema/partitions.rs | 35 ++++-------- .../information_schema/tables.rs | 53 ++++++++++++++++--- .../region_statistics.result | 8 +++ .../information_schema/region_statistics.sql | 2 + 5 files changed, 72 insertions(+), 33 deletions(-) diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 0d9e96ab6a44..c7e6f8b55c01 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -178,6 +178,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Partition manager not found, it's not expected."))] + PartitionManagerNotFound { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to find table partitions"))] FindPartitions { source: partition::error::Error }, @@ -301,6 +307,7 @@ impl ErrorExt for Error { | Error::CastManager { .. } | Error::Json { .. } | Error::GetInformationExtension { .. } + | Error::PartitionManagerNotFound { .. } | Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected, Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments, diff --git a/src/catalog/src/system_schema/information_schema/partitions.rs b/src/catalog/src/system_schema/information_schema/partitions.rs index 93d60679901e..4cfeece62637 100644 --- a/src/catalog/src/system_schema/information_schema/partitions.rs +++ b/src/catalog/src/system_schema/information_schema/partitions.rs @@ -34,15 +34,14 @@ use datatypes::vectors::{ }; use futures::{StreamExt, TryStreamExt}; use partition::manager::PartitionInfo; -use partition::partition::PartitionDef; use snafu::{OptionExt, ResultExt}; -use store_api::storage::{RegionId, ScanRequest, TableId}; +use store_api::storage::{ScanRequest, TableId}; use table::metadata::{TableInfo, TableType}; use super::PARTITIONS; use crate::error::{ - CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, Result, - UpgradeWeakCatalogManagerRefSnafu, + CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu, + Result, UpgradeWeakCatalogManagerRefSnafu, }; use crate::kvbackend::KvBackendCatalogManager; use crate::system_schema::information_schema::{InformationTable, Predicates}; @@ -236,7 +235,8 @@ impl InformationSchemaPartitionsBuilder { let partition_manager = catalog_manager .as_any() .downcast_ref::() - .map(|catalog_manager| catalog_manager.partition_manager()); + .map(|catalog_manager| catalog_manager.partition_manager()) + .context(PartitionManagerNotFoundSnafu)?; let predicates = Predicates::from_scan_request(&request); @@ -262,27 +262,10 @@ impl InformationSchemaPartitionsBuilder { let table_ids: Vec = table_infos.iter().map(|info| info.ident.table_id).collect(); - let mut table_partitions = if let Some(partition_manager) = &partition_manager { - partition_manager - .batch_find_table_partitions(&table_ids) - .await - .context(FindPartitionsSnafu)? - } else { - // Current node must be a standalone instance, contains only one partition by default. - // TODO(dennis): change it when we support multi-regions for standalone. - table_ids - .into_iter() - .map(|table_id| { - ( - table_id, - vec![PartitionInfo { - id: RegionId::new(table_id, 0), - partition: PartitionDef::new(vec![], vec![]), - }], - ) - }) - .collect() - }; + let mut table_partitions = partition_manager + .batch_find_table_partitions(&table_ids) + .await + .context(FindPartitionsSnafu)?; for table_info in table_infos { let partitions = table_partitions diff --git a/src/catalog/src/system_schema/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs index 976c920b9ab9..1336c5179904 100644 --- a/src/catalog/src/system_schema/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID; use common_error::ext::BoxedError; +use common_meta::datanode::RegionStat; use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datafusion::execution::TaskContext; @@ -31,7 +33,7 @@ use datatypes::vectors::{ }; use futures::TryStreamExt; use snafu::{OptionExt, ResultExt}; -use store_api::storage::{ScanRequest, TableId}; +use store_api::storage::{RegionId, ScanRequest, TableId}; use table::metadata::{TableInfo, TableType}; use super::TABLES; @@ -39,6 +41,7 @@ use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; use crate::system_schema::information_schema::{InformationTable, Predicates}; +use crate::system_schema::utils; use crate::CatalogManager; pub const TABLE_CATALOG: &str = "table_catalog"; @@ -234,17 +237,32 @@ impl InformationSchemaTablesBuilder { .context(UpgradeWeakCatalogManagerRefSnafu)?; let predicates = Predicates::from_scan_request(&request); + let information_extension = utils::information_extension(&self.catalog_manager)?; + let region_stats = information_extension.region_stats().await?; + for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); while let Some(table) = stream.try_next().await? { let table_info = table.table_info(); + let region_ids = table_info + .meta + .region_numbers + .iter() + .map(|n| RegionId::new(table_info.ident.table_id, *n)) + .collect::>(); + let table_region_stats = region_stats + .iter() + .filter(|stat| region_ids.contains(&stat.id)) + .collect::>(); + self.add_table( &predicates, &catalog_name, &schema_name, table_info, table.table_type(), + &table_region_stats, ); } } @@ -260,6 +278,7 @@ impl InformationSchemaTablesBuilder { schema_name: &str, table_info: Arc, table_type: TableType, + region_stats: &[&RegionStat], ) { let table_name = table_info.name.as_ref(); let table_id = table_info.table_id(); @@ -273,7 +292,9 @@ impl InformationSchemaTablesBuilder { let row = [ (TABLE_CATALOG, &Value::from(catalog_name)), + (TABLE_ID, &Value::from(table_id)), (TABLE_SCHEMA, &Value::from(schema_name)), + (ENGINE, &Value::from(engine)), (TABLE_NAME, &Value::from(table_name)), (TABLE_TYPE, &Value::from(table_type_text)), ]; @@ -287,21 +308,39 @@ impl InformationSchemaTablesBuilder { self.table_names.push(Some(table_name)); self.table_types.push(Some(table_type_text)); self.table_ids.push(Some(table_id)); + + let data_length = region_stats.iter().map(|stat| stat.sst_size).sum(); + let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum(); + let index_length = region_stats.iter().map(|stat| stat.index_size).sum(); + + // It's not precise, but it is acceptable for long-term data storage. + let avg_row_length = if table_rows > 0 { + let total_data_length = data_length + + region_stats + .iter() + .map(|stat| stat.memtable_size) + .sum::(); + + total_data_length / table_rows + } else { + 0 + }; + + self.data_length.push(Some(data_length)); + self.index_length.push(Some(index_length)); + self.table_rows.push(Some(table_rows)); + self.avg_row_length.push(Some(avg_row_length)); + // TODO(sunng87): use real data for these fields - self.data_length.push(Some(0)); self.max_data_length.push(Some(0)); - self.index_length.push(Some(0)); - self.avg_row_length.push(Some(0)); - self.max_index_length.push(Some(0)); self.checksum.push(Some(0)); - self.table_rows.push(Some(0)); + self.max_index_length.push(Some(0)); self.data_free.push(Some(0)); self.auto_increment.push(Some(0)); self.row_format.push(Some("Fixed")); self.table_collation.push(Some("utf8_bin")); self.update_time.push(None); self.check_time.push(None); - // use mariadb default table version number here self.version.push(Some(11)); self.table_comment.push(table_info.desc.as_deref()); diff --git a/tests/cases/standalone/common/information_schema/region_statistics.result b/tests/cases/standalone/common/information_schema/region_statistics.result index 0c62b4ad6cd0..bca7032520ed 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.result +++ b/tests/cases/standalone/common/information_schema/region_statistics.result @@ -33,6 +33,14 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) | 3 | 2145 | 0 | 0 | +-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test'; + ++-------------+--------------+----------------+------------+ +| data_length | index_length | avg_row_length | table_rows | ++-------------+--------------+----------------+------------+ +| 0 | 0 | 26 | 3 | ++-------------+--------------+----------------+------------+ + DROP TABLE test; Affected Rows: 0 diff --git a/tests/cases/standalone/common/information_schema/region_statistics.sql b/tests/cases/standalone/common/information_schema/region_statistics.sql index cbc4424683a6..be1c2ca2e79a 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.sql +++ b/tests/cases/standalone/common/information_schema/region_statistics.sql @@ -22,4 +22,6 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); +SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test'; + DROP TABLE test; From 3009e421ad529069ce8f2a886eca1ce5def20e40 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Mon, 4 Nov 2024 10:52:58 +0800 Subject: [PATCH 2/5] feat: table stats only works for mito engine currently --- .../information_schema/tables.rs | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/catalog/src/system_schema/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs index 1336c5179904..b39dcca6ab88 100644 --- a/src/catalog/src/system_schema/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; -use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID; +use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE}; use common_error::ext::BoxedError; use common_meta::datanode::RegionStat; use common_recordbatch::adapter::RecordBatchStreamAdapter; @@ -245,16 +245,23 @@ impl InformationSchemaTablesBuilder { while let Some(table) = stream.try_next().await? { let table_info = table.table_info(); - let region_ids = table_info - .meta - .region_numbers - .iter() - .map(|n| RegionId::new(table_info.ident.table_id, *n)) - .collect::>(); - let table_region_stats = region_stats - .iter() - .filter(|stat| region_ids.contains(&stat.id)) - .collect::>(); + + // TODO(dennis): make it working for metric engine + let table_region_stats = if table_info.meta.engine == MITO_ENGINE { + let region_ids = table_info + .meta + .region_numbers + .iter() + .map(|n| RegionId::new(table_info.ident.table_id, *n)) + .collect::>(); + + region_stats + .iter() + .filter(|stat| region_ids.contains(&stat.id)) + .collect::>() + } else { + vec![] + }; self.add_table( &predicates, From 3b4c40204001377d6c4232dab4273c1ac18e232e Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Mon, 4 Nov 2024 11:33:48 +0800 Subject: [PATCH 3/5] fix: tests --- .../src/system_schema/information_schema/tables.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/catalog/src/system_schema/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs index b39dcca6ab88..54ad20c2b275 100644 --- a/src/catalog/src/system_schema/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -238,7 +238,13 @@ impl InformationSchemaTablesBuilder { let predicates = Predicates::from_scan_request(&request); let information_extension = utils::information_extension(&self.catalog_manager)?; - let region_stats = information_extension.region_stats().await?; + + // TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc. + // But we don't want the statments such as `show tables` fails. + let region_stats = information_extension + .region_stats() + .await + .unwrap_or_else(|_| vec![]); for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); From 896d27c8e0044e50fa31aa0490f34de785309755 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Mon, 4 Nov 2024 11:38:29 +0800 Subject: [PATCH 4/5] fix: typo --- src/catalog/src/system_schema/information_schema/tables.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/catalog/src/system_schema/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs index 54ad20c2b275..871a3d09daf9 100644 --- a/src/catalog/src/system_schema/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -240,7 +240,8 @@ impl InformationSchemaTablesBuilder { let information_extension = utils::information_extension(&self.catalog_manager)?; // TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc. - // But we don't want the statments such as `show tables` fails. + // But we don't want the statements such as `show tables` fail, + // so using `unwrap_or_else` here instead of `?` operator. let region_stats = information_extension .region_stats() .await From 7c30341992e51485b0f18cf311a822ae4948e0bf Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Mon, 4 Nov 2024 15:03:23 +0800 Subject: [PATCH 5/5] chore: log error when region_stats fails --- src/catalog/src/system_schema/information_schema/tables.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/catalog/src/system_schema/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs index 871a3d09daf9..b258b857b2db 100644 --- a/src/catalog/src/system_schema/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -21,6 +21,7 @@ use common_error::ext::BoxedError; use common_meta::datanode::RegionStat; use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use common_telemetry::error; use datafusion::execution::TaskContext; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; @@ -245,6 +246,10 @@ impl InformationSchemaTablesBuilder { let region_stats = information_extension .region_stats() .await + .map_err(|e| { + error!(e; "Failed to call region_stats"); + e + }) .unwrap_or_else(|_| vec![]); for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {