From 56d847b32afbcbb8681279ce178e5b0fdef79b8e Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 7 Aug 2024 12:28:46 +0200 Subject: [PATCH] ref(metrics): Move max aggregator size into config and allow passing now timestamp for merge --- relay-config/src/aggregator.rs | 10 -- relay-config/src/config.rs | 3 + relay-metrics/benches/benches.rs | 8 +- relay-metrics/src/aggregator.rs | 110 +++++++++++------- .../src/services/metrics/aggregator.rs | 22 +--- 5 files changed, 76 insertions(+), 77 deletions(-) diff --git a/relay-config/src/aggregator.rs b/relay-config/src/aggregator.rs index 950d3ce74a..80d34e0121 100644 --- a/relay-config/src/aggregator.rs +++ b/relay-config/src/aggregator.rs @@ -12,15 +12,6 @@ pub struct AggregatorServiceConfig { #[serde(flatten)] pub aggregator: AggregatorConfig, - /// Maximum amount of bytes used for metrics aggregation. - /// - /// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory. - /// This is only an approximation and does not take into account things such as pre-allocation - /// in hashmaps. - /// - /// Defaults to `None`, i.e. no limit. - pub max_total_bucket_bytes: Option, - /// The approximate maximum number of bytes submitted within one flush cycle. /// /// This controls how big flushed batches of buckets get, depending on the number of buckets, @@ -40,7 +31,6 @@ impl Default for AggregatorServiceConfig { fn default() -> Self { Self { aggregator: AggregatorConfig::default(), - max_total_bucket_bytes: None, max_flush_bytes: 5_000_000, // 5 MB flush_interval_ms: 100, // 100 milliseconds } diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 7ab4a77643..88d9269398 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -2470,6 +2470,7 @@ impl Config { mut max_tag_key_length, mut max_tag_value_length, mut max_project_key_bucket_bytes, + mut max_total_bucket_bytes, .. } = self.default_aggregator_config().aggregator; @@ -2484,6 +2485,7 @@ impl Config { max_tag_value_length = max_tag_value_length.max(agg.max_tag_value_length); max_project_key_bucket_bytes = max_project_key_bucket_bytes.max(agg.max_project_key_bucket_bytes); + max_total_bucket_bytes = max_total_bucket_bytes.max(agg.max_total_bucket_bytes); } for agg in self @@ -2505,6 +2507,7 @@ impl Config { max_tag_key_length, max_tag_value_length, max_project_key_bucket_bytes, + max_total_bucket_bytes, initial_delay: 30, flush_partitions: None, flush_batching: FlushBatching::Project, diff --git a/relay-metrics/benches/benches.rs b/relay-metrics/benches/benches.rs index ee620a9f0e..31fea30ef9 100644 --- a/relay-metrics/benches/benches.rs +++ b/relay-metrics/benches/benches.rs @@ -196,11 +196,7 @@ fn bench_insert_and_flush(c: &mut Criterion) { #[allow(clippy::unit_arg)] black_box( aggregator - .merge( - black_box(project_key), - black_box(bucket), - black_box(None), - ) + .merge(black_box(project_key), black_box(bucket)) .unwrap(), ); } @@ -221,7 +217,7 @@ fn bench_insert_and_flush(c: &mut Criterion) { let timestamp = UnixTimestamp::now(); let mut aggregator: Aggregator = Aggregator::new(config.clone()); for (project_key, bucket) in input.get_buckets(timestamp) { - aggregator.merge(project_key, bucket, None).unwrap(); + aggregator.merge(project_key, bucket).unwrap(); } aggregator }, diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index b0aab5fec4..c27b9114f5 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -182,12 +182,21 @@ pub struct AggregatorConfig { /// Maximum amount of bytes used for metrics aggregation per project key. /// - /// Similar measuring technique to `max_total_bucket_bytes`, but instead of a + /// Similar measuring technique to [`Self::max_total_bucket_bytes`], but instead of a /// global/process-wide limit, it is enforced per project key. /// /// Defaults to `None`, i.e. no limit. pub max_project_key_bucket_bytes: Option, + /// Maximum amount of bytes used for metrics aggregation. + /// + /// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory. + /// This is only an approximation and does not take into account things such as pre-allocation + /// in hashmaps. + /// + /// Defaults to `None`, i.e. no limit. + pub max_total_bucket_bytes: Option, + /// The number of logical partitions that can receive flushed buckets. /// /// If set, buckets are partitioned by (bucket key % flush_partitions), and routed @@ -279,6 +288,7 @@ impl Default for AggregatorConfig { max_tag_key_length: 200, max_tag_value_length: 200, max_project_key_bucket_bytes: None, + max_total_bucket_bytes: None, flush_batching: FlushBatching::default(), flush_partitions: None, } @@ -458,12 +468,11 @@ impl fmt::Debug for CostTracker { /// All buckets are flushed after a grace period of `initial_delay`. fn get_flush_time( config: &AggregatorConfig, + now: UnixTimestamp, reference_time: Instant, bucket_key: &BucketKey, ) -> Instant { let initial_flush = bucket_key.timestamp + config.bucket_interval() + config.initial_delay(); - - let now = UnixTimestamp::now(); let backdated = initial_flush <= now; let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64; @@ -557,9 +566,10 @@ impl Aggregator { self.buckets.len() } - /// Returns `true` if the cost trackers value is larger than the given max cost. - pub fn totals_cost_exceeded(&self, max_total_cost: Option) -> bool { - self.cost_tracker.totals_cost_exceeded(max_total_cost) + /// Returns `true` if the cost trackers value is larger than the configured max cost. + pub fn totals_cost_exceeded(&self) -> bool { + self.cost_tracker + .totals_cost_exceeded(self.config.max_total_bucket_bytes) } /// Converts this aggregator into a vector of [`Bucket`]. @@ -678,13 +688,14 @@ impl Aggregator { /// Logs a statsd metric for invalid timestamps. fn get_bucket_timestamp( &self, + now: UnixTimestamp, timestamp: UnixTimestamp, bucket_width: u64, ) -> Result { let bucket_ts = self.config.get_bucket_timestamp(timestamp, bucket_width); if !self.config.timestamp_range().contains(&bucket_ts) { - let delta = (bucket_ts.as_secs() as i64) - (UnixTimestamp::now().as_secs() as i64); + let delta = (bucket_ts.as_secs() as i64) - (now.as_secs() as i64); relay_statsd::metric!( histogram(MetricHistograms::InvalidBucketTimestamp) = delta as f64, aggregator = &self.name, @@ -696,16 +707,31 @@ impl Aggregator { Ok(bucket_ts) } - /// Merge a preaggregated bucket into this aggregator. + /// Merge a bucket into this aggregator. /// - /// If no bucket exists for the given bucket key, a new bucket will be created. + /// Like [`Self::merge_with_options`] with defaults. pub fn merge( + &mut self, + project_key: ProjectKey, + bucket: Bucket, + ) -> Result<(), AggregateMetricsError> { + self.merge_with_options(project_key, bucket, UnixTimestamp::now()) + } + + /// Merge a bucket into this aggregator. + /// + /// If no bucket exists for the given bucket key, a new bucket will be created. + /// + /// Arguments: + /// - `now`: The current timestamp, when merging a large batch of buckets, the timestamp + /// should be created outside of the loop. + pub fn merge_with_options( &mut self, project_key: ProjectKey, mut bucket: Bucket, - max_total_bucket_bytes: Option, + now: UnixTimestamp, ) -> Result<(), AggregateMetricsError> { - let timestamp = self.get_bucket_timestamp(bucket.timestamp, bucket.width)?; + let timestamp = self.get_bucket_timestamp(now, bucket.timestamp, bucket.width)?; let key = BucketKey { project_key, timestamp, @@ -744,7 +770,7 @@ impl Aggregator { // whether it is just a counter, etc. self.cost_tracker.check_limits_exceeded( project_key, - max_total_bucket_bytes, + self.config.max_total_bucket_bytes, self.config.max_project_key_bucket_bytes, )?; @@ -784,7 +810,7 @@ impl Aggregator { namespace = key.namespace().as_str(), ); - let flush_at = get_flush_time(&self.config, self.reference_time, &key); + let flush_at = get_flush_time(&self.config, now, self.reference_time, &key); let value = bucket.value; added_cost = key.cost() + value.cost(); @@ -909,6 +935,7 @@ mod tests { max_tag_key_length: 200, max_tag_value_length: 200, max_project_key_bucket_bytes: None, + max_total_bucket_bytes: None, flush_batching: FlushBatching::default(), flush_partitions: None, } @@ -936,8 +963,8 @@ mod tests { let mut bucket2 = bucket1.clone(); bucket2.value = BucketValue::counter(43.into()); - aggregator.merge(project_key, bucket1, None).unwrap(); - aggregator.merge(project_key, bucket2, None).unwrap(); + aggregator.merge(project_key, bucket1).unwrap(); + aggregator.merge(project_key, bucket2).unwrap(); let buckets: Vec<_> = aggregator .buckets @@ -1031,9 +1058,10 @@ mod tests { let mut bucket3 = bucket1.clone(); bucket3.timestamp = UnixTimestamp::from_secs(999994721); - aggregator.merge(project_key, bucket1, None).unwrap(); - aggregator.merge(project_key, bucket2, None).unwrap(); - aggregator.merge(project_key, bucket3, None).unwrap(); + + aggregator.merge(project_key, bucket1).unwrap(); + aggregator.merge(project_key, bucket2).unwrap(); + aggregator.merge(project_key, bucket3).unwrap(); let mut buckets: Vec<_> = aggregator .buckets @@ -1089,12 +1117,8 @@ mod tests { let mut aggregator: Aggregator = Aggregator::new(config); // It's OK to have same metric with different projects: - aggregator - .merge(project_key1, some_bucket(None), None) - .unwrap(); - aggregator - .merge(project_key2, some_bucket(None), None) - .unwrap(); + aggregator.merge(project_key1, some_bucket(None)).unwrap(); + aggregator.merge(project_key2, some_bucket(None)).unwrap(); assert_eq!(aggregator.buckets.len(), 2); } @@ -1267,7 +1291,7 @@ mod tests { bucket.name = metric_name.into(); let current_cost = aggregator.cost_tracker.total_cost; - aggregator.merge(project_key, bucket, None).unwrap(); + aggregator.merge(project_key, bucket).unwrap(); let total_cost = aggregator.cost_tracker.total_cost; assert_eq!(total_cost, current_cost + expected_added_cost); } @@ -1300,7 +1324,7 @@ mod tests { metadata: BucketMetadata::new(timestamp), }; - aggregator.merge(project_key, bucket, None).unwrap(); + aggregator.merge(project_key, bucket).unwrap(); } } @@ -1366,7 +1390,7 @@ mod tests { assert!(matches!( aggregator - .get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2) + .get_bucket_timestamp(UnixTimestamp::now(), UnixTimestamp::from_secs(u64::MAX), 2) .unwrap_err(), AggregateMetricsError::InvalidTimestamp(_) )); @@ -1548,15 +1572,16 @@ mod tests { metadata: BucketMetadata::new(timestamp), }; - let mut aggregator: Aggregator = Aggregator::new(test_config()); + let mut aggregator = Aggregator::new(AggregatorConfig { + max_total_bucket_bytes: Some(1), + ..test_config() + }); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); - aggregator - .merge(project_key, bucket.clone(), Some(1)) - .unwrap(); + aggregator.merge(project_key, bucket.clone()).unwrap(); assert_eq!( - aggregator.merge(project_key, bucket, Some(1)).unwrap_err(), + aggregator.merge(project_key, bucket).unwrap_err(), AggregateMetricsError::TotalLimitExceeded ); } @@ -1580,9 +1605,9 @@ mod tests { let mut aggregator: Aggregator = Aggregator::new(config); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); - aggregator.merge(project_key, bucket.clone(), None).unwrap(); + aggregator.merge(project_key, bucket.clone()).unwrap(); assert_eq!( - aggregator.merge(project_key, bucket, None).unwrap_err(), + aggregator.merge(project_key, bucket).unwrap_err(), AggregateMetricsError::ProjectLimitExceeded ); } @@ -1614,15 +1639,9 @@ mod tests { .metadata .merge(BucketMetadata::new(UnixTimestamp::from_secs(999999811))); - aggregator - .merge(project_key, bucket1.clone(), None) - .unwrap(); - aggregator - .merge(project_key, bucket2.clone(), None) - .unwrap(); - aggregator - .merge(project_key, bucket3.clone(), None) - .unwrap(); + aggregator.merge(project_key, bucket1.clone()).unwrap(); + aggregator.merge(project_key, bucket2.clone()).unwrap(); + aggregator.merge(project_key, bucket3.clone()).unwrap(); let buckets_metadata: Vec<_> = aggregator .buckets @@ -1675,8 +1694,9 @@ mod tests { extracted_from_indexed: false, }; - let flush_time_1 = get_flush_time(&config, reference_time, &bucket_key_1); - let flush_time_2 = get_flush_time(&config, reference_time, &bucket_key_2); + let now = UnixTimestamp::now(); + let flush_time_1 = get_flush_time(&config, now, reference_time, &bucket_key_1); + let flush_time_2 = get_flush_time(&config, now, reference_time, &bucket_key_2); assert_eq!(flush_time_1, flush_time_2); } diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 320c5494ca..707d0eec72 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -6,7 +6,7 @@ use hashbrown::HashMap; use relay_base_schema::project::ProjectKey; use relay_config::AggregatorServiceConfig; use relay_metrics::aggregator::AggregateMetricsError; -use relay_metrics::{aggregator, Bucket}; +use relay_metrics::{aggregator, Bucket, UnixTimestamp}; use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown}; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; @@ -90,7 +90,6 @@ pub struct AggregatorService { aggregator: aggregator::Aggregator, state: AggregatorState, receiver: Option>, - max_total_bucket_bytes: Option, flush_interval_ms: u64, can_accept_metrics: Arc, } @@ -117,11 +116,8 @@ impl AggregatorService { Self { receiver, state: AggregatorState::Running, - max_total_bucket_bytes: config.max_total_bucket_bytes, flush_interval_ms: config.flush_interval_ms, - can_accept_metrics: Arc::new(AtomicBool::new( - !aggregator.totals_cost_exceeded(config.max_total_bucket_bytes), - )), + can_accept_metrics: Arc::new(AtomicBool::new(!aggregator.totals_cost_exceeded())), aggregator, } } @@ -141,12 +137,8 @@ impl AggregatorService { let force_flush = matches!(&self.state, AggregatorState::ShuttingDown); let partitions = self.aggregator.pop_flush_buckets(force_flush); - self.can_accept_metrics.store( - !self - .aggregator - .totals_cost_exceeded(self.max_total_bucket_bytes), - Ordering::Relaxed, - ); + self.can_accept_metrics + .store(!self.aggregator.totals_cost_exceeded(), Ordering::Relaxed); if partitions.is_empty() { return; @@ -193,11 +185,9 @@ impl AggregatorService { buckets, } = msg; + let now = UnixTimestamp::now(); for bucket in buckets.into_iter() { - match self - .aggregator - .merge(project_key, bucket, self.max_total_bucket_bytes) - { + match self.aggregator.merge_with_options(project_key, bucket, now) { // Ignore invalid timestamp errors. Err(AggregateMetricsError::InvalidTimestamp(_)) => {} Err(AggregateMetricsError::TotalLimitExceeded) => {