diff --git a/crates/orchestrator/src/tests/jobs/da_job/mod.rs b/crates/orchestrator/src/tests/jobs/da_job/mod.rs index 56f08375..f369abee 100644 --- a/crates/orchestrator/src/tests/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/da_job/mod.rs @@ -1,75 +1,157 @@ -use std::collections::HashMap; - -use crate::config::{config, config_force_init}; -use crate::data_storage::MockDataStorage; -use da_client_interface::{DaVerificationStatus, MockDaClient}; -use httpmock::prelude::*; +use crate::jobs::da_job::DaJob; +use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; +use crate::{ + config::{config, TestConfigBuilder}, + jobs::Job, +}; +use color_eyre::{eyre::eyre, Result}; +use da_client_interface::MockDaClient; use rstest::*; use serde_json::json; -use starknet_core::types::{FieldElement, MaybePendingStateUpdate, StateDiff, StateUpdate}; +use starknet_core::types::{FieldElement, MaybePendingStateUpdate, PendingStateUpdate, StateDiff, StateUpdate}; +use std::collections::HashMap; +use utils::env_utils::get_env_var_or_panic; use uuid::Uuid; -use super::super::common::constants::{ETHEREUM_MAX_BLOB_PER_TXN, ETHEREUM_MAX_BYTES_PER_BLOB}; -use super::super::common::{default_job_item, init_config}; -use crate::jobs::da_job::DaJob; -use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; -use crate::jobs::Job; - -#[rstest] -#[tokio::test] -async fn test_create_job() { - let config = init_config(None, None, None, None, None, None, None).await; - let job = DaJob.create_job(&config, String::from("0"), HashMap::new()).await; - assert!(job.is_ok()); - - let job = job.unwrap(); - - let job_type = job.job_type; - assert_eq!(job_type, JobType::DataSubmission, "job_type should be DataSubmission"); - assert!(!(job.id.is_nil()), "id should not be nil"); - assert_eq!(job.status, JobStatus::Created, "status should be Created"); - assert_eq!(job.version, 0_i32, "version should be 0"); - assert_eq!(job.external_id.unwrap_string().unwrap(), String::new(), "external_id should be empty string"); -} +// TODO : How to know which DA client we have enabled ? +// feature flag the tests ? #[rstest] -#[tokio::test] -async fn test_verify_job(#[from(default_job_item)] mut job_item: JobItem) { +async fn test_da_job_process_job_failure_on_impossible_blob_length() -> Result<()> { + // Mocking DA client calls let mut da_client = MockDaClient::new(); - da_client.expect_verify_inclusion().times(1).returning(|_| Ok(DaVerificationStatus::Verified)); + da_client.expect_max_blob_per_txn().with().returning(|| 6); + da_client.expect_max_bytes_per_blob().with().returning(|| 131072); + + let server = TestConfigBuilder::new().mock_da_client(Box::new(da_client)).build().await; + let config = config().await; + + let internal_id = "1"; + + let state_update = MaybePendingStateUpdate::Update(StateUpdate { + block_hash: FieldElement::default(), + new_root: FieldElement::default(), + old_root: FieldElement::default(), + state_diff: StateDiff { + storage_diffs: vec![], + deprecated_declared_classes: vec![], + declared_classes: vec![], + deployed_contracts: vec![], + replaced_classes: vec![], + nonces: vec![], + }, + }); + let state_update = serde_json::to_value(&state_update).unwrap(); + let response = json!({ "id": 1,"jsonrpc":"2.0","result": state_update }); + + let state_update_mock = server.mock(|when, then| { + when.path(get_env_var_or_panic("MADARA_RPC_URL").as_str()).body_contains("starknet_getStateUpdate"); + then.status(200).body(serde_json::to_vec(&response).unwrap()); + }); + + state_update_mock.assert(); + + let max_blob_per_txn = config.da_client().max_blob_per_txn().await; + let current_blob_length: u64 = 100; + + assert_eq!( + DaJob + .process_job( + config.as_ref(), + &mut JobItem { + id: Uuid::default(), + internal_id: internal_id.to_string(), + job_type: JobType::DataSubmission, + status: JobStatus::Created, + external_id: ExternalId::String("1".to_string().into_boxed_str()), + metadata: HashMap::default(), + version: 0, + } + ) + .await + .unwrap(), + eyre!( + "Exceeded the maximum number of blobs per transaction: allowed {}, found {} for block {} and job id {}", + max_blob_per_txn, + current_blob_length, + internal_id.to_string(), + Uuid::default() + ) + .to_string() + ); - let config = init_config(None, None, None, Some(da_client), None, None, None).await; - assert!(DaJob.verify_job(&config, &mut job_item).await.is_ok()); + Ok(()) } #[rstest] -#[tokio::test] -async fn test_process_job() { - let server = MockServer::start(); - +async fn test_da_job_process_job_failure_on_pending_block() -> Result<()> { + // Mocking DA client calls let mut da_client = MockDaClient::new(); - let mut storage_client = MockDataStorage::new(); + da_client.expect_max_blob_per_txn().with().returning(|| 6); + da_client.expect_max_bytes_per_blob().with().returning(|| 131072); + + let server = TestConfigBuilder::new().mock_da_client(Box::new(da_client)).build().await; + let config = config().await; let internal_id = "1"; - da_client.expect_max_bytes_per_blob().times(2).returning(move || ETHEREUM_MAX_BYTES_PER_BLOB); - da_client.expect_max_blob_per_txn().times(1).returning(move || ETHEREUM_MAX_BLOB_PER_TXN); - da_client.expect_publish_state_diff().times(1).returning(|_, _| Ok("0xbeef".to_string())); + let pending_state_update = MaybePendingStateUpdate::PendingUpdate(PendingStateUpdate { + old_root: FieldElement::default(), + state_diff: StateDiff { + storage_diffs: vec![], + deprecated_declared_classes: vec![], + declared_classes: vec![], + deployed_contracts: vec![], + replaced_classes: vec![], + nonces: vec![], + }, + }); + + let pending_state_update = serde_json::to_value(&pending_state_update).unwrap(); + let response = json!({ "id": 1,"jsonrpc":"2.0","result": pending_state_update }); + + let state_update_mock = server.mock(|when, then| { + when.path(get_env_var_or_panic("MADARA_RPC_URL").as_str()).body_contains("starknet_getStateUpdate"); + then.status(200).body(serde_json::to_vec(&response).unwrap()); + }); + + state_update_mock.assert(); + + assert_eq!( + DaJob + .process_job( + config.as_ref(), + &mut JobItem { + id: Uuid::default(), + internal_id: internal_id.to_string(), + job_type: JobType::DataSubmission, + status: JobStatus::Created, + external_id: ExternalId::String("1".to_string().into_boxed_str()), + metadata: HashMap::default(), + version: 0, + } + ) + .await + .unwrap(), + eyre!( + "Cannot process block {} for job id {} as it's still in pending state", + internal_id.to_string(), + Uuid::default() + ).to_string() + ); - // Mocking storage client - storage_client.expect_put_data().returning(|_, _| Ok(())).times(1); + Ok(()) +} - let config_init = init_config( - Some(format!("http://localhost:{}", server.port())), - None, - None, - Some(da_client), - None, - None, - Some(storage_client), - ) - .await; +#[rstest] +async fn test_da_job_process_job_success() -> Result<()> { + // Mocking DA client calls + let mut da_client = MockDaClient::new(); + da_client.expect_max_blob_per_txn().with().returning(|| 6); + da_client.expect_max_bytes_per_blob().with().returning(|| 131072); - config_force_init(config_init).await; + let server = TestConfigBuilder::new().mock_da_client(Box::new(da_client)).build().await; + let config = config().await; + let internal_id = "1"; let state_update = MaybePendingStateUpdate::Update(StateUpdate { block_hash: FieldElement::default(), @@ -88,14 +170,16 @@ async fn test_process_job() { let response = json!({ "id": 1,"jsonrpc":"2.0","result": state_update }); let state_update_mock = server.mock(|when, then| { - when.path("/").body_contains("starknet_getStateUpdate"); + when.path(get_env_var_or_panic("MADARA_RPC_URL").as_str()).body_contains("starknet_getStateUpdate"); then.status(200).body(serde_json::to_vec(&response).unwrap()); }); + state_update_mock.assert(); + assert_eq!( DaJob .process_job( - config().await.as_ref(), + config.as_ref(), &mut JobItem { id: Uuid::default(), internal_id: internal_id.to_string(), @@ -111,5 +195,5 @@ async fn test_process_job() { "0xbeef" ); - state_update_mock.assert(); + Ok(()) }