From 2310fcf90d0d6b9f438fd3bec05fae0ed98ff3a8 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 22 Jul 2024 17:47:21 +0530 Subject: [PATCH 01/21] update: DA job draft #1 --- crates/orchestrator/src/main.rs | 2 + .../src/workers/data_submission.rs | 55 +++++++++++++++++++ crates/orchestrator/src/workers/mod.rs | 1 + 3 files changed, 58 insertions(+) create mode 100644 crates/orchestrator/src/workers/data_submission.rs diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 27bdf573..4863843f 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -2,6 +2,7 @@ use dotenvy::dotenv; use orchestrator::config::config; use orchestrator::queue::init_consumers; use orchestrator::routes::app_router; +use orchestrator::workers::data_submission::DataSubmissionWorker; use orchestrator::workers::proof_registration::ProofRegistrationWorker; use orchestrator::workers::proving::ProvingWorker; use orchestrator::workers::snos::SnosWorker; @@ -33,6 +34,7 @@ async fn main() { tokio::spawn(start_cron(Box::new(ProvingWorker), 60)); tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60)); tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60)); + tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60)); tracing::info!("Listening on http://{}", address); axum::serve(listener, app).await.expect("Failed to start axum server"); diff --git a/crates/orchestrator/src/workers/data_submission.rs b/crates/orchestrator/src/workers/data_submission.rs new file mode 100644 index 00000000..e1622c7b --- /dev/null +++ b/crates/orchestrator/src/workers/data_submission.rs @@ -0,0 +1,55 @@ +use crate::config::config; +use crate::jobs::create_job; +use crate::jobs::types::JobType; +use crate::workers::Worker; +use async_trait::async_trait; +use std::collections::HashMap; +use std::error::Error; + +pub struct DataSubmissionWorker; + +#[async_trait] +impl Worker for DataSubmissionWorker { + // 0. All ids are assumed to be block numbers. + // 1. Fetch the latest completed Proving job. + // 2. Fetch the latest DA job creation. + // 3. Create jobs from after the lastest DA job already created till latest completed proving job. + async fn run_worker(&self) -> Result<(), Box> { + let config = config().await; + + // provides latest completed proof creation job id + let latest_proven_job_id = config + .database() + .get_last_successful_job_by_type(JobType::ProofCreation) + .await + .unwrap() + .map(|item| item.internal_id) + .unwrap_or("0".to_string()); + + // provides latest triggered data submission job id + let latest_data_submission_job_id = config + .database() + .get_latest_job_by_type_and_internal_id(JobType::DataSubmission) + .await + .unwrap() + .map(|item| item.internal_id) + .unwrap_or("0".to_string()); + + let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?; + let latest_proven_id: u64 = latest_proven_job_id.parse()?; + + let block_diff = latest_proven_id - latest_data_submission_id; + + // if all blocks are processed + if block_diff == 0 { + return Ok(()); + } + + // creating data submission jobs for latest blocks without pre-running data submission jobs jobs don't yet exist. + for x in latest_data_submission_id + 1..latest_proven_id + 1 { + create_job(JobType::DataSubmission, x.to_string(), HashMap::new()).await?; + } + + Ok(()) + } +} diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index ba833b7e..93014cdc 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -2,6 +2,7 @@ use std::error::Error; use async_trait::async_trait; +pub mod data_submission; pub mod proof_registration; pub mod proving; pub mod snos; From df4fa863fdd6d377997e788b61c6421546719907 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 22 Jul 2024 18:57:00 +0530 Subject: [PATCH 02/21] docs: changelog updated --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 40ab2534..6b5b04c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- creation of data submission jobs. ## Changed From d21abff55e3a98513fe085dbfb5bd6bd91347d9b Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Tue, 23 Jul 2024 12:21:07 +0530 Subject: [PATCH 03/21] update: is_worker_enabled impl & usage in da_submission, removal of String from VerificationFailed --- crates/orchestrator/src/database/mod.rs | 4 +++ .../orchestrator/src/database/mongodb/mod.rs | 26 +++++++++++++++++++ crates/orchestrator/src/jobs/mod.rs | 7 ++--- crates/orchestrator/src/jobs/types.rs | 2 +- .../src/workers/data_submission.rs | 13 +++++----- crates/orchestrator/src/workers/mod.rs | 25 +++++++++++++++++- 6 files changed, 65 insertions(+), 12 deletions(-) diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index 387457d5..cc29f1e3 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -43,6 +43,10 @@ pub trait Database: Send + Sync { job_status: JobStatus, internal_id: String, ) -> Result>; + + // TODO: can be extendible to support multiple status. + async fn get_jobs_by_status(&self, status : JobStatus ) -> Result>; + } pub trait DatabaseConfig { diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 5be0cf54..77a2b4db 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -290,4 +290,30 @@ impl Database for MongoDb { Ok(results) } + + async fn get_jobs_by_status(&self, job_status : JobStatus) -> Result> { + + let filter = doc! { + "job_status": bson::to_bson(&job_status)? + }; + + let mut jobs = self + .get_job_collection() + .find(filter, None) + .await + .expect("Failed to fetch jobs by given job type and status"); + + let mut results = Vec::new(); + + while let Some(result) = jobs.next().await { + match result { + Ok(job_item) => { + results.push(job_item); + } + Err(e) => return Err(e.into()), + } + } + + Ok(results) + } } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index b501cade..e8dcab4c 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -80,7 +80,7 @@ pub async fn process_job(id: Uuid) -> Result<()> { match job.status { // we only want to process jobs that are in the created or verification failed state. // verification failed state means that the previous processing failed and we want to retry - JobStatus::Created | JobStatus::VerificationFailed(_) => { + JobStatus::Created | JobStatus::VerificationFailed => { log::info!("Processing job with id {:?}", id); } _ => { @@ -134,8 +134,9 @@ pub async fn verify_job(id: Uuid) -> Result<()> { JobVerificationStatus::Verified => { config.database().update_job_status(&job, JobStatus::Completed).await?; } - JobVerificationStatus::Rejected(e) => { - config.database().update_job_status(&job, JobStatus::VerificationFailed(e)).await?; + JobVerificationStatus::Rejected(_) => { + // TODO: change '_' to 'e' and add error 'e' to metadata of job status. + config.database().update_job_status(&job, JobStatus::VerificationFailed).await?; // retry job processing if we haven't exceeded the max limit let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index b8e492b4..fce76280 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -98,7 +98,7 @@ pub enum JobStatus { /// The job was processed but the was unable to be verified under the given time VerificationTimeout, /// The job failed processing - VerificationFailed(String), + VerificationFailed, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] diff --git a/crates/orchestrator/src/workers/data_submission.rs b/crates/orchestrator/src/workers/data_submission.rs index e1622c7b..2f118a2c 100644 --- a/crates/orchestrator/src/workers/data_submission.rs +++ b/crates/orchestrator/src/workers/data_submission.rs @@ -15,6 +15,12 @@ impl Worker for DataSubmissionWorker { // 2. Fetch the latest DA job creation. // 3. Create jobs from after the lastest DA job already created till latest completed proving job. async fn run_worker(&self) -> Result<(), Box> { + + // Return without doing anything if the worker is not enabled. + if !self.is_worker_enabled().await? { + return Ok(()); + } + let config = config().await; // provides latest completed proof creation job id @@ -38,13 +44,6 @@ impl Worker for DataSubmissionWorker { let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?; let latest_proven_id: u64 = latest_proven_job_id.parse()?; - let block_diff = latest_proven_id - latest_data_submission_id; - - // if all blocks are processed - if block_diff == 0 { - return Ok(()); - } - // creating data submission jobs for latest blocks without pre-running data submission jobs jobs don't yet exist. for x in latest_data_submission_id + 1..latest_proven_id + 1 { create_job(JobType::DataSubmission, x.to_string(), HashMap::new()).await?; diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 93014cdc..1c063c37 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -1,5 +1,5 @@ use std::error::Error; - +use crate::{config::config, jobs::types::JobStatus}; use async_trait::async_trait; pub mod data_submission; @@ -11,4 +11,27 @@ pub mod update_state; #[async_trait] pub trait Worker: Send + Sync { async fn run_worker(&self) -> Result<(), Box>; + + // TODO: Assumption : False Negative + // we are assuming that the worker will spawn only 1 job for a block and no two jobs will ever exist + // for a single block, the code might fail to work as expected if this happens. + + // Checks if any of the jobs have failed + // Haults any new job creation till all the count of failed jobs is not Zero. + async fn is_worker_enabled(&self) -> Result> { + let config = config().await; + + let failed_da_jobs = config + .database() + .get_jobs_by_status( + JobStatus::VerificationFailed, + ) + .await?; + + if failed_da_jobs.len() > 0 { + return Ok(false); + } + + Ok(true) + } } From f9f4732fcc364a16b7d5f493fea367420048659c Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Tue, 23 Jul 2024 12:50:18 +0530 Subject: [PATCH 04/21] update: renamed to --- crates/orchestrator/src/database/mod.rs | 2 +- crates/orchestrator/src/database/mongodb/mod.rs | 2 +- crates/orchestrator/src/tests/workers/snos/mod.rs | 4 ++-- crates/orchestrator/src/workers/data_submission.rs | 2 +- crates/orchestrator/src/workers/snos.rs | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index cc29f1e3..8d346574 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -29,7 +29,7 @@ pub trait Database: Send + Sync { async fn update_job(&self, job: &JobItem) -> Result<()>; async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>; async fn update_metadata(&self, job: &JobItem, metadata: HashMap) -> Result<()>; - async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result>; + async fn get_latest_job_by_type(&self, job_type: JobType) -> Result>; async fn get_jobs_without_successor( &self, job_a_type: JobType, diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 77a2b4db..84f4fecc 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -112,7 +112,7 @@ impl Database for MongoDb { Ok(()) } - async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result> { + async fn get_latest_job_by_type(&self, job_type: JobType) -> Result> { let filter = doc! { "job_type": mongodb::bson::to_bson(&job_type)?, }; diff --git a/crates/orchestrator/src/tests/workers/snos/mod.rs b/crates/orchestrator/src/tests/workers/snos/mod.rs index 2fa4945e..fe235b12 100644 --- a/crates/orchestrator/src/tests/workers/snos/mod.rs +++ b/crates/orchestrator/src/tests/workers/snos/mod.rs @@ -30,13 +30,13 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { // Mocking db function expectations if !db_val { - db.expect_get_latest_job_by_type_and_internal_id().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None)); + db.expect_get_latest_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None)); start_job_index = 1; block = 5; } else { let uuid_temp = Uuid::new_v4(); - db.expect_get_latest_job_by_type_and_internal_id() + db.expect_get_latest_job_by_type() .with(eq(JobType::SnosRun)) .returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp)))); block = 6; diff --git a/crates/orchestrator/src/workers/data_submission.rs b/crates/orchestrator/src/workers/data_submission.rs index 2f118a2c..cee8e257 100644 --- a/crates/orchestrator/src/workers/data_submission.rs +++ b/crates/orchestrator/src/workers/data_submission.rs @@ -35,7 +35,7 @@ impl Worker for DataSubmissionWorker { // provides latest triggered data submission job id let latest_data_submission_job_id = config .database() - .get_latest_job_by_type_and_internal_id(JobType::DataSubmission) + .get_latest_job_by_type(JobType::DataSubmission) .await .unwrap() .map(|item| item.internal_id) diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index 139d79e1..ea84efdc 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -22,7 +22,7 @@ impl Worker for SnosWorker { let latest_block_number = provider.block_number().await?; let latest_block_processed_data = config .database() - .get_latest_job_by_type_and_internal_id(JobType::SnosRun) + .get_latest_job_by_type(JobType::SnosRun) .await .unwrap() .map(|item| item.internal_id) From 55e1bc8802a7fa8d799c13ee1d42dd2ed3a6557d Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Tue, 23 Jul 2024 12:59:57 +0530 Subject: [PATCH 05/21] update: run worker only if it's enabled using is_worker_enabled check --- crates/orchestrator/src/main.rs | 2 +- crates/orchestrator/src/workers/data_submission.rs | 6 ------ crates/orchestrator/src/workers/mod.rs | 8 ++++++++ 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 4863843f..3c550d75 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -42,7 +42,7 @@ async fn main() { async fn start_cron(worker: Box, interval: u64) { loop { - worker.run_worker().await.expect("Error in running the worker."); + worker.run_worker_if_enabled().await.expect("Error in running the worker."); tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await; } } diff --git a/crates/orchestrator/src/workers/data_submission.rs b/crates/orchestrator/src/workers/data_submission.rs index cee8e257..d9e471c1 100644 --- a/crates/orchestrator/src/workers/data_submission.rs +++ b/crates/orchestrator/src/workers/data_submission.rs @@ -15,12 +15,6 @@ impl Worker for DataSubmissionWorker { // 2. Fetch the latest DA job creation. // 3. Create jobs from after the lastest DA job already created till latest completed proving job. async fn run_worker(&self) -> Result<(), Box> { - - // Return without doing anything if the worker is not enabled. - if !self.is_worker_enabled().await? { - return Ok(()); - } - let config = config().await; // provides latest completed proof creation job id diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 1c063c37..fde40fae 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -10,6 +10,14 @@ pub mod update_state; #[async_trait] pub trait Worker: Send + Sync { + + async fn run_worker_if_enabled(&self) -> Result<(), Box> { + if !self.is_worker_enabled().await? { + return Ok(()); + } + self.run_worker().await + } + async fn run_worker(&self) -> Result<(), Box>; // TODO: Assumption : False Negative From 9d7ab217498779dbd822abfc58d21bad89de1fa7 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Tue, 23 Jul 2024 13:07:54 +0530 Subject: [PATCH 06/21] build: linter fixes --- CHANGELOG.md | 1 + crates/orchestrator/src/database/mod.rs | 3 +-- crates/orchestrator/src/database/mongodb/mod.rs | 11 +++++------ crates/orchestrator/src/jobs/mod.rs | 2 +- crates/orchestrator/src/workers/mod.rs | 12 +++--------- 5 files changed, 11 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b5b04c0..91ee4649 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added + - creation of data submission jobs. ## Changed diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index 8d346574..e9de6a06 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -45,8 +45,7 @@ pub trait Database: Send + Sync { ) -> Result>; // TODO: can be extendible to support multiple status. - async fn get_jobs_by_status(&self, status : JobStatus ) -> Result>; - + async fn get_jobs_by_status(&self, status: JobStatus) -> Result>; } pub trait DatabaseConfig { diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 84f4fecc..33ed260a 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -291,17 +291,16 @@ impl Database for MongoDb { Ok(results) } - async fn get_jobs_by_status(&self, job_status : JobStatus) -> Result> { - + async fn get_jobs_by_status(&self, job_status: JobStatus) -> Result> { let filter = doc! { "job_status": bson::to_bson(&job_status)? }; let mut jobs = self - .get_job_collection() - .find(filter, None) - .await - .expect("Failed to fetch jobs by given job type and status"); + .get_job_collection() + .find(filter, None) + .await + .expect("Failed to fetch jobs by given job type and status"); let mut results = Vec::new(); diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index e8dcab4c..621babf8 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -134,7 +134,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> { JobVerificationStatus::Verified => { config.database().update_job_status(&job, JobStatus::Completed).await?; } - JobVerificationStatus::Rejected(_) => { + JobVerificationStatus::Rejected(_) => { // TODO: change '_' to 'e' and add error 'e' to metadata of job status. config.database().update_job_status(&job, JobStatus::VerificationFailed).await?; diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index fde40fae..664b5021 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -1,6 +1,6 @@ -use std::error::Error; use crate::{config::config, jobs::types::JobStatus}; use async_trait::async_trait; +use std::error::Error; pub mod data_submission; pub mod proof_registration; @@ -10,7 +10,6 @@ pub mod update_state; #[async_trait] pub trait Worker: Send + Sync { - async fn run_worker_if_enabled(&self) -> Result<(), Box> { if !self.is_worker_enabled().await? { return Ok(()); @@ -29,14 +28,9 @@ pub trait Worker: Send + Sync { async fn is_worker_enabled(&self) -> Result> { let config = config().await; - let failed_da_jobs = config - .database() - .get_jobs_by_status( - JobStatus::VerificationFailed, - ) - .await?; + let failed_da_jobs = config.database().get_jobs_by_status(JobStatus::VerificationFailed).await?; - if failed_da_jobs.len() > 0 { + if !failed_da_jobs.is_empty() { return Ok(false); } From fbd721db649c993d072c3570b8b0ca29eeec72bd Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Tue, 23 Jul 2024 17:17:58 +0530 Subject: [PATCH 07/21] Update CHANGELOG.md Co-authored-by: Apoorv Sadana <95699312+apoorvsadana@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ae7aaec..888b6452 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added -- creation of data submission jobs. +- implemented DA worker. - Function to calculate the kzg proof of x_0. - Tests for updating the state. - Function to update the state and publish blob on ethereum in state update job. From e475cc438ce98702f5c477a4a31590d7dfe79b0a Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Tue, 23 Jul 2024 18:01:35 +0530 Subject: [PATCH 08/21] update: limit_to_one on get_jobs_by_status --- crates/orchestrator/src/database/mod.rs | 2 +- crates/orchestrator/src/database/mongodb/mod.rs | 11 ++++++++--- crates/orchestrator/src/workers/mod.rs | 2 +- crates/orchestrator/src/workers/snos.rs | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index e9de6a06..a9a20536 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -45,7 +45,7 @@ pub trait Database: Send + Sync { ) -> Result>; // TODO: can be extendible to support multiple status. - async fn get_jobs_by_status(&self, status: JobStatus) -> Result>; + async fn get_jobs_by_status(&self, status: JobStatus, limit: Option) -> Result>; } pub trait DatabaseConfig { diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 33ed260a..d74934d6 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use color_eyre::eyre::eyre; use color_eyre::Result; use mongodb::bson::{Bson, Document}; -use mongodb::options::{FindOneOptions, UpdateOptions}; +use mongodb::options::{FindOneOptions, FindOptions, UpdateOptions}; use mongodb::{ bson, bson::doc, @@ -291,14 +291,19 @@ impl Database for MongoDb { Ok(results) } - async fn get_jobs_by_status(&self, job_status: JobStatus) -> Result> { + async fn get_jobs_by_status(&self, job_status: JobStatus, limit: Option) -> Result> { let filter = doc! { "job_status": bson::to_bson(&job_status)? }; + let mut find_options = None; + if let Some(val) = limit { + find_options = Some(FindOptions::builder().limit(Some(val)).build()) + }; + let mut jobs = self .get_job_collection() - .find(filter, None) + .find(filter, find_options) .await .expect("Failed to fetch jobs by given job type and status"); diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 664b5021..d56829cd 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -28,7 +28,7 @@ pub trait Worker: Send + Sync { async fn is_worker_enabled(&self) -> Result> { let config = config().await; - let failed_da_jobs = config.database().get_jobs_by_status(JobStatus::VerificationFailed).await?; + let failed_da_jobs = config.database().get_jobs_by_status(JobStatus::VerificationFailed, Some(1)).await?; if !failed_da_jobs.is_empty() { return Ok(false); diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index ea84efdc..f9791c94 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -22,7 +22,7 @@ impl Worker for SnosWorker { let latest_block_number = provider.block_number().await?; let latest_block_processed_data = config .database() - .get_latest_job_by_type(JobType::SnosRun) + .get_last_successful_job_by_type(JobType::SnosRun) .await .unwrap() .map(|item| item.internal_id) From c13a576f8fa2a0e469d6f5c188f00c656208fc9f Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Tue, 23 Jul 2024 18:44:15 +0530 Subject: [PATCH 09/21] update: removed get_last_successful_job_by_type, added get_latest_job_by_type_and_status --- crates/orchestrator/src/database/mod.rs | 6 +++++- crates/orchestrator/src/database/mongodb/mod.rs | 10 +++++++--- .../src/tests/workers/update_state/mod.rs | 11 +++++++---- crates/orchestrator/src/workers/data_submission.rs | 4 ++-- crates/orchestrator/src/workers/snos.rs | 4 ++-- crates/orchestrator/src/workers/update_state.rs | 3 ++- 6 files changed, 25 insertions(+), 13 deletions(-) diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index a9a20536..72903686 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -36,7 +36,11 @@ pub trait Database: Send + Sync { job_a_status: JobStatus, job_b_type: JobType, ) -> Result>; - async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result>; + async fn get_latest_job_by_type_and_status( + &self, + job_type: JobType, + job_status: JobStatus, + ) -> Result>; async fn get_jobs_after_internal_id_by_job_type( &self, job_type: JobType, diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index d74934d6..d51c460f 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -245,10 +245,14 @@ impl Database for MongoDb { Ok(vec_jobs) } - async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result> { + async fn get_latest_job_by_type_and_status( + &self, + job_type: JobType, + job_status: JobStatus, + ) -> Result> { let filter = doc! { "job_type": bson::to_bson(&job_type)?, - "job_status": bson::to_bson(&JobStatus::Completed)? + "job_status": bson::to_bson(&job_status)? }; let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); @@ -256,7 +260,7 @@ impl Database for MongoDb { .get_job_collection() .find_one(filter, find_options) .await - .expect("Failed to fetch latest job by given job type")) + .expect("Failed to fetch the latest job for given type and status.")) } async fn get_jobs_after_internal_id_by_job_type( diff --git a/crates/orchestrator/src/tests/workers/update_state/mod.rs b/crates/orchestrator/src/tests/workers/update_state/mod.rs index a5271f9f..e9062a2a 100644 --- a/crates/orchestrator/src/tests/workers/update_state/mod.rs +++ b/crates/orchestrator/src/tests/workers/update_state/mod.rs @@ -33,15 +33,18 @@ async fn test_update_state_worker( // Mocking db function expectations // If no successful state update jobs exist if !last_successful_job_exists { - db.expect_get_last_successful_job_by_type().with(eq(JobType::StateTransition)).times(1).returning(|_| Ok(None)); + db.expect_get_latest_job_by_type_and_status() + .with(eq(JobType::StateTransition), eq(JobStatus::Completed)) + .times(1) + .returning(|_, _| Ok(None)); } else { // if successful state update job exists // mocking the return value of first function call (getting last successful jobs): - db.expect_get_last_successful_job_by_type() - .with(eq(JobType::StateTransition)) + db.expect_get_latest_job_by_type_and_status() + .with(eq(JobType::StateTransition), eq(JobStatus::Completed)) .times(1) - .returning(|_| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4())))); + .returning(|_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4())))); // mocking the return values of second function call (getting completed proving worker jobs) db.expect_get_jobs_after_internal_id_by_job_type() diff --git a/crates/orchestrator/src/workers/data_submission.rs b/crates/orchestrator/src/workers/data_submission.rs index d9e471c1..c5467428 100644 --- a/crates/orchestrator/src/workers/data_submission.rs +++ b/crates/orchestrator/src/workers/data_submission.rs @@ -1,6 +1,6 @@ use crate::config::config; use crate::jobs::create_job; -use crate::jobs::types::JobType; +use crate::jobs::types::{JobStatus, JobType}; use crate::workers::Worker; use async_trait::async_trait; use std::collections::HashMap; @@ -20,7 +20,7 @@ impl Worker for DataSubmissionWorker { // provides latest completed proof creation job id let latest_proven_job_id = config .database() - .get_last_successful_job_by_type(JobType::ProofCreation) + .get_latest_job_by_type_and_status(JobType::ProofCreation, JobStatus::Completed) .await .unwrap() .map(|item| item.internal_id) diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index f9791c94..0d440f6d 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -6,7 +6,7 @@ use starknet::providers::Provider; use crate::config::config; use crate::jobs::create_job; -use crate::jobs::types::JobType; +use crate::jobs::types::{JobStatus, JobType}; use crate::workers::Worker; pub struct SnosWorker; @@ -22,7 +22,7 @@ impl Worker for SnosWorker { let latest_block_number = provider.block_number().await?; let latest_block_processed_data = config .database() - .get_last_successful_job_by_type(JobType::SnosRun) + .get_latest_job_by_type_and_status(JobType::ProofCreation, JobStatus::Completed) .await .unwrap() .map(|item| item.internal_id) diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index c100ab0e..654b7e3e 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -16,7 +16,8 @@ impl Worker for UpdateStateWorker { /// 3. Create state updates for all the blocks that don't have a state update job async fn run_worker(&self) -> Result<(), Box> { let config = config().await; - let latest_successful_job = config.database().get_last_successful_job_by_type(JobType::StateTransition).await?; + let latest_successful_job = + config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?; match latest_successful_job { Some(job) => { From e6110403fbb84a144a5a6e9acb59293f64aad3a1 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Tue, 23 Jul 2024 20:09:13 +0530 Subject: [PATCH 10/21] update: added error to job metadata --- crates/orchestrator/src/jobs/mod.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 621babf8..e282fdb9 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -134,9 +134,13 @@ pub async fn verify_job(id: Uuid) -> Result<()> { JobVerificationStatus::Verified => { config.database().update_job_status(&job, JobStatus::Completed).await?; } - JobVerificationStatus::Rejected(_) => { - // TODO: change '_' to 'e' and add error 'e' to metadata of job status. + JobVerificationStatus::Rejected(e) => { config.database().update_job_status(&job, JobStatus::VerificationFailed).await?; + let mut metadata = job.metadata.clone(); + metadata.insert("error".to_string(), e); + config.database().update_metadata(&job, metadata).await?; + + log::error!("Invalid status {:?} for job with id {:?}. Cannot verify.", id, job.status); // retry job processing if we haven't exceeded the max limit let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; From 586c7090d471ad06a9a5854b65531d35cf54127f Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 25 Jul 2024 11:52:03 +0530 Subject: [PATCH 11/21] update: pr resolution, simplifying get_jobs_by_status, rejected status in verify_jobs --- .../orchestrator/src/database/mongodb/mod.rs | 25 ++++++------------- crates/orchestrator/src/jobs/mod.rs | 9 ++++--- crates/orchestrator/src/workers/mod.rs | 2 +- crates/orchestrator/src/workers/snos.rs | 2 +- 4 files changed, 15 insertions(+), 23 deletions(-) diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index d51c460f..7379d08f 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -1,4 +1,5 @@ use async_std::stream::StreamExt; +use futures::TryStreamExt; use std::collections::HashMap; use async_trait::async_trait; @@ -305,23 +306,13 @@ impl Database for MongoDb { find_options = Some(FindOptions::builder().limit(Some(val)).build()) }; - let mut jobs = self - .get_job_collection() - .find(filter, find_options) - .await - .expect("Failed to fetch jobs by given job type and status"); - - let mut results = Vec::new(); + let jobs = self + .get_job_collection() + .find(filter, find_options) + .await? + .try_collect() + .await?; - while let Some(result) = jobs.next().await { - match result { - Ok(job_item) => { - results.push(job_item); - } - Err(e) => return Err(e.into()), - } - } - - Ok(results) + Ok(jobs) } } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index e282fdb9..6a46ddd6 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -135,10 +135,11 @@ pub async fn verify_job(id: Uuid) -> Result<()> { config.database().update_job_status(&job, JobStatus::Completed).await?; } JobVerificationStatus::Rejected(e) => { - config.database().update_job_status(&job, JobStatus::VerificationFailed).await?; - let mut metadata = job.metadata.clone(); - metadata.insert("error".to_string(), e); - config.database().update_metadata(&job, metadata).await?; + let mut new_job = job.clone(); + new_job.metadata.insert("error".to_string(), e); + new_job.status = JobStatus::VerificationFailed; + + config.database().update_job(&new_job).await?; log::error!("Invalid status {:?} for job with id {:?}. Cannot verify.", id, job.status); diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index d56829cd..608a1d44 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -19,7 +19,7 @@ pub trait Worker: Send + Sync { async fn run_worker(&self) -> Result<(), Box>; - // TODO: Assumption : False Negative + // Assumption : False Negative // we are assuming that the worker will spawn only 1 job for a block and no two jobs will ever exist // for a single block, the code might fail to work as expected if this happens. diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index 0d440f6d..7508263a 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -22,7 +22,7 @@ impl Worker for SnosWorker { let latest_block_number = provider.block_number().await?; let latest_block_processed_data = config .database() - .get_latest_job_by_type_and_status(JobType::ProofCreation, JobStatus::Completed) + .get_latest_job_by_type_and_status(JobType::SnosRun, JobStatus::Completed) .await .unwrap() .map(|item| item.internal_id) From b0bf981d16a24c93fabbd26186519f99c764ee4d Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 25 Jul 2024 11:56:22 +0530 Subject: [PATCH 12/21] update: linting fixes --- crates/orchestrator/src/database/mongodb/mod.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 7379d08f..0da61e28 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -306,12 +306,7 @@ impl Database for MongoDb { find_options = Some(FindOptions::builder().limit(Some(val)).build()) }; - let jobs = self - .get_job_collection() - .find(filter, find_options) - .await? - .try_collect() - .await?; + let jobs = self.get_job_collection().find(filter, find_options).await?.try_collect().await?; Ok(jobs) } From 69b05ded17f17232108c614278f4e9a0fdb6c3b6 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 25 Jul 2024 19:15:14 +0530 Subject: [PATCH 13/21] Update crates/orchestrator/src/jobs/mod.rs Co-authored-by: Apoorv Sadana <95699312+apoorvsadana@users.noreply.github.com> --- crates/orchestrator/src/jobs/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 6a46ddd6..9a49209d 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -141,7 +141,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> { config.database().update_job(&new_job).await?; - log::error!("Invalid status {:?} for job with id {:?}. Cannot verify.", id, job.status); + log::error!("Verification failed for job with id {:?}. Cannot verify.", id); // retry job processing if we haven't exceeded the max limit let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; From 46b8c23b78ec8352c0be0bb19611a472d8cd881b Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 25 Jul 2024 19:36:06 +0530 Subject: [PATCH 14/21] update: removing .expect from mongodb mod file --- .../orchestrator/src/database/mongodb/mod.rs | 34 +++---------------- 1 file changed, 5 insertions(+), 29 deletions(-) diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 0da61e28..3efcc39c 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -118,11 +118,7 @@ impl Database for MongoDb { "job_type": mongodb::bson::to_bson(&job_type)?, }; let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); - Ok(self - .get_job_collection() - .find_one(filter, find_options) - .await - .expect("Failed to fetch latest job by given job type")) + Ok(self.get_job_collection().find_one(filter, find_options).await?) } /// function to get jobs that don't have a successor job. @@ -227,8 +223,7 @@ impl Database for MongoDb { // } // } - let collection = self.get_job_collection(); - let mut cursor = collection.aggregate(pipeline, None).await?; + let mut cursor = self.get_job_collection().aggregate(pipeline, None).await?; let mut vec_jobs: Vec = Vec::new(); @@ -257,11 +252,7 @@ impl Database for MongoDb { }; let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); - Ok(self - .get_job_collection() - .find_one(filter, find_options) - .await - .expect("Failed to fetch the latest job for given type and status.")) + Ok(self.get_job_collection().find_one(filter, find_options).await?) } async fn get_jobs_after_internal_id_by_job_type( @@ -276,24 +267,9 @@ impl Database for MongoDb { "internal_id": { "$gt": internal_id } }; - let mut jobs = self - .get_job_collection() - .find(filter, None) - .await - .expect("Failed to fetch latest jobs by given job type and internal_od conditions"); + let jobs = self.get_job_collection().find(filter, None).await?.try_collect().await?; - let mut results = Vec::new(); - - while let Some(result) = jobs.next().await { - match result { - Ok(job_item) => { - results.push(job_item); - } - Err(e) => return Err(e.into()), - } - } - - Ok(results) + Ok(jobs) } async fn get_jobs_by_status(&self, job_status: JobStatus, limit: Option) -> Result> { From 8cea54ddfddd33d2ddb091f63f6335867fe22fb1 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 25 Jul 2024 19:55:51 +0530 Subject: [PATCH 15/21] update: fixed testcase for snos worker --- crates/orchestrator/src/tests/workers/snos/mod.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/orchestrator/src/tests/workers/snos/mod.rs b/crates/orchestrator/src/tests/workers/snos/mod.rs index 7b9adc3f..d4903e47 100644 --- a/crates/orchestrator/src/tests/workers/snos/mod.rs +++ b/crates/orchestrator/src/tests/workers/snos/mod.rs @@ -1,6 +1,6 @@ use crate::config::config_force_init; use crate::database::MockDatabase; -use crate::jobs::types::JobType; +use crate::jobs::types::{JobStatus, JobType}; use crate::queue::MockQueueProvider; use crate::tests::common::init_config; use crate::tests::workers::utils::get_job_item_mock_by_id; @@ -30,15 +30,18 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { // Mocking db function expectations if !db_val { - db.expect_get_latest_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None)); + db.expect_get_latest_job_by_type_and_status() + .times(1) + .with(eq(JobType::SnosRun), eq(JobStatus::Completed)) + .returning(|_, _| Ok(None)); start_job_index = 1; block = 5; } else { let uuid_temp = Uuid::new_v4(); - db.expect_get_latest_job_by_type() - .with(eq(JobType::SnosRun)) - .returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp)))); + db.expect_get_latest_job_by_type_and_status() + .with(eq(JobType::SnosRun), eq(JobStatus::Completed)) + .returning(move |_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp)))); block = 6; start_job_index = 2; } From 579d1d18dc385f6abea64c74412558c702bc822a Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 25 Jul 2024 22:01:58 +0530 Subject: [PATCH 16/21] chore: correct variable name --- crates/orchestrator/src/workers/data_submission.rs | 4 ++-- crates/orchestrator/src/workers/mod.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/orchestrator/src/workers/data_submission.rs b/crates/orchestrator/src/workers/data_submission.rs index c5467428..4c7bbb15 100644 --- a/crates/orchestrator/src/workers/data_submission.rs +++ b/crates/orchestrator/src/workers/data_submission.rs @@ -39,8 +39,8 @@ impl Worker for DataSubmissionWorker { let latest_proven_id: u64 = latest_proven_job_id.parse()?; // creating data submission jobs for latest blocks without pre-running data submission jobs jobs don't yet exist. - for x in latest_data_submission_id + 1..latest_proven_id + 1 { - create_job(JobType::DataSubmission, x.to_string(), HashMap::new()).await?; + for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 { + create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new()).await?; } Ok(()) diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 608a1d44..836f3d9d 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -24,13 +24,13 @@ pub trait Worker: Send + Sync { // for a single block, the code might fail to work as expected if this happens. // Checks if any of the jobs have failed - // Haults any new job creation till all the count of failed jobs is not Zero. + // Halts any new job creation till all the count of failed jobs is not Zero. async fn is_worker_enabled(&self) -> Result> { let config = config().await; - let failed_da_jobs = config.database().get_jobs_by_status(JobStatus::VerificationFailed, Some(1)).await?; + let failed_jobs = config.database().get_jobs_by_status(JobStatus::VerificationFailed, Some(1)).await?; - if !failed_da_jobs.is_empty() { + if !failed_jobs.is_empty() { return Ok(false); } From 992190a890ee3c3a2e324744b2de8732d1916384 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 25 Jul 2024 23:15:00 +0530 Subject: [PATCH 17/21] update: added support to check againt multiple status - is_worker_enabled, get_jobs_by_statuses --- crates/orchestrator/src/database/mod.rs | 2 +- crates/orchestrator/src/database/mongodb/mod.rs | 12 ++++++------ crates/orchestrator/src/workers/data_submission.rs | 2 +- crates/orchestrator/src/workers/mod.rs | 6 +++++- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index 72903686..f03c53fd 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -49,7 +49,7 @@ pub trait Database: Send + Sync { ) -> Result>; // TODO: can be extendible to support multiple status. - async fn get_jobs_by_status(&self, status: JobStatus, limit: Option) -> Result>; + async fn get_jobs_by_statuses(&self, status: Vec, limit: Option) -> Result>; } pub trait DatabaseConfig { diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 3efcc39c..557f36d8 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -272,15 +272,15 @@ impl Database for MongoDb { Ok(jobs) } - async fn get_jobs_by_status(&self, job_status: JobStatus, limit: Option) -> Result> { + async fn get_jobs_by_statuses(&self, job_status: Vec, limit: Option) -> Result> { let filter = doc! { - "job_status": bson::to_bson(&job_status)? + "job_status": { + // TODO: Check that the conversion leads to valid output! + "$in": job_status.iter().map(|status| bson::to_bson(status).unwrap_or(Bson::Null)).collect::>() + } }; - let mut find_options = None; - if let Some(val) = limit { - find_options = Some(FindOptions::builder().limit(Some(val)).build()) - }; + let find_options = limit.map(|val| FindOptions::builder().limit(Some(val)).build()); let jobs = self.get_job_collection().find(filter, find_options).await?.try_collect().await?; diff --git a/crates/orchestrator/src/workers/data_submission.rs b/crates/orchestrator/src/workers/data_submission.rs index 4c7bbb15..60c56ffb 100644 --- a/crates/orchestrator/src/workers/data_submission.rs +++ b/crates/orchestrator/src/workers/data_submission.rs @@ -38,7 +38,7 @@ impl Worker for DataSubmissionWorker { let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?; let latest_proven_id: u64 = latest_proven_job_id.parse()?; - // creating data submission jobs for latest blocks without pre-running data submission jobs jobs don't yet exist. + // creating data submission jobs for latest blocks that don't have pre-running data submission jobs yet. for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 { create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new()).await?; } diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 836f3d9d..48bf5cf1 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -24,11 +24,15 @@ pub trait Worker: Send + Sync { // for a single block, the code might fail to work as expected if this happens. // Checks if any of the jobs have failed + // Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed // Halts any new job creation till all the count of failed jobs is not Zero. async fn is_worker_enabled(&self) -> Result> { let config = config().await; - let failed_jobs = config.database().get_jobs_by_status(JobStatus::VerificationFailed, Some(1)).await?; + let failed_jobs = config + .database() + .get_jobs_by_statuses(vec![JobStatus::VerificationFailed, JobStatus::VerificationTimeout], Some(1)) + .await?; if !failed_jobs.is_empty() { return Ok(false); From ee29e87ec017933d75c46bf84840357abdf7d16a Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Fri, 26 Jul 2024 11:48:04 +0530 Subject: [PATCH 18/21] docs: rewrote 1 job per block assumption --- crates/orchestrator/src/workers/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 48bf5cf1..7bd6c8c8 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -19,9 +19,9 @@ pub trait Worker: Send + Sync { async fn run_worker(&self) -> Result<(), Box>; - // Assumption : False Negative - // we are assuming that the worker will spawn only 1 job for a block and no two jobs will ever exist - // for a single block, the code might fail to work as expected if this happens. + // Assumption + // If say a job for block X fails, we don't want the worker to respawn another job for the same block + // we will resolve the existing failed job first // Checks if any of the jobs have failed // Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed From 465241c23a604b14a3011147cc784946055db82a Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Tue, 30 Jul 2024 11:51:33 +0530 Subject: [PATCH 19/21] docs: DataSubmissionWorker -> DataAvailabilitySynchronizer --- crates/orchestrator/src/main.rs | 4 ++-- crates/orchestrator/src/workers/data_submission.rs | 6 +++--- crates/orchestrator/src/workers/mod.rs | 5 ++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 3c550d75..485c5f4e 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -2,7 +2,7 @@ use dotenvy::dotenv; use orchestrator::config::config; use orchestrator::queue::init_consumers; use orchestrator::routes::app_router; -use orchestrator::workers::data_submission::DataSubmissionWorker; +use orchestrator::workers::data_submission::DataAvailabilitySynchronizer; use orchestrator::workers::proof_registration::ProofRegistrationWorker; use orchestrator::workers::proving::ProvingWorker; use orchestrator::workers::snos::SnosWorker; @@ -34,7 +34,7 @@ async fn main() { tokio::spawn(start_cron(Box::new(ProvingWorker), 60)); tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60)); tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60)); - tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60)); + tokio::spawn(start_cron(Box::new(DataAvailabilitySynchronizer), 60)); tracing::info!("Listening on http://{}", address); axum::serve(listener, app).await.expect("Failed to start axum server"); diff --git a/crates/orchestrator/src/workers/data_submission.rs b/crates/orchestrator/src/workers/data_submission.rs index 60c56ffb..d5903312 100644 --- a/crates/orchestrator/src/workers/data_submission.rs +++ b/crates/orchestrator/src/workers/data_submission.rs @@ -6,10 +6,10 @@ use async_trait::async_trait; use std::collections::HashMap; use std::error::Error; -pub struct DataSubmissionWorker; +pub struct DataAvailabilitySynchronizer; #[async_trait] -impl Worker for DataSubmissionWorker { +impl Worker for DataAvailabilitySynchronizer { // 0. All ids are assumed to be block numbers. // 1. Fetch the latest completed Proving job. // 2. Fetch the latest DA job creation. @@ -38,7 +38,7 @@ impl Worker for DataSubmissionWorker { let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?; let latest_proven_id: u64 = latest_proven_job_id.parse()?; - // creating data submission jobs for latest blocks that don't have pre-running data submission jobs yet. + // creating data submission jobs for latest blocks that don't have existing data submission jobs yet. for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 { create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new()).await?; } diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 7bd6c8c8..9e0f6e9b 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -21,7 +21,10 @@ pub trait Worker: Send + Sync { // Assumption // If say a job for block X fails, we don't want the worker to respawn another job for the same block - // we will resolve the existing failed job first + // we will resolve the existing failed job first. + + // We assume the system to keep working till a job hasn't failed, + // as soon as it fails we currently halt any more execution and wait for manual intervention. // Checks if any of the jobs have failed // Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed From 4ecf4fcf9965fab49d301e472a5dcf08ea6da541 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Fri, 2 Aug 2024 13:00:55 +0530 Subject: [PATCH 20/21] chore: liniting fix --- crates/orchestrator/src/main.rs | 2 +- .../{data_submission.rs => data_availability_synchronizer.rs} | 0 crates/orchestrator/src/workers/mod.rs | 2 +- crates/prover-services/gps-fact-checker/src/fact_node.rs | 4 ++-- 4 files changed, 4 insertions(+), 4 deletions(-) rename crates/orchestrator/src/workers/{data_submission.rs => data_availability_synchronizer.rs} (100%) diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 485c5f4e..ba5a05a6 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -2,7 +2,7 @@ use dotenvy::dotenv; use orchestrator::config::config; use orchestrator::queue::init_consumers; use orchestrator::routes::app_router; -use orchestrator::workers::data_submission::DataAvailabilitySynchronizer; +use orchestrator::workers::data_availability_synchronizer::DataAvailabilitySynchronizer; use orchestrator::workers::proof_registration::ProofRegistrationWorker; use orchestrator::workers::proving::ProvingWorker; use orchestrator::workers::snos::SnosWorker; diff --git a/crates/orchestrator/src/workers/data_submission.rs b/crates/orchestrator/src/workers/data_availability_synchronizer.rs similarity index 100% rename from crates/orchestrator/src/workers/data_submission.rs rename to crates/orchestrator/src/workers/data_availability_synchronizer.rs diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 9e0f6e9b..35bd3dd5 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -2,7 +2,7 @@ use crate::{config::config, jobs::types::JobStatus}; use async_trait::async_trait; use std::error::Error; -pub mod data_submission; +pub mod data_availability_synchronizer; pub mod proof_registration; pub mod proving; pub mod snos; diff --git a/crates/prover-services/gps-fact-checker/src/fact_node.rs b/crates/prover-services/gps-fact-checker/src/fact_node.rs index 2d66fbac..494c9669 100644 --- a/crates/prover-services/gps-fact-checker/src/fact_node.rs +++ b/crates/prover-services/gps-fact-checker/src/fact_node.rs @@ -12,8 +12,8 @@ //! constructed using a stack of nodes (initialized to an empty stack) by repeating for each pair: //! 1. Add #n_pages lead nodes to the stack. //! 2. Pop the top #n_nodes, construct a parent node for them, and push it back to the stack. -//! After applying the steps above, the stack must contain exactly one node, which will -//! constitute the root of the Merkle tree. +//! After applying the steps above, the stack must contain exactly one node, which will +//! constitute the root of the Merkle tree. //! //! For example, [(2, 2)] will create a Merkle tree with a root and two direct children, while //! [(3, 2), (0, 2)] will create a Merkle tree with a root whose left child is a leaf and From 5905f889a87088cb9c663c25ed7670bf6871c227 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Sat, 3 Aug 2024 10:16:07 +0530 Subject: [PATCH 21/21] update: changed name : DataAvailabilitySynchronizer -> DataSubmissionWorker --- crates/orchestrator/src/main.rs | 4 ++-- ...availability_synchronizer.rs => data_submission_worker.rs} | 4 ++-- crates/orchestrator/src/workers/mod.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) rename crates/orchestrator/src/workers/{data_availability_synchronizer.rs => data_submission_worker.rs} (95%) diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index ba5a05a6..b2394093 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -2,7 +2,7 @@ use dotenvy::dotenv; use orchestrator::config::config; use orchestrator::queue::init_consumers; use orchestrator::routes::app_router; -use orchestrator::workers::data_availability_synchronizer::DataAvailabilitySynchronizer; +use orchestrator::workers::data_submission_worker::DataSubmissionWorker; use orchestrator::workers::proof_registration::ProofRegistrationWorker; use orchestrator::workers::proving::ProvingWorker; use orchestrator::workers::snos::SnosWorker; @@ -34,7 +34,7 @@ async fn main() { tokio::spawn(start_cron(Box::new(ProvingWorker), 60)); tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60)); tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60)); - tokio::spawn(start_cron(Box::new(DataAvailabilitySynchronizer), 60)); + tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60)); tracing::info!("Listening on http://{}", address); axum::serve(listener, app).await.expect("Failed to start axum server"); diff --git a/crates/orchestrator/src/workers/data_availability_synchronizer.rs b/crates/orchestrator/src/workers/data_submission_worker.rs similarity index 95% rename from crates/orchestrator/src/workers/data_availability_synchronizer.rs rename to crates/orchestrator/src/workers/data_submission_worker.rs index d5903312..3c2d4331 100644 --- a/crates/orchestrator/src/workers/data_availability_synchronizer.rs +++ b/crates/orchestrator/src/workers/data_submission_worker.rs @@ -6,10 +6,10 @@ use async_trait::async_trait; use std::collections::HashMap; use std::error::Error; -pub struct DataAvailabilitySynchronizer; +pub struct DataSubmissionWorker; #[async_trait] -impl Worker for DataAvailabilitySynchronizer { +impl Worker for DataSubmissionWorker { // 0. All ids are assumed to be block numbers. // 1. Fetch the latest completed Proving job. // 2. Fetch the latest DA job creation. diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 35bd3dd5..785b1e2d 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -2,7 +2,7 @@ use crate::{config::config, jobs::types::JobStatus}; use async_trait::async_trait; use std::error::Error; -pub mod data_availability_synchronizer; +pub mod data_submission_worker; pub mod proof_registration; pub mod proving; pub mod snos;