diff --git a/Cargo.lock b/Cargo.lock index b121698979e5..071d2c436366 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1348,6 +1348,7 @@ dependencies = [ "common-error", "common-macro", "common-meta", + "common-procedure", "common-query", "common-recordbatch", "common-telemetry", @@ -2144,6 +2145,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", + "common-time", "futures", "futures-util", "humantime-serde", @@ -4325,7 +4327,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=973f49cde88a582fb65755cc572ebcf6fb93ccf7#973f49cde88a582fb65755cc572ebcf6fb93ccf7" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=796ce9b003c6689e853825f649e03543c81ede99#796ce9b003c6689e853825f649e03543c81ede99" dependencies = [ "prost 0.12.6", "serde", diff --git a/Cargo.toml b/Cargo.toml index d199caac090e..e0ee599cf9e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ etcd-client = { version = "0.13" } fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "973f49cde88a582fb65755cc572ebcf6fb93ccf7" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "796ce9b003c6689e853825f649e03543c81ede99" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index a4841717333a..8e2f9d68302f 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -22,6 +22,7 @@ common-config.workspace = true common-error.workspace = true common-macro.workspace = true common-meta.workspace = true +common-procedure.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-telemetry.workspace = true diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index d44a5b768391..14748a469587 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -82,6 +82,33 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to get procedure client in {mode} mode"))] + GetProcedureClient { + mode: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to list procedures"))] + ListProcedures { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + + #[snafu(display("Procedure id not found"))] + ProcedureIdNotFound { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("convert proto data error"))] + ConvertProtoData { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to re-compile script due to internal error"))] CompileScriptInternal { #[snafu(implicit)] @@ -266,7 +293,9 @@ impl ErrorExt for Error { | Error::FindRegionRoutes { .. } | Error::CacheNotFound { .. } | Error::CastManager { .. } - | Error::Json { .. } => StatusCode::Unexpected, + | Error::Json { .. } + | Error::GetProcedureClient { .. } + | Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected, Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments, @@ -283,7 +312,9 @@ impl ErrorExt for Error { | Error::ListNodes { source, .. } | Error::ListSchemas { source, .. } | Error::ListTables { source, .. } - | Error::ListFlows { source, .. } => source.status_code(), + | Error::ListFlows { source, .. } + | Error::ListProcedures { source, .. } + | Error::ConvertProtoData { source, .. } => source.status_code(), Error::CreateTable { source, .. } => source.status_code(), diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index feb5e31d09bb..0896a82e26a8 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -31,6 +31,7 @@ use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_name::TableNameKey; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; +use common_procedure::ProcedureManagerRef; use futures_util::stream::BoxStream; use futures_util::{StreamExt, TryStreamExt}; use meta_client::client::MetaClient; @@ -67,6 +68,7 @@ pub struct KvBackendCatalogManager { /// A sub-CatalogManager that handles system tables system_catalog: SystemCatalog, cache_registry: LayeredCacheRegistryRef, + procedure_manager: Option, } const CATALOG_CACHE_MAX_CAPACITY: u64 = 128; @@ -77,6 +79,7 @@ impl KvBackendCatalogManager { meta_client: Option>, backend: KvBackendRef, cache_registry: LayeredCacheRegistryRef, + procedure_manager: Option, ) -> Arc { Arc::new_cyclic(|me| Self { mode, @@ -104,6 +107,7 @@ impl KvBackendCatalogManager { backend, }, cache_registry, + procedure_manager, }) } @@ -130,6 +134,10 @@ impl KvBackendCatalogManager { pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef { &self.table_metadata_manager } + + pub fn procedure_manager(&self) -> Option { + self.procedure_manager.clone() + } } #[async_trait::async_trait] diff --git a/src/catalog/src/system_schema/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs index 93dfaa75b531..83f2ff492611 100644 --- a/src/catalog/src/system_schema/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -18,6 +18,7 @@ pub mod flows; mod information_memory_table; pub mod key_column_usage; mod partitions; +mod procedure_info; mod region_peers; mod runtime_metrics; pub mod schemata; @@ -188,6 +189,11 @@ impl SystemSchemaProviderInner for InformationSchemaProvider { self.catalog_name.clone(), self.flow_metadata_manager.clone(), )) as _), + PROCEDURE_INFO => Some( + Arc::new(procedure_info::InformationSchemaProcedureInfo::new( + self.catalog_manager.clone(), + )) as _, + ), _ => None, } } @@ -250,7 +256,10 @@ impl InformationSchemaProvider { self.build_table(TABLE_CONSTRAINTS).unwrap(), ); tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap()); - + tables.insert( + PROCEDURE_INFO.to_string(), + self.build_table(PROCEDURE_INFO).unwrap(), + ); // Add memory tables for name in MEMORY_TABLES.iter() { tables.insert((*name).to_string(), self.build_table(name).expect(name)); diff --git a/src/catalog/src/system_schema/information_schema/procedure_info.rs b/src/catalog/src/system_schema/information_schema/procedure_info.rs new file mode 100644 index 000000000000..56c36c22100a --- /dev/null +++ b/src/catalog/src/system_schema/information_schema/procedure_info.rs @@ -0,0 +1,310 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; + +use api::v1::meta::{ProcedureMeta, ProcedureStatus}; +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_catalog::consts::INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID; +use common_config::Mode; +use common_error::ext::BoxedError; +use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; +use common_meta::rpc::procedure; +use common_procedure::{ProcedureInfo, ProcedureState}; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use common_time::timestamp::Timestamp; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; +use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::timestamp::TimestampMillisecond; +use datatypes::value::Value; +use datatypes::vectors::{StringVectorBuilder, TimestampMillisecondVectorBuilder}; +use snafu::ResultExt; +use store_api::storage::{ScanRequest, TableId}; + +use super::PROCEDURE_INFO; +use crate::error::{ + ConvertProtoDataSnafu, CreateRecordBatchSnafu, GetProcedureClientSnafu, InternalSnafu, + ListProceduresSnafu, ProcedureIdNotFoundSnafu, Result, +}; +use crate::system_schema::information_schema::{InformationTable, Predicates}; +use crate::system_schema::utils; +use crate::CatalogManager; + +const PROCEDURE_ID: &str = "procedure_id"; +const PROCEDURE_TYPE: &str = "procedure_type"; +const START_TIME: &str = "start_time"; +const END_TIME: &str = "end_time"; +const STATUS: &str = "status"; +const LOCK_KEYS: &str = "lock_keys"; + +const INIT_CAPACITY: usize = 42; + +/// The `PROCEDURE_INFO` table provides information about the current procedure information of the cluster. +/// +/// - `procedure_id`: the unique identifier of the procedure. +/// - `procedure_name`: the name of the procedure. +/// - `start_time`: the starting execution time of the procedure. +/// - `end_time`: the ending execution time of the procedure. +/// - `status`: the status of the procedure. +/// - `lock_keys`: the lock keys of the procedure. +/// +pub(super) struct InformationSchemaProcedureInfo { + schema: SchemaRef, + catalog_manager: Weak, +} + +impl InformationSchemaProcedureInfo { + pub(super) fn new(catalog_manager: Weak) -> Self { + Self { + schema: Self::schema(), + catalog_manager, + } + } + + pub(crate) fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + ColumnSchema::new(PROCEDURE_ID, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(PROCEDURE_TYPE, ConcreteDataType::string_datatype(), false), + ColumnSchema::new( + START_TIME, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ), + ColumnSchema::new( + END_TIME, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ), + ColumnSchema::new(STATUS, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(LOCK_KEYS, ConcreteDataType::string_datatype(), true), + ])) + } + + fn builder(&self) -> InformationSchemaProcedureInfoBuilder { + InformationSchemaProcedureInfoBuilder::new( + self.schema.clone(), + self.catalog_manager.clone(), + ) + } +} + +impl InformationTable for InformationSchemaProcedureInfo { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID + } + + fn table_name(&self) -> &'static str { + PROCEDURE_INFO + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_procedure_info(Some(request)) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + +struct InformationSchemaProcedureInfoBuilder { + schema: SchemaRef, + catalog_manager: Weak, + + procedure_ids: StringVectorBuilder, + procedure_types: StringVectorBuilder, + start_times: TimestampMillisecondVectorBuilder, + end_times: TimestampMillisecondVectorBuilder, + statuses: StringVectorBuilder, + lock_keys: StringVectorBuilder, +} + +impl InformationSchemaProcedureInfoBuilder { + fn new(schema: SchemaRef, catalog_manager: Weak) -> Self { + Self { + schema, + catalog_manager, + procedure_ids: StringVectorBuilder::with_capacity(INIT_CAPACITY), + procedure_types: StringVectorBuilder::with_capacity(INIT_CAPACITY), + start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY), + end_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY), + statuses: StringVectorBuilder::with_capacity(INIT_CAPACITY), + lock_keys: StringVectorBuilder::with_capacity(INIT_CAPACITY), + } + } + + /// Construct the `information_schema.procedure_info` virtual table + async fn make_procedure_info(&mut self, request: Option) -> Result { + let predicates = Predicates::from_scan_request(&request); + let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone); + match mode { + Mode::Standalone => { + if let Some(procedure_manager) = utils::procedure_manager(&self.catalog_manager)? { + let procedures = procedure_manager + .list_procedures() + .await + .map_err(BoxedError::new) + .context(ListProceduresSnafu)?; + for procedure in procedures { + self.add_procedure( + &predicates, + procedure.state.as_str_name().to_string(), + procedure, + ); + } + } else { + return GetProcedureClientSnafu { mode: "standalone" }.fail(); + } + } + Mode::Distributed => { + if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? { + let procedures = meta_client + .list_procedures(&ExecutorContext::default()) + .await + .map_err(BoxedError::new) + .context(ListProceduresSnafu)?; + for procedure in procedures.procedures { + self.add_procedure_info(&predicates, procedure)?; + } + } else { + return GetProcedureClientSnafu { + mode: "distributed", + } + .fail(); + } + } + }; + + self.finish() + } + + fn add_procedure( + &mut self, + predicates: &Predicates, + status: String, + procedure_info: ProcedureInfo, + ) { + let ProcedureInfo { + id, + type_name, + start_time_ms, + end_time_ms, + lock_keys, + .. + } = procedure_info; + let pid = id.to_string(); + let start_time = TimestampMillisecond(Timestamp::new_millisecond(start_time_ms)); + let end_time = TimestampMillisecond(Timestamp::new_millisecond(end_time_ms)); + let lock_keys = lock_keys.join(","); + + let row = [ + (PROCEDURE_ID, &Value::from(pid.clone())), + (PROCEDURE_TYPE, &Value::from(type_name.clone())), + (START_TIME, &Value::from(start_time)), + (END_TIME, &Value::from(end_time)), + (STATUS, &Value::from(status.clone())), + (LOCK_KEYS, &Value::from(lock_keys.clone())), + ]; + if !predicates.eval(&row) { + return; + } + self.procedure_ids.push(Some(&pid)); + self.procedure_types.push(Some(&type_name)); + self.start_times.push(Some(start_time)); + self.end_times.push(Some(end_time)); + self.statuses.push(Some(&status)); + self.lock_keys.push(Some(&lock_keys)); + } + + fn add_procedure_info( + &mut self, + predicates: &Predicates, + procedure: ProcedureMeta, + ) -> Result<()> { + let pid = match procedure.id { + Some(pid) => pid, + None => return ProcedureIdNotFoundSnafu {}.fail(), + }; + let pid = procedure::pb_pid_to_pid(&pid) + .map_err(BoxedError::new) + .context(ConvertProtoDataSnafu)?; + let status = ProcedureStatus::try_from(procedure.status) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown") + .to_string(); + let procedure_info = ProcedureInfo { + id: pid, + type_name: procedure.type_name, + start_time_ms: procedure.start_time_ms, + end_time_ms: procedure.end_time_ms, + state: ProcedureState::Running, + lock_keys: procedure.lock_keys, + }; + self.add_procedure(predicates, status, procedure_info); + Ok(()) + } + + fn finish(&mut self) -> Result { + let columns: Vec = vec![ + Arc::new(self.procedure_ids.finish()), + Arc::new(self.procedure_types.finish()), + Arc::new(self.start_times.finish()), + Arc::new(self.end_times.finish()), + Arc::new(self.statuses.finish()), + Arc::new(self.lock_keys.finish()), + ]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} + +impl DfPartitionStream for InformationSchemaProcedureInfo { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_procedure_info(None) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} diff --git a/src/catalog/src/system_schema/information_schema/table_names.rs b/src/catalog/src/system_schema/information_schema/table_names.rs index c2c9eeff24f5..a62f4ddb4002 100644 --- a/src/catalog/src/system_schema/information_schema/table_names.rs +++ b/src/catalog/src/system_schema/information_schema/table_names.rs @@ -45,3 +45,4 @@ pub const TABLE_CONSTRAINTS: &str = "table_constraints"; pub const CLUSTER_INFO: &str = "cluster_info"; pub const VIEWS: &str = "views"; pub const FLOWS: &str = "flows"; +pub const PROCEDURE_INFO: &str = "procedure_info"; diff --git a/src/catalog/src/system_schema/utils.rs b/src/catalog/src/system_schema/utils.rs index 2a2e89516a6f..b9786bc2600f 100644 --- a/src/catalog/src/system_schema/utils.rs +++ b/src/catalog/src/system_schema/utils.rs @@ -18,6 +18,7 @@ use std::sync::{Arc, Weak}; use common_config::Mode; use common_meta::key::TableMetadataManagerRef; +use common_procedure::ProcedureManagerRef; use meta_client::client::MetaClient; use snafu::OptionExt; @@ -68,3 +69,17 @@ pub fn table_meta_manager( .downcast_ref::() .map(|manager| manager.table_metadata_manager_ref().clone())) } + +/// Try to get the `[ProcedureManagerRef]` from `[CatalogManager]` weak reference. +pub fn procedure_manager( + catalog_manager: &Weak, +) -> Result> { + let catalog_manager = catalog_manager + .upgrade() + .context(UpgradeWeakCatalogManagerRefSnafu)?; + + Ok(catalog_manager + .as_any() + .downcast_ref::() + .and_then(|manager| manager.procedure_manager())) +} diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index d6d81fa1342b..09d3d9d2fd2e 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -327,6 +327,7 @@ mod tests { None, backend.clone(), layered_cache_registry, + None, ); let table_metadata_manager = TableMetadataManager::new(backend); let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]); diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 6d7e211d7cb9..35ab1a3580c6 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -281,6 +281,7 @@ async fn create_query_engine(meta_addr: &str) -> Result { Some(meta_client.clone()), cached_meta_backend.clone(), layered_cache_registry, + None, ); let plugins: Plugins = Default::default(); let state = Arc::new(QueryEngineState::new( diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index e950d9bb43b1..78dfc906077f 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -274,6 +274,7 @@ impl StartCommand { Some(meta_client.clone()), cached_meta_backend.clone(), layered_cache_registry.clone(), + None, ); let table_metadata_manager = diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 6d8fd97070ee..320dc49c1979 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -320,6 +320,7 @@ impl StartCommand { Some(meta_client.clone()), cached_meta_backend.clone(), layered_cache_registry.clone(), + None, ); let executor = HandlerGroupExecutor::new(vec![ diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index c8083c5f80c2..80b38ebaa749 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -482,6 +482,7 @@ impl StartCommand { None, kv_backend.clone(), layered_cache_registry.clone(), + Some(procedure_manager.clone()), ); let table_metadata_manager = diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 6020a4bef805..2a8e2fc0e46e 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -98,6 +98,8 @@ pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31; pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32; /// id for information_schema.FLOWS pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33; +/// id for information_schema.procedure_info +pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34; /// ----- End of information_schema tables ----- /// ----- Begin of pg_catalog tables ----- diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 7186997906e3..11654f04d6a3 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use api::v1::meta::ProcedureDetailResponse; use common_telemetry::tracing_context::W3cTrace; use store_api::storage::{RegionId, RegionNumber, TableId}; @@ -82,6 +83,8 @@ pub trait ProcedureExecutor: Send + Sync { ctx: &ExecutorContext, pid: &str, ) -> Result; + + async fn list_procedures(&self, ctx: &ExecutorContext) -> Result; } pub type ProcedureExecutorRef = Arc; diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 152a4631e2dd..1ee148406d04 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::meta::ProcedureDetailResponse; use common_procedure::{ watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, }; @@ -825,6 +826,15 @@ impl ProcedureExecutor for DdlManager { Ok(procedure::procedure_state_to_pb_response(&state)) } + + async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result { + let metas = self + .procedure_manager + .list_procedures() + .await + .context(QueryProcedureSnafu)?; + Ok(procedure::procedure_details_to_pb_response(metas)) + } } #[cfg(test)] diff --git a/src/common/meta/src/rpc/procedure.rs b/src/common/meta/src/rpc/procedure.rs index 6dabe899a81e..2c2a69b5b603 100644 --- a/src/common/meta/src/rpc/procedure.rs +++ b/src/common/meta/src/rpc/procedure.rs @@ -16,10 +16,11 @@ use std::time::Duration; pub use api::v1::meta::{MigrateRegionResponse, ProcedureStateResponse}; use api::v1::meta::{ - ProcedureId as PbProcedureId, ProcedureStateResponse as PbProcedureStateResponse, + ProcedureDetailResponse as PbProcedureDetailResponse, ProcedureId as PbProcedureId, + ProcedureMeta as PbProcedureMeta, ProcedureStateResponse as PbProcedureStateResponse, ProcedureStatus as PbProcedureStatus, }; -use common_procedure::{ProcedureId, ProcedureState}; +use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState}; use snafu::ResultExt; use crate::error::{ParseProcedureIdSnafu, Result}; @@ -49,9 +50,9 @@ pub fn pid_to_pb_pid(pid: ProcedureId) -> PbProcedureId { } } -/// Cast the common [`ProcedureState`] to pb [`ProcedureStateResponse`]. -pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStateResponse { - let (status, error) = match state { +/// Cast the [`ProcedureState`] to protobuf [`PbProcedureStatus`]. +pub fn procedure_state_to_pb_state(state: &ProcedureState) -> (PbProcedureStatus, String) { + match state { ProcedureState::Running => (PbProcedureStatus::Running, String::default()), ProcedureState::Done { .. } => (PbProcedureStatus::Done, String::default()), ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.to_string()), @@ -62,8 +63,12 @@ pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStat ProcedureState::RollingBack { error } => { (PbProcedureStatus::RollingBack, error.to_string()) } - }; + } +} +/// Cast the common [`ProcedureState`] to pb [`ProcedureStateResponse`]. +pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStateResponse { + let (status, error) = procedure_state_to_pb_state(state); PbProcedureStateResponse { status: status.into(), error, @@ -71,6 +76,28 @@ pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStat } } +pub fn procedure_details_to_pb_response(metas: Vec) -> PbProcedureDetailResponse { + let procedures = metas + .into_iter() + .map(|meta| { + let (status, error) = procedure_state_to_pb_state(&meta.state); + PbProcedureMeta { + id: Some(pid_to_pb_pid(meta.id)), + type_name: meta.type_name.to_string(), + status: status.into(), + start_time_ms: meta.start_time_ms, + end_time_ms: meta.end_time_ms, + lock_keys: meta.lock_keys, + error, + } + }) + .collect(); + PbProcedureDetailResponse { + procedures, + ..Default::default() + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index b56aa46a91b4..1d8c6736e3fd 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -19,6 +19,7 @@ common-error.workspace = true common-macro.workspace = true common-runtime.workspace = true common-telemetry.workspace = true +common-time.workspace = true futures.workspace = true humantime-serde.workspace = true object-store.workspace = true diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index ece2ce4189a0..269ad2c52904 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -26,7 +26,7 @@ pub mod watcher; pub use crate::error::{Error, Result}; pub use crate::procedure::{ BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError, - Procedure, ProcedureId, ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, - Status, StringKey, + Procedure, ProcedureId, ProcedureInfo, ProcedureManager, ProcedureManagerRef, ProcedureState, + ProcedureWithId, Status, StringKey, }; pub use crate::watcher::Watcher; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 63fc06270a10..da456fb37aef 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -16,7 +16,7 @@ mod runner; mod rwlock; use std::collections::{HashMap, VecDeque}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; @@ -35,7 +35,7 @@ use crate::error::{ StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, }; use crate::local::runner::Runner; -use crate::procedure::{BoxedProcedureLoader, InitProcedureState}; +use crate::procedure::{BoxedProcedureLoader, InitProcedureState, ProcedureInfo}; use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef}; use crate::{ BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState, @@ -57,6 +57,8 @@ const META_TTL: Duration = Duration::from_secs(60 * 10); pub(crate) struct ProcedureMeta { /// Id of this procedure. id: ProcedureId, + /// Type name of this procedure. + type_name: String, /// Parent procedure id. parent_id: Option, /// Notify to wait for subprocedures. @@ -69,6 +71,10 @@ pub(crate) struct ProcedureMeta { state_receiver: Receiver, /// Id of child procedures. children: Mutex>, + /// Start execution time of this procedure. + start_time_ms: AtomicI64, + /// End execution time of this procedure. + end_time_ms: AtomicI64, } impl ProcedureMeta { @@ -77,6 +83,7 @@ impl ProcedureMeta { procedure_state: ProcedureState, parent_id: Option, lock_key: LockKey, + type_name: &str, ) -> ProcedureMeta { let (state_sender, state_receiver) = watch::channel(procedure_state); ProcedureMeta { @@ -87,6 +94,9 @@ impl ProcedureMeta { state_sender, state_receiver, children: Mutex::new(Vec::new()), + start_time_ms: AtomicI64::new(0), + end_time_ms: AtomicI64::new(0), + type_name: type_name.to_string(), } } @@ -117,6 +127,18 @@ impl ProcedureMeta { fn num_children(&self) -> usize { self.children.lock().unwrap().len() } + + /// update the start time of the procedure. + fn set_start_time_ms(&self) { + self.start_time_ms + .store(common_time::util::current_time_millis(), Ordering::Relaxed); + } + + /// update the end time of the procedure. + fn set_end_time_ms(&self) { + self.end_time_ms + .store(common_time::util::current_time_millis(), Ordering::Relaxed); + } } /// Reference counted pointer to [ProcedureMeta]. @@ -210,6 +232,22 @@ impl ManagerContext { procedures.get(&procedure_id).map(|meta| meta.state()) } + /// Returns the [ProcedureMeta] of all procedures. + fn list_procedure(&self) -> Vec { + let procedures = self.procedures.read().unwrap(); + procedures + .values() + .map(|meta| ProcedureInfo { + id: meta.id, + type_name: meta.type_name.clone(), + start_time_ms: meta.start_time_ms.load(Ordering::Relaxed), + end_time_ms: meta.end_time_ms.load(Ordering::Relaxed), + state: meta.state(), + lock_keys: meta.lock_key.get_keys(), + }) + .collect() + } + /// Returns the [Watcher] of specific `procedure_id`. fn watcher(&self, procedure_id: ProcedureId) -> Option { let procedures = self.procedures.read().unwrap(); @@ -438,6 +476,7 @@ impl LocalManager { procedure_state, None, procedure.lock_key(), + procedure.type_name(), )); let runner = Runner { meta: meta.clone(), @@ -641,6 +680,10 @@ impl ProcedureManager for LocalManager { fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option { self.manager_ctx.watcher(procedure_id) } + + async fn list_procedures(&self) -> Result> { + Ok(self.manager_ctx.list_procedure()) + } } struct RemoveOutdatedMetaFunction { @@ -675,6 +718,7 @@ pub(crate) mod test_util { ProcedureState::Running, None, LockKey::default(), + "ProcedureAdapter", ) } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 2f38ae135d23..c2d15001fba3 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -27,7 +27,9 @@ use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; use crate::procedure::{Output, StringKey}; use crate::store::{ProcedureMessage, ProcedureStore}; -use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status}; +use crate::{ + BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status, +}; /// A guard to cleanup procedure state. struct ProcedureGuard { @@ -129,7 +131,9 @@ impl Runner { // Execute the procedure. We need to release the lock whenever the execution // is successful or fail. + self.meta.set_start_time_ms(); self.execute_procedure_in_loop().await; + self.meta.set_end_time_ms(); // We can't remove the metadata of the procedure now as users and its parent might // need to query its state. @@ -368,6 +372,7 @@ impl Runner { procedure_state, Some(self.meta.id), procedure.lock_key(), + procedure.type_name(), )); let runner = Runner { meta: meta.clone(), diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 6c694315e93b..ddf5dc74a35d 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -159,6 +159,14 @@ impl Procedure for Box { (**self).execute(ctx).await } + async fn rollback(&mut self, ctx: &Context) -> Result<()> { + (**self).rollback(ctx).await + } + + fn rollback_supported(&self) -> bool { + (**self).rollback_supported() + } + fn dump(&self) -> Result { (**self).dump() } @@ -227,6 +235,11 @@ impl LockKey { pub fn keys_to_lock(&self) -> impl Iterator { self.0.iter() } + + /// Returns the keys to lock. + pub fn get_keys(&self) -> Vec { + self.0.iter().map(|key| format!("{:?}", key)).collect() + } } /// Boxed [Procedure]. @@ -374,6 +387,18 @@ impl ProcedureState { _ => None, } } + + /// Return the string values of the enum field names. + pub fn as_str_name(&self) -> &str { + match self { + ProcedureState::Running => "Running", + ProcedureState::Done { .. } => "Done", + ProcedureState::Retrying { .. } => "Retrying", + ProcedureState::Failed { .. } => "Failed", + ProcedureState::PrepareRollback { .. } => "PrepareRollback", + ProcedureState::RollingBack { .. } => "RollingBack", + } + } } /// The initial procedure state. @@ -412,11 +437,30 @@ pub trait ProcedureManager: Send + Sync + 'static { /// Returns a [Watcher] to watch [ProcedureState] of specific procedure. fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option; + + /// Returns the details of the procedure. + async fn list_procedures(&self) -> Result>; } /// Ref-counted pointer to the [ProcedureManager]. pub type ProcedureManagerRef = Arc; +#[derive(Debug, Clone)] +pub struct ProcedureInfo { + /// Id of this procedure. + pub id: ProcedureId, + /// Type name of this procedure. + pub type_name: String, + /// Start execution time of this procedure. + pub start_time_ms: i64, + /// End execution time of this procedure. + pub end_time_ms: i64, + /// status of this procedure. + pub state: ProcedureState, + /// Lock keys of this procedure. + pub lock_keys: Vec, +} + #[cfg(test)] mod tests { use common_error::mock::MockError; diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 7c2a6ed92382..0a9df7293fb9 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -22,7 +22,7 @@ mod cluster; mod store; mod util; -use api::v1::meta::Role; +use api::v1::meta::{ProcedureDetailResponse, Role}; use cluster::Client as ClusterClient; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -259,6 +259,16 @@ impl ProcedureExecutor for MetaClient { .map_err(BoxedError::new) .context(meta_error::ExternalSnafu) } + + async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult { + self.procedure_client() + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)? + .list_procedures() + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } } #[async_trait::async_trait] diff --git a/src/meta-client/src/client/procedure.rs b/src/meta-client/src/client/procedure.rs index 32049dbabdf0..f45cfb787988 100644 --- a/src/meta-client/src/client/procedure.rs +++ b/src/meta-client/src/client/procedure.rs @@ -18,8 +18,9 @@ use std::time::Duration; use api::v1::meta::procedure_service_client::ProcedureServiceClient; use api::v1::meta::{ - DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureId, - ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, Role, + DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, + ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse, + QueryProcedureRequest, ResponseHeader, Role, }; use common_grpc::channel_manager::ChannelManager; use common_telemetry::tracing_context::TracingContext; @@ -89,6 +90,11 @@ impl Client { .migrate_region(region_id, from_peer, to_peer, replay_timeout) .await } + + pub async fn list_procedures(&self) -> Result { + let inner = self.inner.read().await; + inner.list_procedures().await + } } #[derive(Debug)] @@ -279,4 +285,23 @@ impl Inner { ) .await } + + async fn list_procedures(&self) -> Result { + let mut req = ProcedureDetailRequest::default(); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); + + self.with_retry( + "list procedure", + move |mut client| { + let req = req.clone(); + async move { client.details(req).await.map(|res| res.into_inner()) } + }, + |resp: &ProcedureDetailResponse| &resp.header, + ) + .await + } } diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index 05e6d98fa47a..d19d0902ae05 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -18,7 +18,7 @@ use std::time::Duration; use api::v1::meta::{ procedure_service_server, DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, - ProcedureStateResponse, QueryProcedureRequest, + ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, }; use common_meta::ddl::ExecutorContext; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest}; @@ -146,4 +146,20 @@ impl procedure_service_server::ProcedureService for Metasrv { Ok(Response::new(resp)) } + + async fn details( + &self, + request: Request, + ) -> GrpcResult { + let ProcedureDetailRequest { header } = request.into_inner(); + let _header = header.context(error::MissingRequestHeaderSnafu)?; + let metas = self + .procedure_manager() + .list_procedures() + .await + .context(error::QueryProcedureSnafu)?; + Ok(Response::new(procedure::procedure_details_to_pb_response( + metas, + ))) + } } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 19cb36a5b9b1..ce2803996aa5 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -370,6 +370,7 @@ impl GreptimeDbClusterBuilder { Some(meta_client.clone()), cached_meta_backend.clone(), cache_registry.clone(), + None, ); let handlers_executor = HandlerGroupExecutor::new(vec![ diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 9f7188568fa4..fa6e8f2a9e5c 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -149,6 +149,7 @@ impl GreptimeDbStandaloneBuilder { None, kv_backend.clone(), cache_registry.clone(), + Some(procedure_manager.clone()), ); let flow_builder = FlownodeBuilder::new( diff --git a/tests/cases/standalone/common/information_schema/procedure_info.result b/tests/cases/standalone/common/information_schema/procedure_info.result new file mode 100644 index 000000000000..965e02b8ba33 --- /dev/null +++ b/tests/cases/standalone/common/information_schema/procedure_info.result @@ -0,0 +1,40 @@ +--- test information_schema.procedure_info ---- +USE public; + +Affected Rows: 0 + +CREATE TABLE procedure_info_for_sql_test1( + ts TIMESTAMP TIME INDEX, + temperature DOUBLE DEFAULT 10, +) engine=mito with('append_mode'='true'); + +Affected Rows: 0 + +CREATE TABLE procedure_info_for_sql_test2( + ts TIMESTAMP TIME INDEX, + temperature DOUBLE DEFAULT 10, +) engine=mito with('append_mode'='true'); + +Affected Rows: 0 + +use INFORMATION_SCHEMA; + +Affected Rows: 0 + +select procedure_type from procedure_info where lock_keys like '%procedure_info_for_sql_test%'; + ++--------------------------------+ +| procedure_type | ++--------------------------------+ +| metasrv-procedure::CreateTable | +| metasrv-procedure::CreateTable | ++--------------------------------+ + +use public; + +Affected Rows: 0 + +DROP TABLE procedure_info_for_sql_test1, procedure_info_for_sql_test2; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/information_schema/procedure_info.sql b/tests/cases/standalone/common/information_schema/procedure_info.sql new file mode 100644 index 000000000000..763dde12ef85 --- /dev/null +++ b/tests/cases/standalone/common/information_schema/procedure_info.sql @@ -0,0 +1,20 @@ +--- test information_schema.procedure_info ---- +USE public; + +CREATE TABLE procedure_info_for_sql_test1( + ts TIMESTAMP TIME INDEX, + temperature DOUBLE DEFAULT 10, +) engine=mito with('append_mode'='true'); + +CREATE TABLE procedure_info_for_sql_test2( + ts TIMESTAMP TIME INDEX, + temperature DOUBLE DEFAULT 10, +) engine=mito with('append_mode'='true'); + +use INFORMATION_SCHEMA; + +select procedure_type from procedure_info where lock_keys like '%procedure_info_for_sql_test%'; + +use public; + +DROP TABLE procedure_info_for_sql_test1, procedure_info_for_sql_test2; diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index fa50fb2aab3a..c92a5b33c8ad 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -45,6 +45,7 @@ SHOW TABLES; | optimizer_trace | | parameters | | partitions | +| procedure_info | | profiling | | referential_constraints | | region_peers | @@ -91,6 +92,7 @@ SHOW FULL TABLES; | optimizer_trace | LOCAL TEMPORARY | | parameters | LOCAL TEMPORARY | | partitions | LOCAL TEMPORARY | +| procedure_info | LOCAL TEMPORARY | | profiling | LOCAL TEMPORARY | | referential_constraints | LOCAL TEMPORARY | | region_peers | LOCAL TEMPORARY | @@ -131,6 +133,7 @@ SHOW TABLE STATUS; |optimizer_trace||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| |parameters||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| |partitions||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| +|procedure_info||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| |profiling||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| |referential_constraints||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| |region_peers||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index cbb0c12b6f3e..03b65cf390ea 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -32,6 +32,7 @@ order by table_schema, table_name; |greptime|information_schema|optimizer_trace|LOCALTEMPORARY|17|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|parameters|LOCALTEMPORARY|18|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|partitions|LOCALTEMPORARY|28|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| +|greptime|information_schema|procedure_info|LOCALTEMPORARY|34|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|profiling|LOCALTEMPORARY|19|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|referential_constraints|LOCALTEMPORARY|20|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|region_peers|LOCALTEMPORARY|29|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| @@ -256,6 +257,12 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | partitions | table_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | partitions | tablespace_name | 25 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | partitions | update_time | 20 | | | | | 3 | | | | | select,insert | | DateTime | datetime | FIELD | | Yes | datetime | | | +| greptime | information_schema | procedure_info | end_time | 4 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | | +| greptime | information_schema | procedure_info | lock_keys | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | procedure_info | procedure_id | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | procedure_info | procedure_type | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | procedure_info | start_time | 3 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | | +| greptime | information_schema | procedure_info | status | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | profiling | block_ops_in | 9 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | profiling | block_ops_out | 10 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | profiling | context_involuntary | 8 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result index 4cf8084cd364..4fd2f59356f6 100644 --- a/tests/cases/standalone/common/view/create.result +++ b/tests/cases/standalone/common/view/create.result @@ -106,6 +106,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; |greptime|information_schema|optimizer_trace|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|parameters|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|partitions|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| +|greptime|information_schema|procedure_info|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|profiling|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|referential_constraints|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|region_peers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y|