Skip to content

Commit

Permalink
ref(project-cache): Check revision before loading from Redis (#3892)
Browse files Browse the repository at this point in the history
Checks the 'new' revision key before actually loading and parsing the
project config.

Revision introduced here: getsentry/sentry#75523
Change is backwards compatible, if the key is missing the check is
simply skipped.

I was considering propagating the information whether something was
refreshed or a fresh fetch through the state channel, but after review
and consideration, I don't think that is useful information, we still
have to reset a bunch of internal state and move the project out of the
`in flight` status. All if which we already do, the only thing we could
change is skipping the assignment of the project info contained in the
project fetch state (because that's the same state as the old), but this
is already only an `Arc`.
  • Loading branch information
Dav1dde committed Aug 6, 2024
1 parent ffd218f commit be744a1
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 52 deletions.
12 changes: 10 additions & 2 deletions relay-server/src/services/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,11 @@ impl Project {
"project {} state requested {attempts} times",
self.project_key
);
project_cache.send(RequestUpdate::new(self.project_key, no_cache));
project_cache.send(RequestUpdate {
project_key: self.project_key,
no_cache,
cached_state: self.state.clone(),
});
}

channel
Expand Down Expand Up @@ -473,7 +477,11 @@ impl Project {
self.project_key
);

project_cache.send(RequestUpdate::new(self.project_key, no_cache));
project_cache.send(RequestUpdate {
project_key: self.project_key,
no_cache,
cached_state: self.state.clone(),
});
return old_state;
}

Expand Down
11 changes: 11 additions & 0 deletions relay-server/src/services/project/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ impl ProjectState {
}
}

/// Returns the revision of the contained project info.
///
/// `None` if the revision is missing or not available.
pub fn revision(&self) -> Option<&str> {
match &self {
ProjectState::Enabled(info) => info.rev.as_deref(),
ProjectState::Disabled => None,
ProjectState::Pending => None,
}
}

/// Creates `Scoping` for this project if the state is loaded.
///
/// Returns `Some` if the project state has been fetched and contains a project identifier,
Expand Down
16 changes: 16 additions & 0 deletions relay-server/src/services/project/state/fetch_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ impl ProjectFetchState {
}
}

/// Refreshes the expiry of the fetch state.
pub fn refresh(old: ProjectFetchState) -> Self {
Self {
last_fetch: Some(Instant::now()),
state: old.state,
}
}

