Skip to content

Commit

Permalink
fix(spans): Fixes span outcomes and inherited rate limits (#3793)
Browse files Browse the repository at this point in the history
Span outcomes are currently off when created from rate limited
transactions in the `transaction_indexed` category. All of the dropped
outcomes are also added to the accepted, because the metrics are not
dropped.

Additionally outcomes for the span category were emitted when there was
a limit for `transaction_indexed` instead of just emitting them from
`span_indexed`.

Implementation notes:
Need to drop the metrics based on the enforcement instead of the rate
limits, because it is correct that there are no rate limits added for
the inherited category. We wouldn't want to start dropping standalone
spans because of a transaction quota.
  • Loading branch information
Dav1dde authored Jul 5, 2024
1 parent 1ebb22e commit 906a14e
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Fixes raw OS description parsing for iOS and iPadOS originating from the Unity SDK. ([#3780](https://github.com/getsentry/relay/pull/3780))
- Fixes metrics dropped due to missing project state. ([#3553](https://github.com/getsentry/relay/issues/3553))
- Incorrect span outcomes when generated from a indexed transaction quota. ([#3793](https://github.com/getsentry/relay/pull/3793))
- Report outcomes for spans when transactions are rate limited. ([#3749](https://github.com/getsentry/relay/pull/3749))

**Internal**:
Expand Down
24 changes: 12 additions & 12 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tokio::sync::Semaphore;
use {
crate::metrics::MetricsLimiter,
crate::services::store::{Store, StoreEnvelope},
crate::utils::{sample, EnvelopeLimiter, ItemAction},
crate::utils::{sample, Enforcement, EnvelopeLimiter, ItemAction},
itertools::Itertools,
relay_cardinality::{
CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
Expand Down Expand Up @@ -660,12 +660,12 @@ impl ProcessingExtractedMetrics {
/// This is used to apply rate limits which have been enforced on sampled items of an envelope
/// to also consistently apply to the metrics extracted from these items.
#[cfg(feature = "processing")]
fn enforce_limits(&mut self, scoping: Scoping, limits: &RateLimits) {
for (category, namespace) in [
(DataCategory::Transaction, MetricNamespace::Transactions),
(DataCategory::Span, MetricNamespace::Spans),
fn apply_enforcement(&mut self, enforcement: &Enforcement) {
for (namespace, limit) in [
(MetricNamespace::Transactions, &enforcement.event),
(MetricNamespace::Spans, &enforcement.spans),
] {
if !limits.check(scoping.item(category)).is_empty() {
if limit.is_active() {
relay_log::trace!(
"dropping {namespace} metrics, due to enforced limit on envelope"
);
Expand Down Expand Up @@ -1325,19 +1325,19 @@ impl EnvelopeProcessorService {
let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), {
envelope_limiter.compute(state.managed_envelope.envelope_mut(), &scoping)?
});
let event_active = enforcement.event_active();
let event_active = enforcement.is_event_active();

// Use the same rate limits as used for the envelope on the metrics.
// Those rate limits should not be checked for expiry or similar to ensure a consistent
// limiting of envelope items and metrics.
state.extracted_metrics.apply_enforcement(&enforcement);
enforcement.apply_with_outcomes(&mut state.managed_envelope);

if event_active {
state.remove_event();
debug_assert!(state.envelope().is_empty());
}

// Use the same rate limits as used for the envelope on the metrics.
// Those rate limits should not be checked for expiry or similar to ensure a consistent
// limiting of envelope items and metrics.
state.extracted_metrics.enforce_limits(scoping, &limits);

if !limits.is_empty() {
self.inner
.addrs
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ impl Project {
/// as top-level spans, thus if we limited a transaction, we want to count and emit negative
/// outcomes for each of the spans nested inside that transaction.
fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) {
if !enforcement.event_active() {
if !enforcement.is_event_active() {
return;
}

Expand Down
38 changes: 27 additions & 11 deletions relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,12 @@ impl CategoryLimit {
}
}

/// Recreates the category limit for a new category with the same reason.
/// Recreates the category limit, if active, for a new category with the same reason.
pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
if !self.is_active() {
return Self::default();
}

Self {
category,
quantity,
Expand Down Expand Up @@ -344,9 +348,22 @@ pub struct Enforcement {
}

impl Enforcement {
/// Returns `true` if the event should be rate limited.
pub fn event_active(&self) -> bool {
self.event.is_active() || self.event_indexed.is_active()
/// Returns the `CategoryLimit` for the event.
///
/// `None` if the event is not rate limited.
pub fn active_event(&self) -> Option<&CategoryLimit> {
if self.event.is_active() {
Some(&self.event)
} else if self.event_indexed.is_active() {
Some(&self.event_indexed)
} else {
None
}
}

/// Returns `true` if the event is rate limited.
pub fn is_event_active(&self) -> bool {
self.active_event().is_some()
}

/// Helper for `track_outcomes`.
Expand Down Expand Up @@ -580,10 +597,9 @@ where
rate_limits.merge(event_limits);
}

if enforcement.event_active() {
enforcement.attachments = enforcement
.event
.clone_for(DataCategory::Attachment, summary.attachment_quantity);
if let Some(limit) = enforcement.active_event() {
enforcement.attachments =
limit.clone_for(DataCategory::Attachment, summary.attachment_quantity);
} else if summary.attachment_quantity > 0 {
let item_scoping = scoping.item(DataCategory::Attachment);
let attachment_limits = (self.check)(item_scoping, summary.attachment_quantity)?;
Expand Down Expand Up @@ -612,7 +628,7 @@ where
rate_limits.merge(session_limits);
}

if enforcement.event_active() {
if enforcement.is_event_active() {
enforcement.profiles = enforcement
.event
.clone_for(DataCategory::Profile, summary.profile_quantity);
Expand Down Expand Up @@ -669,14 +685,14 @@ where
rate_limits.merge(checkin_limits);
}

if enforcement.event_active() {
if enforcement.is_event_active() {
enforcement.spans = enforcement
.event
.clone_for(DataCategory::Span, summary.span_quantity);

enforcement.spans_indexed = enforcement
.event_indexed
.clone_for(DataCategory::SpanIndexed, summary.span_quantity)
.clone_for(DataCategory::SpanIndexed, summary.span_quantity);
} else if summary.span_quantity > 0 {
let mut span_limits =
(self.check)(scoping.item(DataCategory::Span), summary.span_quantity)?;
Expand Down
43 changes: 25 additions & 18 deletions tests/integration/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -1720,21 +1720,15 @@ def summarize_outcomes():
metrics_consumer.assert_empty()


@pytest.mark.parametrize(
"category,raises_rate_limited",
[
("transaction", True),
("transaction_indexed", False),
],
)
@pytest.mark.parametrize("category", ["transaction", "transaction_indexed"])
def test_rate_limit_is_consistent_between_transaction_and_spans(
mini_sentry,
relay_with_processing,
transactions_consumer,
spans_consumer,
metrics_consumer,
outcomes_consumer,
category,
raises_rate_limited,
):
"""
Rate limits are consistent between transactions and nested spans.
Expand Down Expand Up @@ -1763,18 +1757,31 @@ def test_rate_limit_is_consistent_between_transaction_and_spans(
transactions_consumer = transactions_consumer()
spans_consumer = spans_consumer()
outcomes_consumer = outcomes_consumer()
metrics_consumer = metrics_consumer()

start = datetime.now(timezone.utc)
end = start + timedelta(seconds=1)

envelope = envelope_with_transaction_and_spans(start, end)
def usage_metrics():
metrics = metrics_consumer.get_metrics()
transaction_count = sum(
m[0]["value"]
for m in metrics
if m[0]["name"] == "c:transactions/usage@none"
)
span_count = sum(
m[0]["value"] for m in metrics if m[0]["name"] == "c:spans/usage@none"
)
return (transaction_count, span_count)

def summarize_outcomes():
counter = Counter()
for outcome in outcomes_consumer.get_outcomes(timeout=10):
counter[(outcome["category"], outcome["outcome"])] += outcome["quantity"]
return dict(counter)

start = datetime.now(timezone.utc)
end = start + timedelta(seconds=1)

envelope = envelope_with_transaction_and_spans(start, end)

# First batch passes
relay.send_envelope(project_id, envelope)

Expand All @@ -1784,6 +1791,7 @@ def summarize_outcomes():
spans = spans_consumer.get_spans(n=2, timeout=10)
assert len(spans) == 2
assert summarize_outcomes() == {(16, 0): 2} # SpanIndexed, Accepted
assert usage_metrics() == (1, 2)

# Second batch nothing passes
relay.send_envelope(project_id, envelope)
Expand All @@ -1797,17 +1805,18 @@ def summarize_outcomes():
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}
assert usage_metrics() == (0, 0)
elif category == "transaction_indexed":
assert summarize_outcomes() == {
(9, 2): 1, # TransactionIndexed, Rate Limited
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}
assert usage_metrics() == (1, 2)

# Third batch might raise 429 since it hits the fast path
maybe_raises = (
pytest.raises(HTTPError, match="429 Client Error")
if raises_rate_limited
if category == "transaction"
else contextlib.nullcontext()
)
with maybe_raises:
Expand All @@ -1820,15 +1829,13 @@ def summarize_outcomes():
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}
assert usage_metrics() == (0, 0)
elif category == "transaction_indexed":
assert summarize_outcomes() == {
(9, 2): 1, # TransactionIndexed, Rate Limited
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}

transactions_consumer.assert_empty()
spans_consumer.assert_empty()
assert usage_metrics() == (1, 2)


@pytest.mark.parametrize(
Expand Down

0 comments on commit 906a14e

Please sign in to comment.