Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: data_length, index_length, table_rows in tables #4927

Merged
merged 5 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Partition manager not found, it's not expected."))]
PartitionManagerNotFound {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to find table partitions"))]
FindPartitions { source: partition::error::Error },

Expand Down Expand Up @@ -301,6 +307,7 @@ impl ErrorExt for Error {
| Error::CastManager { .. }
| Error::Json { .. }
| Error::GetInformationExtension { .. }
| Error::PartitionManagerNotFound { .. }
| Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected,

Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments,
Expand Down
35 changes: 9 additions & 26 deletions src/catalog/src/system_schema/information_schema/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ use datatypes::vectors::{
};
use futures::{StreamExt, TryStreamExt};
use partition::manager::PartitionInfo;
use partition::partition::PartitionDef;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, ScanRequest, TableId};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};

use super::PARTITIONS;
use crate::error::{
CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu,
CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu,
Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates};
Expand Down Expand Up @@ -236,7 +235,8 @@ impl InformationSchemaPartitionsBuilder {
let partition_manager = catalog_manager
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.map(|catalog_manager| catalog_manager.partition_manager());
.map(|catalog_manager| catalog_manager.partition_manager())
.context(PartitionManagerNotFoundSnafu)?;

let predicates = Predicates::from_scan_request(&request);

Expand All @@ -262,27 +262,10 @@ impl InformationSchemaPartitionsBuilder {
let table_ids: Vec<TableId> =
table_infos.iter().map(|info| info.ident.table_id).collect();

let mut table_partitions = if let Some(partition_manager) = &partition_manager {
partition_manager
.batch_find_table_partitions(&table_ids)
.await
.context(FindPartitionsSnafu)?
} else {
// Current node must be a standalone instance, contains only one partition by default.
// TODO(dennis): change it when we support multi-regions for standalone.
table_ids
.into_iter()
.map(|table_id| {
(
table_id,
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}],
)
})
.collect()
};
let mut table_partitions = partition_manager
.batch_find_table_partitions(&table_ids)
.await
.context(FindPartitionsSnafu)?;

for table_info in table_infos {
let partitions = table_partitions
Expand Down
74 changes: 66 additions & 8 deletions src/catalog/src/system_schema/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID;
use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE};
use common_error::ext::BoxedError;
use common_meta::datanode::RegionStat;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::error;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
Expand All @@ -31,14 +34,15 @@ use datatypes::vectors::{
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};

use super::TABLES;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::system_schema::information_schema::{InformationTable, Predicates};
use crate::system_schema::utils;
use crate::CatalogManager;

pub const TABLE_CATALOG: &str = "table_catalog";
Expand Down Expand Up @@ -234,17 +238,50 @@ impl InformationSchemaTablesBuilder {
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);

let information_extension = utils::information_extension(&self.catalog_manager)?;

// TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc.
// But we don't want the statements such as `show tables` fail,
// so using `unwrap_or_else` here instead of `?` operator.
let region_stats = information_extension
.region_stats()
.await
.map_err(|e| {
error!(e; "Failed to call region_stats");
e
})
.unwrap_or_else(|_| vec![]);
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);

while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();

// TODO(dennis): make it working for metric engine
let table_region_stats = if table_info.meta.engine == MITO_ENGINE {
let region_ids = table_info
.meta
.region_numbers
.iter()
.map(|n| RegionId::new(table_info.ident.table_id, *n))
.collect::<HashSet<_>>();

region_stats
.iter()
.filter(|stat| region_ids.contains(&stat.id))
.collect::<Vec<_>>()
} else {
vec![]
};

self.add_table(
&predicates,
&catalog_name,
&schema_name,
table_info,
table.table_type(),
&table_region_stats,
);
}
}
Expand All @@ -260,6 +297,7 @@ impl InformationSchemaTablesBuilder {
schema_name: &str,
table_info: Arc<TableInfo>,
table_type: TableType,
region_stats: &[&RegionStat],
) {
let table_name = table_info.name.as_ref();
let table_id = table_info.table_id();
Expand All @@ -273,7 +311,9 @@ impl InformationSchemaTablesBuilder {

let row = [
(TABLE_CATALOG, &Value::from(catalog_name)),
(TABLE_ID, &Value::from(table_id)),
(TABLE_SCHEMA, &Value::from(schema_name)),
(ENGINE, &Value::from(engine)),
(TABLE_NAME, &Value::from(table_name)),
(TABLE_TYPE, &Value::from(table_type_text)),
];
Expand All @@ -287,21 +327,39 @@ impl InformationSchemaTablesBuilder {
self.table_names.push(Some(table_name));
self.table_types.push(Some(table_type_text));
self.table_ids.push(Some(table_id));

let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
let index_length = region_stats.iter().map(|stat| stat.index_size).sum();

// It's not precise, but it is acceptable for long-term data storage.
let avg_row_length = if table_rows > 0 {
let total_data_length = data_length
+ region_stats
.iter()
.map(|stat| stat.memtable_size)
.sum::<u64>();

total_data_length / table_rows
} else {
0
};

self.data_length.push(Some(data_length));
self.index_length.push(Some(index_length));
self.table_rows.push(Some(table_rows));
self.avg_row_length.push(Some(avg_row_length));

// TODO(sunng87): use real data for these fields
self.data_length.push(Some(0));
self.max_data_length.push(Some(0));
self.index_length.push(Some(0));
self.avg_row_length.push(Some(0));
self.max_index_length.push(Some(0));
self.checksum.push(Some(0));
self.table_rows.push(Some(0));
self.max_index_length.push(Some(0));
self.data_free.push(Some(0));
self.auto_increment.push(Some(0));
self.row_format.push(Some("Fixed"));
self.table_collation.push(Some("utf8_bin"));
self.update_time.push(None);
self.check_time.push(None);

// use mariadb default table version number here
self.version.push(Some(11));
self.table_comment.push(table_info.desc.as_deref());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
| 3 | 2145 | 0 | 0 |
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+

SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';

+-------------+--------------+----------------+------------+
| data_length | index_length | avg_row_length | table_rows |
+-------------+--------------+----------------+------------+
| 0 | 0 | 26 | 3 |
+-------------+--------------+----------------+------------+

DROP TABLE test;

Affected Rows: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');

SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';

DROP TABLE test;