Skip to content

Commit

Permalink
fix(spans): Fixes span outcomes and inherited rate limits
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Jul 4, 2024
1 parent 1ebb22e commit c434fa7
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Fixes raw OS description parsing for iOS and iPadOS originating from the Unity SDK. ([#3780](https://github.com/getsentry/relay/pull/3780))
- Fixes metrics dropped due to missing project state. ([#3553](https://github.com/getsentry/relay/issues/3553))
- Incorrect span outcomes when generated from a indexed transaction quota. ([#3793](https://github.com/getsentry/relay/pull/3793))
- Report outcomes for spans when transactions are rate limited. ([#3749](https://github.com/getsentry/relay/pull/3749))

**Internal**:
Expand Down
19 changes: 9 additions & 10 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tokio::sync::Semaphore;
use {
crate::metrics::MetricsLimiter,
crate::services::store::{Store, StoreEnvelope},
crate::utils::{sample, EnvelopeLimiter, ItemAction},
crate::utils::{sample, Enforcement, EnvelopeLimiter, ItemAction},
itertools::Itertools,
relay_cardinality::{
CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
Expand Down Expand Up @@ -660,12 +660,12 @@ impl ProcessingExtractedMetrics {
/// This is used to apply rate limits which have been enforced on sampled items of an envelope
/// to also consistently apply to the metrics extracted from these items.
#[cfg(feature = "processing")]
fn enforce_limits(&mut self, scoping: Scoping, limits: &RateLimits) {
for (category, namespace) in [
(DataCategory::Transaction, MetricNamespace::Transactions),
(DataCategory::Span, MetricNamespace::Spans),
fn apply_enforcement(&mut self, enforcement: &Enforcement) {
for (namespace, limit) in [
(MetricNamespace::Transactions, &enforcement.event),
(MetricNamespace::Spans, &enforcement.spans),
] {
if !limits.check(scoping.item(category)).is_empty() {
if limit.is_active() {
relay_log::trace!(
"dropping {namespace} metrics, due to enforced limit on envelope"
);
Expand Down Expand Up @@ -1325,18 +1325,17 @@ impl EnvelopeProcessorService {
let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), {
envelope_limiter.compute(state.managed_envelope.envelope_mut(), &scoping)?
});
let event_active = enforcement.event_active();
enforcement.apply_with_outcomes(&mut state.managed_envelope);

if event_active {
if enforcement.is_event_active() {
state.remove_event();
debug_assert!(state.envelope().is_empty());
}

// Use the same rate limits as used for the envelope on the metrics.
// Those rate limits should not be checked for expiry or similar to ensure a consistent
// limiting of envelope items and metrics.
state.extracted_metrics.enforce_limits(scoping, &limits);
state.extracted_metrics.apply_enforcement(&enforcement);
enforcement.apply_with_outcomes(&mut state.managed_envelope);

if !limits.is_empty() {
self.inner
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ impl Project {
/// as top-level spans, thus if we limited a transaction, we want to count and emit negative
/// outcomes for each of the spans nested inside that transaction.
fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) {
if !enforcement.event_active() {
if !enforcement.is_event_active() {
return;
}

Expand Down
38 changes: 27 additions & 11 deletions relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,12 @@ impl CategoryLimit {
}
}

/// Recreates the category limit for a new category with the same reason.
/// Recreates the category limit, if active, for a new category with the same reason.
pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
if !self.is_active() {
return Self::default();
}

Self {
category,
quantity,
Expand Down Expand Up @@ -344,9 +348,22 @@ pub struct Enforcement {
}

impl Enforcement {
/// Returns `true` if the event should be rate limited.
pub fn event_active(&self) -> bool {
self.event.is_active() || self.event_indexed.is_active()
/// Returns the `CategoryLimit` for the event.
///
/// `None` if the event is not rate limited.
pub fn active_event(&self) -> Option<&CategoryLimit> {
if self.event.is_active() {
Some(&self.event)
} else if self.event_indexed.is_active() {
Some(&self.event_indexed)
} else {
None
}
}

/// Returns `true` if the event is rate limited.
pub fn is_event_active(&self) -> bool {
self.active_event().is_some()
}

/// Helper for `track_outcomes`.
Expand Down Expand Up @@ -580,10 +597,9 @@ where
rate_limits.merge(event_limits);
}

if enforcement.event_active() {
enforcement.attachments = enforcement
.event
.clone_for(DataCategory::Attachment, summary.attachment_quantity);
if let Some(limit) = enforcement.active_event() {
enforcement.attachments =
limit.clone_for(DataCategory::Attachment, summary.attachment_quantity);
} else if summary.attachment_quantity > 0 {
let item_scoping = scoping.item(DataCategory::Attachment);
let attachment_limits = (self.check)(item_scoping, summary.attachment_quantity)?;
Expand Down Expand Up @@ -612,7 +628,7 @@ where
rate_limits.merge(session_limits);
}

if enforcement.event_active() {
if enforcement.is_event_active() {
enforcement.profiles = enforcement
.event
.clone_for(DataCategory::Profile, summary.profile_quantity);
Expand Down Expand Up @@ -669,14 +685,14 @@ where
rate_limits.merge(checkin_limits);
}

if enforcement.event_active() {
if enforcement.is_event_active() {
enforcement.spans = enforcement
.event
.clone_for(DataCategory::Span, summary.span_quantity);

enforcement.spans_indexed = enforcement
.event_indexed
.clone_for(DataCategory::SpanIndexed, summary.span_quantity)
.clone_for(DataCategory::SpanIndexed, summary.span_quantity);
} else if summary.span_quantity > 0 {
let mut span_limits =
(self.check)(scoping.item(DataCategory::Span), summary.span_quantity)?;
Expand Down
43 changes: 25 additions & 18 deletions tests/integration/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -1720,21 +1720,15 @@ def summarize_outcomes():
metrics_consumer.assert_empty()


@pytest.mark.parametrize(
"category,raises_rate_limited",
[
("transaction", True),
("transaction_indexed", False),
],
)
@pytest.mark.parametrize("category", ["transaction", "transaction_indexed"])
def test_rate_limit_is_consistent_between_transaction_and_spans(
mini_sentry,
relay_with_processing,
transactions_consumer,
spans_consumer,
metrics_consumer,
outcomes_consumer,
category,
raises_rate_limited,
):
"""
Rate limits are consistent between transactions and nested spans.
Expand Down Expand Up @@ -1763,18 +1757,31 @@ def test_rate_limit_is_consistent_between_transaction_and_spans(
transactions_consumer = transactions_consumer()
spans_consumer = spans_consumer()
outcomes_consumer = outcomes_consumer()
metrics_consumer = metrics_consumer()

start = datetime.now(timezone.utc)
end = start + timedelta(seconds=1)

envelope = envelope_with_transaction_and_spans(start, end)
def usage_metrics():
metrics = metrics_consumer.get_metrics()
transaction_count = sum(
m[0]["value"]
for m in metrics
if m[0]["name"] == "c:transactions/usage@none"
)
span_count = sum(
m[0]["value"] for m in metrics if m[0]["name"] == "c:spans/usage@none"
)
return (transaction_count, span_count)

def summarize_outcomes():
counter = Counter()
for outcome in outcomes_consumer.get_outcomes(timeout=10):
counter[(outcome["category"], outcome["outcome"])] += outcome["quantity"]
return dict(counter)

start = datetime.now(timezone.utc)
end = start + timedelta(seconds=1)

envelope = envelope_with_transaction_and_spans(start, end)

# First batch passes
relay.send_envelope(project_id, envelope)

Expand All @@ -1784,6 +1791,7 @@ def summarize_outcomes():
spans = spans_consumer.get_spans(n=2, timeout=10)
assert len(spans) == 2
assert summarize_outcomes() == {(16, 0): 2} # SpanIndexed, Accepted
assert usage_metrics() == (1, 2)

# Second batch nothing passes
relay.send_envelope(project_id, envelope)
Expand All @@ -1797,17 +1805,18 @@ def summarize_outcomes():
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}
assert usage_metrics() == (0, 0)
elif category == "transaction_indexed":
assert summarize_outcomes() == {
(9, 2): 1, # TransactionIndexed, Rate Limited
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}
assert usage_metrics() == (1, 2)

# Third batch might raise 429 since it hits the fast path
maybe_raises = (
pytest.raises(HTTPError, match="429 Client Error")
if raises_rate_limited
if category == "transaction"
else contextlib.nullcontext()
)
with maybe_raises:
Expand All @@ -1820,15 +1829,13 @@ def summarize_outcomes():
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}
assert usage_metrics() == (0, 0)
elif category == "transaction_indexed":
assert summarize_outcomes() == {
(9, 2): 1, # TransactionIndexed, Rate Limited
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}

transactions_consumer.assert_empty()
spans_consumer.assert_empty()
assert usage_metrics() == (1, 2)


@pytest.mark.parametrize(
Expand Down

0 comments on commit c434fa7

Please sign in to comment.