Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(spooler): Remove use of legacy project cache #4419

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 2 additions & 32 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::services::metrics::RouterService;
use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
use crate::services::outcome_aggregator::OutcomeAggregator;
use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService};
use crate::services::projects::cache::{legacy, ProjectCacheHandle, ProjectCacheService};
use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
use crate::services::projects::source::ProjectSource;
use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::stats::RelayStats;
Expand All @@ -39,7 +39,6 @@ use relay_redis::AsyncRedisConnection;
#[cfg(feature = "processing")]
use relay_redis::{PooledClient, RedisError, RedisPool, RedisPools, RedisScripts};
use relay_system::{channel, Addr, Service, ServiceRunner};
use tokio::sync::mpsc;

/// Indicates the type of failure of the server.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
Expand Down Expand Up @@ -68,7 +67,6 @@ pub struct Registry {
pub test_store: Addr<TestStore>,
pub relay_cache: Addr<RelayCache>,
pub global_config: Addr<GlobalConfigManager>,
pub legacy_project_cache: Addr<legacy::ProjectCache>,
pub upstream_relay: Addr<UpstreamRelay>,
pub envelope_buffer: PartitionedEnvelopeBuffer,

Expand Down Expand Up @@ -197,9 +195,6 @@ impl ServiceState {
// service fail if the service is not running.
let global_config = runner.start(global_config);

let (legacy_project_cache, legacy_project_cache_rx) =
channel(legacy::ProjectCacheService::name());

let project_source = ProjectSource::start_in(
&mut runner,
Arc::clone(&config),
Expand Down Expand Up @@ -268,37 +263,18 @@ impl ServiceState {
processor_rx,
);

let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());

let envelope_buffer = PartitionedEnvelopeBuffer::create(
config.spool_partitions(),
config.clone(),
memory_stat.clone(),
global_config_rx.clone(),
envelopes_tx.clone(),
project_cache_handle.clone(),
processor.clone(),
outcome_aggregator.clone(),
test_store.clone(),
&mut runner,
);

// Keep all the services in one context.
let project_cache_services = legacy::Services {
envelope_buffer: envelope_buffer.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
};

runner.start_with(
legacy::ProjectCacheService::new(
project_cache_handle.clone(),
project_cache_services,
envelopes_rx,
),
legacy_project_cache_rx,
);

let health_check = runner.start(HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Expand Down Expand Up @@ -328,7 +304,6 @@ impl ServiceState {
test_store,
relay_cache,
global_config,
legacy_project_cache,
project_cache_handle,
upstream_relay,
envelope_buffer,
Expand Down Expand Up @@ -365,11 +340,6 @@ impl ServiceState {
self.inner.registry.envelope_buffer.buffer(project_key_pair)
}

/// Returns the address of the [`legacy::ProjectCache`] service.
pub fn legacy_project_cache(&self) -> &Addr<legacy::ProjectCache> {
&self.inner.registry.legacy_project_cache
}

/// Returns a [`ProjectCacheHandle`].
pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
&self.inner.registry.project_cache_handle
Expand Down
137 changes: 130 additions & 7 deletions relay-server/src/services/buffer/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,154 @@ use relay_base_schema::project::ProjectKey;
use crate::Envelope;

/// Struct that represents two project keys.
#[derive(Debug, Clone, Copy, Eq, Hash, Ord, PartialOrd, PartialEq)]
#[derive(Debug, Clone, Copy)]
pub struct ProjectKeyPair {
pub own_key: ProjectKey,
pub sampling_key: ProjectKey,
own_key: ProjectKey,
sampling_key: Option<ProjectKey>,
Copy link
Member Author

@iambriccardo iambriccardo Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided that I would prefer to encode in the pair the lack of sampling key and rather expose different methods for making it behave like before, where they were both set and the fallback of sampling_key was own_key.

}

impl ProjectKeyPair {
pub fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self {
Self {
own_key,
sampling_key,
sampling_key: Some(sampling_key),
}
}

pub fn own_key(&self) -> ProjectKey {
self.own_key
}

pub fn sampling_key(&self) -> Option<ProjectKey> {
self.sampling_key
}

pub fn sampling_key_unwrap(&self) -> ProjectKey {
self.sampling_key.unwrap_or(self.own_key)
}

pub fn from_envelope(envelope: &Envelope) -> Self {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);
Self::new(own_key, sampling_key)
let sampling_key = envelope.sampling_key();
Self {
own_key,
sampling_key,
}
}

pub fn iter(&self) -> impl Iterator<Item = ProjectKey> {
let Self {
own_key,
sampling_key,
} = self;
std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key))
std::iter::once(*own_key).chain(sampling_key.filter(|k| k != own_key))
}
}

impl PartialEq for ProjectKeyPair {
fn eq(&self, other: &Self) -> bool {
self.own_key() == other.own_key()
&& self.sampling_key_unwrap() == other.sampling_key_unwrap()
}
}

impl Eq for ProjectKeyPair {}

impl PartialOrd for ProjectKeyPair {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ProjectKeyPair {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
let own_comparison = self.own_key().cmp(&other.own_key());
if own_comparison != std::cmp::Ordering::Equal {
return own_comparison;
};
self.sampling_key_unwrap().cmp(&other.sampling_key_unwrap())
}
}

impl std::hash::Hash for ProjectKeyPair {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.own_key().hash(state);
self.sampling_key_unwrap().hash(state);
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;

#[test]
fn test_project_key_pair_new() {
let own = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair = ProjectKeyPair::new(own, sampling);
assert_eq!(pair.own_key(), own);
assert_eq!(pair.sampling_key_unwrap(), sampling);
}

#[test]
fn test_project_key_pair_equality() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair1 = ProjectKeyPair::new(key1, key2);
let pair2 = ProjectKeyPair::new(key1, key2);
let pair3 = ProjectKeyPair::new(key2, key1);

assert_eq!(pair1, pair2);
assert_ne!(pair1, pair3);
}

#[test]
fn test_project_key_pair_ordering() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair1 = ProjectKeyPair::new(key1, key2);
let pair2 = ProjectKeyPair::new(key2, key1);
let pair3 = ProjectKeyPair {
own_key: key1,
sampling_key: None,
};

assert!(pair1 < pair2);
assert!(pair3 < pair2);
}