/// Project state for an unknown but allowed project.
///
/// This state is used for forwarding in Proxy mode.
pub fn allowed() -> Self {
Self::enabled(ProjectInfo {
project_id: None,
last_change: None,
rev: None,
public_keys: Default::default(),
slug: None,
config: ProjectConfig::default(),
Expand Down Expand Up @@ -119,6 +128,13 @@ impl ProjectFetchState {
Expiry::Updated
}
}

/// Returns the revision of the contained project state.
///
/// See: [`ProjectState::revision`].
pub fn revision(&self) -> Option<&str> {
self.state.revision()
}
}

/// Wrapper for a project state, with expiry information.
Expand Down
4 changes: 3 additions & 1 deletion relay-server/src/services/project/state/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ pub struct ProjectInfo {
///
/// This might be `None` in some rare cases like where states
/// are faked locally.
#[serde(default)]
pub last_change: Option<DateTime<Utc>>,
/// The revision id of the project config.
pub rev: Option<String>,
/// Indicates that the project is disabled.
/// A container of known public keys in the project.
///
Expand All @@ -56,6 +57,7 @@ pub struct ProjectInfo {
pub struct LimitedProjectInfo {
pub project_id: Option<ProjectId>,
pub last_change: Option<DateTime<Utc>>,
pub rev: Option<String>,
pub public_keys: SmallVec<[PublicKeyConfig; 1]>,
pub slug: Option<String>,
#[serde(with = "LimitedProjectConfig")]
Expand Down
61 changes: 35 additions & 26 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,15 @@ use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff
#[derive(Clone, Debug)]
pub struct RequestUpdate {
/// The public key to fetch the project by.
project_key: ProjectKey,

pub project_key: ProjectKey,
/// If true, all caches should be skipped and a fresh state should be computed.
no_cache: bool,
}

impl RequestUpdate {
pub fn new(project_key: ProjectKey, no_cache: bool) -> Self {
Self {
project_key,
no_cache,
}
}
pub no_cache: bool,
/// Previously cached fetch state, if available.
///
/// The upstream request will include the revision of the currently cached state,
/// if the upstream does not have a different revision, this cached
/// state is re-used and its expiry bumped.
pub cached_state: ProjectFetchState,
}

/// Returns the project state.
Expand Down Expand Up @@ -459,7 +455,12 @@ impl ProjectSource {
}
}

async fn fetch(self, project_key: ProjectKey, no_cache: bool) -> Result<ProjectFetchState, ()> {
async fn fetch(
self,
project_key: ProjectKey,
no_cache: bool,
_cached_state: ProjectFetchState,
) -> Result<ProjectFetchState, ()> {
let state_opt = self
.local_source
.send(FetchOptionalProjectState { project_key })
Expand All @@ -479,27 +480,34 @@ impl ProjectSource {

#[cfg(feature = "processing")]
if let Some(redis_source) = self.redis_source {
let revision = _cached_state.revision().map(String::from);

let redis_permit = self.redis_semaphore.acquire().await.map_err(|_| ())?;
let state_fetch_result =
tokio::task::spawn_blocking(move || redis_source.get_config(project_key))
.await
.map_err(|_| ())?;
let state_fetch_result = tokio::task::spawn_blocking(move || {
redis_source.get_config_if_changed(project_key, revision.as_deref())
})
.await
.map_err(|_| ())?;
drop(redis_permit);

let state = match state_fetch_result {
Ok(state) => state.sanitized(),
match state_fetch_result {
// New state fetched from Redis, possibly pending.
Ok(Some(state)) => {
let state = state.sanitized();
if !state.is_pending() {
return Ok(ProjectFetchState::new(state));
}
}
// Redis reported that we're holding an up-to-date version of the state already,
// refresh the state and return the old cached state again.
Ok(None) => return Ok(ProjectFetchState::refresh(_cached_state)),
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
"failed to fetch project from Redis",
);
ProjectState::Pending
}
};

if !matches!(state, ProjectState::Pending) {
return Ok(ProjectFetchState::new(state));
}
};

self.upstream_source
Expand Down Expand Up @@ -734,6 +742,7 @@ impl ProjectCacheBroker {
let RequestUpdate {
project_key,
no_cache,
cached_state,
} = message;

// Bump the update time of the project in our hashmap to evade eviction.
Expand All @@ -750,14 +759,14 @@ impl ProjectCacheBroker {
tokio::time::sleep_until(next_attempt).await;
}
let state = source
.fetch(project_key, no_cache)
.fetch(project_key, no_cache, cached_state)
.await
.unwrap_or_else(|()| ProjectFetchState::disabled());

let message = UpdateProjectState {
project_key,
state,
no_cache,
state,
};

sender.send(message).ok();
Expand Down
93 changes: 73 additions & 20 deletions relay-server/src/services/project_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,85 @@ impl RedisProjectSource {
RedisProjectSource { config, redis }
}

pub fn get_config(&self, key: ProjectKey) -> Result<ProjectState, RedisProjectError> {
let mut command = relay_redis::redis::cmd("GET");

let prefix = self.config.projectconfig_cache_prefix();
command.arg(format!("{prefix}:{key}"));

let raw_response_opt: Option<Vec<u8>> = command
.query(&mut self.redis.client()?.connection()?)
.map_err(RedisError::Redis)?;

let response = match raw_response_opt {
Some(response) => {
metric!(counter(RelayCounters::ProjectStateRedis) += 1, hit = "true");
let parsed = parse_redis_response(response.as_slice())?;
ProjectState::from(parsed)
}
None => {
/// Fetches a project config from Redis.
///
/// Returns `None` if the the project config stored in Redis has the same `revision`.
/// Always returns a project state if the passed `revision` is `None`.
///
/// The returned project state is [`ProjectState::Pending`] if the requested project config is not
/// stored in Redis.
pub fn get_config_if_changed(
&self,
key: ProjectKey,
revision: Option<&str>,
) -> Result<Option<ProjectState>, RedisProjectError> {
let mut client = self.redis.client()?;
let mut connection = client.connection()?;

// Only check for the revision if we were passed a revision.
if let Some(revision) = revision {
let current_revision: Option<String> = relay_redis::redis::cmd("GET")
.arg(self.get_redis_rev_key(key))
.query(&mut connection)
.map_err(RedisError::Redis)?;

relay_log::trace!(
"Redis revision {current_revision:?}, requested revision {revision:?}"
);
if current_revision.as_deref() == Some(revision) {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "false"
hit = "revision",
);
ProjectState::Pending
return Ok(None);
}
}

let raw_response_opt: Option<Vec<u8>> = relay_redis::redis::cmd("GET")
.arg(self.get_redis_project_config_key(key))
.query(&mut connection)
.map_err(RedisError::Redis)?;

let Some(response) = raw_response_opt else {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "false"
);
return Ok(Some(ProjectState::Pending));
};

Ok(response)
let response = ProjectState::from(parse_redis_response(response.as_slice())?);

// If we were passed a revision, check if we just loaded the same revision from Redis.
//
// We always want to keep the old revision alive if possible, since the already loaded
// version has already initialized caches.
//
// While this is theoretically possible this should always been handled using the above revision
// check using the additional Redis key.
if revision.is_some() && response.revision() == revision {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "project_config_revision"
);
Ok(None)
} else {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "project_config"
);
Ok(Some(response))
}
}

fn get_redis_project_config_key(&self, key: ProjectKey) -> String {
let prefix = self.config.projectconfig_cache_prefix();
format!("{prefix}:{key}")
}

fn get_redis_rev_key(&self, key: ProjectKey) -> String {
let prefix = self.config.projectconfig_cache_prefix();
format!("{prefix}:{key}.rev")
}
}

Expand Down
9 changes: 7 additions & 2 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,13 @@ pub enum RelayCounters {
ProjectStateNoCache,
/// Number of times a project state is requested from the central Redis cache.
///
/// This has a tag `hit` with values `true` or `false`. If false the request will be
/// sent to the sentry endpoint.
/// This metric is tagged with:
/// - `hit`: One of:
/// - `revision`: the cached version was validated to be up to date using its revision.
/// - `project_config`: the request was handled by the cache.
/// - `project_config_revision`: the request was handled by the cache and the revision did
/// not change.
/// - `false`: the request will be sent to the sentry endpoint.
#[cfg(feature = "processing")]
ProjectStateRedis,
/// Number of times a project is looked up from the cache.
Expand Down
38 changes: 37 additions & 1 deletion tests/integration/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,5 +256,41 @@ def test_processing_redis_query_compressed(

relay.send_event(project_id)

event, v = events_consumer.get_event()
event, _ = events_consumer.get_event()
assert event["logentry"] == {"formatted": "Hello, World!"}


def test_processing_redis_query_with_revision(
mini_sentry,
redis_client,
relay_with_processing,
events_consumer,
outcomes_consumer,
):
outcomes_consumer = outcomes_consumer()
events_consumer = events_consumer()

relay = relay_with_processing(
{"limits": {"query_timeout": 10}, "cache": {"project_expiry": 1}}
)
project_id = 42
cfg = mini_sentry.add_full_project_config(project_id)
cfg["rev"] = "123"

key = mini_sentry.get_dsn_public_key(project_id)
projectconfig_cache_prefix = relay.options["processing"][
"projectconfig_cache_prefix"
]
redis_client.setex(f"{projectconfig_cache_prefix}:{key}", 3600, json.dumps(cfg))
redis_client.setex(f"{projectconfig_cache_prefix}:{key}.rev", 3600, cfg["rev"])

relay.send_event(project_id)
event, _ = events_consumer.get_event()
assert event["logentry"] == {"formatted": "Hello, World!"}

# 1 second timeout on the project cache, make sure it times out
time.sleep(2)

relay.send_event(project_id)
event, _ = events_consumer.get_event()
assert event["logentry"] == {"formatted": "Hello, World!"}

0 comments on commit be744a1

Please sign in to comment.