From 78b359233555bbace2863ada12a6a4f55d84c391 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Wed, 2 Aug 2023 08:24:58 +1200 Subject: [PATCH] Revert "DPLT-1074 Queue real-time messages on Redis Streams (#157)" This reverts commit d2f44e857403eb1b1b6a7c8b534ed83c04db11a0. --- .../queryapi_coordinator/src/indexer_types.rs | 6 -- indexer/queryapi_coordinator/src/main.rs | 16 +----- indexer/storage/src/lib.rs | 56 ------------------- 3 files changed, 1 insertion(+), 77 deletions(-) diff --git a/indexer/queryapi_coordinator/src/indexer_types.rs b/indexer/queryapi_coordinator/src/indexer_types.rs index f3880320b..4a4993ce5 100644 --- a/indexer/queryapi_coordinator/src/indexer_types.rs +++ b/indexer/queryapi_coordinator/src/indexer_types.rs @@ -40,9 +40,3 @@ pub struct IndexerFunction { pub provisioned: bool, pub indexer_rule: IndexerRule, } - -impl IndexerFunction { - pub fn get_full_name(&self) -> String { - format!("{}/{}", self.account_id, self.function_name) - } -} diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 6cd7398af..e447f3316 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -10,7 +10,7 @@ use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; use crate::indexer_types::IndexerFunction; use indexer_types::{IndexerQueueMessage, IndexerRegistry}; use opts::{Opts, Parser}; -use storage::{self, ConnectionManager}; +use storage::ConnectionManager; pub(crate) mod cache; mod historical_block_processing; @@ -195,20 +195,6 @@ async fn handle_streamer_message( if !indexer_function.provisioned { set_provisioned_flag(&mut indexer_registry_locked, &indexer_function); } - - storage::set( - context.redis_connection_manager, - &format!("{}:storage", indexer_function.get_full_name()), - serde_json::to_string(indexer_function)?, - ) - .await?; - - storage::add_to_registered_stream( - context.redis_connection_manager, - &format!("{}:stream", indexer_function.get_full_name()), - &[("block_height", block_height)], - ) - .await? } stream::iter(indexer_function_messages.into_iter()) diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 332d1c789..59e8c8b04 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -2,8 +2,6 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; -const STREAMS_SET_KEY: &str = "streams"; - pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") } @@ -52,60 +50,6 @@ pub async fn get( tracing::debug!(target: STORAGE, "GET: {:?}: {:?}", &key, &value,); Ok(value) } - -async fn sadd( - redis_connection_manager: &ConnectionManager, - key: impl ToRedisArgs + std::fmt::Debug, - value: impl ToRedisArgs + std::fmt::Debug, -) -> anyhow::Result<()> { - tracing::debug!(target: STORAGE, "SADD: {:?}: {:?}", key, value); - - redis::cmd("SADD") - .arg(key) - .arg(value) - .query_async(&mut redis_connection_manager.clone()) - .await?; - - Ok(()) -} - -async fn xadd( - redis_connection_manager: &ConnectionManager, - stream_key: &str, - fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], -) -> anyhow::Result<()> { - tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields); - - // TODO: Remove stream cap when we finally start processing it - redis::cmd("XTRIM") - .arg("MAXLEN") - .arg(100) - .query_async(&mut redis_connection_manager.clone()) - .await?; - - let mut cmd = redis::cmd("XADD"); - cmd.arg(stream_key).arg("*"); - - for (field, value) in fields { - cmd.arg(*field).arg(value); - } - - cmd.query_async(&mut redis_connection_manager.clone()) - .await?; - - Ok(()) -} - -pub async fn add_to_registered_stream( - redis_connection_manager: &ConnectionManager, - key: &str, - fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], -) -> anyhow::Result<()> { - sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?; - xadd(redis_connection_manager, key, fields).await?; - - Ok(()) -} /// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage. /// Increments the counter `receipts_{transaction_hash}` by one. /// The counter holds how many Receipts related to the Transaction are in watching list