Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(metrics): Move max aggregator size into config and allow passing now timestamp for merge #3904

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions relay-config/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,6 @@ pub struct AggregatorServiceConfig {
#[serde(flatten)]
pub aggregator: AggregatorConfig,

/// Maximum amount of bytes used for metrics aggregation.
///
/// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory.
/// This is only an approximation and does not take into account things such as pre-allocation
/// in hashmaps.
///
/// Defaults to `None`, i.e. no limit.
pub max_total_bucket_bytes: Option<usize>,

/// The approximate maximum number of bytes submitted within one flush cycle.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
Expand All @@ -40,7 +31,6 @@ impl Default for AggregatorServiceConfig {
fn default() -> Self {
Self {
aggregator: AggregatorConfig::default(),
max_total_bucket_bytes: None,
max_flush_bytes: 5_000_000, // 5 MB
flush_interval_ms: 100, // 100 milliseconds
}
Expand Down
3 changes: 3 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2470,6 +2470,7 @@ impl Config {
mut max_tag_key_length,
mut max_tag_value_length,
mut max_project_key_bucket_bytes,
mut max_total_bucket_bytes,
..
} = self.default_aggregator_config().aggregator;

Expand All @@ -2484,6 +2485,7 @@ impl Config {
max_tag_value_length = max_tag_value_length.max(agg.max_tag_value_length);
max_project_key_bucket_bytes =
max_project_key_bucket_bytes.max(agg.max_project_key_bucket_bytes);
max_total_bucket_bytes = max_total_bucket_bytes.max(agg.max_total_bucket_bytes);
}

for agg in self
Expand All @@ -2505,6 +2507,7 @@ impl Config {
max_tag_key_length,
max_tag_value_length,
max_project_key_bucket_bytes,
max_total_bucket_bytes,
initial_delay: 30,
flush_partitions: None,
flush_batching: FlushBatching::Project,
Expand Down
8 changes: 2 additions & 6 deletions relay-metrics/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
#[allow(clippy::unit_arg)]
black_box(
aggregator
.merge(
black_box(project_key),
black_box(bucket),
black_box(None),
)
.merge(black_box(project_key), black_box(bucket))
.unwrap(),
);
}
Expand All @@ -221,7 +217,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
let timestamp = UnixTimestamp::now();
let mut aggregator: Aggregator = Aggregator::new(config.clone());
for (project_key, bucket) in input.get_buckets(timestamp) {
aggregator.merge(project_key, bucket, None).unwrap();
aggregator.merge(project_key, bucket).unwrap();
}
aggregator
},
Expand Down
110 changes: 65 additions & 45 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,21 @@ pub struct AggregatorConfig {

/// Maximum amount of bytes used for metrics aggregation per project key.
///
/// Similar measuring technique to `max_total_bucket_bytes`, but instead of a
/// Similar measuring technique to [`Self::max_total_bucket_bytes`], but instead of a
/// global/process-wide limit, it is enforced per project key.
///
/// Defaults to `None`, i.e. no limit.
pub max_project_key_bucket_bytes: Option<usize>,

/// Maximum amount of bytes used for metrics aggregation.
///
/// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory.
/// This is only an approximation and does not take into account things such as pre-allocation
/// in hashmaps.
///
/// Defaults to `None`, i.e. no limit.
pub max_total_bucket_bytes: Option<usize>,

/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % flush_partitions), and routed
Expand Down Expand Up @@ -279,6 +288,7 @@ impl Default for AggregatorConfig {
max_tag_key_length: 200,
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
max_total_bucket_bytes: None,
flush_batching: FlushBatching::default(),
flush_partitions: None,
}
Expand Down Expand Up @@ -458,12 +468,11 @@ impl fmt::Debug for CostTracker {
/// All buckets are flushed after a grace period of `initial_delay`.
fn get_flush_time(
config: &AggregatorConfig,
now: UnixTimestamp,
reference_time: Instant,
bucket_key: &BucketKey,
) -> Instant {
let initial_flush = bucket_key.timestamp + config.bucket_interval() + config.initial_delay();

let now = UnixTimestamp::now();
let backdated = initial_flush <= now;

let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
Expand Down Expand Up @@ -557,9 +566,10 @@ impl Aggregator {
self.buckets.len()
}

/// Returns `true` if the cost trackers value is larger than the given max cost.
pub fn totals_cost_exceeded(&self, max_total_cost: Option<usize>) -> bool {
self.cost_tracker.totals_cost_exceeded(max_total_cost)
/// Returns `true` if the cost trackers value is larger than the configured max cost.
pub fn totals_cost_exceeded(&self) -> bool {
self.cost_tracker
.totals_cost_exceeded(self.config.max_total_bucket_bytes)
}

/// Converts this aggregator into a vector of [`Bucket`].
Expand Down Expand Up @@ -678,13 +688,14 @@ impl Aggregator {
/// Logs a statsd metric for invalid timestamps.
fn get_bucket_timestamp(
&self,
now: UnixTimestamp,
timestamp: UnixTimestamp,
bucket_width: u64,
) -> Result<UnixTimestamp, AggregateMetricsError> {
let bucket_ts = self.config.get_bucket_timestamp(timestamp, bucket_width);

if !self.config.timestamp_range().contains(&bucket_ts) {
let delta = (bucket_ts.as_secs() as i64) - (UnixTimestamp::now().as_secs() as i64);
let delta = (bucket_ts.as_secs() as i64) - (now.as_secs() as i64);
relay_statsd::metric!(
histogram(MetricHistograms::InvalidBucketTimestamp) = delta as f64,
aggregator = &self.name,
Expand All @@ -696,16 +707,31 @@ impl Aggregator {
Ok(bucket_ts)
}

/// Merge a preaggregated bucket into this aggregator.
/// Merge a bucket into this aggregator.
///
/// If no bucket exists for the given bucket key, a new bucket will be created.
/// Like [`Self::merge_with_options`] with defaults.
pub fn merge(
&mut self,
project_key: ProjectKey,
bucket: Bucket,
) -> Result<(), AggregateMetricsError> {
self.merge_with_options(project_key, bucket, UnixTimestamp::now())
}

/// Merge a bucket into this aggregator.
///
/// If no bucket exists for the given bucket key, a new bucket will be created.
///
/// Arguments:
/// - `now`: The current timestamp, when merging a large batch of buckets, the timestamp
/// should be created outside of the loop.
pub fn merge_with_options(
&mut self,
project_key: ProjectKey,
mut bucket: Bucket,
max_total_bucket_bytes: Option<usize>,
now: UnixTimestamp,
) -> Result<(), AggregateMetricsError> {
let timestamp = self.get_bucket_timestamp(bucket.timestamp, bucket.width)?;
let timestamp = self.get_bucket_timestamp(now, bucket.timestamp, bucket.width)?;
let key = BucketKey {
project_key,
timestamp,
Expand Down Expand Up @@ -744,7 +770,7 @@ impl Aggregator {
// whether it is just a counter, etc.
self.cost_tracker.check_limits_exceeded(
project_key,
max_total_bucket_bytes,
self.config.max_total_bucket_bytes,
self.config.max_project_key_bucket_bytes,
)?;

Expand Down Expand Up @@ -784,7 +810,7 @@ impl Aggregator {
namespace = key.namespace().as_str(),
);

let flush_at = get_flush_time(&self.config, self.reference_time, &key);
let flush_at = get_flush_time(&self.config, now, self.reference_time, &key);
let value = bucket.value;
added_cost = key.cost() + value.cost();

Expand Down Expand Up @@ -909,6 +935,7 @@ mod tests {
max_tag_key_length: 200,
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
max_total_bucket_bytes: None,
flush_batching: FlushBatching::default(),
flush_partitions: None,
}
Expand Down Expand Up @@ -936,8 +963,8 @@ mod tests {

let mut bucket2 = bucket1.clone();
bucket2.value = BucketValue::counter(43.into());
aggregator.merge(project_key, bucket1, None).unwrap();
aggregator.merge(project_key, bucket2, None).unwrap();
aggregator.merge(project_key, bucket1).unwrap();
aggregator.merge(project_key, bucket2).unwrap();

let buckets: Vec<_> = aggregator
.buckets
Expand Down Expand Up @@ -1031,9 +1058,10 @@ mod tests {

let mut bucket3 = bucket1.clone();
bucket3.timestamp = UnixTimestamp::from_secs(999994721);
aggregator.merge(project_key, bucket1, None).unwrap();
aggregator.merge(project_key, bucket2, None).unwrap();
aggregator.merge(project_key, bucket3, None).unwrap();

aggregator.merge(project_key, bucket1).unwrap();
aggregator.merge(project_key, bucket2).unwrap();
aggregator.merge(project_key, bucket3).unwrap();

let mut buckets: Vec<_> = aggregator
.buckets
Expand Down Expand Up @@ -1089,12 +1117,8 @@ mod tests {
let mut aggregator: Aggregator = Aggregator::new(config);

// It's OK to have same metric with different projects:
aggregator
.merge(project_key1, some_bucket(None), None)
.unwrap();
aggregator
.merge(project_key2, some_bucket(None), None)
.unwrap();
aggregator.merge(project_key1, some_bucket(None)).unwrap();
aggregator.merge(project_key2, some_bucket(None)).unwrap();

assert_eq!(aggregator.buckets.len(), 2);
}
Expand Down Expand Up @@ -1267,7 +1291,7 @@ mod tests {
bucket.name = metric_name.into();

let current_cost = aggregator.cost_tracker.total_cost;
aggregator.merge(project_key, bucket, None).unwrap();
aggregator.merge(project_key, bucket).unwrap();
let total_cost = aggregator.cost_tracker.total_cost;
assert_eq!(total_cost, current_cost + expected_added_cost);
}
Expand Down Expand Up @@ -1300,7 +1324,7 @@ mod tests {
metadata: BucketMetadata::new(timestamp),
};

aggregator.merge(project_key, bucket, None).unwrap();
aggregator.merge(project_key, bucket).unwrap();
}
}

Expand Down Expand Up @@ -1366,7 +1390,7 @@ mod tests {

assert!(matches!(
aggregator
.get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2)
.get_bucket_timestamp(UnixTimestamp::now(), UnixTimestamp::from_secs(u64::MAX), 2)
.unwrap_err(),
AggregateMetricsError::InvalidTimestamp(_)
));
Expand Down Expand Up @@ -1548,15 +1572,16 @@ mod tests {
metadata: BucketMetadata::new(timestamp),
};

let mut aggregator: Aggregator = Aggregator::new(test_config());
let mut aggregator = Aggregator::new(AggregatorConfig {
max_total_bucket_bytes: Some(1),
..test_config()
});
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

aggregator
.merge(project_key, bucket.clone(), Some(1))
.unwrap();
aggregator.merge(project_key, bucket.clone()).unwrap();

assert_eq!(
aggregator.merge(project_key, bucket, Some(1)).unwrap_err(),
aggregator.merge(project_key, bucket).unwrap_err(),
AggregateMetricsError::TotalLimitExceeded
);
}
Expand All @@ -1580,9 +1605,9 @@ mod tests {
let mut aggregator: Aggregator = Aggregator::new(config);
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

aggregator.merge(project_key, bucket.clone(), None).unwrap();
aggregator.merge(project_key, bucket.clone()).unwrap();
assert_eq!(
aggregator.merge(project_key, bucket, None).unwrap_err(),
aggregator.merge(project_key, bucket).unwrap_err(),
AggregateMetricsError::ProjectLimitExceeded
);
}
Expand Down Expand Up @@ -1614,15 +1639,9 @@ mod tests {
.metadata
.merge(BucketMetadata::new(UnixTimestamp::from_secs(999999811)));

aggregator
.merge(project_key, bucket1.clone(), None)
.unwrap();
aggregator
.merge(project_key, bucket2.clone(), None)
.unwrap();
aggregator
.merge(project_key, bucket3.clone(), None)
.unwrap();
aggregator.merge(project_key, bucket1.clone()).unwrap();
aggregator.merge(project_key, bucket2.clone()).unwrap();
aggregator.merge(project_key, bucket3.clone()).unwrap();

let buckets_metadata: Vec<_> = aggregator
.buckets
Expand Down Expand Up @@ -1675,8 +1694,9 @@ mod tests {
extracted_from_indexed: false,
};

let flush_time_1 = get_flush_time(&config, reference_time, &bucket_key_1);
let flush_time_2 = get_flush_time(&config, reference_time, &bucket_key_2);
let now = UnixTimestamp::now();
let flush_time_1 = get_flush_time(&config, now, reference_time, &bucket_key_1);
let flush_time_2 = get_flush_time(&config, now, reference_time, &bucket_key_2);

assert_eq!(flush_time_1, flush_time_2);
}
Expand Down
22 changes: 6 additions & 16 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use hashbrown::HashMap;
use relay_base_schema::project::ProjectKey;
use relay_config::AggregatorServiceConfig;
use relay_metrics::aggregator::AggregateMetricsError;
use relay_metrics::{aggregator, Bucket};
use relay_metrics::{aggregator, Bucket, UnixTimestamp};
use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown};

use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
Expand Down Expand Up @@ -90,7 +90,6 @@ pub struct AggregatorService {
aggregator: aggregator::Aggregator,
state: AggregatorState,
receiver: Option<Recipient<FlushBuckets, NoResponse>>,
max_total_bucket_bytes: Option<usize>,
flush_interval_ms: u64,
can_accept_metrics: Arc<AtomicBool>,
}
Expand All @@ -117,11 +116,8 @@ impl AggregatorService {
Self {
receiver,
state: AggregatorState::Running,
max_total_bucket_bytes: config.max_total_bucket_bytes,
flush_interval_ms: config.flush_interval_ms,
can_accept_metrics: Arc::new(AtomicBool::new(
!aggregator.totals_cost_exceeded(config.max_total_bucket_bytes),
)),
can_accept_metrics: Arc::new(AtomicBool::new(!aggregator.totals_cost_exceeded())),
aggregator,
}
}
Expand All @@ -141,12 +137,8 @@ impl AggregatorService {
let force_flush = matches!(&self.state, AggregatorState::ShuttingDown);
let partitions = self.aggregator.pop_flush_buckets(force_flush);

self.can_accept_metrics.store(
!self
.aggregator
.totals_cost_exceeded(self.max_total_bucket_bytes),
Ordering::Relaxed,
);
self.can_accept_metrics
.store(!self.aggregator.totals_cost_exceeded(), Ordering::Relaxed);

if partitions.is_empty() {
return;
Expand Down Expand Up @@ -193,11 +185,9 @@ impl AggregatorService {
buckets,
} = msg;

let now = UnixTimestamp::now();
for bucket in buckets.into_iter() {
match self
.aggregator
.merge(project_key, bucket, self.max_total_bucket_bytes)
{
match self.aggregator.merge_with_options(project_key, bucket, now) {
// Ignore invalid timestamp errors.
Err(AggregateMetricsError::InvalidTimestamp(_)) => {}
Err(AggregateMetricsError::TotalLimitExceeded) => {
Expand Down
Loading