diff --git a/Cargo.lock b/Cargo.lock index db7b55dcdace..7cc4a59b648e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1259,7 +1259,6 @@ dependencies = [ "async-trait", "common-base", "common-config", - "common-contexts", "common-datablocks", "common-datavalues", "common-exception", @@ -1298,14 +1297,6 @@ dependencies = [ "thrift 0.17.0", ] -[[package]] -name = "common-contexts" -version = "0.1.0" -dependencies = [ - "async-trait", - "opendal", -] - [[package]] name = "common-datablocks" version = "0.1.0" @@ -2019,7 +2010,6 @@ dependencies = [ "common-base", "common-catalog", "common-config", - "common-contexts", "common-datablocks", "common-datavalues", "common-exception", @@ -2097,7 +2087,6 @@ dependencies = [ "async-trait", "backon", "common-base", - "common-contexts", "common-exception", "globiter", "once_cell", @@ -2126,10 +2115,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "common-storages-context" -version = "0.1.0" - [[package]] name = "common-storages-factory" version = "0.1.0" @@ -2151,7 +2136,6 @@ dependencies = [ "common-pipeline-sources", "common-pipeline-transforms", "common-storage", - "common-storages-context", "common-storages-fuse", "common-storages-index", "common-storages-preludes", @@ -2194,7 +2178,6 @@ dependencies = [ "common-storage", "common-storages-cache", "common-storages-constants", - "common-storages-context", "common-storages-index", "futures", "futures-util", @@ -2269,7 +2252,6 @@ dependencies = [ "common-base", "common-building", "common-catalog", - "common-contexts", "common-datablocks", "common-datavalues", "common-exception", @@ -2282,7 +2264,7 @@ dependencies = [ "common-pipeline-sinks", "common-pipeline-sources", "common-pipeline-transforms", - "common-storages-context", + "common-storage", "common-users", "futures", "itertools", @@ -2900,7 +2882,6 @@ dependencies = [ "common-building", "common-catalog", "common-config", - "common-contexts", "common-datablocks", "common-datavalues", "common-exception", @@ -2932,7 +2913,6 @@ dependencies = [ "common-storage", "common-storages-cache", "common-storages-constants", - "common-storages-context", "common-storages-factory", "common-storages-fuse", "common-storages-hive", diff --git a/Cargo.toml b/Cargo.toml index 309f6cd9dda4..0aedcb0a07bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ members = [ "src/common/base", "src/common/building", "src/common/cache", - "src/common/contexts", "src/common/exception", "src/common/grpc", "src/common/hashtable", @@ -48,7 +47,6 @@ members = [ "src/query/settings", "src/query/sql", "src/query/storages/cache", - "src/query/storages/context", "src/query/storages/constants", "src/query/storages/fuse", "src/query/storages/fuse-meta", diff --git a/src/common/contexts/Cargo.toml b/src/common/contexts/Cargo.toml deleted file mode 100644 index 405387aee747..000000000000 --- a/src/common/contexts/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "common-contexts" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[lib] -doctest = false -test = false - -[dependencies] -async-trait = "0.1.57" -opendal = { version = "0.19", features = ["layers-retry"] } diff --git a/src/common/contexts/src/dal/dal_context.rs b/src/common/contexts/src/dal/dal_context.rs deleted file mode 100644 index f1d508515b37..000000000000 --- a/src/common/contexts/src/dal/dal_context.rs +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -use std::io::Error; -use std::io::ErrorKind; -use std::io::Result; -use std::sync::Arc; -use std::time::Instant; - -use async_trait::async_trait; -use opendal::io_util::observe_read; -use opendal::io_util::ReadEvent; -use opendal::ops::OpCreate; -use opendal::ops::OpDelete; -use opendal::ops::OpList; -use opendal::ops::OpPresign; -use opendal::ops::OpRead; -use opendal::ops::OpStat; -use opendal::ops::OpWrite; -use opendal::ops::PresignedRequest; -use opendal::Accessor; -use opendal::AccessorMetadata; -use opendal::BytesReader; -use opendal::Layer; -use opendal::ObjectMetadata; -use opendal::ObjectStreamer; - -use crate::DalMetrics; - -#[derive(Clone, Default, Debug)] -pub struct DalContext { - inner: Option>, - metrics: Arc, -} - -impl DalContext { - pub fn new(inner: Arc) -> Self { - DalContext { - inner: Some(inner), - metrics: Arc::new(Default::default()), - } - } - - fn get_inner(&self) -> Result> { - match &self.inner { - None => Err(Error::new( - ErrorKind::Other, - "dal context must init wrongly, inner accessor is empty", - )), - Some(inner) => Ok(inner.clone()), - } - } - - pub fn get_metrics(&self) -> Arc { - self.metrics.clone() - } -} - -impl Layer for DalContext { - fn layer(&self, inner: Arc) -> Arc { - Arc::new(DalContext { - inner: Some(inner), - metrics: self.metrics.clone(), - }) - } -} - -#[async_trait] -impl Accessor for DalContext { - fn metadata(&self) -> AccessorMetadata { - self.get_inner() - .expect("must have valid accessor") - .metadata() - } - - async fn create(&self, path: &str, args: OpCreate) -> Result<()> { - self.get_inner()?.create(path, args).await - } - - async fn read(&self, path: &str, args: OpRead) -> Result { - let metric = self.metrics.clone(); - - self.get_inner()?.read(path, args).await.map(|r| { - let mut last_pending = None; - let r = observe_read(r, move |e| { - let start = match last_pending { - None => Instant::now(), - Some(t) => t, - }; - match e { - ReadEvent::Pending => last_pending = Some(start), - ReadEvent::Read(n) => { - last_pending = None; - metric.inc_read_bytes(n); - } - ReadEvent::Error(_) => last_pending = None, - _ => {} - } - metric.inc_read_bytes_cost(start.elapsed().as_millis() as u64); - }); - - Box::new(r) as BytesReader - }) - } - - async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result { - let metric = self.metrics.clone(); - - let mut last_pending = None; - - let r = observe_read(r, move |e| { - let start = match last_pending { - None => Instant::now(), - Some(t) => t, - }; - match e { - ReadEvent::Pending => last_pending = Some(start), - ReadEvent::Read(n) => { - last_pending = None; - metric.inc_write_bytes(n); - } - ReadEvent::Error(_) => last_pending = None, - _ => {} - } - metric.inc_write_bytes_cost(start.elapsed().as_millis() as u64); - }); - - self.get_inner()?.write(path, args, Box::new(r)).await - } - - async fn stat(&self, path: &str, args: OpStat) -> Result { - self.get_inner()?.stat(path, args).await - } - - async fn delete(&self, path: &str, args: OpDelete) -> Result<()> { - self.get_inner()?.delete(path, args).await - } - - async fn list(&self, path: &str, args: OpList) -> Result { - self.get_inner()?.list(path, args).await - } - - fn presign(&self, path: &str, args: OpPresign) -> Result { - self.get_inner()?.presign(path, args) - } -} diff --git a/src/common/contexts/src/dal/dal_metrics.rs b/src/common/contexts/src/dal/dal_metrics.rs deleted file mode 100644 index 605868d5c225..000000000000 --- a/src/common/contexts/src/dal/dal_metrics.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::atomic::AtomicU64; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::sync::Arc; - -/// DalMetrics represents the metrics of a DAL (all bytes metrics are compressed size). -#[derive(Clone, Debug, Default)] -pub struct DalMetrics { - /// Read bytes. - read_bytes: Arc, - /// Cost(in ms) of read bytes. - read_bytes_cost_ms: Arc, - /// Bytes written by data access layer - write_bytes: Arc, - /// Cost(in ms) of write bytes. - write_bytes_cost_ms: Arc, - /// Number of partitions scanned, after pruning - partitions_scanned: Arc, - /// Number of partitions, before pruning - partitions_total: Arc, -} - -impl DalMetrics { - pub fn inc_read_bytes(&self, v: usize) { - if v > 0 { - self.read_bytes.fetch_add(v, Ordering::Relaxed); - } - } - - pub fn get_read_bytes(&self) -> usize { - self.read_bytes.load(Ordering::Relaxed) - } - - pub fn inc_read_bytes_cost(&self, ms: u64) { - if ms > 0 { - self.read_bytes_cost_ms.fetch_add(ms, Ordering::Relaxed); - } - } - - pub fn inc_write_bytes_cost(&self, ms: u64) { - if ms > 0 { - self.write_bytes_cost_ms.fetch_add(ms, Ordering::Relaxed); - } - } - - pub fn get_read_bytes_cost(&self) -> u64 { - self.read_bytes_cost_ms.load(Ordering::Relaxed) - } - - pub fn get_write_bytes_cost(&self) -> u64 { - self.write_bytes_cost_ms.load(Ordering::Relaxed) - } - - pub fn inc_write_bytes(&self, v: usize) { - if v > 0 { - self.write_bytes.fetch_add(v, Ordering::Relaxed); - } - } - - pub fn get_write_bytes(&self) -> usize { - self.write_bytes.load(Ordering::Relaxed) - } - - pub fn inc_partitions_scanned(&self, v: u64) { - if v > 0 { - self.partitions_scanned.fetch_add(v, Ordering::Relaxed); - } - } - - pub fn get_partitions_scanned(&self) -> u64 { - self.partitions_scanned.load(Ordering::Relaxed) - } - - pub fn inc_partitions_total(&self, v: u64) { - if v > 0 { - self.partitions_total.fetch_add(v, Ordering::Relaxed); - } - } - - pub fn get_partitions_total(&self) -> u64 { - self.partitions_total.load(Ordering::Relaxed) - } -} diff --git a/src/common/contexts/src/dal/mod.rs b/src/common/contexts/src/dal/mod.rs deleted file mode 100644 index eaaea50dc16f..000000000000 --- a/src/common/contexts/src/dal/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod dal_context; -mod dal_metrics; - -pub use dal_context::DalContext; -pub use dal_metrics::DalMetrics; diff --git a/src/common/contexts/src/lib.rs b/src/common/contexts/src/lib.rs deleted file mode 100644 index 1161cbc4e106..000000000000 --- a/src/common/contexts/src/lib.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![deny(unused_crate_dependencies)] - -mod dal; - -pub use dal::DalContext; -pub use dal::DalMetrics; diff --git a/src/common/storage/Cargo.toml b/src/common/storage/Cargo.toml index fff8c3bdc820..234bd05d17c0 100644 --- a/src/common/storage/Cargo.toml +++ b/src/common/storage/Cargo.toml @@ -11,7 +11,6 @@ storage-hdfs = ["opendal/services-hdfs"] [dependencies] common-base = { path = "../base" } -common-contexts = { path = "../contexts" } common-exception = { path = "../exception" } anyhow = { workspace = true } diff --git a/src/common/storage/src/lib.rs b/src/common/storage/src/lib.rs index 0c34ad938cd4..1c980c73dea2 100644 --- a/src/common/storage/src/lib.rs +++ b/src/common/storage/src/lib.rs @@ -56,5 +56,9 @@ mod location; pub use location::parse_uri_location; pub use location::UriLocation; +mod metrics; +pub use metrics::StorageMetrics; +pub use metrics::StorageMetricsLayer; + mod runtime_layer; mod utils; diff --git a/src/common/storage/src/metrics.rs b/src/common/storage/src/metrics.rs new file mode 100644 index 000000000000..345927a8e7fc --- /dev/null +++ b/src/common/storage/src/metrics.rs @@ -0,0 +1,243 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Result; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Instant; + +use async_trait::async_trait; +use opendal::io_util::observe_read; +use opendal::io_util::ReadEvent; +use opendal::ops::OpCreate; +use opendal::ops::OpDelete; +use opendal::ops::OpList; +use opendal::ops::OpPresign; +use opendal::ops::OpRead; +use opendal::ops::OpStat; +use opendal::ops::OpWrite; +use opendal::ops::PresignedRequest; +use opendal::Accessor; +use opendal::AccessorMetadata; +use opendal::BytesReader; +use opendal::Layer; +use opendal::ObjectMetadata; +use opendal::ObjectStreamer; + +/// StorageMetrics represents the metrics of storage (all bytes metrics are compressed size). +#[derive(Debug, Default)] +pub struct StorageMetrics { + /// Read bytes. + read_bytes: AtomicUsize, + /// Cost(in ms) of read bytes. + read_bytes_cost_ms: AtomicU64, + /// Bytes written by data access layer + write_bytes: AtomicUsize, + /// Cost(in ms) of write bytes. + write_bytes_cost_ms: AtomicU64, + /// Number of partitions scanned, after pruning + partitions_scanned: AtomicU64, + /// Number of partitions, before pruning + partitions_total: AtomicU64, +} + +impl StorageMetrics { + /// Merge give metrics into one. + pub fn merge(vs: &[impl AsRef]) -> StorageMetrics { + StorageMetrics { + read_bytes: AtomicUsize::new(vs.iter().map(|v| v.as_ref().get_read_bytes()).sum()), + read_bytes_cost_ms: AtomicU64::new( + vs.iter().map(|v| v.as_ref().get_read_bytes_cost()).sum(), + ), + write_bytes: AtomicUsize::new(vs.iter().map(|v| v.as_ref().get_write_bytes()).sum()), + write_bytes_cost_ms: AtomicU64::new( + vs.iter().map(|v| v.as_ref().get_write_bytes_cost()).sum(), + ), + partitions_scanned: AtomicU64::new( + vs.iter().map(|v| v.as_ref().get_partitions_scanned()).sum(), + ), + partitions_total: AtomicU64::new( + vs.iter().map(|v| v.as_ref().get_partitions_total()).sum(), + ), + } + } + + pub fn inc_read_bytes(&self, v: usize) { + if v > 0 { + self.read_bytes.fetch_add(v, Ordering::Relaxed); + } + } + + pub fn get_read_bytes(&self) -> usize { + self.read_bytes.load(Ordering::Relaxed) + } + + pub fn inc_read_bytes_cost(&self, ms: u64) { + if ms > 0 { + self.read_bytes_cost_ms.fetch_add(ms, Ordering::Relaxed); + } + } + + pub fn inc_write_bytes_cost(&self, ms: u64) { + if ms > 0 { + self.write_bytes_cost_ms.fetch_add(ms, Ordering::Relaxed); + } + } + + pub fn get_read_bytes_cost(&self) -> u64 { + self.read_bytes_cost_ms.load(Ordering::Relaxed) + } + + pub fn get_write_bytes_cost(&self) -> u64 { + self.write_bytes_cost_ms.load(Ordering::Relaxed) + } + + pub fn inc_write_bytes(&self, v: usize) { + if v > 0 { + self.write_bytes.fetch_add(v, Ordering::Relaxed); + } + } + + pub fn get_write_bytes(&self) -> usize { + self.write_bytes.load(Ordering::Relaxed) + } + + pub fn inc_partitions_scanned(&self, v: u64) { + if v > 0 { + self.partitions_scanned.fetch_add(v, Ordering::Relaxed); + } + } + + pub fn get_partitions_scanned(&self) -> u64 { + self.partitions_scanned.load(Ordering::Relaxed) + } + + pub fn inc_partitions_total(&self, v: u64) { + if v > 0 { + self.partitions_total.fetch_add(v, Ordering::Relaxed); + } + } + + pub fn get_partitions_total(&self) -> u64 { + self.partitions_total.load(Ordering::Relaxed) + } +} + +#[derive(Clone, Debug)] +pub struct StorageMetricsLayer { + metrics: Arc, +} + +impl StorageMetricsLayer { + /// Create a new storage metrics layer. + pub fn new(metrics: Arc) -> Self { + StorageMetricsLayer { metrics } + } +} + +impl Layer for StorageMetricsLayer { + fn layer(&self, inner: Arc) -> Arc { + Arc::new(StorageMetricsAccessor { + inner, + metrics: self.metrics.clone(), + }) + } +} + +#[derive(Clone, Debug)] +struct StorageMetricsAccessor { + inner: Arc, + metrics: Arc, +} + +#[async_trait] +impl Accessor for StorageMetricsAccessor { + fn metadata(&self) -> AccessorMetadata { + self.inner.metadata() + } + + async fn create(&self, path: &str, args: OpCreate) -> Result<()> { + self.inner.create(path, args).await + } + + async fn read(&self, path: &str, args: OpRead) -> Result { + let metric = self.metrics.clone(); + + self.inner.read(path, args).await.map(|r| { + let mut last_pending = None; + let r = observe_read(r, move |e| { + let start = match last_pending { + None => Instant::now(), + Some(t) => t, + }; + match e { + ReadEvent::Pending => last_pending = Some(start), + ReadEvent::Read(n) => { + last_pending = None; + metric.inc_read_bytes(n); + } + ReadEvent::Error(_) => last_pending = None, + _ => {} + } + metric.inc_read_bytes_cost(start.elapsed().as_millis() as u64); + }); + + Box::new(r) as BytesReader + }) + } + + async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result { + let metric = self.metrics.clone(); + + let mut last_pending = None; + + let r = observe_read(r, move |e| { + let start = match last_pending { + None => Instant::now(), + Some(t) => t, + }; + match e { + ReadEvent::Pending => last_pending = Some(start), + ReadEvent::Read(n) => { + last_pending = None; + metric.inc_write_bytes(n); + } + ReadEvent::Error(_) => last_pending = None, + _ => {} + } + metric.inc_write_bytes_cost(start.elapsed().as_millis() as u64); + }); + + self.inner.write(path, args, Box::new(r)).await + } + + async fn stat(&self, path: &str, args: OpStat) -> Result { + self.inner.stat(path, args).await + } + + async fn delete(&self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args).await + } + + /// TODO: we need to make sure returning object's accessor is correct. + async fn list(&self, path: &str, args: OpList) -> Result { + self.inner.list(path, args).await + } + + fn presign(&self, path: &str, args: OpPresign) -> Result { + self.inner.presign(path, args) + } +} diff --git a/src/query/catalog/Cargo.toml b/src/query/catalog/Cargo.toml index 32b9dd7f492e..581b0b9cdd05 100644 --- a/src/query/catalog/Cargo.toml +++ b/src/query/catalog/Cargo.toml @@ -11,7 +11,6 @@ test = false [dependencies] common-base = { path = "../../common/base" } common-config = { path = "../config" } -common-contexts = { path = "../../common/contexts" } common-datablocks = { path = "../datablocks" } common-datavalues = { path = "../datavalues" } common-exception = { path = "../../common/exception" } diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 46e08e6feb86..b854ee5d6ada 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -33,6 +33,7 @@ use common_legacy_planners::Statistics; use common_meta_app::schema::TableInfo; use common_meta_types::MetaId; use common_pipeline_core::Pipeline; +use common_storage::StorageMetrics; use crate::table::column_stats_provider_impls::DummyColumnStatisticsProvider; use crate::table_context::TableContext; @@ -74,6 +75,11 @@ pub trait Table: Sync + Send { fn get_table_info(&self) -> &TableInfo; + /// get_data_metrics will get data metrics from table. + fn get_data_metrics(&self) -> Option> { + None + } + /// whether column prune(projection) can help in table read fn benefit_column_prune(&self) -> bool { false diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 668f1d98dc9c..93d7a0c65829 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -20,8 +20,6 @@ use std::time::SystemTime; use common_base::base::Progress; use common_base::base::ProgressValues; use common_config::Config; -use common_contexts::DalContext; -use common_contexts::DalMetrics; use common_datablocks::DataBlock; use common_exception::Result; use common_functions::scalars::FunctionContext; @@ -32,6 +30,7 @@ use common_legacy_planners::ReadDataSourcePlan; use common_meta_types::UserInfo; use common_settings::Settings; use common_storage::DataOperator; +use common_storage::StorageMetrics; use crate::catalog::Catalog; use crate::cluster_info::Cluster; @@ -47,7 +46,8 @@ pub struct ProcessInfo { pub client_address: Option, pub session_extra_info: Option, pub memory_usage: i64, - pub dal_metrics: Option, + /// storage metrics for persisted data reading. + pub data_metrics: Option, pub scan_progress_value: Option, pub mysql_connection_id: Option, pub created_time: SystemTime, @@ -84,15 +84,12 @@ pub trait TableContext: Send + Sync { fn apply_changed_settings(&self, changed_settings: Arc) -> Result<()>; fn get_format_settings(&self) -> Result; fn get_tenant(&self) -> String; - /// Get the data accessor metrics. - fn get_dal_metrics(&self) -> DalMetrics; /// Get the session running query. fn get_query_str(&self) -> String; /// Get the kind of session running query. fn get_query_kind(&self) -> String; // Get the persist storage data accessor operator from the session manager. fn get_data_operator(&self) -> Result; - fn get_dal_context(&self) -> &DalContext; fn push_precommit_block(&self, block: DataBlock); fn consume_precommit_blocks(&self) -> Vec; fn try_get_function_context(&self) -> Result; diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 0aabcc8a4479..a0c8999f4af0 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -32,7 +32,6 @@ common-ast = { path = "../ast" } common-base = { path = "../../common/base" } common-catalog = { path = "../catalog" } common-config = { path = "../config" } -common-contexts = { path = "../../common/contexts" } common-datablocks = { path = "../datablocks" } common-datavalues = { path = "../datavalues" } common-exception = { path = "../../common/exception" } @@ -64,7 +63,6 @@ common-sql = { path = "../sql" } common-storage = { path = "../../common/storage" } common-storages-cache = { path = "../storages/cache" } common-storages-constants = { path = "../storages/constants" } -common-storages-context = { path = "../storages/context" } common-storages-factory = { path = "../storages/factory" } common-storages-fuse = { path = "../storages/fuse" } common-storages-hive = { path = "../storages/hive", optional = true } diff --git a/src/query/service/src/catalogs/default/mutable_catalog.rs b/src/query/service/src/catalogs/default/mutable_catalog.rs index 7f8145b472a3..fb8c57552d8b 100644 --- a/src/query/service/src/catalogs/default/mutable_catalog.rs +++ b/src/query/service/src/catalogs/default/mutable_catalog.rs @@ -62,7 +62,6 @@ use crate::catalogs::catalog::Catalog; use crate::databases::Database; use crate::databases::DatabaseContext; use crate::databases::DatabaseFactory; -use crate::storages::StorageContext; use crate::storages::StorageDescription; use crate::storages::StorageFactory; use crate::storages::Table; @@ -211,8 +210,7 @@ impl Catalog for MutableCatalog { fn get_table_by_info(&self, table_info: &TableInfo) -> Result> { let storage = self.ctx.storage_factory.clone(); - let ctx = StorageContext {}; - storage.get_table(ctx, table_info) + storage.get_table(table_info) } async fn get_table_meta_by_id( diff --git a/src/query/service/src/databases/default/default_database.rs b/src/query/service/src/databases/default/default_database.rs index ddcf33063fcb..dfa4ea4a2149 100644 --- a/src/query/service/src/databases/default/default_database.rs +++ b/src/query/service/src/databases/default/default_database.rs @@ -41,7 +41,6 @@ use common_meta_app::schema::UpsertTableOptionReq; use crate::databases::Database; use crate::databases::DatabaseContext; -use crate::storages::StorageContext; #[derive(Clone)] pub struct DefaultDatabase { @@ -78,8 +77,7 @@ impl Database for DefaultDatabase { fn get_table_by_info(&self, table_info: &TableInfo) -> Result> { let storage = self.ctx.storage_factory.clone(); - let ctx = StorageContext {}; - storage.get_table(ctx, table_info) + storage.get_table(table_info) } // Get one table by db and table name. diff --git a/src/query/service/src/databases/share/share_database.rs b/src/query/service/src/databases/share/share_database.rs index 7c881bafc120..4e7e2f879023 100644 --- a/src/query/service/src/databases/share/share_database.rs +++ b/src/query/service/src/databases/share/share_database.rs @@ -41,7 +41,6 @@ use common_meta_app::schema::UpsertTableOptionReq; use crate::databases::Database; use crate::databases::DatabaseContext; -use crate::storages::StorageContext; #[derive(Clone)] pub struct ShareDatabase { @@ -77,8 +76,7 @@ impl Database for ShareDatabase { fn get_table_by_info(&self, table_info: &TableInfo) -> Result> { let storage = self.ctx.storage_factory.clone(); - let ctx = StorageContext {}; - storage.get_table(ctx, table_info) + storage.get_table(table_info) } // Get one table by db and table name. diff --git a/src/query/service/src/interpreters/interpreter_query_log.rs b/src/query/service/src/interpreters/interpreter_query_log.rs index 0676e0a3df12..a20e3621d46a 100644 --- a/src/query/service/src/interpreters/interpreter_query_log.rs +++ b/src/query/service/src/interpreters/interpreter_query_log.rs @@ -201,20 +201,20 @@ impl InterpreterQueryLog { let event_date = (event_time / (24 * 3_600_000_000)) as i32; let query_start_time = convert_query_log_timestamp(ctx.get_created_time()); let query_duration_ms = (event_time - query_start_time) / 1_000; - let dal_metrics = ctx.get_dal_metrics(); + let data_metrics = ctx.get_data_metrics(); let written_rows = ctx.get_write_progress_value().rows as u64; let written_bytes = ctx.get_write_progress_value().bytes as u64; - let written_io_bytes = dal_metrics.get_write_bytes() as u64; - let written_io_bytes_cost_ms = dal_metrics.get_write_bytes_cost(); + let written_io_bytes = data_metrics.get_write_bytes() as u64; + let written_io_bytes_cost_ms = data_metrics.get_write_bytes_cost(); let scan_rows = ctx.get_scan_progress_value().rows as u64; let scan_bytes = ctx.get_scan_progress_value().bytes as u64; - let scan_io_bytes = dal_metrics.get_read_bytes() as u64; - let scan_io_bytes_cost_ms = dal_metrics.get_read_bytes_cost(); + let scan_io_bytes = data_metrics.get_read_bytes() as u64; + let scan_io_bytes_cost_ms = data_metrics.get_read_bytes_cost(); - let scan_partitions = dal_metrics.get_partitions_scanned(); - let total_partitions = dal_metrics.get_partitions_total(); + let scan_partitions = data_metrics.get_partitions_scanned(); + let total_partitions = data_metrics.get_partitions_total(); let cpu_usage = ctx.get_settings().get_max_threads()? as u32; let memory_usage = ctx.get_current_session().get_memory_usage() as u64; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 831e423db01f..1fb2aa29bd64 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -29,8 +29,6 @@ use common_base::base::ProgressValues; use common_base::base::TrySpawn; use common_config::Config; use common_config::DATABEND_COMMIT_VERSION; -use common_contexts::DalContext; -use common_contexts::DalMetrics; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; @@ -45,6 +43,7 @@ use common_legacy_planners::StageTableInfo; use common_meta_app::schema::TableInfo; use common_meta_types::UserInfo; use common_storage::DataOperator; +use common_storage::StorageMetrics; use parking_lot::RwLock; use tracing::debug; @@ -181,6 +180,10 @@ impl QueryContext { self.shared.get_affect() } + pub fn get_data_metrics(&self) -> StorageMetrics { + self.shared.get_data_metrics() + } + pub fn set_affect(self: &Arc, affect: QueryAffect) { self.shared.set_affect(affect) } @@ -295,10 +298,7 @@ impl TableContext for QueryContext { fn get_tenant(&self) -> String { self.shared.get_tenant() } - /// Get the data accessor metrics. - fn get_dal_metrics(&self) -> DalMetrics { - self.shared.dal_ctx.get_metrics().as_ref().clone() - } + /// Get the session running query. fn get_query_str(&self) -> String { self.shared.get_query_str() @@ -310,15 +310,7 @@ impl TableContext for QueryContext { // Get the storage data accessor operator from the session manager. fn get_data_operator(&self) -> Result { - let pop = self.shared.data_operator.clone(); - - Ok(DataOperator::new( - pop.operator().layer(self.shared.dal_ctx.as_ref().clone()), - pop.params().clone(), - )) - } - fn get_dal_context(&self) -> &DalContext { - self.shared.dal_ctx.as_ref() + Ok(self.shared.data_operator.clone()) } fn push_precommit_block(&self, block: DataBlock) { self.shared.push_precommit_block(block) diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 17359d6c61fe..872935444374 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -23,12 +23,12 @@ use std::time::SystemTime; use common_base::base::Progress; use common_base::base::Runtime; use common_config::Config; -use common_contexts::DalContext; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::UserInfo; use common_storage::DataOperator; +use common_storage::StorageMetrics; use common_storage::StorageParams; use parking_lot::Mutex; use parking_lot::RwLock; @@ -74,7 +74,6 @@ pub struct QueryContextShared { pub(in crate::sessions) http_query: Arc>>, pub(in crate::sessions) aborting: Arc, pub(in crate::sessions) tables_refs: Arc>>>, - pub(in crate::sessions) dal_ctx: Arc, pub(in crate::sessions) auth_manager: Arc, pub(in crate::sessions) affect: Arc>>, pub(in crate::sessions) catalog_manager: Arc, @@ -107,7 +106,6 @@ impl QueryContextShared { http_query: Arc::new(RwLock::new(None)), aborting: Arc::new(AtomicBool::new(false)), tables_refs: Arc::new(Mutex::new(HashMap::new())), - dal_ctx: Arc::new(Default::default()), auth_manager: AuthMgr::create(config).await?, affect: Arc::new(Mutex::new(None)), executor: Arc::new(RwLock::new(Weak::new())), @@ -168,6 +166,19 @@ impl QueryContextShared { self.data_operator.get_storage_params() } + /// Get all tables that already attached in this query. + pub fn get_tables_refs(&self) -> Vec> { + let tables = self.tables_refs.lock(); + tables.values().cloned().collect() + } + + pub fn get_data_metrics(&self) -> StorageMetrics { + let tables = self.get_tables_refs(); + let metrics: Vec> = + tables.iter().filter_map(|v| v.get_data_metrics()).collect(); + StorageMetrics::merge(&metrics) + } + pub fn get_tenant(&self) -> String { self.session.get_current_tenant() } diff --git a/src/query/service/src/sessions/session_info.rs b/src/query/service/src/sessions/session_info.rs index 4db8fe442af7..635f490185bc 100644 --- a/src/query/service/src/sessions/session_info.rs +++ b/src/query/service/src/sessions/session_info.rs @@ -17,7 +17,7 @@ use std::time::SystemTime; use common_base::base::ProgressValues; pub use common_catalog::table_context::ProcessInfo; -use common_contexts::DalMetrics; +use common_storage::StorageMetrics; use crate::sessions::Session; use crate::sessions::SessionContext; @@ -50,7 +50,7 @@ impl Session { client_address: status.get_client_host(), session_extra_info: self.process_extra_info(status), memory_usage, - dal_metrics: Self::query_dal_metrics(status), + data_metrics: Self::query_data_metrics(status), scan_progress_value: Self::query_scan_progress_value(status), mysql_connection_id: self.mysql_connection_id, created_time: Self::query_created_time(status), @@ -85,11 +85,11 @@ impl Session { .map(|context_shared| context_shared.get_query_str()) } - fn query_dal_metrics(status: &SessionContext) -> Option { + fn query_data_metrics(status: &SessionContext) -> Option { status .get_query_context_shared() .as_ref() - .map(|context_shared| context_shared.dal_ctx.get_metrics().as_ref().clone()) + .map(|context_shared| context_shared.get_data_metrics()) } fn query_scan_progress_value(status: &SessionContext) -> Option { diff --git a/src/query/service/tests/it/storages/memory.rs b/src/query/service/tests/it/storages/memory.rs index 888d9fe3cacc..cd06e3bcab71 100644 --- a/src/query/service/tests/it/storages/memory.rs +++ b/src/query/service/tests/it/storages/memory.rs @@ -34,19 +34,18 @@ async fn test_memorytable() -> Result<()> { DataField::new("a", u32::to_data_type()), DataField::new("b", u64::to_data_type()), ]); - let table = - MemoryTable::try_create(crate::tests::create_storage_context().await?, TableInfo { - desc: "'default'.'a'".into(), - name: "a".into(), - ident: Default::default(), - meta: TableMeta { - schema: schema.clone(), - engine: "Memory".to_string(), - options: TableOptions::default(), - ..Default::default() - }, + let table = MemoryTable::try_create(TableInfo { + desc: "'default'.'a'".into(), + name: "a".into(), + ident: Default::default(), + meta: TableMeta { + schema: schema.clone(), + engine: "Memory".to_string(), + options: TableOptions::default(), ..Default::default() - })?; + }, + ..Default::default() + })?; // append data. { diff --git a/src/query/service/tests/it/storages/null.rs b/src/query/service/tests/it/storages/null.rs index 13895e027c31..d822d64267ee 100644 --- a/src/query/service/tests/it/storages/null.rs +++ b/src/query/service/tests/it/storages/null.rs @@ -27,7 +27,7 @@ use futures::TryStreamExt; async fn test_null_table() -> Result<()> { let (_guard, ctx) = crate::tests::create_query_context().await?; - let table = NullTable::try_create(crate::tests::create_storage_context().await?, TableInfo { + let table = NullTable::try_create(TableInfo { desc: "'default'.'a'".into(), name: "a".into(), ident: Default::default(), diff --git a/src/query/service/tests/it/storages/testdata/system-tables.txt b/src/query/service/tests/it/storages/testdata/system-tables.txt index afd651ab959d..ebf2d36ba068 100644 --- a/src/query/service/tests/it/storages/testdata/system-tables.txt +++ b/src/query/service/tests/it/storages/testdata/system-tables.txt @@ -23,12 +23,12 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | created_on | system | tables_with_history | VARCHAR | | | false | | | creator | system | stages | VARCHAR | | | true | | | current_database | system | query_log | VARCHAR | | | false | | -| dal_metrics_read_bytes | system | processes | BIGINT UNSIGNED | | | true | | -| dal_metrics_write_bytes | system | processes | BIGINT UNSIGNED | | | true | | | data_compressed_size | system | tables | BIGINT UNSIGNED | | | true | | | data_compressed_size | system | tables_with_history | BIGINT UNSIGNED | | | true | | +| data_read_bytes | system | processes | BIGINT UNSIGNED | | | true | | | data_size | system | tables | BIGINT UNSIGNED | | | true | | | data_size | system | tables_with_history | BIGINT UNSIGNED | | | true | | +| data_write_bytes | system | processes | BIGINT UNSIGNED | | | true | | | database | system | clustering_history | VARCHAR | | | false | | | database | system | columns | VARCHAR | | | false | | | database | system | processes | VARCHAR | | | false | | diff --git a/src/query/service/tests/it/tests/context.rs b/src/query/service/tests/it/tests/context.rs index 78535682b28a..3c739ee87387 100644 --- a/src/query/service/tests/it/tests/context.rs +++ b/src/query/service/tests/it/tests/context.rs @@ -30,7 +30,6 @@ use databend_query::sessions::QueryContextShared; use databend_query::sessions::SessionManager; use databend_query::sessions::SessionType; use databend_query::sessions::TableContext; -use databend_query::storages::StorageContext; use crate::tests::sessions::TestGuard; use crate::tests::TestGlobalServices; @@ -100,10 +99,6 @@ pub async fn create_query_context_with_config( Ok((guard, dummy_query_context)) } -pub async fn create_storage_context() -> Result { - Ok(StorageContext {}) -} - #[allow(dead_code)] pub struct ClusterDescriptor { local_node_id: String, diff --git a/src/query/service/tests/it/tests/mod.rs b/src/query/service/tests/it/tests/mod.rs index 01498212d932..f3e2c65a868b 100644 --- a/src/query/service/tests/it/tests/mod.rs +++ b/src/query/service/tests/it/tests/mod.rs @@ -25,7 +25,6 @@ pub use context::create_query_context; pub use context::create_query_context_with_cluster; pub use context::create_query_context_with_config; pub use context::create_query_context_with_type; -pub use context::create_storage_context; pub use context::ClusterDescriptor; pub use sessions::TestGlobalServices; pub use sessions::TestGuard; diff --git a/src/query/sql/Cargo.toml b/src/query/sql/Cargo.toml index 6678cd1ff139..a013b7a89080 100644 --- a/src/query/sql/Cargo.toml +++ b/src/query/sql/Cargo.toml @@ -17,7 +17,6 @@ common-ast = { path = "../ast" } common-base = { path = "../../common/base" } common-catalog = { path = "../catalog" } common-config = { path = "../config" } -common-contexts = { path = "../../common/contexts" } common-datablocks = { path = "../datablocks" } common-datavalues = { path = "../datavalues" } common-exception = { path = "../../common/exception" } diff --git a/src/query/storages/context/Cargo.toml b/src/query/storages/context/Cargo.toml deleted file mode 100644 index b5572d90b58c..000000000000 --- a/src/query/storages/context/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "common-storages-context" -version = { workspace = true } -authors = { workspace = true } -license = { workspace = true } -publish = { workspace = true } -edition = { workspace = true } - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[lib] -doctest = false -test = false - -[dependencies] diff --git a/src/query/storages/context/src/context.rs b/src/query/storages/context/src/context.rs deleted file mode 100644 index 39c751efc266..000000000000 --- a/src/query/storages/context/src/context.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// Storage Context. -#[derive(Clone)] -pub struct StorageContext {} diff --git a/src/query/storages/context/src/lib.rs b/src/query/storages/context/src/lib.rs deleted file mode 100644 index a9a8f254e254..000000000000 --- a/src/query/storages/context/src/lib.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![deny(unused_crate_dependencies)] - -#[allow(clippy::module_inception)] -mod context; - -pub use context::StorageContext; diff --git a/src/query/storages/factory/Cargo.toml b/src/query/storages/factory/Cargo.toml index d632b4b0d3ac..b4cfc51897e1 100644 --- a/src/query/storages/factory/Cargo.toml +++ b/src/query/storages/factory/Cargo.toml @@ -27,7 +27,6 @@ common-pipeline-core = { path = "../../pipeline/core" } common-pipeline-sources = { path = "../../pipeline/sources" } common-pipeline-transforms = { path = "../../pipeline/transforms" } common-storage = { path = "../../../common/storage" } -common-storages-context = { path = "../context" } common-storages-fuse = { path = "../fuse" } common-storages-index = { path = "../index" } common-storages-preludes = { path = "../preludes" } diff --git a/src/query/storages/factory/src/lib.rs b/src/query/storages/factory/src/lib.rs index 4669909cb740..15f46b604118 100644 --- a/src/query/storages/factory/src/lib.rs +++ b/src/query/storages/factory/src/lib.rs @@ -24,7 +24,6 @@ pub mod system; pub use common_catalog::table::NavigationPoint; pub use common_catalog::table::TableStatistics; -pub use common_storages_context::StorageContext; pub use common_storages_preludes::information_schema; pub use common_storages_preludes::memory; pub use common_storages_preludes::null; diff --git a/src/query/storages/factory/src/stage/stage_table.rs b/src/query/storages/factory/src/stage/stage_table.rs index b57eedcf1a49..4cc5b4304779 100644 --- a/src/query/storages/factory/src/stage/stage_table.rs +++ b/src/query/storages/factory/src/stage/stage_table.rs @@ -40,6 +40,7 @@ use tracing::info; use super::stage_table_sink::StageTableSink; use crate::Table; +/// TODO: we need to track the data metrics in stage table. pub struct StageTable { table_info: StageTableInfo, // This is no used but a placeholder. diff --git a/src/query/storages/factory/src/stage/stage_table_sink.rs b/src/query/storages/factory/src/stage/stage_table_sink.rs index 882c8678b0f8..7c6c9146a28e 100644 --- a/src/query/storages/factory/src/stage/stage_table_sink.rs +++ b/src/query/storages/factory/src/stage/stage_table_sink.rs @@ -46,7 +46,6 @@ enum State { pub struct StageTableSink { state: State, input: Arc, - ctx: Arc, data_accessor: Operator, output: Option>, @@ -103,7 +102,6 @@ impl StageTableSink { let single = table_info.stage_info.copy_options.single; Ok(ProcessorPtr::create(Box::new(StageTableSink { - ctx, input, data_accessor, table_info, @@ -266,10 +264,11 @@ impl Processor for StageTableSink { match std::mem::replace(&mut self.state, State::None) { State::NeedWrite(bytes, remainng_block) => { let path = self.unload_path(); - self.ctx - .get_dal_context() - .get_metrics() - .inc_write_bytes(bytes.len()); + + // TODO(xuanwo): we used to update the data metrics here. + // + // But all data metrics will be moved to table, thus we can't + // update here, we need to address this. let object = self.data_accessor.object(&path); { || object.write(bytes.as_slice()) } diff --git a/src/query/storages/factory/src/storage_factory.rs b/src/query/storages/factory/src/storage_factory.rs index 77643f752012..a74c866f70a1 100644 --- a/src/query/storages/factory/src/storage_factory.rs +++ b/src/query/storages/factory/src/storage_factory.rs @@ -27,20 +27,19 @@ use crate::fuse::FuseTable; use crate::memory::MemoryTable; use crate::null::NullTable; use crate::view::ViewTable; -use crate::StorageContext; use crate::Table; pub trait StorageCreator: Send + Sync { - fn try_create(&self, ctx: StorageContext, table_info: TableInfo) -> Result>; + fn try_create(&self, table_info: TableInfo) -> Result>; } impl StorageCreator for T where - T: Fn(StorageContext, TableInfo) -> Result>, + T: Fn(TableInfo) -> Result>, T: Send + Sync, { - fn try_create(&self, ctx: StorageContext, table_info: TableInfo) -> Result> { - self(ctx, table_info) + fn try_create(&self, table_info: TableInfo) -> Result> { + self(table_info) } } @@ -109,14 +108,14 @@ impl StorageFactory { } } - pub fn get_table(&self, ctx: StorageContext, table_info: &TableInfo) -> Result> { + pub fn get_table(&self, table_info: &TableInfo) -> Result> { let engine = table_info.engine().to_uppercase(); let lock = self.storages.read(); let factory = lock.get(&engine).ok_or_else(|| { ErrorCode::UnknownTableEngine(format!("Unknown table engine {}", engine)) })?; - let table: Arc = factory.creator.try_create(ctx, table_info.clone())?.into(); + let table: Arc = factory.creator.try_create(table_info.clone())?.into(); Ok(table) } diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index e65512805049..6e0453a71dbe 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -33,7 +33,6 @@ common-sharing = { path = "../../sharing" } common-storage = { path = "../../../common/storage" } common-storages-cache = { path = "../cache" } common-storages-constants = { path = "../constants" } -common-storages-context = { path = "../context" } common-storages-index = { path = "../index" } async-trait = { version = "0.1.57", package = "async-trait-fn" } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 06dd8ebd41e6..245ed95828c3 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -45,7 +45,8 @@ use common_sharing::create_share_table_operator; use common_storage::init_operator; use common_storage::DataOperator; use common_storage::ShareTableConfig; -use common_storages_context::StorageContext; +use common_storage::StorageMetrics; +use common_storage::StorageMetricsLayer; use opendal::Operator; use uuid::Uuid; @@ -77,10 +78,11 @@ pub struct FuseTable { pub(crate) read_only: bool, pub(crate) operator: Operator, + pub(crate) data_metrics: Arc, } impl FuseTable { - pub fn try_create(_ctx: StorageContext, table_info: TableInfo) -> Result> { + pub fn try_create(table_info: TableInfo) -> Result> { let r = Self::do_create(table_info, false)?; Ok(r) } @@ -92,7 +94,7 @@ impl FuseTable { if let Some((_, order)) = &cluster_key_meta { cluster_keys = ExpressionParser::parse_exprs(order)?; } - let operator = match table_info.from_share { + let mut operator = match table_info.from_share { Some(ref from_share) => create_share_table_operator( ShareTableConfig::share_endpoint_address(), &table_info.tenant, @@ -111,6 +113,8 @@ impl FuseTable { } } }; + let data_metrics = Arc::new(StorageMetrics::default()); + operator = operator.layer(StorageMetricsLayer::new(data_metrics.clone())); Ok(Box::new(FuseTable { table_info, @@ -119,6 +123,7 @@ impl FuseTable { meta_location_generator: TableMetaLocationGenerator::with_prefix(storage_prefix), read_only, operator, + data_metrics, })) } @@ -258,6 +263,10 @@ impl Table for FuseTable { true } + fn get_data_metrics(&self) -> Option> { + Some(self.data_metrics.clone()) + } + async fn alter_table_cluster_keys( &self, ctx: Arc, diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index 2c28c27155a0..b80159098907 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -116,7 +116,7 @@ impl FuseTable { } let partitions_total = mutator.partitions_total(); - let (statistics, parts) = Self::read_partitions_with_metas( + let (statistics, parts) = self.read_partitions_with_metas( ctx.clone(), self.table_info.schema(), None, diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 198b5fb73393..3766fc5a1fed 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -166,6 +166,7 @@ impl FuseTable { } if !lazy_init_segments.is_empty() { + let table = self.clone(); let table_info = self.table_info.clone(); let push_downs = plan.push_downs.clone(); let query_ctx = ctx.clone(); @@ -173,6 +174,7 @@ impl FuseTable { // TODO: need refactor pipeline.set_on_init(move || { + let table = table.clone(); let table_info = table_info.clone(); let ctx = query_ctx.clone(); let dal = dal.clone(); @@ -180,15 +182,16 @@ impl FuseTable { let lazy_init_segments = lazy_init_segments.clone(); let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move { - let (_statistics, partitions) = FuseTable::prune_snapshot_blocks( - ctx, - dal, - push_downs, - table_info, - lazy_init_segments, - 0, - ) - .await?; + let (_statistics, partitions) = table + .prune_snapshot_blocks( + ctx, + dal, + push_downs, + table_info, + lazy_init_segments, + 0, + ) + .await?; Result::<_, ErrorCode>::Ok(partitions) })?; diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 08037877177a..6c52d3924b35 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -78,7 +78,7 @@ impl FuseTable { let table_info = self.table_info.clone(); let segments_location = snapshot.segments.clone(); let summary = snapshot.summary.block_count as usize; - Self::prune_snapshot_blocks( + self.prune_snapshot_blocks( ctx.clone(), self.operator.clone(), push_downs.clone(), @@ -94,6 +94,7 @@ impl FuseTable { #[tracing::instrument(level = "debug", name = "prune_snapshot_blocks", skip_all, fields(ctx.id = ctx.get_id().as_str()))] pub async fn prune_snapshot_blocks( + &self, ctx: Arc, dal: Operator, push_downs: Option, @@ -125,11 +126,12 @@ impl FuseTable { start.elapsed().as_secs() ); - Self::read_partitions_with_metas(ctx, table_info.schema(), push_downs, block_metas, summary) + self.read_partitions_with_metas(ctx, table_info.schema(), push_downs, block_metas, summary) } pub fn read_partitions_with_metas( - ctx: Arc, + &self, + _: Arc, schema: DataSchemaRef, push_downs: Option, block_metas: Vec, @@ -147,11 +149,9 @@ impl FuseTable { statistics.partitions_scanned = partitions_scanned; // Update context statistics. - ctx.get_dal_context() - .get_metrics() + self.data_metrics .inc_partitions_total(partitions_total as u64); - ctx.get_dal_context() - .get_metrics() + self.data_metrics .inc_partitions_scanned(partitions_scanned as u64); Ok((statistics, parts)) diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index f049b1ac5d23..d671b7048b78 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -111,7 +111,7 @@ impl FuseTable { } let partitions_total = mutator.partitions_total(); - let (statistics, parts) = Self::read_partitions_with_metas( + let (statistics, parts) = self.read_partitions_with_metas( ctx.clone(), self.table_info.schema(), None, diff --git a/src/query/storages/preludes/Cargo.toml b/src/query/storages/preludes/Cargo.toml index 50abb9435464..915f20a09ae3 100644 --- a/src/query/storages/preludes/Cargo.toml +++ b/src/query/storages/preludes/Cargo.toml @@ -14,7 +14,6 @@ test = false [dependencies] common-base = { path = "../../../common/base" } common-catalog = { path = "../../catalog" } -common-contexts = { path = "../../../common/contexts" } common-datablocks = { path = "../../datablocks" } common-datavalues = { path = "../../datavalues" } common-exception = { path = "../../../common/exception" } @@ -27,7 +26,7 @@ common-pipeline-core = { path = "../../pipeline/core" } common-pipeline-sinks = { path = "../../pipeline/sinks" } common-pipeline-sources = { path = "../../pipeline/sources" } common-pipeline-transforms = { path = "../../pipeline/transforms" } -common-storages-context = { path = "../context" } +common-storage = { path = "../../../common/storage" } common-users = { path = "../../users" } async-trait = { version = "0.1.57", package = "async-trait-fn" } diff --git a/src/query/storages/preludes/src/lib.rs b/src/query/storages/preludes/src/lib.rs index 0a48eaf46e60..1c1e427b8f2d 100644 --- a/src/query/storages/preludes/src/lib.rs +++ b/src/query/storages/preludes/src/lib.rs @@ -24,7 +24,6 @@ pub mod view; mod storages { pub use common_catalog::catalog::StorageDescription; pub use common_catalog::table::Table; - pub use common_storages_context::StorageContext; pub use super::memory; pub use super::system; diff --git a/src/query/storages/preludes/src/memory/memory_table.rs b/src/query/storages/preludes/src/memory/memory_table.rs index a097423bff2e..152396ff0959 100644 --- a/src/query/storages/preludes/src/memory/memory_table.rs +++ b/src/query/storages/preludes/src/memory/memory_table.rs @@ -32,6 +32,7 @@ use common_legacy_planners::Projection; use common_legacy_planners::ReadDataSourcePlan; use common_legacy_planners::Statistics; use common_meta_app::schema::TableInfo; +use common_storage::StorageMetrics; use once_cell::sync::Lazy; use parking_lot::Mutex; use parking_lot::RwLock; @@ -44,7 +45,6 @@ use crate::pipelines::processors::SyncSourcer; use crate::pipelines::Pipeline; use crate::sessions::TableContext; use crate::storages::memory::memory_part::MemoryPartInfo; -use crate::storages::StorageContext; use crate::storages::StorageDescription; use crate::storages::Table; @@ -54,10 +54,12 @@ static IN_MEMORY_DATA: Lazy>>> = pub struct MemoryTable { table_info: TableInfo, blocks: Arc>>, + + data_metrics: Arc, } impl MemoryTable { - pub fn try_create(_: StorageContext, table_info: TableInfo) -> Result> { + pub fn try_create(table_info: TableInfo) -> Result> { let table_id = &table_info.ident.table_id; let blocks = { let mut in_mem_data = IN_MEMORY_DATA.write(); @@ -72,7 +74,11 @@ impl MemoryTable { } }; - let table = Self { table_info, blocks }; + let table = Self { + table_info, + blocks, + data_metrics: Arc::new(StorageMetrics::default()), + }; Ok(Box::new(table)) } @@ -135,6 +141,10 @@ impl Table for MemoryTable { true } + fn get_data_metrics(&self) -> Option> { + Some(self.data_metrics.clone()) + } + async fn read_partitions( &self, ctx: Arc, @@ -223,15 +233,13 @@ impl Table for MemoryTable { async fn commit_insertion( &self, - ctx: Arc, + _: Arc, operations: Vec, overwrite: bool, ) -> Result<()> { let written_bytes: usize = operations.iter().map(|b| b.memory_size()).sum(); - ctx.get_dal_context() - .get_metrics() - .inc_write_bytes(written_bytes); + self.data_metrics.inc_write_bytes(written_bytes); if overwrite { let mut blocks = self.blocks.write(); diff --git a/src/query/storages/preludes/src/null/null_table.rs b/src/query/storages/preludes/src/null/null_table.rs index 81010e618acf..7f03f5bb9ae2 100644 --- a/src/query/storages/preludes/src/null/null_table.rs +++ b/src/query/storages/preludes/src/null/null_table.rs @@ -32,7 +32,6 @@ use crate::pipelines::processors::SyncSourcer; use crate::pipelines::Pipe; use crate::pipelines::Pipeline; use crate::sessions::TableContext; -use crate::storages::StorageContext; use crate::storages::StorageDescription; use crate::storages::Table; @@ -41,7 +40,7 @@ pub struct NullTable { } impl NullTable { - pub fn try_create(_ctx: StorageContext, table_info: TableInfo) -> Result> { + pub fn try_create(table_info: TableInfo) -> Result> { Ok(Box::new(Self { table_info })) } diff --git a/src/query/storages/preludes/src/random/random_table.rs b/src/query/storages/preludes/src/random/random_table.rs index 1be42e7dc351..f348dd1d0e6a 100644 --- a/src/query/storages/preludes/src/random/random_table.rs +++ b/src/query/storages/preludes/src/random/random_table.rs @@ -34,7 +34,6 @@ use crate::pipelines::processors::SyncSourcer; use crate::pipelines::Pipeline; use crate::pipelines::SourcePipeBuilder; use crate::sessions::TableContext; -use crate::storages::StorageContext; use crate::storages::StorageDescription; use crate::storages::Table; @@ -43,7 +42,7 @@ pub struct RandomTable { } impl RandomTable { - pub fn try_create(_ctx: StorageContext, table_info: TableInfo) -> Result> { + pub fn try_create(table_info: TableInfo) -> Result> { Ok(Box::new(Self { table_info })) } diff --git a/src/query/storages/preludes/src/system/processes_table.rs b/src/query/storages/preludes/src/system/processes_table.rs index 7a68de311240..7b1f018ce156 100644 --- a/src/query/storages/preludes/src/system/processes_table.rs +++ b/src/query/storages/preludes/src/system/processes_table.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use std::time::Duration; use common_base::base::ProgressValues; -use common_contexts::DalMetrics; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::Result; @@ -25,6 +24,7 @@ use common_meta_app::schema::TableIdent; use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableMeta; use common_meta_types::UserInfo; +use common_storage::StorageMetrics; use crate::sessions::TableContext; use crate::storages::Table; @@ -54,8 +54,8 @@ impl SyncSystemTable for ProcessesTable { let mut processes_database = Vec::with_capacity(processes_info.len()); let mut processes_extra_info = Vec::with_capacity(processes_info.len()); let mut processes_memory_usage = Vec::with_capacity(processes_info.len()); - let mut processes_dal_metrics_read_bytes = Vec::with_capacity(processes_info.len()); - let mut processes_dal_metrics_write_bytes = Vec::with_capacity(processes_info.len()); + let mut processes_data_read_bytes = Vec::with_capacity(processes_info.len()); + let mut processes_data_write_bytes = Vec::with_capacity(processes_info.len()); let mut processes_scan_progress_read_rows = Vec::with_capacity(processes_info.len()); let mut processes_scan_progress_read_bytes = Vec::with_capacity(processes_info.len()); let mut processes_mysql_connection_id = Vec::with_capacity(processes_info.len()); @@ -72,10 +72,10 @@ impl SyncSystemTable for ProcessesTable { &process_info.session_extra_info, )); processes_memory_usage.push(process_info.memory_usage); - let (dal_metrics_read_bytes, dal_metrics_write_bytes) = - ProcessesTable::process_dal_metrics(&process_info.dal_metrics); - processes_dal_metrics_read_bytes.push(dal_metrics_read_bytes); - processes_dal_metrics_write_bytes.push(dal_metrics_write_bytes); + let (data_read_bytes, data_write_bytes) = + ProcessesTable::process_data_metrics(&process_info.data_metrics); + processes_data_read_bytes.push(data_read_bytes); + processes_data_write_bytes.push(data_write_bytes); let (scan_progress_read_rows, scan_progress_read_bytes) = ProcessesTable::process_scan_progress_values(&process_info.scan_progress_value); processes_scan_progress_read_rows.push(scan_progress_read_rows); @@ -99,8 +99,8 @@ impl SyncSystemTable for ProcessesTable { Series::from_data(processes_database), Series::from_data(processes_extra_info), Series::from_data(processes_memory_usage), - Series::from_data(processes_dal_metrics_read_bytes), - Series::from_data(processes_dal_metrics_write_bytes), + Series::from_data(processes_data_read_bytes), + Series::from_data(processes_data_write_bytes), Series::from_data(processes_scan_progress_read_rows), Series::from_data(processes_scan_progress_read_bytes), Series::from_data(processes_mysql_connection_id), @@ -120,8 +120,8 @@ impl ProcessesTable { DataField::new("database", Vu8::to_data_type()), DataField::new_nullable("extra_info", Vu8::to_data_type()), DataField::new("memory_usage", i64::to_data_type()), - DataField::new_nullable("dal_metrics_read_bytes", u64::to_data_type()), - DataField::new_nullable("dal_metrics_write_bytes", u64::to_data_type()), + DataField::new_nullable("data_read_bytes", u64::to_data_type()), + DataField::new_nullable("data_write_bytes", u64::to_data_type()), DataField::new_nullable("scan_progress_read_rows", u64::to_data_type()), DataField::new_nullable("scan_progress_read_bytes", u64::to_data_type()), DataField::new_nullable("mysql_connection_id", u32::to_data_type()), @@ -156,16 +156,16 @@ impl ProcessesTable { session_extra_info.clone().map(|s| s.into_bytes()) } - fn process_dal_metrics(dal_metrics_opt: &Option) -> (Option, Option) { - if dal_metrics_opt.is_some() { - let dal_metrics = dal_metrics_opt.as_ref().unwrap(); - ( - Some(dal_metrics.get_read_bytes() as u64), - Some(dal_metrics.get_write_bytes() as u64), - ) - } else { - (None, None) - } + fn process_data_metrics(metrics: &Option) -> (Option, Option) { + metrics + .as_ref() + .map(|v| { + ( + Some(v.get_read_bytes() as u64), + Some(v.get_write_bytes() as u64), + ) + }) + .unwrap_or_default() } fn process_scan_progress_values( diff --git a/src/query/storages/preludes/src/view/view_table.rs b/src/query/storages/preludes/src/view/view_table.rs index 9e42eab5d5c1..1318a16180cb 100644 --- a/src/query/storages/preludes/src/view/view_table.rs +++ b/src/query/storages/preludes/src/view/view_table.rs @@ -19,7 +19,6 @@ use common_exception::ErrorCode; use common_exception::Result; use common_meta_app::schema::TableInfo; -use crate::storages::StorageContext; use crate::storages::StorageDescription; use crate::storages::Table; @@ -32,7 +31,7 @@ pub const VIEW_ENGINE: &str = "VIEW"; pub const QUERY: &str = "query"; impl ViewTable { - pub fn try_create(_ctx: StorageContext, table_info: TableInfo) -> Result> { + pub fn try_create(table_info: TableInfo) -> Result> { let query = table_info.options().get(QUERY).cloned(); if let Some(query) = query { Ok(Box::new(ViewTable { query, table_info }))