Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(filters): Move generic filter checks to event::filter #3216

Merged
merged 4 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtra
use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::event::FiltersApplied;
use crate::services::project::ProjectState;
use crate::services::project_cache::{AddMetricMeta, ProjectCache, UpdateRateLimits};
use crate::services::test_store::{Capture, TestStore};
Expand Down Expand Up @@ -1294,25 +1295,11 @@ impl EnvelopeProcessorService {
event::finalize(state, &self.inner.config)?;
self.light_normalize_event(state)?;
dynamic_sampling::normalize(state);
event::filter(state, &self.inner.global_config.current())?;
// Don't extract metrics if relay can't apply generic inbound filters.
// An inbound filter applied in another up-to-date relay in chain may
// need to drop the event, and there should not be metrics from dropped
// events.
// In processing relays, always extract metrics to avoid losing them.
let supported_generic_filters = self.inner.global_config.current().filters.is_ok()
&& relay_filter::are_generic_filters_supported(
self.inner
.global_config
.current()
.filters()
.map(|f| f.version),
state.project_state.config.filter_settings.generic.version,
);
let filter_run = event::filter(state, &self.inner.global_config.current())?;

let mut sampling_should_drop = false;

if self.inner.config.processing_enabled() || supported_generic_filters {
if self.inner.config.processing_enabled() || matches!(filter_run, FiltersApplied::Ok) {
let sampling_result = dynamic_sampling::run(state, &self.inner.config);
// Remember sampling decision, before it is reset in `dynamic_sampling::sample_envelope_items`.
sampling_should_drop = sampling_result.should_drop();
Expand Down
41 changes: 38 additions & 3 deletions relay-server/src/services/processor/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,35 @@ pub fn finalize<G: EventProcessing>(
Ok(())
}

/// Decision for applying some filters that don't drop the event.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bikeshedding a bit: I don't really like "decision" here, but the alternatives "result" and "outcome" have other meanings that may generate some confusion. Open to suggestions.

///
/// The enum represents either the success of running all filters and keeping
/// the event, [`FiltersApplied::Ok`], or not running all the filters because
/// some are unsupported, [`FiltersApplied::Unsupported`].
///
/// If there are unsuppported filters, Relay should forward the event upstream
/// so that a more up-to-date Relay can apply filters appropriately. Actions
/// that depend on the outcome of event filtering, such as metric extraction,
/// should be postponed until a filtering decision is made.
pub enum FiltersApplied {
/// All filters have been applied and the event should be kept.
Ok,
/// Some filters are not supported and were not applied.
///
/// Relay should forward events upstream for a more up-to-date Relay to apply these filters.
/// Supported filters were applied and they don't reject the event.
Unsupported,
}

pub fn filter<G: EventProcessing>(
state: &mut ProcessEnvelopeState<G>,
global_config: &GlobalConfig,
) -> Result<(), ProcessingError> {
) -> Result<FiltersApplied, ProcessingError> {
let event = match state.event.value_mut() {
Some(event) => event,
// Some events are created by processing relays (e.g. unreal), so they do not yet
// exist at this point in non-processing relays.
None => return Ok(()),
None => return Ok(FiltersApplied::Ok),
};

let client_ip = state.managed_envelope.envelope().meta().client_addr();
Expand All @@ -293,7 +313,22 @@ pub fn filter<G: EventProcessing>(
.reject(Outcome::Filtered(err.clone()));
ProcessingError::EventFiltered(err)
})
})
})?;

// Don't extract metrics if relay can't apply generic filters. A filter
// applied in another up-to-date relay in chain may need to drop the event,
// and there should not be metrics from dropped events.
// In processing relays, always extract metrics to avoid losing them.
let supported_generic_filters = global_config.filters.is_ok()
&& relay_filter::are_generic_filters_supported(
global_config.filters().map(|f| f.version),
state.project_state.config.filter_settings.generic.version,
);
if supported_generic_filters {
Ok(FiltersApplied::Ok)
} else {
Ok(FiltersApplied::Unsupported)
}
}

/// Apply data privacy rules to the event payload.
Expand Down
Loading