Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "DPLT-1074 Queue real-time messages on Redis Streams"" #161

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions indexer/queryapi_coordinator/src/indexer_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ pub struct IndexerFunction {
pub provisioned: bool,
pub indexer_rule: IndexerRule,
}

impl IndexerFunction {
pub fn get_full_name(&self) -> String {
format!("{}/{}", self.account_id, self.function_name)
}
}
22 changes: 20 additions & 2 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use near_lake_framework::near_indexer_primitives::{types, StreamerMessage};
use crate::indexer_types::IndexerFunction;
use indexer_types::{IndexerQueueMessage, IndexerRegistry};
use opts::{Opts, Parser};
use storage::ConnectionManager;
use storage::{self, ConnectionManager};

pub(crate) mod cache;
mod historical_block_processing;
Expand Down Expand Up @@ -111,7 +111,11 @@ async fn main() -> anyhow::Result<()> {
})
.buffer_unordered(1usize);

while let Some(_handle_message) = handlers.next().await {}
while let Some(handle_message) = handlers.next().await {
if let Err(err) = handle_message {
tracing::error!(target: INDEXER, "{:#?}", err);
}
}
drop(handlers); // close the channel so the sender will stop

// propagate errors from the sender
Expand Down Expand Up @@ -195,6 +199,20 @@ async fn handle_streamer_message(
if !indexer_function.provisioned {
set_provisioned_flag(&mut indexer_registry_locked, &indexer_function);
}

storage::set(
context.redis_connection_manager,
&format!("{}:storage", indexer_function.get_full_name()),
serde_json::to_string(indexer_function)?,
)
.await?;

storage::add_to_registered_stream(
context.redis_connection_manager,
&format!("{}:stream", indexer_function.get_full_name()),
&[("block_height", block_height)],
)
.await?;
}

stream::iter(indexer_function_messages.into_iter())
Expand Down
57 changes: 57 additions & 0 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};

const STORAGE: &str = "storage_alertexer";

const STREAMS_SET_KEY: &str = "streams";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
}
Expand Down Expand Up @@ -50,6 +52,61 @@ pub async fn get<V: FromRedisValue + std::fmt::Debug>(
tracing::debug!(target: STORAGE, "GET: {:?}: {:?}", &key, &value,);
Ok(value)
}

async fn sadd(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
value: impl ToRedisArgs + std::fmt::Debug,
) -> anyhow::Result<()> {
tracing::debug!(target: STORAGE, "SADD: {:?}: {:?}", key, value);

redis::cmd("SADD")
.arg(key)
.arg(value)
.query_async(&mut redis_connection_manager.clone())
.await?;

Ok(())
}

async fn xadd(
redis_connection_manager: &ConnectionManager,
stream_key: &str,
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)],
) -> anyhow::Result<()> {
tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields);

// TODO: Remove stream cap when we finally start processing it
redis::cmd("XTRIM")
.arg(stream_key)
.arg("MAXLEN")
.arg(100)
.query_async(&mut redis_connection_manager.clone())
.await?;

let mut cmd = redis::cmd("XADD");
cmd.arg(stream_key).arg("*");

for (field, value) in fields {
cmd.arg(*field).arg(value);
}

cmd.query_async(&mut redis_connection_manager.clone())
.await?;

Ok(())
}

pub async fn add_to_registered_stream(
redis_connection_manager: &ConnectionManager,
key: &str,
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)],
) -> anyhow::Result<()> {
sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?;
xadd(redis_connection_manager, key, fields).await?;

Ok(())
}
/// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage.
/// Increments the counter `receipts_{transaction_hash}` by one.
/// The counter holds how many Receipts related to the Transaction are in watching list
Expand Down
Loading