#[test]
fn test_project_key_pair_hash() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair1 = ProjectKeyPair::new(key1, key2);
let pair2 = ProjectKeyPair::new(key1, key2);
let pair3 = ProjectKeyPair::new(key2, key1);

let mut set = HashSet::new();
set.insert(pair1);
assert!(set.contains(&pair2));
assert!(!set.contains(&pair3));
}

#[test]
fn test_project_key_pair_iter() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

// Test with different sampling key
let pair = ProjectKeyPair::new(key1, key2);
let keys: Vec<_> = pair.iter().collect();
assert_eq!(keys, vec![key1, key2]);

// Test with same key (should only yield one key)
let pair = ProjectKeyPair::new(key1, key1);
let keys: Vec<_> = pair.iter().collect();
assert_eq!(keys, vec![key1]);
}
}
14 changes: 10 additions & 4 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,10 @@ where

Ok(match (stack.peek().await?, ready) {
(None, _) => Peek::Empty,
(Some(last_received_at), true) => Peek::Ready { last_received_at },
(Some(last_received_at), true) => Peek::Ready {
project_key_pair: *project_key_pair,
last_received_at,
},
(Some(last_received_at), false) => Peek::NotReady {
project_key_pair: *project_key_pair,
next_project_fetch: *next_project_fetch,
Expand Down Expand Up @@ -400,11 +403,11 @@ where
let mut found = false;
for (subkey, readiness) in [
(
project_key_pair.own_key,
project_key_pair.own_key(),
&mut stack.readiness.own_project_ready,
),
(
project_key_pair.sampling_key,
project_key_pair.sampling_key_unwrap(),
&mut stack.readiness.sampling_project_ready,
),
] {
Expand Down Expand Up @@ -561,6 +564,7 @@ where
pub enum Peek {
Empty,
Ready {
project_key_pair: ProjectKeyPair,
last_received_at: DateTime<Utc>,
},
NotReady {
Expand All @@ -574,7 +578,9 @@ impl Peek {
pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
match self {
Self::Empty => None,
Self::Ready { last_received_at }
Self::Ready {
last_received_at, ..
}
| Self::NotReady {
last_received_at, ..
} => Some(*last_received_at),
Expand Down
Loading
Loading