diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 5b335fd34..8b6bb8407 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -18,6 +18,7 @@ scylla = { path = "../scylla", features = [ "num-bigint-03", "num-bigint-04", "bigdecimal-04", + "metrics", ] } tokio = { version = "1.34", features = ["full"] } tracing = { version = "0.1.25", features = ["log"] } diff --git a/scylla/Cargo.toml b/scylla/Cargo.toml index 92b7dfafb..6580430ad 100644 --- a/scylla/Cargo.toml +++ b/scylla/Cargo.toml @@ -39,6 +39,7 @@ full-serialization = [ "num-bigint-04", "bigdecimal-04", ] +metrics = ["dep:histogram"] [dependencies] scylla-macros = { version = "0.7.0", path = "../scylla-macros" } @@ -47,7 +48,7 @@ byteorder = "1.3.4" bytes = "1.0.1" futures = "0.3.6" hashbrown = "0.14" -histogram = "0.11.1" +histogram = { version = "0.11.1", optional = true } tokio = { version = "1.34", features = [ "net", "time", diff --git a/scylla/src/lib.rs b/scylla/src/lib.rs index f03edf7a9..3d46c1a81 100644 --- a/scylla/src/lib.rs +++ b/scylla/src/lib.rs @@ -286,4 +286,5 @@ pub use transport::load_balancing; pub use transport::retry_policy; pub use transport::speculative_execution; +#[cfg(feature = "metrics")] pub use transport::metrics::{Metrics, MetricsError}; diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 8d7c07be3..cd3874b35 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -37,6 +37,7 @@ use crate::transport::cluster::ClusterData; use crate::transport::connection::{Connection, NonErrorQueryResponse, QueryResponse}; use crate::transport::errors::{ProtocolError, QueryError, UserRequestError}; use crate::transport::load_balancing::{self, RoutingInfo}; +#[cfg(feature = "metrics")] use crate::transport::metrics::Metrics; use crate::transport::retry_policy::{QueryInfo, RetryDecision, RetrySession}; use crate::transport::NodeRef; @@ -67,6 +68,7 @@ pub(crate) struct PreparedIteratorConfig { pub(crate) values: SerializedValues, pub(crate) execution_profile: Arc, pub(crate) cluster_data: Arc, + #[cfg(feature = "metrics")] pub(crate) metrics: Arc, } @@ -142,6 +144,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> { query_consistency: Consistency, retry_session: Box, execution_profile: Arc, + #[cfg(feature = "metrics")] metrics: Arc, paging_state: PagingState, @@ -236,11 +239,13 @@ where self.log_attempt_error(&last_error, &retry_decision); match retry_decision { RetryDecision::RetrySameNode(cl) => { + #[cfg(feature = "metrics")] self.metrics.inc_retries_num(); current_consistency = cl.unwrap_or(current_consistency); continue 'same_node_retries; } RetryDecision::RetryNextNode(cl) => { + #[cfg(feature = "metrics")] self.metrics.inc_retries_num(); current_consistency = cl.unwrap_or(current_consistency); continue 'nodes_in_plan; @@ -298,6 +303,7 @@ where node: NodeRef<'_>, request_span: &RequestSpan, ) -> Result, QueryError> { + #[cfg(feature = "metrics")] self.metrics.inc_total_paged_queries(); let query_start = std::time::Instant::now(); @@ -323,6 +329,7 @@ where tracing_id, .. }) => { + #[cfg(feature = "metrics")] let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); self.log_attempt_success(); self.log_query_success(); @@ -359,6 +366,7 @@ where } Err(err) => { let err = err.into(); + #[cfg(feature = "metrics")] self.metrics.inc_failed_paged_queries(); self.execution_profile .load_balancing_policy @@ -378,6 +386,7 @@ where Ok(ControlFlow::Break(proof)) } Ok(response) => { + #[cfg(feature = "metrics")] self.metrics.inc_failed_paged_queries(); let err = ProtocolError::UnexpectedResponse(response.response.to_response_kind()).into(); @@ -680,7 +689,7 @@ impl QueryPager { query: Query, execution_profile: Arc, cluster_data: Arc, - metrics: Arc, + #[cfg(feature = "metrics")] metrics: Arc, ) -> Result { let (sender, receiver) = mpsc::channel(1); @@ -743,6 +752,7 @@ impl QueryPager { query_consistency: consistency, retry_session, execution_profile, + #[cfg(feature = "metrics")] metrics, paging_state: PagingState::start(), history_listener: query.config.history_listener.clone(), @@ -861,6 +871,7 @@ impl QueryPager { query_consistency: consistency, retry_session, execution_profile: config.execution_profile, + #[cfg(feature = "metrics")] metrics: config.metrics, paging_state: PagingState::start(), history_listener: config.prepared.config.history_listener.clone(), diff --git a/scylla/src/transport/mod.rs b/scylla/src/transport/mod.rs index 544e5644a..d62e93c10 100644 --- a/scylla/src/transport/mod.rs +++ b/scylla/src/transport/mod.rs @@ -11,6 +11,7 @@ pub mod iterator; pub mod legacy_query_result; pub mod load_balancing; pub mod locator; +#[cfg(feature = "metrics")] pub(crate) mod metrics; mod node; pub mod partitioner; diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index b3efa7e07..5222f808c 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -68,6 +68,7 @@ use crate::transport::host_filter::HostFilter; #[allow(deprecated)] use crate::transport::iterator::{LegacyRowIterator, PreparedIteratorConfig}; use crate::transport::load_balancing::{self, RoutingInfo}; +#[cfg(feature = "metrics")] use crate::transport::metrics::Metrics; use crate::transport::node::Node; use crate::transport::query_result::QueryResult; @@ -190,6 +191,7 @@ where cluster: Cluster, default_execution_profile_handle: ExecutionProfileHandle, schema_agreement_interval: Duration, + #[cfg(feature = "metrics")] metrics: Arc, schema_agreement_timeout: Duration, schema_agreement_automatic_waiting: bool, @@ -215,6 +217,7 @@ impl std::fmt::Debug for GenericSession where DeserApi: DeserializationApiKind, { + #[cfg(feature = "metrics")] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Session") .field("cluster", &ClusterNeatDebug(&self.cluster)) @@ -230,6 +233,22 @@ where ) .finish() } + + #[cfg(not(feature = "metrics"))] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Session") + .field("cluster", &ClusterNeatDebug(&self.cluster)) + .field( + "default_execution_profile_handle", + &self.default_execution_profile_handle, + ) + .field("schema_agreement_interval", &self.schema_agreement_interval) + .field( + "auto_await_schema_agreement_timeout", + &self.schema_agreement_timeout, + ) + .finish() + } } /// Configuration options for [`Session`]. @@ -883,6 +902,7 @@ impl GenericSession { LegacySession { cluster: self.cluster.clone(), default_execution_profile_handle: self.default_execution_profile_handle.clone(), + #[cfg(feature = "metrics")] metrics: self.metrics.clone(), refresh_metadata_on_auto_schema_agreement: self .refresh_metadata_on_auto_schema_agreement, @@ -993,6 +1013,7 @@ impl GenericSession { Session { cluster: self.cluster.clone(), default_execution_profile_handle: self.default_execution_profile_handle.clone(), + #[cfg(feature = "metrics")] metrics: self.metrics.clone(), refresh_metadata_on_auto_schema_agreement: self .refresh_metadata_on_auto_schema_agreement, @@ -1114,6 +1135,7 @@ where cluster, default_execution_profile_handle, schema_agreement_interval: config.schema_agreement_interval, + #[cfg(feature = "metrics")] metrics: Arc::new(Metrics::new()), schema_agreement_timeout: config.schema_agreement_timeout, schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting, @@ -1324,6 +1346,7 @@ where query, execution_profile, self.cluster.get_data(), + #[cfg(feature = "metrics")] self.metrics.clone(), ) .await @@ -1338,6 +1361,7 @@ where values, execution_profile, cluster_data: self.cluster.get_data(), + #[cfg(feature = "metrics")] metrics: self.metrics.clone(), }) .await @@ -1592,6 +1616,7 @@ where values: serialized_values, execution_profile, cluster_data: self.cluster.get_data(), + #[cfg(feature = "metrics")] metrics: self.metrics.clone(), }) .await @@ -1802,6 +1827,7 @@ where /// Access metrics collected by the driver\ /// Driver collects various metrics like number of queries or query latencies. /// They can be read using this method + #[cfg(feature = "metrics")] pub fn get_metrics(&self) -> Arc { self.metrics.clone() } @@ -2020,6 +2046,7 @@ where }; let context = speculative_execution::Context { + #[cfg(feature = "metrics")] metrics: self.metrics.clone(), }; @@ -2119,6 +2146,7 @@ where }; context.request_span.record_shard_id(&connection); + #[cfg(feature = "metrics")] self.metrics.inc_total_nonpaged_queries(); let query_start = std::time::Instant::now(); @@ -2138,6 +2166,7 @@ where last_error = match query_result { Ok(response) => { trace!(parent: &span, "Query succeeded"); + #[cfg(feature = "metrics")] let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); context.log_attempt_success(&attempt_id); execution_profile.load_balancing_policy.on_query_success( @@ -2153,6 +2182,7 @@ where last_error = %e, "Query failed" ); + #[cfg(feature = "metrics")] self.metrics.inc_failed_nonpaged_queries(); execution_profile.load_balancing_policy.on_query_failure( context.query_info, @@ -2182,11 +2212,13 @@ where context.log_attempt_error(&attempt_id, the_error, &retry_decision); match retry_decision { RetryDecision::RetrySameNode(new_cl) => { + #[cfg(feature = "metrics")] self.metrics.inc_retries_num(); current_consistency = new_cl.unwrap_or(current_consistency); continue 'same_node_retries; } RetryDecision::RetryNextNode(new_cl) => { + #[cfg(feature = "metrics")] self.metrics.inc_retries_num(); current_consistency = new_cl.unwrap_or(current_consistency); continue 'nodes_in_plan; diff --git a/scylla/src/transport/speculative_execution.rs b/scylla/src/transport/speculative_execution.rs index 60344d0a0..e2168c5b5 100644 --- a/scylla/src/transport/speculative_execution.rs +++ b/scylla/src/transport/speculative_execution.rs @@ -3,15 +3,19 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, }; use scylla_cql::frame::response::error::DbError; -use std::{future::Future, sync::Arc, time::Duration}; +#[cfg(feature = "metrics")] +use std::sync::Arc; +use std::{future::Future, time::Duration}; use tracing::{trace_span, warn, Instrument}; use crate::transport::errors::QueryError; +#[cfg(feature = "metrics")] use super::metrics::Metrics; /// Context is passed as an argument to `SpeculativeExecutionPolicy` methods pub struct Context { + #[cfg(feature = "metrics")] pub metrics: Arc, } @@ -66,6 +70,7 @@ impl SpeculativeExecutionPolicy for PercentileSpeculativeExecutionPolicy { self.max_retry_count } + #[cfg(feature = "metrics")] fn retry_interval(&self, context: &Context) -> Duration { let interval = context.metrics.get_latency_percentile_ms(self.percentile); let ms = match interval { @@ -80,6 +85,12 @@ impl SpeculativeExecutionPolicy for PercentileSpeculativeExecutionPolicy { }; Duration::from_millis(ms) } + + #[cfg(not(feature = "metrics"))] + fn retry_interval(&self, _: &Context) -> Duration { + warn!("PercentileSpeculativeExecutionPolicy requires the 'metrics' feature to work as intended, defaulting to 100 ms"); + Duration::from_millis(100) + } } /// Checks if a result created in a speculative execution branch can be ignored.