From 2c80f7d27932ad6b131c2ee2fad3204a89de1b96 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 14 Jun 2024 17:09:43 +0530 Subject: [PATCH] feat : added review #1 changes : added error handling for snos workers --- crates/orchestrator/Cargo.toml | 2 +- crates/orchestrator/src/database/mod.rs | 2 +- crates/orchestrator/src/database/mongodb/mod.rs | 2 +- crates/orchestrator/src/tests/workers/mod.rs | 8 ++++---- crates/orchestrator/src/workers/snos.rs | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 528582a1..6c256547 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -13,6 +13,7 @@ path = "src/main.rs" [dependencies] arc-swap = { workspace = true } +async-std = "1.12.0" async-trait = { workspace = true } axum = { workspace = true, features = ["macros"] } axum-macros = { workspace = true } @@ -34,7 +35,6 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } url = { workspace = true } uuid = { workspace = true, features = ["v4", "serde"] } -async-std = "1.12.0" [features] default = ["ethereum", "with_mongodb", "with_sqs"] diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index 8432c293..3da862af 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -33,7 +33,7 @@ pub trait Database: Send + Sync { ) -> Result<()>; async fn update_metadata(&self, job: &JobItem, metadata: HashMap) -> Result<()>; - async fn get_latest_job_by_type(&self, job_type: JobType) -> Result>; + async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result>; } pub trait DatabaseConfig { diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 7e698ceb..308fb43e 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -116,7 +116,7 @@ impl Database for MongoDb { Ok(()) } - async fn get_latest_job_by_type(&self, job_type: JobType) -> Result> { + async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result> { let filter = doc! { "job_type": mongodb::bson::to_bson(&job_type)?, }; diff --git a/crates/orchestrator/src/tests/workers/mod.rs b/crates/orchestrator/src/tests/workers/mod.rs index fc2b53c4..81e1bc43 100644 --- a/crates/orchestrator/src/tests/workers/mod.rs +++ b/crates/orchestrator/src/tests/workers/mod.rs @@ -30,15 +30,15 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { // Mocking db function expectations if !db_val { - db.expect_get_latest_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None)); + db.expect_get_latest_job_by_type_and_internal_id().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None)); start_job_index = 1; block = 5; } else { let uuid_temp = Uuid::new_v4(); - db.expect_get_latest_job_by_type() + db.expect_get_latest_job_by_type_and_internal_id() .with(eq(JobType::SnosRun)) - .returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp.clone())))); + .returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp)))); block = 6; start_job_index = 2; } @@ -56,7 +56,7 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { db.expect_create_job() .times(1) .withf(move |item| item.internal_id == i.clone().to_string()) - .returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid.clone()))); + .returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid))); } // Queue function call simulations diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index 6853c6cb..f116cf67 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -20,7 +20,7 @@ impl Worker for SnosWorker { let latest_block_number = provider.block_number().await?; let latest_block_processed_data = config .database() - .get_latest_job_by_type(JobType::SnosRun) + .get_latest_job_by_type_and_internal_id(JobType::SnosRun) .await .unwrap() .map(|item| item.internal_id)