Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove async indexer_rules_engine #387

Merged
merged 5 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@ env:

jobs:
check:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Run check
working-directory: ./indexer
run: cargo check

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Run check
working-directory: ./indexer
run: cargo test


rustfmt:
name: rustfmt
runs-on: ubuntu-20.04
Expand Down
3,551 changes: 3,551 additions & 0 deletions indexer/indexer_rules_engine/blocks/93085141.json

Large diffs are not rendered by default.

24 changes: 2 additions & 22 deletions indexer/indexer_rules_engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,12 @@
pub mod matcher;
pub mod outcomes_reducer;
mod outcomes_reducer_sync;
pub mod types;

use indexer_rule_type::indexer_rule::{IndexerRule, MatchingRule};
use near_lake_framework::near_indexer_primitives::StreamerMessage;
use types::indexer_rule_match::{ChainId, IndexerRuleMatch};

pub async fn reduce_indexer_rule_matches(
indexer_rule: &IndexerRule,
streamer_message: &StreamerMessage,
chain_id: ChainId,
) -> anyhow::Result<Vec<IndexerRuleMatch>> {
Ok(match &indexer_rule.matching_rule {
MatchingRule::ActionAny { .. }
| MatchingRule::ActionFunctionCall { .. }
| MatchingRule::Event { .. } => {
outcomes_reducer::reduce_indexer_rule_matches_from_outcomes(
indexer_rule,
streamer_message,
chain_id,
)
.await?
}
})
}

pub fn reduce_indexer_rule_matches_sync(
pub fn reduce_indexer_rule_matches(
indexer_rule: &IndexerRule,
streamer_message: &StreamerMessage,
chain_id: ChainId,
Expand All @@ -35,7 +15,7 @@ pub fn reduce_indexer_rule_matches_sync(
MatchingRule::ActionAny { .. }
| MatchingRule::ActionFunctionCall { .. }
| MatchingRule::Event { .. } => {
outcomes_reducer_sync::reduce_indexer_rule_matches_from_outcomes(
outcomes_reducer::reduce_indexer_rule_matches_from_outcomes(
indexer_rule,
streamer_message,
chain_id,
Expand Down
54 changes: 21 additions & 33 deletions indexer/indexer_rules_engine/src/outcomes_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use futures::future::try_join_all;

use crate::matcher;
use crate::types::events::Event;
use crate::types::indexer_rule_match::{ChainId, IndexerRuleMatch, IndexerRuleMatchPayload};
Expand All @@ -8,12 +6,12 @@ use near_lake_framework::near_indexer_primitives::{
IndexerExecutionOutcomeWithReceipt, StreamerMessage,
};

pub async fn reduce_indexer_rule_matches_from_outcomes(
pub fn reduce_indexer_rule_matches_from_outcomes(
indexer_rule: &IndexerRule,
streamer_message: &StreamerMessage,
chain_id: ChainId,
) -> anyhow::Result<Vec<IndexerRuleMatch>> {
let build_indexer_rule_match_futures = streamer_message
) -> Vec<IndexerRuleMatch> {
streamer_message
.shards
.iter()
.flat_map(|shard| {
Expand All @@ -33,19 +31,18 @@ pub async fn reduce_indexer_rule_matches_from_outcomes(
streamer_message.block.header.height,
chain_id.clone(),
)
});

try_join_all(build_indexer_rule_match_futures).await
})
.collect()
}

async fn build_indexer_rule_match(
fn build_indexer_rule_match(
indexer_rule: &IndexerRule,
receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt,
block_header_hash: String,
block_height: u64,
chain_id: ChainId,
) -> anyhow::Result<IndexerRuleMatch> {
Ok(IndexerRuleMatch {
) -> IndexerRuleMatch {
IndexerRuleMatch {
chain_id: chain_id.clone(),
indexer_rule_id: indexer_rule.id,
indexer_rule_name: indexer_rule.name.clone(),
Expand All @@ -55,7 +52,7 @@ async fn build_indexer_rule_match(
block_header_hash,
),
block_height,
})
}
}

fn build_indexer_rule_match_payload(
Expand Down Expand Up @@ -127,9 +124,12 @@ mod tests {
std::fs::read_to_string(path).unwrap()
}
fn read_local_streamer_message(block_height: u64) -> StreamerMessage {
let path = format!("../blocks/{}.json", block_height);
let json = serde_json::from_str(&read_local_file(&path)).unwrap();
serde_json::from_value(json).unwrap()
let path = format!(
"{}/blocks/{}.json",
env!("CARGO_MANIFEST_DIR"),
block_height
);
serde_json::from_str(&read_local_file(&path)).unwrap()
}

#[tokio::test]
Expand All @@ -149,9 +149,7 @@ mod tests {
&wildcard_rule,
&streamer_message,
ChainId::Testnet,
)
.await
.unwrap();
);

assert_eq!(result.len(), 0);
}
Expand All @@ -173,9 +171,7 @@ mod tests {
&wildcard_rule,
&streamer_message,
ChainId::Testnet,
)
.await
.unwrap();
);

assert_eq!(result.len(), 1); // There are two matches, until we add Extraction we are just matching the first one (block matching)
}
Expand All @@ -197,9 +193,7 @@ mod tests {
&wildcard_rule,
&streamer_message,
ChainId::Testnet,
)
.await
.unwrap();
);

assert_eq!(result.len(), 1); // see Extraction note in previous test

Expand All @@ -217,9 +211,7 @@ mod tests {
&wildcard_rule,
&streamer_message,
ChainId::Testnet,
)
.await
.unwrap();
);

assert_eq!(result.len(), 1); // see Extraction note in previous test
}
Expand All @@ -241,9 +233,7 @@ mod tests {
&wildcard_rule,
&streamer_message,
ChainId::Testnet,
)
.await
.unwrap();
);

