Skip to content

Commit

Permalink
refactor: Remove RedisKeyProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed May 13, 2024
1 parent 2c7580e commit a9035d9
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 55 deletions.
1 change: 0 additions & 1 deletion coordinator/src/block_streams/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use tonic::transport::channel::Channel;
use tonic::Request;

use crate::indexer_config::IndexerConfig;
use crate::redis::RedisKeyProvider;
use crate::utils::exponential_retry;

#[cfg(not(test))]
Expand Down
1 change: 0 additions & 1 deletion coordinator/src/executors/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tonic::transport::channel::Channel;
use tonic::Request;

use crate::indexer_config::IndexerConfig;
use crate::redis::RedisKeyProvider;
use crate::utils::exponential_retry;

#[cfg(not(test))]
Expand Down
28 changes: 20 additions & 8 deletions coordinator/src/indexer_config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use near_primitives::types::AccountId;
use registry_types::{Rule, StartBlock};

use crate::redis::RedisKeyProvider;

#[derive(Debug, Clone, PartialEq)]
pub struct IndexerConfig {
pub account_id: AccountId,
Expand All @@ -16,14 +14,28 @@ pub struct IndexerConfig {
}

impl IndexerConfig {
pub fn get_full_name(&self) -> String {
format!("{}/{}", self.account_id, self.function_name)
}

pub fn get_redis_stream_key(&self) -> String {
format!("{}:block_stream", self.get_full_name())
}

pub fn get_last_published_block_key(&self) -> String {
format!("{}:last_published_block", self.get_full_name())
}

pub fn get_redis_stream_version_key(&self) -> String {
format!("{}:version", self.get_redis_stream_key())
}

pub fn get_state_key(&self) -> String {
format!("{}:state", self.get_full_name())
}

pub fn get_registry_version(&self) -> u64 {
self.updated_at_block_height
.unwrap_or(self.created_at_block_height)
}
}

impl RedisKeyProvider for IndexerConfig {
fn prefix(&self) -> String {
format!("{}/{}", self.account_id, self.function_name)
}
}
69 changes: 24 additions & 45 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,13 @@ use std::fmt::Debug;
use anyhow::Context;
use redis::{aio::ConnectionManager, FromRedisValue, ToRedisArgs};

use crate::indexer_config::IndexerConfig;

#[cfg(test)]
pub use MockRedisClientImpl as RedisClient;
#[cfg(not(test))]
pub use RedisClientImpl as RedisClient;

pub trait RedisKeyProvider {
fn prefix(&self) -> String;

fn get_redis_stream_key(&self) -> String {
format!("{}:block_stream", self.prefix())
}

fn get_last_published_block_key(&self) -> String {
format!("{}:last_published_block", self.prefix())
}

fn get_redis_stream_version_key(&self) -> String {
format!("{}:version", self.prefix())
}

fn get_state_key(&self) -> String {
format!("{}:state", self.prefix())
}
}

#[derive(Clone)]
pub struct RedisClientImpl {
connection: ConnectionManager,
Expand Down Expand Up @@ -91,42 +73,39 @@ impl RedisClientImpl {
Ok(())
}

pub async fn get_stream_version<K: RedisKeyProvider + 'static>(
pub async fn get_stream_version(
&self,
key_provider: &K,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>> {
self.get::<_, u64>(key_provider.get_redis_stream_version_key())
self.get::<_, u64>(indexer_config.get_redis_stream_version_key())
.await
}

pub async fn get_last_published_block<K: RedisKeyProvider + 'static>(
pub async fn get_last_published_block(
&self,
key_provider: &K,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>> {
self.get::<_, u64>(key_provider.get_last_published_block_key())
self.get::<_, u64>(indexer_config.get_last_published_block_key())
.await
}

pub async fn clear_block_stream<K: RedisKeyProvider + 'static>(
&self,
key_provider: &K,
) -> anyhow::Result<()> {
self.del(key_provider.get_redis_stream_key()).await
pub async fn clear_block_stream(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> {
self.del(indexer_config.get_redis_stream_key()).await
}

pub async fn get_indexer_state<K: RedisKeyProvider + 'static>(
pub async fn get_indexer_state(
&self,
key_provider: &K,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<String>> {
self.get(key_provider.get_state_key()).await
self.get(indexer_config.get_state_key()).await
}

pub async fn set_indexer_state<K: RedisKeyProvider + 'static>(
pub async fn set_indexer_state(
&self,
key_provider: &K,
indexer_config: &IndexerConfig,
state: String,
) -> anyhow::Result<()> {
self.set(key_provider.get_state_key(), state).await
self.set(indexer_config.get_state_key(), state).await
}

pub async fn set_migration_complete(&self) -> anyhow::Result<()> {
Expand All @@ -143,25 +122,25 @@ mockall::mock! {
pub RedisClientImpl {
pub async fn connect(redis_url: &str) -> anyhow::Result<Self>;

pub async fn get_indexer_state<K: RedisKeyProvider + 'static>(&self, key_provider: &K) -> anyhow::Result<Option<String>>;
pub async fn get_indexer_state(&self, indexer_config: &IndexerConfig) -> anyhow::Result<Option<String>>;

pub async fn set_indexer_state<K: RedisKeyProvider + 'static>(
pub async fn set_indexer_state(
&self,
key_provider: &K,
indexer_config: &IndexerConfig,
state: String,
) -> anyhow::Result<()>;

pub async fn get_stream_version<K: RedisKeyProvider + 'static>(
pub async fn get_stream_version(
&self,
key_provider: &K,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>>;

pub async fn get_last_published_block<K: RedisKeyProvider + 'static>(
pub async fn get_last_published_block(
&self,
key_provider: &K,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>>;

pub async fn clear_block_stream<K: RedisKeyProvider + 'static>(&self, key_provider: &K) -> anyhow::Result<()>;
pub async fn clear_block_stream(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()>;

pub async fn set_migration_complete(&self) -> anyhow::Result<()>;

Expand Down

0 comments on commit a9035d9

Please sign in to comment.