Skip to content

Commit

Permalink
fix(spans): Fixes span outcomes and inherited rate limits
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Jul 4, 2024
1 parent 1ebb22e commit f5a61fd
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Fixes raw OS description parsing for iOS and iPadOS originating from the Unity SDK. ([#3780](https://github.com/getsentry/relay/pull/3780))
- Fixes metrics dropped due to missing project state. ([#3553](https://github.com/getsentry/relay/issues/3553))
- Incorrect span outcomes when generated from a indexed transaction quota. ([#3793](https://github.com/getsentry/relay/pull/3793))
- Report outcomes for spans when transactions are rate limited. ([#3749](https://github.com/getsentry/relay/pull/3749))

**Internal**:
Expand Down
22 changes: 11 additions & 11 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tokio::sync::Semaphore;
use {
crate::metrics::MetricsLimiter,
crate::services::store::{Store, StoreEnvelope},
crate::utils::{sample, EnvelopeLimiter, ItemAction},
crate::utils::{sample, Enforcement, EnvelopeLimiter, ItemAction},
itertools::Itertools,
relay_cardinality::{
CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
Expand Down Expand Up @@ -660,12 +660,12 @@ impl ProcessingExtractedMetrics {
/// This is used to apply rate limits which have been enforced on sampled items of an envelope
/// to also consistently apply to the metrics extracted from these items.
#[cfg(feature = "processing")]
fn enforce_limits(&mut self, scoping: Scoping, limits: &RateLimits) {
for (category, namespace) in [
(DataCategory::Transaction, MetricNamespace::Transactions),
(DataCategory::Span, MetricNamespace::Spans),
fn apply_enforcement(&mut self, enforcement: &Enforcement) {
for (namespace, limit) in [
(MetricNamespace::Transactions, &enforcement.event),
(MetricNamespace::Spans, &enforcement.spans),
] {
if !limits.check(scoping.item(category)).is_empty() {
if limit.is_active() {
relay_log::trace!(
"dropping {namespace} metrics, due to enforced limit on envelope"
);
Expand Down Expand Up @@ -1326,18 +1326,18 @@ impl EnvelopeProcessorService {
envelope_limiter.compute(state.managed_envelope.envelope_mut(), &scoping)?
});
let event_active = enforcement.event_active();

// Use the same rate limits as used for the envelope on the metrics.
// Those rate limits should not be checked for expiry or similar to ensure a consistent
// limiting of envelope items and metrics.
state.extracted_metrics.apply_enforcement(&enforcement);
enforcement.apply_with_outcomes(&mut state.managed_envelope);

if event_active {
state.remove_event();
debug_assert!(state.envelope().is_empty());
}

// Use the same rate limits as used for the envelope on the metrics.
// Those rate limits should not be checked for expiry or similar to ensure a consistent
// limiting of envelope items and metrics.
state.extracted_metrics.enforce_limits(scoping, &limits);

if !limits.is_empty() {
self.inner
.addrs
Expand Down
8 changes: 6 additions & 2 deletions relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,12 @@ impl CategoryLimit {
}
}

/// Recreates the category limit for a new category with the same reason.
/// Recreates the category limit, if active, for a new category with the same reason.
pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
if !self.is_active() {
return Self::default();
}

Self {
category,
quantity,
Expand Down Expand Up @@ -676,7 +680,7 @@ where

enforcement.spans_indexed = enforcement
.event_indexed
.clone_for(DataCategory::SpanIndexed, summary.span_quantity)
.clone_for(DataCategory::SpanIndexed, summary.span_quantity);
} else if summary.span_quantity > 0 {
let mut span_limits =
(self.check)(scoping.item(DataCategory::Span), summary.span_quantity)?;
Expand Down
43 changes: 25 additions & 18 deletions tests/integration/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -1720,21 +1720,15 @@ def summarize_outcomes():
metrics_consumer.assert_empty()


@pytest.mark.parametrize(
"category,raises_rate_limited",
[
("transaction", True),
("transaction_indexed", False),
],
)
@pytest.mark.parametrize("category", ["transaction", "transaction_indexed"])
def test_rate_limit_is_consistent_between_transaction_and_spans(
mini_sentry,
relay_with_processing,
transactions_consumer,
spans_consumer,
metrics_consumer,
outcomes_consumer,
category,
raises_rate_limited,
):
"""
Rate limits are consistent between transactions and nested spans.
Expand Down Expand Up @@ -1763,18 +1757,31 @@ def test_rate_limit_is_consistent_between_transaction_and_spans(
transactions_consumer = transactions_consumer()
spans_consumer = spans_consumer()
outcomes_consumer = outcomes_consumer()
metrics_consumer = metrics_consumer()

start = datetime.now(timezone.utc)
end = start + timedelta(seconds=1)

envelope = envelope_with_transaction_and_spans(start, end)
def usage_metrics():
metrics = metrics_consumer.get_metrics()
transaction_count = sum(
m[0]["value"]
for m in metrics
if m[0]["name"] == "c:transactions/usage@none"
)
span_count = sum(
m[0]["value"] for m in metrics if m[0]["name"] == "c:spans/usage@none"
)
return (transaction_count, span_count)

def summarize_outcomes():
counter = Counter()
for outcome in outcomes_consumer.get_outcomes(timeout=10):
counter[(outcome["category"], outcome["outcome"])] += outcome["quantity"]
return dict(counter)

start = datetime.now(timezone.utc)
end = start + timedelta(seconds=1)

envelope = envelope_with_transaction_and_spans(start, end)

# First batch passes
relay.send_envelope(project_id, envelope)

Expand All @@ -1784,6 +1791,7 @@ def summarize_outcomes():
spans = spans_consumer.get_spans(n=2, timeout=10)
assert len(spans) == 2
assert summarize_outcomes() == {(16, 0): 2} # SpanIndexed, Accepted
assert usage_metrics() == (1, 2)

# Second batch nothing passes
relay.send_envelope(project_id, envelope)
Expand All @@ -1797,17 +1805,18 @@ def summarize_outcomes():
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}
assert usage_metrics() == (0, 0)
elif category == "transaction_indexed":
assert summarize_outcomes() == {
(9, 2): 1, # TransactionIndexed, Rate Limited
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}
assert usage_metrics() == (1, 2)

# Third batch might raise 429 since it hits the fast path
maybe_raises = (
pytest.raises(HTTPError, match="429 Client Error")
if raises_rate_limited
if category == "transaction"
else contextlib.nullcontext()
)
with maybe_raises:
Expand All @@ -1820,15 +1829,13 @@ def summarize_outcomes():
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}
assert usage_metrics() == (0, 0)
elif category == "transaction_indexed":
assert summarize_outcomes() == {
(9, 2): 1, # TransactionIndexed, Rate Limited
(12, 2): 2, # Span, Rate Limited
(16, 2): 2, # SpanIndexed, Rate Limited
}

transactions_consumer.assert_empty()
spans_consumer.assert_empty()
assert usage_metrics() == (1, 2)


@pytest.mark.parametrize(
Expand Down

0 comments on commit f5a61fd

Please sign in to comment.