assert_eq!(result.len(), 1); // There are two matches, until we add Extraction we are just matching the first one (block matching)
}
Expand All @@ -265,9 +255,7 @@ mod tests {
&wildcard_rule,
&streamer_message,
ChainId::Testnet,
)
.await
.unwrap();
);

assert_eq!(result.len(), 1); // There are two matches, until we add Extraction we are just matching the first one (block matching)
}
Expand Down
114 changes: 0 additions & 114 deletions indexer/indexer_rules_engine/src/outcomes_reducer_sync.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub(crate) async fn process_historical_messages(
break;
}

let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync(
let matches = indexer_rules_engine::reduce_indexer_rule_matches(
&indexer_function.indexer_rule,
&streamer_message,
chain_id.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod tests {
/// Parses some env vars from .env, Run with
/// cargo test historical_block_processing_integration_tests::test_indexing_metadata_file;
#[tokio::test]
#[ignore]
async fn test_indexing_metadata_file() {
let aws_config = aws_config::from_env().load().await;
let s3_client = aws_sdk_s3::Client::new(&aws_config);
Expand All @@ -43,6 +44,7 @@ mod tests {
/// Parses some env vars from .env, Run with
/// cargo test historical_block_processing_integration_tests::test_process_historical_messages;
#[tokio::test]
#[ignore]
async fn test_process_historical_messages() {
opts::init_tracing();

Expand Down Expand Up @@ -97,6 +99,7 @@ mod tests {
/// Parses some env vars from .env, Run with
/// cargo test historical_block_processing_integration_tests::test_filter_matching_wildcard_blocks_from_index_files;
#[tokio::test]
#[ignore]
async fn test_filter_matching_wildcard_blocks_from_index_files() {
let contract = "*.keypom.near";
let matching_rule = MatchingRule::ActionAny {
Expand Down Expand Up @@ -154,6 +157,7 @@ mod tests {
/// Parses some env vars from .env, Run with
/// cargo test historical_block_processing_integration_tests::test_filter_matching_blocks_from_index_files;
#[tokio::test]
#[ignore]
async fn test_filter_matching_blocks_from_index_files() {
let contract = "*.agency.near";
let matching_rule = MatchingRule::ActionAny {
Expand Down
11 changes: 4 additions & 7 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,7 @@ async fn reduce_rule_matches_for_indexer_function<'x>(
&indexer_function.indexer_rule,
streamer_message,
chain_id.clone(),
)
.await?;
);
Ok(IndexerFunctionWithMatches {
indexer_function,
matches,
Expand Down Expand Up @@ -317,13 +316,11 @@ mod tests {

let indexer_registry: SharedIndexerRegistry =
std::sync::Arc::new(Mutex::new(indexer_registry));
let mut indexer_registry_locked = indexer_registry.lock().await;

set_provisioned_flag(&indexer_registry, &indexer_function);
set_provisioned_flag(&indexer_registry, &indexer_function).await;

let account_functions = indexer_registry_locked
.get(&indexer_function.account_id)
.unwrap();
let lock = indexer_registry.lock().await;
let account_functions = lock.get(&indexer_function.account_id).unwrap();
let indexer_function = account_functions
.get(&indexer_function.function_name)
.unwrap();
Expand Down
Loading
Loading