diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d3c7337..d05b9e87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- implemented DA worker. - Function to calculate the kzg proof of x_0. - Tests for updating the state. - Function to update the state and publish blob on ethereum in state update job. diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index 387457d5..f03c53fd 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -29,20 +29,27 @@ pub trait Database: Send + Sync { async fn update_job(&self, job: &JobItem) -> Result<()>; async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>; async fn update_metadata(&self, job: &JobItem, metadata: HashMap) -> Result<()>; - async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result>; + async fn get_latest_job_by_type(&self, job_type: JobType) -> Result>; async fn get_jobs_without_successor( &self, job_a_type: JobType, job_a_status: JobStatus, job_b_type: JobType, ) -> Result>; - async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result>; + async fn get_latest_job_by_type_and_status( + &self, + job_type: JobType, + job_status: JobStatus, + ) -> Result>; async fn get_jobs_after_internal_id_by_job_type( &self, job_type: JobType, job_status: JobStatus, internal_id: String, ) -> Result>; + + // TODO: can be extendible to support multiple status. + async fn get_jobs_by_statuses(&self, status: Vec, limit: Option) -> Result>; } pub trait DatabaseConfig { diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 5be0cf54..557f36d8 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -1,11 +1,12 @@ use async_std::stream::StreamExt; +use futures::TryStreamExt; use std::collections::HashMap; use async_trait::async_trait; use color_eyre::eyre::eyre; use color_eyre::Result; use mongodb::bson::{Bson, Document}; -use mongodb::options::{FindOneOptions, UpdateOptions}; +use mongodb::options::{FindOneOptions, FindOptions, UpdateOptions}; use mongodb::{ bson, bson::doc, @@ -112,16 +113,12 @@ impl Database for MongoDb { Ok(()) } - async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result> { + async fn get_latest_job_by_type(&self, job_type: JobType) -> Result> { let filter = doc! { "job_type": mongodb::bson::to_bson(&job_type)?, }; let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); - Ok(self - .get_job_collection() - .find_one(filter, find_options) - .await - .expect("Failed to fetch latest job by given job type")) + Ok(self.get_job_collection().find_one(filter, find_options).await?) } /// function to get jobs that don't have a successor job. @@ -226,8 +223,7 @@ impl Database for MongoDb { // } // } - let collection = self.get_job_collection(); - let mut cursor = collection.aggregate(pipeline, None).await?; + let mut cursor = self.get_job_collection().aggregate(pipeline, None).await?; let mut vec_jobs: Vec = Vec::new(); @@ -245,18 +241,18 @@ impl Database for MongoDb { Ok(vec_jobs) } - async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result> { + async fn get_latest_job_by_type_and_status( + &self, + job_type: JobType, + job_status: JobStatus, + ) -> Result> { let filter = doc! { "job_type": bson::to_bson(&job_type)?, - "job_status": bson::to_bson(&JobStatus::Completed)? + "job_status": bson::to_bson(&job_status)? }; let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); - Ok(self - .get_job_collection() - .find_one(filter, find_options) - .await - .expect("Failed to fetch latest job by given job type")) + Ok(self.get_job_collection().find_one(filter, find_options).await?) } async fn get_jobs_after_internal_id_by_job_type( @@ -271,23 +267,23 @@ impl Database for MongoDb { "internal_id": { "$gt": internal_id } }; - let mut jobs = self - .get_job_collection() - .find(filter, None) - .await - .expect("Failed to fetch latest jobs by given job type and internal_od conditions"); + let jobs = self.get_job_collection().find(filter, None).await?.try_collect().await?; - let mut results = Vec::new(); + Ok(jobs) + } - while let Some(result) = jobs.next().await { - match result { - Ok(job_item) => { - results.push(job_item); - } - Err(e) => return Err(e.into()), + async fn get_jobs_by_statuses(&self, job_status: Vec, limit: Option) -> Result> { + let filter = doc! { + "job_status": { + // TODO: Check that the conversion leads to valid output! + "$in": job_status.iter().map(|status| bson::to_bson(status).unwrap_or(Bson::Null)).collect::>() } - } + }; + + let find_options = limit.map(|val| FindOptions::builder().limit(Some(val)).build()); + + let jobs = self.get_job_collection().find(filter, find_options).await?.try_collect().await?; - Ok(results) + Ok(jobs) } } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index b501cade..9a49209d 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -80,7 +80,7 @@ pub async fn process_job(id: Uuid) -> Result<()> { match job.status { // we only want to process jobs that are in the created or verification failed state. // verification failed state means that the previous processing failed and we want to retry - JobStatus::Created | JobStatus::VerificationFailed(_) => { + JobStatus::Created | JobStatus::VerificationFailed => { log::info!("Processing job with id {:?}", id); } _ => { @@ -135,7 +135,13 @@ pub async fn verify_job(id: Uuid) -> Result<()> { config.database().update_job_status(&job, JobStatus::Completed).await?; } JobVerificationStatus::Rejected(e) => { - config.database().update_job_status(&job, JobStatus::VerificationFailed(e)).await?; + let mut new_job = job.clone(); + new_job.metadata.insert("error".to_string(), e); + new_job.status = JobStatus::VerificationFailed; + + config.database().update_job(&new_job).await?; + + log::error!("Verification failed for job with id {:?}. Cannot verify.", id); // retry job processing if we haven't exceeded the max limit let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index b8e492b4..fce76280 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -98,7 +98,7 @@ pub enum JobStatus { /// The job was processed but the was unable to be verified under the given time VerificationTimeout, /// The job failed processing - VerificationFailed(String), + VerificationFailed, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 27bdf573..b2394093 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -2,6 +2,7 @@ use dotenvy::dotenv; use orchestrator::config::config; use orchestrator::queue::init_consumers; use orchestrator::routes::app_router; +use orchestrator::workers::data_submission_worker::DataSubmissionWorker; use orchestrator::workers::proof_registration::ProofRegistrationWorker; use orchestrator::workers::proving::ProvingWorker; use orchestrator::workers::snos::SnosWorker; @@ -33,6 +34,7 @@ async fn main() { tokio::spawn(start_cron(Box::new(ProvingWorker), 60)); tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60)); tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60)); + tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60)); tracing::info!("Listening on http://{}", address); axum::serve(listener, app).await.expect("Failed to start axum server"); @@ -40,7 +42,7 @@ async fn main() { async fn start_cron(worker: Box, interval: u64) { loop { - worker.run_worker().await.expect("Error in running the worker."); + worker.run_worker_if_enabled().await.expect("Error in running the worker."); tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await; } } diff --git a/crates/orchestrator/src/tests/workers/snos/mod.rs b/crates/orchestrator/src/tests/workers/snos/mod.rs index fa9a4a1e..d4903e47 100644 --- a/crates/orchestrator/src/tests/workers/snos/mod.rs +++ b/crates/orchestrator/src/tests/workers/snos/mod.rs @@ -1,6 +1,6 @@ use crate::config::config_force_init; use crate::database::MockDatabase; -use crate::jobs::types::JobType; +use crate::jobs::types::{JobStatus, JobType}; use crate::queue::MockQueueProvider; use crate::tests::common::init_config; use crate::tests::workers::utils::get_job_item_mock_by_id; @@ -30,15 +30,18 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { // Mocking db function expectations if !db_val { - db.expect_get_last_successful_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None)); + db.expect_get_latest_job_by_type_and_status() + .times(1) + .with(eq(JobType::SnosRun), eq(JobStatus::Completed)) + .returning(|_, _| Ok(None)); start_job_index = 1; block = 5; } else { let uuid_temp = Uuid::new_v4(); - db.expect_get_last_successful_job_by_type() - .with(eq(JobType::SnosRun)) - .returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp)))); + db.expect_get_latest_job_by_type_and_status() + .with(eq(JobType::SnosRun), eq(JobStatus::Completed)) + .returning(move |_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp)))); block = 6; start_job_index = 2; } diff --git a/crates/orchestrator/src/tests/workers/update_state/mod.rs b/crates/orchestrator/src/tests/workers/update_state/mod.rs index a5271f9f..e9062a2a 100644 --- a/crates/orchestrator/src/tests/workers/update_state/mod.rs +++ b/crates/orchestrator/src/tests/workers/update_state/mod.rs @@ -33,15 +33,18 @@ async fn test_update_state_worker( // Mocking db function expectations // If no successful state update jobs exist if !last_successful_job_exists { - db.expect_get_last_successful_job_by_type().with(eq(JobType::StateTransition)).times(1).returning(|_| Ok(None)); + db.expect_get_latest_job_by_type_and_status() + .with(eq(JobType::StateTransition), eq(JobStatus::Completed)) + .times(1) + .returning(|_, _| Ok(None)); } else { // if successful state update job exists // mocking the return value of first function call (getting last successful jobs): - db.expect_get_last_successful_job_by_type() - .with(eq(JobType::StateTransition)) + db.expect_get_latest_job_by_type_and_status() + .with(eq(JobType::StateTransition), eq(JobStatus::Completed)) .times(1) - .returning(|_| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4())))); + .returning(|_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4())))); // mocking the return values of second function call (getting completed proving worker jobs) db.expect_get_jobs_after_internal_id_by_job_type() diff --git a/crates/orchestrator/src/workers/data_submission_worker.rs b/crates/orchestrator/src/workers/data_submission_worker.rs new file mode 100644 index 00000000..3c2d4331 --- /dev/null +++ b/crates/orchestrator/src/workers/data_submission_worker.rs @@ -0,0 +1,48 @@ +use crate::config::config; +use crate::jobs::create_job; +use crate::jobs::types::{JobStatus, JobType}; +use crate::workers::Worker; +use async_trait::async_trait; +use std::collections::HashMap; +use std::error::Error; + +pub struct DataSubmissionWorker; + +#[async_trait] +impl Worker for DataSubmissionWorker { + // 0. All ids are assumed to be block numbers. + // 1. Fetch the latest completed Proving job. + // 2. Fetch the latest DA job creation. + // 3. Create jobs from after the lastest DA job already created till latest completed proving job. + async fn run_worker(&self) -> Result<(), Box> { + let config = config().await; + + // provides latest completed proof creation job id + let latest_proven_job_id = config + .database() + .get_latest_job_by_type_and_status(JobType::ProofCreation, JobStatus::Completed) + .await + .unwrap() + .map(|item| item.internal_id) + .unwrap_or("0".to_string()); + + // provides latest triggered data submission job id + let latest_data_submission_job_id = config + .database() + .get_latest_job_by_type(JobType::DataSubmission) + .await + .unwrap() + .map(|item| item.internal_id) + .unwrap_or("0".to_string()); + + let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?; + let latest_proven_id: u64 = latest_proven_job_id.parse()?; + + // creating data submission jobs for latest blocks that don't have existing data submission jobs yet. + for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 { + create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new()).await?; + } + + Ok(()) + } +} diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index ba833b7e..785b1e2d 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -1,7 +1,8 @@ -use std::error::Error; - +use crate::{config::config, jobs::types::JobStatus}; use async_trait::async_trait; +use std::error::Error; +pub mod data_submission_worker; pub mod proof_registration; pub mod proving; pub mod snos; @@ -9,5 +10,37 @@ pub mod update_state; #[async_trait] pub trait Worker: Send + Sync { + async fn run_worker_if_enabled(&self) -> Result<(), Box> { + if !self.is_worker_enabled().await? { + return Ok(()); + } + self.run_worker().await + } + async fn run_worker(&self) -> Result<(), Box>; + + // Assumption + // If say a job for block X fails, we don't want the worker to respawn another job for the same block + // we will resolve the existing failed job first. + + // We assume the system to keep working till a job hasn't failed, + // as soon as it fails we currently halt any more execution and wait for manual intervention. + + // Checks if any of the jobs have failed + // Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed + // Halts any new job creation till all the count of failed jobs is not Zero. + async fn is_worker_enabled(&self) -> Result> { + let config = config().await; + + let failed_jobs = config + .database() + .get_jobs_by_statuses(vec![JobStatus::VerificationFailed, JobStatus::VerificationTimeout], Some(1)) + .await?; + + if !failed_jobs.is_empty() { + return Ok(false); + } + + Ok(true) + } } diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index f9791c94..7508263a 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -6,7 +6,7 @@ use starknet::providers::Provider; use crate::config::config; use crate::jobs::create_job; -use crate::jobs::types::JobType; +use crate::jobs::types::{JobStatus, JobType}; use crate::workers::Worker; pub struct SnosWorker; @@ -22,7 +22,7 @@ impl Worker for SnosWorker { let latest_block_number = provider.block_number().await?; let latest_block_processed_data = config .database() - .get_last_successful_job_by_type(JobType::SnosRun) + .get_latest_job_by_type_and_status(JobType::SnosRun, JobStatus::Completed) .await .unwrap() .map(|item| item.internal_id) diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index c100ab0e..654b7e3e 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -16,7 +16,8 @@ impl Worker for UpdateStateWorker { /// 3. Create state updates for all the blocks that don't have a state update job async fn run_worker(&self) -> Result<(), Box> { let config = config().await; - let latest_successful_job = config.database().get_last_successful_job_by_type(JobType::StateTransition).await?; + let latest_successful_job = + config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?; match latest_successful_job { Some(job) => { diff --git a/crates/prover-services/gps-fact-checker/src/fact_node.rs b/crates/prover-services/gps-fact-checker/src/fact_node.rs index 2d66fbac..494c9669 100644 --- a/crates/prover-services/gps-fact-checker/src/fact_node.rs +++ b/crates/prover-services/gps-fact-checker/src/fact_node.rs @@ -12,8 +12,8 @@ //! constructed using a stack of nodes (initialized to an empty stack) by repeating for each pair: //! 1. Add #n_pages lead nodes to the stack. //! 2. Pop the top #n_nodes, construct a parent node for them, and push it back to the stack. -//! After applying the steps above, the stack must contain exactly one node, which will -//! constitute the root of the Merkle tree. +//! After applying the steps above, the stack must contain exactly one node, which will +//! constitute the root of the Merkle tree. //! //! For example, [(2, 2)] will create a Merkle tree with a root and two direct children, while //! [(3, 2), (0, 2)] will create a Merkle tree with a root whose left child is a leaf and