diff --git a/CHANGELOG.md b/CHANGELOG.md index 81816ddb9c..c788b5bb64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Fixes raw OS description parsing for iOS and iPadOS originating from the Unity SDK. ([#3780](https://github.com/getsentry/relay/pull/3780)) - Fixes metrics dropped due to missing project state. ([#3553](https://github.com/getsentry/relay/issues/3553)) +- Incorrect span outcomes when generated from a indexed transaction quota. ([#3793](https://github.com/getsentry/relay/pull/3793)) - Report outcomes for spans when transactions are rate limited. ([#3749](https://github.com/getsentry/relay/pull/3749)) **Internal**: diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 3b41c4f690..89dec3a1c5 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -50,7 +50,7 @@ use tokio::sync::Semaphore; use { crate::metrics::MetricsLimiter, crate::services::store::{Store, StoreEnvelope}, - crate::utils::{sample, EnvelopeLimiter, ItemAction}, + crate::utils::{sample, Enforcement, EnvelopeLimiter, ItemAction}, itertools::Itertools, relay_cardinality::{ CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter, @@ -660,12 +660,12 @@ impl ProcessingExtractedMetrics { /// This is used to apply rate limits which have been enforced on sampled items of an envelope /// to also consistently apply to the metrics extracted from these items. #[cfg(feature = "processing")] - fn enforce_limits(&mut self, scoping: Scoping, limits: &RateLimits) { - for (category, namespace) in [ - (DataCategory::Transaction, MetricNamespace::Transactions), - (DataCategory::Span, MetricNamespace::Spans), + fn apply_enforcement(&mut self, enforcement: &Enforcement) { + for (namespace, limit) in [ + (MetricNamespace::Transactions, &enforcement.event), + (MetricNamespace::Spans, &enforcement.spans), ] { - if !limits.check(scoping.item(category)).is_empty() { + if limit.is_active() { relay_log::trace!( "dropping {namespace} metrics, due to enforced limit on envelope" ); @@ -1325,10 +1325,8 @@ impl EnvelopeProcessorService { let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), { envelope_limiter.compute(state.managed_envelope.envelope_mut(), &scoping)? }); - let event_active = enforcement.event_active(); - enforcement.apply_with_outcomes(&mut state.managed_envelope); - if event_active { + if enforcement.is_event_active() { state.remove_event(); debug_assert!(state.envelope().is_empty()); } @@ -1336,7 +1334,8 @@ impl EnvelopeProcessorService { // Use the same rate limits as used for the envelope on the metrics. // Those rate limits should not be checked for expiry or similar to ensure a consistent // limiting of envelope items and metrics. - state.extracted_metrics.enforce_limits(scoping, &limits); + state.extracted_metrics.apply_enforcement(&enforcement); + enforcement.apply_with_outcomes(&mut state.managed_envelope); if !limits.is_empty() { self.inner diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index cc32103cfe..88f21866ba 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -1051,7 +1051,7 @@ impl Project { /// as top-level spans, thus if we limited a transaction, we want to count and emit negative /// outcomes for each of the spans nested inside that transaction. fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) { - if !enforcement.event_active() { + if !enforcement.is_event_active() { return; } diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index eb39ed2367..c27c6c71e2 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -285,8 +285,12 @@ impl CategoryLimit { } } - /// Recreates the category limit for a new category with the same reason. + /// Recreates the category limit, if active, for a new category with the same reason. pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit { + if !self.is_active() { + return Self::default(); + } + Self { category, quantity, @@ -344,9 +348,22 @@ pub struct Enforcement { } impl Enforcement { - /// Returns `true` if the event should be rate limited. - pub fn event_active(&self) -> bool { - self.event.is_active() || self.event_indexed.is_active() + /// Returns the `CategoryLimit` for the event. + /// + /// `None` if the event is not rate limited. + pub fn active_event(&self) -> Option<&CategoryLimit> { + if self.event.is_active() { + Some(&self.event) + } else if self.event_indexed.is_active() { + Some(&self.event_indexed) + } else { + None + } + } + + /// Returns `true` if the event is rate limited. + pub fn is_event_active(&self) -> bool { + self.active_event().is_some() } /// Helper for `track_outcomes`. @@ -580,10 +597,9 @@ where rate_limits.merge(event_limits); } - if enforcement.event_active() { - enforcement.attachments = enforcement - .event - .clone_for(DataCategory::Attachment, summary.attachment_quantity); + if let Some(limit) = enforcement.active_event() { + enforcement.attachments = + limit.clone_for(DataCategory::Attachment, summary.attachment_quantity); } else if summary.attachment_quantity > 0 { let item_scoping = scoping.item(DataCategory::Attachment); let attachment_limits = (self.check)(item_scoping, summary.attachment_quantity)?; @@ -612,7 +628,7 @@ where rate_limits.merge(session_limits); } - if enforcement.event_active() { + if enforcement.is_event_active() { enforcement.profiles = enforcement .event .clone_for(DataCategory::Profile, summary.profile_quantity); @@ -669,14 +685,14 @@ where rate_limits.merge(checkin_limits); } - if enforcement.event_active() { + if enforcement.is_event_active() { enforcement.spans = enforcement .event .clone_for(DataCategory::Span, summary.span_quantity); enforcement.spans_indexed = enforcement .event_indexed - .clone_for(DataCategory::SpanIndexed, summary.span_quantity) + .clone_for(DataCategory::SpanIndexed, summary.span_quantity); } else if summary.span_quantity > 0 { let mut span_limits = (self.check)(scoping.item(DataCategory::Span), summary.span_quantity)?; diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 9f61a95b9c..efd38336de 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -1720,21 +1720,15 @@ def summarize_outcomes(): metrics_consumer.assert_empty() -@pytest.mark.parametrize( - "category,raises_rate_limited", - [ - ("transaction", True), - ("transaction_indexed", False), - ], -) +@pytest.mark.parametrize("category", ["transaction", "transaction_indexed"]) def test_rate_limit_is_consistent_between_transaction_and_spans( mini_sentry, relay_with_processing, transactions_consumer, spans_consumer, + metrics_consumer, outcomes_consumer, category, - raises_rate_limited, ): """ Rate limits are consistent between transactions and nested spans. @@ -1763,11 +1757,19 @@ def test_rate_limit_is_consistent_between_transaction_and_spans( transactions_consumer = transactions_consumer() spans_consumer = spans_consumer() outcomes_consumer = outcomes_consumer() + metrics_consumer = metrics_consumer() - start = datetime.now(timezone.utc) - end = start + timedelta(seconds=1) - - envelope = envelope_with_transaction_and_spans(start, end) + def usage_metrics(): + metrics = metrics_consumer.get_metrics() + transaction_count = sum( + m[0]["value"] + for m in metrics + if m[0]["name"] == "c:transactions/usage@none" + ) + span_count = sum( + m[0]["value"] for m in metrics if m[0]["name"] == "c:spans/usage@none" + ) + return (transaction_count, span_count) def summarize_outcomes(): counter = Counter() @@ -1775,6 +1777,11 @@ def summarize_outcomes(): counter[(outcome["category"], outcome["outcome"])] += outcome["quantity"] return dict(counter) + start = datetime.now(timezone.utc) + end = start + timedelta(seconds=1) + + envelope = envelope_with_transaction_and_spans(start, end) + # First batch passes relay.send_envelope(project_id, envelope) @@ -1784,6 +1791,7 @@ def summarize_outcomes(): spans = spans_consumer.get_spans(n=2, timeout=10) assert len(spans) == 2 assert summarize_outcomes() == {(16, 0): 2} # SpanIndexed, Accepted + assert usage_metrics() == (1, 2) # Second batch nothing passes relay.send_envelope(project_id, envelope) @@ -1797,17 +1805,18 @@ def summarize_outcomes(): (12, 2): 2, # Span, Rate Limited (16, 2): 2, # SpanIndexed, Rate Limited } + assert usage_metrics() == (0, 0) elif category == "transaction_indexed": assert summarize_outcomes() == { (9, 2): 1, # TransactionIndexed, Rate Limited - (12, 2): 2, # Span, Rate Limited (16, 2): 2, # SpanIndexed, Rate Limited } + assert usage_metrics() == (1, 2) # Third batch might raise 429 since it hits the fast path maybe_raises = ( pytest.raises(HTTPError, match="429 Client Error") - if raises_rate_limited + if category == "transaction" else contextlib.nullcontext() ) with maybe_raises: @@ -1820,15 +1829,13 @@ def summarize_outcomes(): (12, 2): 2, # Span, Rate Limited (16, 2): 2, # SpanIndexed, Rate Limited } + assert usage_metrics() == (0, 0) elif category == "transaction_indexed": assert summarize_outcomes() == { (9, 2): 1, # TransactionIndexed, Rate Limited - (12, 2): 2, # Span, Rate Limited (16, 2): 2, # SpanIndexed, Rate Limited } - - transactions_consumer.assert_empty() - spans_consumer.assert_empty() + assert usage_metrics() == (1, 2) @pytest.mark.parametrize(