Skip to content

Commit

Permalink
feat : added review #1 changes : added error handling for snos workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Arun Jangra authored and Arun Jangra committed Jun 14, 2024
1 parent 10ba1e8 commit 2c80f7d
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ path = "src/main.rs"

[dependencies]
arc-swap = { workspace = true }
async-std = "1.12.0"
async-trait = { workspace = true }
axum = { workspace = true, features = ["macros"] }
axum-macros = { workspace = true }
Expand All @@ -34,7 +35,6 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
url = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
async-std = "1.12.0"

[features]
default = ["ethereum", "with_mongodb", "with_sqs"]
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub trait Database: Send + Sync {
) -> Result<()>;

async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Database for MongoDb {
Ok(())
}

async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
Expand Down
8 changes: 4 additions & 4 deletions crates/orchestrator/src/tests/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {

// Mocking db function expectations
if !db_val {
db.expect_get_latest_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
db.expect_get_latest_job_by_type_and_internal_id().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
start_job_index = 1;
block = 5;
} else {
let uuid_temp = Uuid::new_v4();

db.expect_get_latest_job_by_type()
db.expect_get_latest_job_by_type_and_internal_id()
.with(eq(JobType::SnosRun))
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp.clone()))));
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
block = 6;
start_job_index = 2;
}
Expand All @@ -56,7 +56,7 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {
db.expect_create_job()
.times(1)
.withf(move |item| item.internal_id == i.clone().to_string())
.returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid.clone())));
.returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid)));
}

// Queue function call simulations
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Worker for SnosWorker {
let latest_block_number = provider.block_number().await?;
let latest_block_processed_data = config
.database()
.get_latest_job_by_type(JobType::SnosRun)
.get_latest_job_by_type_and_internal_id(JobType::SnosRun)
.await
.unwrap()
.map(|item| item.internal_id)
Expand Down

0 comments on commit 2c80f7d

Please sign in to comment.