Skip to content

Commit

Permalink
ref(metrics): Move max aggregator size into config and allow passing …
Browse files Browse the repository at this point in the history
…now timestamp for merge (#3904)

The `max_total_bucket_bytes` config can be just part of the aggregator,
it's always passed as an argument with the same value and the project
max size is already part of the aggregator.

Additionally improves performance by not re-creating the `now` timestamp
over and over again when merging a large batch of metrics into the
aggregator. This takes up a significant amount of time.
  • Loading branch information
Dav1dde authored Aug 7, 2024
1 parent 9a938e9 commit 3f5e6b9
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 77 deletions.
10 changes: 0 additions & 10 deletions relay-config/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,6 @@ pub struct AggregatorServiceConfig {
#[serde(flatten)]
pub aggregator: AggregatorConfig,

/// Maximum amount of bytes used for metrics aggregation.
///
/// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory.
/// This is only an approximation and does not take into account things such as pre-allocation
/// in hashmaps.
///
/// Defaults to `None`, i.e. no limit.
pub max_total_bucket_bytes: Option<usize>,

/// The approximate maximum number of bytes submitted within one flush cycle.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
Expand All @@ -40,7 +31,6 @@ impl Default for AggregatorServiceConfig {
fn default() -> Self {
Self {
aggregator: AggregatorConfig::default(),
max_total_bucket_bytes: None,
max_flush_bytes: 5_000_000, // 5 MB
flush_interval_ms: 100, // 100 milliseconds
}
Expand Down
3 changes: 3 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2470,6 +2470,7 @@ impl Config {
mut max_tag_key_length,
mut max_tag_value_length,
mut max_project_key_bucket_bytes,
mut max_total_bucket_bytes,
..
} = self.default_aggregator_config().aggregator;

Expand All @@ -2484,6 +2485,7 @@ impl Config {
max_tag_value_length = max_tag_value_length.max(agg.max_tag_value_length);
max_project_key_bucket_bytes =
max_project_key_bucket_bytes.max(agg.max_project_key_bucket_bytes);
max_total_bucket_bytes = max_total_bucket_bytes.max(agg.max_total_bucket_bytes);
}

for agg in self
Expand All @@ -2505,6 +2507,7 @@ impl Config {
max_tag_key_length,
max_tag_value_length,
max_project_key_bucket_bytes,
max_total_bucket_bytes,
initial_delay: 30,
flush_partitions: None,
flush_batching: FlushBatching::Project,
Expand Down
8 changes: 2 additions & 6 deletions relay-metrics/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
#[allow(clippy::unit_arg)]
black_box(
aggregator
.merge(
black_box(project_key),
black_box(bucket),
black_box(None),
)
.merge(black_box(project_key), black_box(bucket))
.unwrap(),
);
}
Expand All @@ -221,7 +217,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
let timestamp = UnixTimestamp::now();
let mut aggregator: Aggregator = Aggregator::new(config.clone());
for (project_key, bucket) in input.get_buckets(timestamp) {
aggregator.merge(project_key, bucket, None).unwrap();
aggregator.merge(project_key, bucket).unwrap();
}
aggregator
},
Expand Down
110 changes: 65 additions & 45 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,21 @@ pub struct AggregatorConfig {

/// Maximum amount of bytes used for metrics aggregation per project key.
///
/// Similar measuring technique to `max_total_bucket_bytes`, but instead of a
/// Similar measuring technique to [`Self::max_total_bucket_bytes`], but instead of a
/// global/process-wide limit, it is enforced per project key.
///
/// Defaults to `None`, i.e. no limit.
pub max_project_key_bucket_bytes: Option<usize>,

/// Maximum amount of bytes used for metrics aggregation.
///
/// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory.
/// This is only an approximation and does not take into account things such as pre-allocation
/// in hashmaps.
///
/// Defaults to `None`, i.e. no limit.
pub max_total_bucket_bytes: Option<usize>,

/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % flush_partitions), and routed
Expand Down Expand Up @@ -279,6 +288,7 @@ impl Default for AggregatorConfig {
max_tag_key_length: 200,
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
max_total_bucket_bytes: None,
flush_batching: FlushBatching::default(),
flush_partitions: None,
}
Expand Down Expand Up @@ -458,12 +468,11 @@ impl fmt::Debug for CostTracker {
/// All buckets are flushed after a grace period of `initial_delay`.
fn get_flush_time(
config: &AggregatorConfig,
now: UnixTimestamp,
reference_time: Instant,
bucket_key: &BucketKey,
) -> Instant {
let initial_flush = bucket_key.timestamp + config.bucket_interval() + config.initial_delay();

let now = UnixTimestamp::now();
let backdated = initial_flush <= now;

let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
Expand Down Expand Up @@ -557,9 +566,10 @@ impl Aggregator {
self.buckets.len()
}

/// Returns `true` if the cost trackers value is larger than the given max cost.
pub fn totals_cost_exceeded(&self, max_total_cost: Option<usize>) -> bool {
self.cost_tracker.totals_cost_exceeded(max_total_cost)
/// Returns `true` if the cost trackers value is larger than the configured max cost.
pub fn totals_cost_exceeded(&self) -> bool {
self.cost_tracker
.totals_cost_exceeded(self.config.max_total_bucket_bytes)
}

/// Converts this aggregator into a vector of [`Bucket`].
Expand Down Expand Up @@ -678,13 +688,14 @@ impl Aggregator {
/// Logs a statsd metric for invalid timestamps.
fn get_bucket_timestamp(
&self,
now: UnixTimestamp,
timestamp: UnixTimestamp,
bucket_width: u64,
) -> Result<UnixTimestamp, AggregateMetricsError> {
let bucket_ts = self.config.get_bucket_timestamp(timestamp, bucket_width);

if !self.config.timestamp_range().contains(&bucket_ts) {
let delta = (bucket_ts.as_secs() as i64) - (UnixTimestamp::now().as_secs() as i64);
let delta = (bucket_ts.as_secs() as i64) - (now.as_secs() as i64);
relay_statsd::metric!(
histogram(MetricHistograms::InvalidBucketTimestamp) = delta as f64,
aggregator = &self.name,
Expand All @@ -696,16 +707,31 @@ impl Aggregator {
Ok(bucket_ts)
}

/// Merge a preaggregated bucket into this aggregator.
/// Merge a bucket into this aggregator.
///
/// If no bucket exists for the given bucket key, a new bucket will be created.
/// Like [`Self::merge_with_options`] with defaults.
pub fn merge(
&mut self,
project_key: ProjectKey,
bucket: Bucket,
) -> Result<(), AggregateMetricsError> {
self.merge_with_options(project_key, bucket, UnixTimestamp::now())
}

/// Merge a bucket into this aggregator.
///
/// If no bucket exists for the given bucket key, a new bucket will be created.
///
/// Arguments:
/// - `now`: The current timestamp, when merging a large batch of buckets, the timestamp
/// should be created outside of the loop.
pub fn merge_with_options(
&mut self,
project_key: ProjectKey,
mut bucket: Bucket,
max_total_bucket_bytes: Option<usize>,
now: UnixTimestamp,
) -> Result<(), AggregateMetricsError> {
let timestamp = self.get_bucket_timestamp(bucket.timestamp, bucket.width)?;
let timestamp = self.get_bucket_timestamp(now, bucket.timestamp, bucket.width)?;
let key = BucketKey {
project_key,
timestamp,
Expand Down Expand Up @@ -744,7 +770,7 @@ impl Aggregator {
// whether it is just a counter, etc.
self.cost_tracker.check_limits_exceeded(
project_key,
max_total_bucket_bytes,
self.config.max_total_bucket_bytes,
self.config.max_project_key_bucket_bytes,
)?;

Expand Down Expand Up @@ -784,7 +810,7 @@ impl Aggregator {
namespace = key.namespace().as_str(),
);

let flush_at = get_flush_time(&self.config, self.reference_time, &key);
let flush_at = get_flush_time(&self.config, now, self.reference_time, &key);
let value = bucket.value;
added_cost = key.cost() + value.cost();

Expand Down Expand Up @@ -909,6 +935,7 @@ mod tests {
max_tag_key_length: 200,
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
max_total_bucket_bytes: None,
flush_batching: FlushBatching::default(),
flush_partitions: None,
}
Expand Down Expand Up @@ -936,8 +963,8 @@ mod tests {

let mut bucket2 = bucket1.clone();
bucket2.value = BucketValue::counter(43.into());
aggregator.merge(project_key, bucket1, None).unwrap();
aggregator.merge(project_key, bucket2, None).unwrap();
aggregator.merge(project_key, bucket1).unwrap();
aggregator.merge(project_key, bucket2).unwrap();

let buckets: Vec<_> = aggregator
.buckets
Expand Down Expand Up @@ -1031,9 +1058,10 @@ mod tests {

let mut bucket3 = bucket1.clone();
bucket3.timestamp = UnixTimestamp::from_secs(999994721);
aggregator.merge(project_key, bucket1, None).unwrap();
aggregator.merge(project_key, bucket2, None).unwrap();
aggregator.merge(project_key, bucket3, None).unwrap();

aggregator.merge(project_key, bucket1).unwrap();
aggregator.merge(project_key, bucket2).unwrap();
aggregator.merge(project_key, bucket3).unwrap();

let mut buckets: Vec<_> = aggregator
.buckets
Expand Down Expand Up @@ -1089,12 +1117,8 @@ mod tests {
let mut aggregator: Aggregator = Aggregator::new(config);

// It's OK to have same metric with different projects:
aggregator
.merge(project_key1, some_bucket(None), None)
.unwrap();
aggregator
.merge(project_key2, some_bucket(None), None)
.unwrap();
aggregator.merge(project_key1, some_bucket(None)).unwrap();
aggregator.merge(project_key2, some_bucket(None)).unwrap();

assert_eq!(aggregator.buckets.len(), 2);
}
Expand Down Expand Up @@ -1267,7 +1291,7 @@ mod tests {
bucket.name = metric_name.into();

let current_cost = aggregator.cost_tracker.total_cost;
aggregator.merge(project_key, bucket, None).unwrap();
aggregator.merge(project_key, bucket).unwrap();
let total_cost = aggregator.cost_tracker.total_cost;
assert_eq!(total_cost, current_cost + expected_added_cost);
}
Expand Down Expand Up @@ -1300,7 +1324,7 @@ mod tests {
metadata: BucketMetadata::new(timestamp),
};

aggregator.merge(project_key, bucket, None).unwrap();
aggregator.merge(project_key, bucket).unwrap();
}
}

Expand Down Expand Up @@ -1366,7 +1390,7 @@ mod tests {

assert!(matches!(
aggregator
.get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2)
.get_bucket_timestamp(UnixTimestamp::now(), UnixTimestamp::from_secs(u64::MAX), 2)
.unwrap_err(),
AggregateMetricsError::InvalidTimestamp(_)
));
Expand Down Expand Up @@ -1548,15 +1572,16 @@ mod tests {
metadata: BucketMetadata::new(timestamp),
};

let mut aggregator: Aggregator = Aggregator::new(test_config());
let mut aggregator = Aggregator::new(AggregatorConfig {
max_total_bucket_bytes: Some(1),
..test_config()
});
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

aggregator
.merge(project_key, bucket.clone(), Some(1))
.unwrap();
aggregator.merge(project_key, bucket.clone()).unwrap();

assert_eq!(
aggregator.merge(project_key, bucket, Some(1)).unwrap_err(),
aggregator.merge(project_key, bucket).unwrap_err(),
AggregateMetricsError::TotalLimitExceeded
);
}
Expand All @@ -1580,9 +1605,9 @@ mod tests {
let mut aggregator: Aggregator = Aggregator::new(config);
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

aggregator.merge(project_key, bucket.clone(), None).unwrap();
aggregator.merge(project_key, bucket.clone()).unwrap();
assert_eq!(
aggregator.merge(project_key, bucket, None).unwrap_err(),
aggregator.merge(project_key, bucket).unwrap_err(),
AggregateMetricsError::ProjectLimitExceeded
);
}
Expand Down Expand Up @@ -1614,15 +1639,9 @@ mod tests {
.metadata
.merge(BucketMetadata::new(UnixTimestamp::from_secs(999999811)));

aggregator
.merge(project_key, bucket1.clone(), None)
.unwrap();
aggregator
.merge(project_key, bucket2.clone(), None)
.unwrap();
aggregator
.merge(project_key, bucket3.clone(), None)
.unwrap();
aggregator.merge(project_key, bucket1.clone()).unwrap();
aggregator.merge(project_key, bucket2.clone()).unwrap();
aggregator.merge(project_key, bucket3.clone()).unwrap();

let buckets_metadata: Vec<_> = aggregator
.buckets
Expand Down Expand Up @@ -1675,8 +1694,9 @@ mod tests {
extracted_from_indexed: false,
};

let flush_time_1 = get_flush_time(&config, reference_time, &bucket_key_1);
let flush_time_2 = get_flush_time(&config, reference_time, &bucket_key_2);
let now = UnixTimestamp::now();
let flush_time_1 = get_flush_time(&config, now, reference_time, &bucket_key_1);
let flush_time_2 = get_flush_time(&config, now, reference_time, &bucket_key_2);

assert_eq!(flush_time_1, flush_time_2);
}
Expand Down
22 changes: 6 additions & 16 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use hashbrown::HashMap;
use relay_base_schema::project::ProjectKey;
use relay_config::AggregatorServiceConfig;
use relay_metrics::aggregator::AggregateMetricsError;
use relay_metrics::{aggregator, Bucket};
use relay_metrics::{aggregator, Bucket, UnixTimestamp};
use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown};

use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
Expand Down Expand Up @@ -90,7 +90,6 @@ pub struct AggregatorService {
aggregator: aggregator::Aggregator,
state: AggregatorState,
receiver: Option<Recipient<FlushBuckets, NoResponse>>,
max_total_bucket_bytes: Option<usize>,
flush_interval_ms: u64,
can_accept_metrics: Arc<AtomicBool>,
}
Expand All @@ -117,11 +116,8 @@ impl AggregatorService {
Self {
receiver,
state: AggregatorState::Running,
max_total_bucket_bytes: config.max_total_bucket_bytes,
flush_interval_ms: config.flush_interval_ms,
can_accept_metrics: Arc::new(AtomicBool::new(
!aggregator.totals_cost_exceeded(config.max_total_bucket_bytes),
)),
can_accept_metrics: Arc::new(AtomicBool::new(!aggregator.totals_cost_exceeded())),
aggregator,
}
}
Expand All @@ -141,12 +137,8 @@ impl AggregatorService {
let force_flush = matches!(&self.state, AggregatorState::ShuttingDown);
let partitions = self.aggregator.pop_flush_buckets(force_flush);

self.can_accept_metrics.store(
!self
.aggregator
.totals_cost_exceeded(self.max_total_bucket_bytes),
Ordering::Relaxed,
);
self.can_accept_metrics
.store(!self.aggregator.totals_cost_exceeded(), Ordering::Relaxed);

if partitions.is_empty() {
return;
Expand Down Expand Up @@ -193,11 +185,9 @@ impl AggregatorService {
buckets,
} = msg;

let now = UnixTimestamp::now();
for bucket in buckets.into_iter() {
match self
.aggregator
.merge(project_key, bucket, self.max_total_bucket_bytes)
{
match self.aggregator.merge_with_options(project_key, bucket, now) {
// Ignore invalid timestamp errors.
Err(AggregateMetricsError::InvalidTimestamp(_)) => {}
Err(AggregateMetricsError::TotalLimitExceeded) => {
Expand Down

0 comments on commit 3f5e6b9

Please sign in to comment.