diff --git a/.env.example b/.env.example index 57635205..259562b5 100644 --- a/.env.example +++ b/.env.example @@ -21,9 +21,14 @@ STARKNET_CAIRO_CORE_CONTRACT_ADDRESS= # MongoDB connection string MONGODB_CONNECTION_STRING= -# SQS +# AWS AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= +AWS_DEFAULT_REGION= + +# SQS +SQS_JOB_PROCESSING_QUEUE_URL= +SQS_JOB_VERIFICATION_QUEUE_URL= # S3 AWS_S3_BUCKET_NAME= diff --git a/.env.test b/.env.test index 75cfea32..586dbcbc 100644 --- a/.env.test +++ b/.env.test @@ -5,6 +5,9 @@ AWS_SECRET_ACCESS_KEY="AWS_SECRET_ACCESS_KEY" AWS_S3_BUCKET_NAME="madara-orchestrator-test-bucket" AWS_S3_BUCKET_REGION="us-east-1" AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566" +SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue" +SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue" +AWS_DEFAULT_REGION="localhost" ##### On chain config ##### diff --git a/CHANGELOG.md b/CHANGELOG.md index 92536c8a..cf35cc49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,13 +10,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - Function to calculate the kzg proof of x_0. - Tests for updating the state. - Function to update the state and publish blob on ethereum in state update job. +- Tests for job handlers in orchestrator/src/jobs/mod.rs. - Fixtures for testing. +- Basic rust-toolchain support. +- `AWS_DEFAULT_REGION="localhost"` var. in .env.test for omniqueue queue testing. - Added basic rust-toolchain support. ## Changed - GitHub's coverage CI yml file for localstack and db testing. - Orchestrator :Moved TestConfigBuilder to `config.rs` in tests folder. +- `.env` file requires two more variables which are queue urls for processing + and verification. ## Removed diff --git a/Cargo.lock b/Cargo.lock index be09e96f..da33c8fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2837,9 +2837,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a4a5e448145999d7de17bf44a886900ecb834953408dae8aaf90465ce91c1dd" +checksum = "87c5f920ffd1e0526ec9e70e50bf444db50b204395a0fa7016bbf9e31ea1698f" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -2896,9 +2896,9 @@ dependencies = [ [[package]] name = "aws-sdk-sqs" -version = "1.29.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f3fb8da46554d08e63272e56495f7c94908c16dc62d3c7cc8a0fb4d7591726a" +checksum = "3587fbaf540d65337c2356ebf3f78fba160025b3d69634175f1ea3a7895738e9" dependencies = [ "aws-credential-types", "aws-runtime", @@ -2985,9 +2985,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.2" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31eed8d45759b2c5fe7fd304dd70739060e9e0de509209036eabea14d0720cce" +checksum = "5df1b0fa6be58efe9d4ccc257df0a53b89cd8909e86591a13ca54817c87517be" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -3057,9 +3057,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.8" +version = "0.60.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a7de001a1b9a25601016d8057ea16e31a45fdca3751304c8edf4ad72e706c08" +checksum = "d9cd0ae3d97daa0a2bf377a4d8e8e1362cae590c4a1aad0d40058ebca18eb91e" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -3097,9 +3097,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.6.0" +version = "1.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db83b08939838d18e33b5dbaf1a0f048f28c10bd28071ab7ce6f245451855414" +checksum = "ce87155eba55e11768b8c1afa607f3e864ae82f03caf63258b37455b0ad02537" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -3124,9 +3124,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b570ea39eb95bd32543f6e4032bce172cb6209b9bc8c83c770d08169e875afc" +checksum = "30819352ed0a04ecf6a2f3477e344d2d1ba33d43e0f09ad9047c12e0d923616f" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -3176,9 +3176,9 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.3.2" +version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2009a9733865d0ebf428a314440bbe357cc10d0c16d86a8e15d32e9b47c1e80e" +checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -5305,7 +5305,7 @@ dependencies = [ "async-trait", "axum 0.7.5", "color-eyre", - "mockall", + "mockall 0.12.1", "starknet", ] @@ -5939,7 +5939,7 @@ dependencies = [ "color-eyre", "da-client-interface", "dotenv", - "mockall", + "mockall 0.12.1", "reqwest 0.12.5", "rstest 0.18.2", "serde", @@ -5959,7 +5959,7 @@ dependencies = [ "c-kzg", "color-eyre", "dotenv", - "mockall", + "mockall 0.12.1", "reqwest 0.12.5", "rstest 0.18.2", "serde", @@ -8240,7 +8240,21 @@ dependencies = [ "downcast", "fragile", "lazy_static", - "mockall_derive", + "mockall_derive 0.12.1", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c28b3fb6d753d28c20e826cd46ee611fda1cf3cde03a443a974043247c065a" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive 0.13.0", "predicates", "predicates-tree", ] @@ -8257,6 +8271,30 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "mockall_derive" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "341014e7f530314e9a1fdbc7400b244efea7122662c96bfa248c31da5bfb2020" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.66", +] + +[[package]] +name = "mockall_double" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ca96e5ac35256ae3e13536edd39b172b88f41615e1d7b653c8ad24524113e8" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "mongodb" version = "2.8.2" @@ -9409,10 +9447,12 @@ dependencies = [ "alloy 0.1.2", "aptos-da-client", "arc-swap", + "assert_matches", "async-std", "async-trait", "aws-config", "aws-sdk-s3", + "aws-sdk-sqs", "axum 0.7.5", "axum-macros", "bincode 1.3.3", @@ -9432,7 +9472,8 @@ dependencies = [ "log", "majin-blob-core", "majin-blob-types", - "mockall", + "mockall 0.13.0", + "mockall_double", "mongodb", "num", "num-bigint 0.4.5", @@ -10585,7 +10626,7 @@ dependencies = [ "async-trait", "cairo-vm 1.0.0-rc3", "gps-fact-checker", - "mockall", + "mockall 0.12.1", "snos", "thiserror", "utils", @@ -12035,7 +12076,7 @@ dependencies = [ "axum 0.7.5", "c-kzg", "color-eyre", - "mockall", + "mockall 0.12.1", "starknet", ] @@ -12677,7 +12718,7 @@ dependencies = [ "color-eyre", "dotenv", "lazy_static", - "mockall", + "mockall 0.12.1", "reqwest 0.12.5", "rstest 0.18.2", "serde", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 4d0cbba2..555e2a14 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -15,10 +15,12 @@ path = "src/main.rs" alloy = { workspace = true } aptos-da-client = { workspace = true, optional = true } arc-swap = { workspace = true } +assert_matches = "1.5.0" async-std = "1.12.0" async-trait = { workspace = true } aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] } +aws-sdk-sqs = "1.36.0" axum = { workspace = true, features = ["macros"] } axum-macros = { workspace = true } bincode = { workspace = true } @@ -36,7 +38,8 @@ lazy_static = { workspace = true } log = "0.4.21" majin-blob-core = { git = "https://github.com/AbdelStark/majin-blob", branch = "main" } majin-blob-types = { git = "https://github.com/AbdelStark/majin-blob", branch = "main" } -mockall = "0.12.1" +mockall = { version = "0.13.0" } +mockall_double = "0.3.1" mongodb = { workspace = true, features = ["bson-uuid-1"], optional = true } num = { workspace = true } num-bigint = { workspace = true } diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index f03c53fd..a410755e 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -1,6 +1,6 @@ -use ::mongodb::bson::doc; use std::collections::HashMap; +use ::mongodb::bson::doc; use async_trait::async_trait; use color_eyre::Result; use mockall::automock; diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 4b21b9c4..7db24a09 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -359,6 +359,7 @@ mod tests { use serde_json::json; use super::*; + // use majin_blob_types::serde; use crate::tests::common::init_config; #[rstest] diff --git a/crates/orchestrator/src/jobs/job_handler_factory.rs b/crates/orchestrator/src/jobs/job_handler_factory.rs new file mode 100644 index 00000000..5a4daef3 --- /dev/null +++ b/crates/orchestrator/src/jobs/job_handler_factory.rs @@ -0,0 +1,65 @@ +use mockall::automock; + +#[automock] +pub mod factory { + use std::sync::Arc; + + #[allow(unused_imports)] + use mockall::automock; + + use crate::jobs::types::JobType; + use crate::jobs::{da_job, proving_job, snos_job, state_update_job, Job}; + + /// To get the job handler + // +-------------------+ + // | | + // | Arc>| + // | | + // +--------+----------+ + // | + // | +----------------+ + // | | | + // +--->| Box | + // | | | + // | +----------------+ + // | | + // | | + // +-------v-------+ | + // | | | + // | Closure 1 | | + // | | | + // +---------------+ | + // | + // +---------------+ | + // | | | + // | Closure x | | + // | | | + // +---------------+ | + // | + // | + // v + // +--------------+ + // | | + // | dyn Job | + // | (job_handler)| + // | | + // +--------------+ + /// We are using Arc so that we can call the Arc::clone while testing that will point + /// to the same Box. So when we are mocking the behaviour : + /// + /// - We create the MockJob + /// - We return this mocked job whenever a function calls `get_job_handler` + /// - Making it an Arc allows us to return the same MockJob in multiple calls to `get_job_handler`. This is needed because `MockJob` doesn't implement Clone + pub async fn get_job_handler(job_type: &JobType) -> Arc> { + // Original implementation + let job: Box = match job_type { + JobType::DataSubmission => Box::new(da_job::DaJob), + JobType::SnosRun => Box::new(snos_job::SnosJob), + JobType::ProofCreation => Box::new(proving_job::ProvingJob), + JobType::StateTransition => Box::new(state_update_job::StateUpdateJob), + _ => unimplemented!("Job type not implemented yet."), + }; + + Arc::new(job) + } +} diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 9a49209d..3a90d2fa 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -4,25 +4,30 @@ use std::time::Duration; use async_trait::async_trait; use color_eyre::eyre::eyre; use color_eyre::Result; +use mockall::automock; +use mockall_double::double; use tracing::log; use uuid::Uuid; use crate::config::{config, Config}; use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; +#[double] +use crate::jobs::job_handler_factory::factory; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue}; pub mod constants; pub mod da_job; +pub mod job_handler_factory; pub mod proving_job; pub mod register_proof_job; pub mod snos_job; pub mod state_update_job; -pub mod types; /// The Job trait is used to define the methods that a job /// should implement to be used as a job for the orchestrator. The orchestrator automatically /// handles queueing and processing of jobs as long as they implement the trait. +#[automock] #[async_trait] pub trait Job: Send + Sync { /// Should build a new job item and return it @@ -50,6 +55,8 @@ pub trait Job: Send + Sync { fn verification_polling_delay_seconds(&self) -> u64; } +pub mod types; + /// Creates the job in the DB in the created state and adds it to the process queue pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMap) -> Result<()> { let config = config().await; @@ -63,7 +70,7 @@ pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMa )); } - let job_handler = get_job_handler(&job_type); + let job_handler = factory::get_job_handler(&job_type).await; let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?; config.database().create_job(job_item.clone()).await?; @@ -93,7 +100,7 @@ pub async fn process_job(id: Uuid) -> Result<()> { // outdated config.database().update_job_status(&job, JobStatus::LockedForProcessing).await?; - let job_handler = get_job_handler(&job.job_type); + let job_handler = factory::get_job_handler(&job.job_type).await; let external_id = job_handler.process_job(config.as_ref(), &mut job).await?; let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; @@ -127,7 +134,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> { } } - let job_handler = get_job_handler(&job.job_type); + let job_handler = factory::get_job_handler(&job.job_type).await; let verification_status = job_handler.verify_job(config.as_ref(), &mut job).await?; match verification_status { @@ -162,7 +169,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> { let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?; if verify_attempts >= job_handler.max_verification_attempts() { // TODO: send alert - log::info!("Verification attempts exceeded for job {}. Marking as timedout.", job.id); + log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id); config.database().update_job_status(&job, JobStatus::VerificationTimeout).await?; return Ok(()); } @@ -179,16 +186,6 @@ pub async fn verify_job(id: Uuid) -> Result<()> { Ok(()) } -fn get_job_handler(job_type: &JobType) -> Box { - match job_type { - JobType::DataSubmission => Box::new(da_job::DaJob), - JobType::SnosRun => Box::new(snos_job::SnosJob), - JobType::ProofCreation => Box::new(proving_job::ProvingJob), - JobType::StateTransition => Box::new(state_update_job::StateUpdateJob), - _ => unimplemented!("Job type not implemented yet."), - } -} - async fn get_job(id: Uuid) -> Result { let config = config().await; let job = config.database().get_job_by_id(id).await?; @@ -201,7 +198,7 @@ async fn get_job(id: Uuid) -> Result { } } -fn increment_key_in_metadata(metadata: &HashMap, key: &str) -> Result> { +pub fn increment_key_in_metadata(metadata: &HashMap, key: &str) -> Result> { let mut new_metadata = metadata.clone(); let attempt = get_u64_from_metadata(metadata, key)?; let incremented_value = attempt.checked_add(1); diff --git a/crates/orchestrator/src/queue/sqs/mod.rs b/crates/orchestrator/src/queue/sqs/mod.rs index 0ba901fd..1598189a 100644 --- a/crates/orchestrator/src/queue/sqs/mod.rs +++ b/crates/orchestrator/src/queue/sqs/mod.rs @@ -1,9 +1,11 @@ use std::time::Duration; +use crate::queue::job_queue::JOB_PROCESSING_QUEUE; use async_trait::async_trait; use color_eyre::Result; use omniqueue::backends::{SqsBackend, SqsConfig, SqsConsumer, SqsProducer}; use omniqueue::{Delivery, QueueError}; +use utils::env_utils::get_env_var_or_panic; use crate::queue::QueueProvider; pub struct SqsQueue; @@ -11,7 +13,8 @@ pub struct SqsQueue; #[async_trait] impl QueueProvider for SqsQueue { async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option) -> Result<()> { - let producer = get_producer(queue).await?; + let queue_url = get_queue_url(queue); + let producer = get_producer(queue_url).await?; match delay { Some(d) => producer.send_raw_scheduled(payload.as_str(), d).await?, @@ -22,20 +25,29 @@ impl QueueProvider for SqsQueue { } async fn consume_message_from_queue(&self, queue: String) -> std::result::Result { - let mut consumer = get_consumer(queue).await?; + let queue_url = get_queue_url(queue); + let mut consumer = get_consumer(queue_url).await?; consumer.receive().await } } +fn get_queue_url(queue_name: String) -> String { + if queue_name == JOB_PROCESSING_QUEUE { + get_env_var_or_panic("SQS_JOB_PROCESSING_QUEUE_URL") + } else { + get_env_var_or_panic("SQS_JOB_VERIFICATION_QUEUE_URL") + } +} + // TODO: store the producer and consumer in memory to avoid creating a new one every time async fn get_producer(queue: String) -> Result { let (producer, _) = - SqsBackend::builder(SqsConfig { queue_dsn: queue, override_endpoint: false }).build_pair().await?; + SqsBackend::builder(SqsConfig { queue_dsn: queue, override_endpoint: true }).build_pair().await?; Ok(producer) } async fn get_consumer(queue: String) -> std::result::Result { let (_, consumer) = - SqsBackend::builder(SqsConfig { queue_dsn: queue, override_endpoint: false }).build_pair().await?; + SqsBackend::builder(SqsConfig { queue_dsn: queue, override_endpoint: true }).build_pair().await?; Ok(consumer) } diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index c008aa22..619591d5 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -4,11 +4,13 @@ use std::collections::HashMap; use std::sync::Arc; use ::uuid::Uuid; +use aws_config::Region; use constants::*; use da_client_interface::MockDaClient; use mongodb::Client; use prover_client_interface::MockProverClient; use rstest::*; +use serde::Deserialize; use settlement_client_interface::MockSettlementClient; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; @@ -24,6 +26,7 @@ use crate::database::{DatabaseConfig, MockDatabase}; use crate::jobs::types::JobStatus::Created; use crate::jobs::types::JobType::DataSubmission; use crate::jobs::types::{ExternalId, JobItem}; +use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE}; use crate::queue::MockQueueProvider; pub async fn init_config( @@ -89,6 +92,40 @@ pub async fn drop_database() -> color_eyre::Result<()> { Ok(()) } +// SQS structs & functions + +pub async fn create_sqs_queues() -> color_eyre::Result<()> { + let sqs_client = get_sqs_client().await; + + // Dropping sqs queues + let list_queues_output = sqs_client.list_queues().send().await?; + let queue_urls = list_queues_output.queue_urls(); + log::debug!("Found {} queues", queue_urls.len()); + for queue_url in queue_urls { + match sqs_client.delete_queue().queue_url(queue_url).send().await { + Ok(_) => log::debug!("Successfully deleted queue: {}", queue_url), + Err(e) => eprintln!("Error deleting queue {}: {:?}", queue_url, e), + } + } + + // Creating SQS queues + sqs_client.create_queue().queue_name(JOB_PROCESSING_QUEUE).send().await?; + sqs_client.create_queue().queue_name(JOB_VERIFICATION_QUEUE).send().await?; + Ok(()) +} + +async fn get_sqs_client() -> aws_sdk_sqs::Client { + // This function is for localstack. So we can hardcode the region for this as of now. + let region_provider = Region::new("us-east-1"); + let config = aws_config::from_env().region(region_provider).load().await; + aws_sdk_sqs::Client::new(&config) +} + +#[derive(Deserialize, Debug)] +pub struct MessagePayloadType { + pub(crate) id: Uuid, +} + pub async fn get_storage_client() -> Box { Box::new(AWSS3::new(AWSS3ConfigType::WithEndpoint(S3LocalStackConfig::new_from_env())).await) } diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index f64f18aa..2108e1a2 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use crate::config::{build_da_client, build_prover_service, build_settlement_client, config_force_init, Config}; use crate::data_storage::DataStorage; use da_client_interface::DaClient; +use httpmock::MockServer; + use prover_client_interface::ProverClient; use settlement_client_interface::SettlementClient; use starknet::providers::jsonrpc::HttpTransport; @@ -15,9 +17,8 @@ use crate::database::mongodb::MongoDb; use crate::database::{Database, DatabaseConfig}; use crate::queue::sqs::SqsQueue; use crate::queue::QueueProvider; +use crate::tests::common::{create_sqs_queues, drop_database, get_storage_client}; -use crate::tests::common::{drop_database, get_storage_client}; -use httpmock::MockServer; // Inspiration : https://rust-unofficial.github.io/patterns/patterns/creational/builder.html // TestConfigBuilder allows to heavily customise the global configs based on the test's requirement. // Eg: We want to mock only the da client and leave rest to be as it is, use mock_da_client. @@ -65,6 +66,11 @@ impl TestConfigBuilder { self } + pub fn mock_db_client(mut self, db_client: Box) -> TestConfigBuilder { + self.database = Some(db_client); + self + } + pub async fn build(mut self) -> MockServer { dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); @@ -101,7 +107,10 @@ impl TestConfigBuilder { } } - drop_database().await.unwrap(); + // Deleting and Creating the queues in sqs. + create_sqs_queues().await.expect("Not able to delete and create the queues."); + // Deleting the database + drop_database().await.expect("Unable to drop the database."); let config = Config::new( self.starknet_client.unwrap_or_else(|| { diff --git a/crates/orchestrator/src/tests/jobs/mod.rs b/crates/orchestrator/src/tests/jobs/mod.rs index 7a707131..b40326aa 100644 --- a/crates/orchestrator/src/tests/jobs/mod.rs +++ b/crates/orchestrator/src/tests/jobs/mod.rs @@ -1,5 +1,3 @@ -use rstest::rstest; - #[cfg(test)] pub mod da_job; @@ -9,14 +7,497 @@ pub mod proving_job; #[cfg(test)] pub mod state_update_job; +use assert_matches::assert_matches; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use mockall::predicate::eq; +use mongodb::bson::doc; +use omniqueue::QueueError; +use rstest::rstest; +use tokio::time::sleep; +use uuid::Uuid; + +use crate::config::config; +use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; +use crate::jobs::job_handler_factory::mock_factory; +use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus}; +use crate::jobs::{create_job, increment_key_in_metadata, process_job, verify_job, Job, MockJob}; +use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE}; +use crate::tests::common::MessagePayloadType; +use crate::tests::config::TestConfigBuilder; + +/// Tests `create_job` function when job is not existing in the db. +#[rstest] +#[tokio::test] +async fn create_job_job_does_not_exists_in_db_works() { + let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "0".to_string()); + let mut job_handler = MockJob::new(); + + // Adding expectation for creation of new job. + let job_item_clone = job_item.clone(); + job_handler.expect_create_job().times(1).returning(move |_, _, _| Ok(job_item_clone.clone())); + + TestConfigBuilder::new().build().await; + let config = config().await; + + // Mocking the `get_job_handler` call in create_job function. + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + ctx.expect().times(1).with(eq(JobType::SnosRun)).return_once(move |_| Arc::clone(&job_handler)); + + assert!(create_job(JobType::SnosRun, "0".to_string(), HashMap::new()).await.is_ok()); + + let mut hashmap: HashMap = HashMap::new(); + hashmap.insert(JOB_PROCESS_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); + hashmap.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); + + // Db checks. + let job_in_db = config.database().get_job_by_id(job_item.id).await.unwrap().unwrap(); + assert_eq!(job_in_db.id, job_item.id); + assert_eq!(job_in_db.internal_id, job_item.internal_id); + assert_eq!(job_in_db.metadata, hashmap); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks. + let consumed_messages = config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); + let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); + assert_eq!(consumed_message_payload.id, job_item.id); +} + +/// Tests `create_job` function when job is already existing in the db. #[rstest] #[tokio::test] -async fn create_job_fails_job_already_exists() { - // TODO +async fn create_job_job_exists_in_db_works() { + let job_item = build_job_item_by_type_and_status(JobType::ProofCreation, JobStatus::Created, "0".to_string()); + + TestConfigBuilder::new().build().await; + + let config = config().await; + let database_client = config.database(); + database_client.create_job(job_item).await.unwrap(); + + assert!(create_job(JobType::ProofCreation, "0".to_string(), HashMap::new()).await.is_err()); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks. + let consumed_messages = + config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + assert_matches!(consumed_messages, QueueError::NoData); } +/// Tests `create_job` function when job handler is not implemented in the `get_job_handler` +/// This test should fail as job handler is not implemented in the `factory.rs` #[rstest] +#[should_panic(expected = "Job type not implemented yet.")] #[tokio::test] -async fn create_job_fails_works_new_job() { - // TODO +async fn create_job_job_handler_is_not_implemented_panics() { + TestConfigBuilder::new().build().await; + let config = config().await; + + // Mocking the `get_job_handler` call in create_job function. + let ctx = mock_factory::get_job_handler_context(); + ctx.expect().times(1).returning(|_| panic!("Job type not implemented yet.")); + + assert!(create_job(JobType::ProofCreation, "0".to_string(), HashMap::new()).await.is_err()); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks. + let consumed_messages = + config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + assert_matches!(consumed_messages, QueueError::NoData); +} + +/// Tests `process_job` function when job is already existing in the db and job status is either +/// `Created` or `VerificationFailed`. +#[rstest] +#[case(JobType::SnosRun, JobStatus::Created)] +#[case(JobType::DataSubmission, JobStatus::VerificationFailed)] +#[tokio::test] +async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works( + #[case] job_type: JobType, + #[case] job_status: JobStatus, +) { + let job_item = build_job_item_by_type_and_status(job_type.clone(), job_status.clone(), "1".to_string()); + + // Building config + TestConfigBuilder::new().build().await; + let config = config().await; + let database_client = config.database(); + + let mut job_handler = MockJob::new(); + + // Creating job in database + database_client.create_job(job_item.clone()).await.unwrap(); + // Expecting process job function in job processor to return the external ID. + job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string())); + job_handler.expect_verification_polling_delay_seconds().return_const(1u64); + + // Mocking the `get_job_handler` call in create_job function. + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + ctx.expect().times(1).with(eq(job_type.clone())).returning(move |_| Arc::clone(&job_handler)); + + assert!(process_job(job_item.id).await.is_ok()); + // Getting the updated job. + let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); + // checking if job_status is updated in db + assert_eq!(updated_job.status, JobStatus::PendingVerification); + assert_eq!(updated_job.external_id, ExternalId::String(Box::from("0xbeef"))); + assert_eq!(updated_job.metadata.get(JOB_PROCESS_ATTEMPT_METADATA_KEY).unwrap(), "1"); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks + let consumed_messages = + config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); + let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); + assert_eq!(consumed_message_payload.id, job_item.id); +} + +/// Tests `process_job` function when job is already existing in the db and job status is not +/// `Created` or `VerificationFailed`. +#[rstest] +#[tokio::test] +async fn process_job_with_job_exists_in_db_with_invalid_job_processing_status_errors() { + // Creating a job with Completed status which is invalid processing. + let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Completed, "1".to_string()); + + // building config + TestConfigBuilder::new().build().await; + let config = config().await; + let database_client = config.database(); + + // creating job in database + database_client.create_job(job_item.clone()).await.unwrap(); + + assert!(process_job(job_item.id).await.is_err()); + + let job_in_db = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); + // Job should be untouched in db. + assert_eq!(job_in_db, job_item); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks. + let consumed_messages = + config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + assert_matches!(consumed_messages, QueueError::NoData); +} + +/// Tests `process_job` function when job is not in the db +/// This test should fail +#[rstest] +#[tokio::test] +async fn process_job_job_does_not_exists_in_db_works() { + // Creating a valid job which is not existing in the db. + let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "1".to_string()); + + // building config + TestConfigBuilder::new().build().await; + let config = config().await; + + assert!(process_job(job_item.id).await.is_err()); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks. + let consumed_messages = + config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + assert_matches!(consumed_messages, QueueError::NoData); +} + +/// Tests `process_job` function when 2 workers try to process the same job. +/// This test should fail because once the job is locked for processing on one +/// worker it should not be accessed by another worker and should throw an error +/// when updating the job status. +#[rstest] +#[tokio::test] +async fn process_job_two_workers_process_same_job_works() { + let mut job_handler = MockJob::new(); + // Expecting process job function in job processor to return the external ID. + job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string())); + job_handler.expect_verification_polling_delay_seconds().return_const(1u64); + + // Mocking the `get_job_handler` call in create_job function. + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + ctx.expect().times(1).with(eq(JobType::SnosRun)).returning(move |_| Arc::clone(&job_handler)); + + // building config + TestConfigBuilder::new().build().await; + let config = config().await; + let db_client = config.database(); + + let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "1".to_string()); + + // Creating the job in the db + db_client.create_job(job_item.clone()).await.unwrap(); + + // Simulating the two workers + let worker_1 = tokio::spawn(async move { process_job(job_item.id).await }); + let worker_2 = tokio::spawn(async move { process_job(job_item.id).await }); + + // waiting for workers to complete the processing + let (result_1, result_2) = tokio::join!(worker_1, worker_2); + + assert_ne!( + result_1.unwrap().is_ok(), + result_2.unwrap().is_ok(), + "One worker should succeed and the other should fail" + ); + + // Waiting for 5 secs for job to be updated in the db + sleep(Duration::from_secs(5)).await; + + let final_job_in_db = db_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); + assert_eq!(final_job_in_db.status, JobStatus::PendingVerification); +} + +/// Tests `verify_job` function when job is having expected status +/// and returns a `Verified` verification status. +#[rstest] +#[tokio::test] +async fn verify_job_with_verified_status_works() { + let job_item = + build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); + + // building config + TestConfigBuilder::new().build().await; + + let config = config().await; + let database_client = config.database(); + let mut job_handler = MockJob::new(); + + // creating job in database + database_client.create_job(job_item.clone()).await.unwrap(); + // expecting process job function in job processor to return the external ID + job_handler.expect_verify_job().times(1).returning(move |_, _| Ok(JobVerificationStatus::Verified)); + job_handler.expect_max_process_attempts().returning(move || 2u64); + + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + // Mocking the `get_job_handler` call in create_job function. + ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); + + assert!(verify_job(job_item.id).await.is_ok()); + + // DB checks. + let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); + assert_eq!(updated_job.status, JobStatus::Completed); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks. + let consumed_messages_verification_queue = + config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + assert_matches!(consumed_messages_verification_queue, QueueError::NoData); + let consumed_messages_processing_queue = + config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + assert_matches!(consumed_messages_processing_queue, QueueError::NoData); +} + +/// Tests `verify_job` function when job is having expected status +/// and returns a `Rejected` verification status. +#[rstest] +#[tokio::test] +async fn verify_job_with_rejected_status_adds_to_queue_works() { + let job_item = + build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); + + // building config + TestConfigBuilder::new().build().await; + + let config = config().await; + let database_client = config.database(); + let mut job_handler = MockJob::new(); + + // creating job in database + database_client.create_job(job_item.clone()).await.unwrap(); + job_handler.expect_verify_job().times(1).returning(move |_, _| Ok(JobVerificationStatus::Rejected("".to_string()))); + job_handler.expect_max_process_attempts().returning(move || 2u64); + + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + // Mocking the `get_job_handler` call in create_job function. + ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); + + assert!(verify_job(job_item.id).await.is_ok()); + + // DB checks. + let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); + assert_eq!(updated_job.status, JobStatus::VerificationFailed); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks. + let consumed_messages = config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); + let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); + assert_eq!(consumed_message_payload.id, job_item.id); +} + +/// Tests `verify_job` function when job is having expected status +/// and returns a `Rejected` verification status but doesn't add +/// the job to process queue because of maximum attempts reached. +#[rstest] +#[tokio::test] +async fn verify_job_with_rejected_status_works() { + let mut job_item = + build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); + + // increasing JOB_VERIFICATION_ATTEMPT_METADATA_KEY to simulate max. attempts reached. + let metadata = increment_key_in_metadata(&job_item.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY).unwrap(); + job_item.metadata = metadata; + + // building config + TestConfigBuilder::new().build().await; + + let config = config().await; + let database_client = config.database(); + let mut job_handler = MockJob::new(); + + // creating job in database + database_client.create_job(job_item.clone()).await.unwrap(); + // expecting process job function in job processor to return the external ID + job_handler.expect_verify_job().times(1).returning(move |_, _| Ok(JobVerificationStatus::Rejected("".to_string()))); + job_handler.expect_max_process_attempts().returning(move || 1u64); + + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + // Mocking the `get_job_handler` call in create_job function. + ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); + + assert!(verify_job(job_item.id).await.is_ok()); + + // DB checks. + let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); + assert_eq!(updated_job.status, JobStatus::VerificationFailed); + assert_eq!(updated_job.metadata.get(JOB_PROCESS_ATTEMPT_METADATA_KEY).unwrap(), "1"); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks. + let consumed_messages_processing_queue = + config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + assert_matches!(consumed_messages_processing_queue, QueueError::NoData); +} + +/// Tests `verify_job` function when job is having expected status +/// and returns a `Pending` verification status. +#[rstest] +#[tokio::test] +async fn verify_job_with_pending_status_adds_to_queue_works() { + let job_item = + build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); + + // building config + TestConfigBuilder::new().build().await; + + let config = config().await; + let database_client = config.database(); + let mut job_handler = MockJob::new(); + + // creating job in database + database_client.create_job(job_item.clone()).await.unwrap(); + // expecting process job function in job processor to return the external ID + job_handler.expect_verify_job().times(1).returning(move |_, _| Ok(JobVerificationStatus::Pending)); + job_handler.expect_max_verification_attempts().returning(move || 2u64); + job_handler.expect_verification_polling_delay_seconds().returning(move || 2u64); + + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + // Mocking the `get_job_handler` call in create_job function. + ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); + + assert!(verify_job(job_item.id).await.is_ok()); + + // DB checks. + let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); + assert_eq!(updated_job.metadata.get(JOB_VERIFICATION_ATTEMPT_METADATA_KEY).unwrap(), "1"); + assert_eq!(updated_job.status, JobStatus::PendingVerification); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks + let consumed_messages = + config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); + let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); + assert_eq!(consumed_message_payload.id, job_item.id); +} + +/// Tests `verify_job` function when job is having expected status +/// and returns a `Pending` verification status but doesn't add +/// the job to process queue because of maximum attempts reached. +#[rstest] +#[tokio::test] +async fn verify_job_with_pending_status_works() { + let mut job_item = + build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); + + // increasing JOB_VERIFICATION_ATTEMPT_METADATA_KEY to simulate max. attempts reached. + let metadata = increment_key_in_metadata(&job_item.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY).unwrap(); + job_item.metadata = metadata; + + // building config + TestConfigBuilder::new().build().await; + + let config = config().await; + let database_client = config.database(); + let mut job_handler = MockJob::new(); + + // creating job in database + database_client.create_job(job_item.clone()).await.unwrap(); + // expecting process job function in job processor to return the external ID + job_handler.expect_verify_job().times(1).returning(move |_, _| Ok(JobVerificationStatus::Pending)); + job_handler.expect_max_verification_attempts().returning(move || 1u64); + job_handler.expect_verification_polling_delay_seconds().returning(move || 2u64); + + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + // Mocking the `get_job_handler` call in create_job function. + ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); + + assert!(verify_job(job_item.id).await.is_ok()); + + // DB checks. + let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); + assert_eq!(updated_job.status, JobStatus::VerificationTimeout); + assert_eq!(updated_job.metadata.get(JOB_VERIFICATION_ATTEMPT_METADATA_KEY).unwrap(), "1"); + + // Waiting for 5 secs for message to be passed into the queue + sleep(Duration::from_secs(5)).await; + + // Queue checks. + let consumed_messages_verification_queue = + config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + assert_matches!(consumed_messages_verification_queue, QueueError::NoData); +} + +fn build_job_item_by_type_and_status(job_type: JobType, job_status: JobStatus, internal_id: String) -> JobItem { + let mut hashmap: HashMap = HashMap::new(); + hashmap.insert(JOB_PROCESS_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); + hashmap.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); + JobItem { + id: Uuid::new_v4(), + internal_id, + job_type, + status: job_status, + external_id: ExternalId::Number(0), + metadata: hashmap, + version: 0, + } } diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index 082af9ca..cd5d8f85 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -1,29 +1,26 @@ +use std::collections::HashMap; +use std::fs; +use std::path::PathBuf; + +use bytes::Bytes; +use httpmock::prelude::*; +use lazy_static::lazy_static; use mockall::predicate::eq; use rstest::*; use settlement_client_interface::MockSettlementClient; -use bytes::Bytes; -use std::path::PathBuf; -use std::{collections::HashMap, fs}; - use super::super::common::init_config; - -use crate::jobs::{ - constants::{ - JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY, JOB_METADATA_STATE_UPDATE_FETCH_FROM_TESTS, - JOB_PROCESS_ATTEMPT_METADATA_KEY, - }, - state_update_job::StateUpdateJob, - types::{JobStatus, JobType}, - Job, -}; - use crate::config::{config, config_force_init}; use crate::constants::{BLOB_DATA_FILE_NAME, SNOS_OUTPUT_FILE_NAME}; use crate::data_storage::MockDataStorage; +use crate::jobs::constants::{ + JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY, JOB_METADATA_STATE_UPDATE_FETCH_FROM_TESTS, + JOB_PROCESS_ATTEMPT_METADATA_KEY, +}; use crate::jobs::state_update_job::utils::hex_string_to_u8_vec; -use httpmock::prelude::*; -use lazy_static::lazy_static; +use crate::jobs::state_update_job::StateUpdateJob; +use crate::jobs::types::{JobStatus, JobType}; +use crate::jobs::Job; lazy_static! { pub static ref CURRENT_PATH: PathBuf = std::env::current_dir().unwrap(); diff --git a/crates/orchestrator/src/tests/workers/proving/mod.rs b/crates/orchestrator/src/tests/workers/proving/mod.rs index 79a076f4..4bf0ef75 100644 --- a/crates/orchestrator/src/tests/workers/proving/mod.rs +++ b/crates/orchestrator/src/tests/workers/proving/mod.rs @@ -1,23 +1,26 @@ +use std::error::Error; +use std::sync::Arc; + +use da_client_interface::MockDaClient; +use httpmock::MockServer; +use mockall::predicate::eq; +use prover_client_interface::MockProverClient; +use rstest::rstest; +use settlement_client_interface::MockSettlementClient; + use crate::config::config_force_init; use crate::database::MockDatabase; +use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::{JobItem, JobStatus, JobType}; +use crate::jobs::{Job, MockJob}; use crate::queue::MockQueueProvider; use crate::tests::common::init_config; use crate::tests::workers::utils::{db_checks_proving_worker, get_job_by_mock_id_vector}; use crate::workers::proving::ProvingWorker; -use crate::workers::Worker; -use da_client_interface::MockDaClient; -use httpmock::MockServer; -use prover_client_interface::MockProverClient; -use rstest::rstest; -use settlement_client_interface::MockSettlementClient; -use std::error::Error; -use std::time::Duration; -use tokio::time::sleep; #[rstest] -#[case(false)] #[case(true)] +#[case(false)] #[tokio::test] async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box> { let server = MockServer::start(); @@ -27,12 +30,15 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box = get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Created, 5, 1) .into_iter() @@ -47,7 +53,7 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box = vec![1, 2, 4, 5]; for i in num_vec { - db_checks_proving_worker(i, &mut db); + db_checks_proving_worker(i, &mut db, &mut job_handler); } prover_client.expect_submit_task().times(4).returning(|_| Ok("task_id".to_string())); @@ -60,7 +66,7 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box Result<(), Box> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + // Mocking the `get_job_handler` call in create_job function. + if incomplete_runs { + ctx.expect().times(4).with(eq(JobType::ProofCreation)).returning(move |_| Arc::clone(&job_handler)); + } else { + ctx.expect().times(5).with(eq(JobType::ProofCreation)).returning(move |_| Arc::clone(&job_handler)); + } + let proving_worker = ProvingWorker {}; proving_worker.run_worker().await?; Ok(()) } + +use crate::workers::Worker; diff --git a/crates/orchestrator/src/tests/workers/snos/mod.rs b/crates/orchestrator/src/tests/workers/snos/mod.rs index 4579e651..45917b30 100644 --- a/crates/orchestrator/src/tests/workers/snos/mod.rs +++ b/crates/orchestrator/src/tests/workers/snos/mod.rs @@ -1,19 +1,24 @@ +use std::error::Error; +use std::sync::Arc; + +use da_client_interface::MockDaClient; +use httpmock::MockServer; +use mockall::predicate::eq; +use rstest::rstest; +use serde_json::json; +use uuid::Uuid; + use crate::config::config_force_init; use crate::database::MockDatabase; +use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::{JobStatus, JobType}; +use crate::jobs::{Job, MockJob}; use crate::queue::job_queue::JOB_PROCESSING_QUEUE; use crate::queue::MockQueueProvider; use crate::tests::common::init_config; use crate::tests::workers::utils::get_job_item_mock_by_id; use crate::workers::snos::SnosWorker; use crate::workers::Worker; -use da_client_interface::MockDaClient; -use httpmock::MockServer; -use mockall::predicate::eq; -use rstest::rstest; -use serde_json::json; -use std::error::Error; -use uuid::Uuid; #[rstest] #[case(false)] @@ -27,6 +32,9 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { let start_job_index; let block; + // Mocking the get_job_handler function. + let mut job_handler = MockJob::new(); + // Mocking db function expectations if !db_val { db.expect_get_latest_job_by_type_and_status() @@ -54,13 +62,23 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { let uuid = Uuid::new_v4(); + let job_item = get_job_item_mock_by_id(i.clone().to_string(), uuid); + let job_item_cloned = job_item.clone(); + + job_handler.expect_create_job().times(1).returning(move |_, _, _| Ok(job_item_cloned.clone())); + // creating jobs call expectations db.expect_create_job() .times(1) .withf(move |item| item.internal_id == i.clone().to_string()) - .returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid))); + .returning(move |_| Ok(job_item.clone())); } + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + // Mocking the `get_job_handler` call in create_job function. + ctx.expect().times(5).with(eq(JobType::SnosRun)).returning(move |_| Arc::clone(&job_handler)); + // Queue function call simulations queue .expect_send_message_to_queue() diff --git a/crates/orchestrator/src/tests/workers/update_state/mod.rs b/crates/orchestrator/src/tests/workers/update_state/mod.rs index e9062a2a..8cc8485d 100644 --- a/crates/orchestrator/src/tests/workers/update_state/mod.rs +++ b/crates/orchestrator/src/tests/workers/update_state/mod.rs @@ -1,6 +1,17 @@ +use std::error::Error; +use std::sync::Arc; + +use da_client_interface::MockDaClient; +use httpmock::MockServer; +use mockall::predicate::eq; +use rstest::rstest; +use uuid::Uuid; + use crate::config::config_force_init; use crate::database::MockDatabase; +use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::{JobStatus, JobType}; +use crate::jobs::{Job, MockJob}; use crate::queue::MockQueueProvider; use crate::tests::common::init_config; use crate::tests::workers::utils::{ @@ -8,12 +19,6 @@ use crate::tests::workers::utils::{ }; use crate::workers::update_state::UpdateStateWorker; use crate::workers::Worker; -use da_client_interface::MockDaClient; -use httpmock::MockServer; -use mockall::predicate::eq; -use rstest::rstest; -use std::error::Error; -use uuid::Uuid; #[rstest] #[case(false, 0)] @@ -30,6 +35,9 @@ async fn test_update_state_worker( const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; + // Mocking the get_job_handler function. + let mut job_handler = MockJob::new(); + // Mocking db function expectations // If no successful state update jobs exist if !last_successful_job_exists { @@ -58,7 +66,8 @@ async fn test_update_state_worker( )) }); - // mocking getting of the jobs (when there is a safety check for any pre-existing job during job creation) + // mocking getting of the jobs (when there is a safety check for any pre-existing job during job + // creation) let completed_jobs = get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Completed, number_of_processed_jobs as u64, 2); for job in completed_jobs { @@ -72,9 +81,17 @@ async fn test_update_state_worker( db_create_job_expectations_update_state_worker( &mut db, get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Completed, number_of_processed_jobs as u64, 2), + &mut job_handler, ); } + let y: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + // Mocking the `get_job_handler` call in create_job function. + if last_successful_job_exists { + ctx.expect().times(5).with(eq(JobType::StateTransition)).returning(move |_| Arc::clone(&y)); + } + // Queue function call simulations queue .expect_send_message_to_queue() diff --git a/crates/orchestrator/src/tests/workers/utils/mod.rs b/crates/orchestrator/src/tests/workers/utils/mod.rs index 8e776155..03dd0cd3 100644 --- a/crates/orchestrator/src/tests/workers/utils/mod.rs +++ b/crates/orchestrator/src/tests/workers/utils/mod.rs @@ -1,6 +1,7 @@ use crate::database::MockDatabase; use crate::jobs::constants::JOB_METADATA_CAIRO_PIE_PATH_KEY; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; +use crate::jobs::MockJob; use mockall::predicate::eq; use std::collections::HashMap; use uuid::Uuid; @@ -52,24 +53,34 @@ pub fn get_job_by_mock_id_vector( jobs_vec } -pub fn db_create_job_expectations_update_state_worker(db: &mut MockDatabase, proof_creation_jobs: Vec) { +pub fn db_create_job_expectations_update_state_worker( + db: &mut MockDatabase, + proof_creation_jobs: Vec, + mock_job: &mut MockJob, +) { for job in proof_creation_jobs { let internal_id = job.internal_id.clone(); - db.expect_create_job().times(1).withf(move |item| item.internal_id == job.internal_id).returning(move |_| { - Ok(JobItem { - id: Uuid::new_v4(), - internal_id: internal_id.clone(), - job_type: JobType::StateTransition, - status: JobStatus::Created, - external_id: ExternalId::Number(0), - metadata: get_hashmap(), - version: 0, - }) - }); + let job_item = JobItem { + id: Uuid::new_v4(), + internal_id: internal_id.clone(), + job_type: JobType::StateTransition, + status: JobStatus::Created, + external_id: ExternalId::Number(0), + metadata: get_hashmap(), + version: 0, + }; + let job_item_cloned = job_item.clone(); + + mock_job.expect_create_job().times(1).returning(move |_, _, _| Ok(job_item.clone())); + + db.expect_create_job() + .times(1) + .withf(move |item| item.internal_id == job.internal_id) + .returning(move |_| Ok(job_item_cloned.clone())); } } -pub fn db_checks_proving_worker(id: i32, db: &mut MockDatabase) { +pub fn db_checks_proving_worker(id: i32, db: &mut MockDatabase, mock_job: &mut MockJob) { fn get_job_item_mock_by_id(id: i32) -> JobItem { let uuid = Uuid::new_v4(); JobItem { @@ -88,10 +99,15 @@ pub fn db_checks_proving_worker(id: i32, db: &mut MockDatabase) { .with(eq(id.clone().to_string()), eq(JobType::ProofCreation)) .returning(|_, _| Ok(None)); + let job_item = get_job_item_mock_by_id(id); + let job_item_cloned = job_item.clone(); + + mock_job.expect_create_job().times(1).returning(move |_, _, _| Ok(job_item.clone())); + db.expect_create_job() .times(1) .withf(move |item| item.internal_id == id.clone().to_string()) - .returning(move |_| Ok(get_job_item_mock_by_id(id))); + .returning(move |_| Ok(job_item_cloned.clone())); } pub fn get_hashmap() -> HashMap { diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index 654b7e3e..d8198181 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -1,10 +1,10 @@ use std::error::Error; +use async_trait::async_trait; + use crate::config::config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; -use async_trait::async_trait; - use crate::workers::Worker; pub struct UpdateStateWorker;