Skip to content

Commit

Permalink
feat : added state update worker (#36)
Browse files Browse the repository at this point in the history
* feat : added state update worker

* feat : fixed tests and updated worker code

* refactor and added some comments
  • Loading branch information
ocdbytes committed Jul 17, 2024
1 parent 00fc6f5 commit 33a12b5
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 6,423 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@

*.code-workspace
.vscode

lcov.info
7 changes: 7 additions & 0 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ pub trait Database: Send + Sync {
job_a_status: JobStatus,
job_b_type: JobType,
) -> Result<Vec<JobItem>>;
async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_jobs_after_internal_id_by_job_type(
&self,
job_type: JobType,
job_status: JobStatus,
internal_id: String,
) -> Result<Vec<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
46 changes: 46 additions & 0 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,50 @@ impl Database for MongoDb {

Ok(vec_jobs)
}

async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": bson::to_bson(&job_type)?,
"job_status": bson::to_bson(&JobStatus::Completed)?
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();

Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
}

async fn get_jobs_after_internal_id_by_job_type(
&self,
job_type: JobType,
job_status: JobStatus,
internal_id: String,
) -> Result<Vec<JobItem>> {
let filter = doc! {
"job_type": bson::to_bson(&job_type)?,
"job_status": bson::to_bson(&job_status)?,
"internal_id": { "$gt": internal_id }
};

let mut jobs = self
.get_job_collection()
.find(filter, None)
.await
.expect("Failed to fetch latest jobs by given job type and internal_od conditions");

let mut results = Vec::new();

while let Some(result) = jobs.next().await {
match result {
Ok(job_item) => {
results.push(job_item);
}
Err(e) => return Err(e.into()),
}
}

Ok(results)
}
}
1 change: 1 addition & 0 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
JobType::DataSubmission => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(snos_job::SnosJob),
JobType::ProofCreation => Box::new(proving_job::ProvingJob),
JobType::StateTransition => Box::new(state_update_job::StateUpdateJob),
_ => unimplemented!("Job type not implemented yet."),
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/tests/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
pub mod proving;
#[cfg(test)]
pub mod snos;
mod update_state;
mod utils;
66 changes: 9 additions & 57 deletions crates/orchestrator/src/tests/workers/proving/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::constants::JOB_METADATA_CAIRO_PIE_PATH_KEY;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::jobs::types::{JobItem, JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::tests::workers::utils::{db_checks_proving_worker, get_job_by_mock_id_vector};
use crate::workers::proving::ProvingWorker;
use crate::workers::Worker;
use da_client_interface::MockDaClient;
use httpmock::MockServer;
use mockall::predicate::eq;
use prover_client_interface::MockProverClient;
use rstest::rstest;
use settlement_client_interface::MockSettlementClient;
use std::collections::HashMap;
use std::error::Error;
use std::time::Duration;
use tokio::time::sleep;
use uuid::Uuid;

#[rstest]
#[case(false)]
Expand All @@ -37,8 +34,10 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box<dy
// Mocking Prover Client

if incomplete_runs {
let jobs_vec_temp: Vec<JobItem> =
get_job_item_mock_by_id_vec(5).into_iter().filter(|val| val.internal_id != "3").collect();
let jobs_vec_temp: Vec<JobItem> = get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Created, 5, 1)
.into_iter()
.filter(|val| val.internal_id != "3")
.collect();
// Mocking db call for getting successful snos jobs
db.expect_get_jobs_without_successor()
.times(1)
Expand All @@ -48,7 +47,7 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box<dy
let num_vec: Vec<i32> = vec![1, 2, 4, 5];

for i in num_vec {
db_checks(i, &mut db);
db_checks_proving_worker(i, &mut db);
}

prover_client.expect_submit_task().times(4).returning(|_| Ok("task_id".to_string()));
Expand All @@ -61,14 +60,14 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box<dy
.withf(|queue, _payload, _delay| queue == JOB_PROCESSING_QUEUE);
} else {
for i in 1..5 + 1 {
db_checks(i, &mut db);
db_checks_proving_worker(i, &mut db);
}

// Mocking db call for getting successful snos jobs
db.expect_get_jobs_without_successor()
.times(1)
.withf(|_, _, _| true)
.returning(move |_, _, _| Ok(get_job_item_mock_by_id_vec(5)));
.returning(move |_, _, _| Ok(get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Created, 5, 1)));

prover_client.expect_submit_task().times(5).returning(|_| Ok("task_id".to_string()));

Expand Down Expand Up @@ -96,50 +95,3 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box<dy

Ok(())
}

fn get_job_item_mock_by_id_vec(count: i32) -> Vec<JobItem> {
let mut job_vec: Vec<JobItem> = Vec::new();
for i in 1..count + 1 {
let uuid = Uuid::new_v4();
job_vec.push(JobItem {
id: uuid,
internal_id: i.to_string(),
job_type: JobType::ProofCreation,
status: JobStatus::Created,
external_id: ExternalId::Number(0),
metadata: get_hashmap(),
version: 0,
})
}
job_vec
}

fn get_job_item_mock_by_id(id: i32) -> JobItem {
let uuid = Uuid::new_v4();
JobItem {
id: uuid,
internal_id: id.to_string(),
job_type: JobType::ProofCreation,
status: JobStatus::Created,
external_id: ExternalId::Number(0),
metadata: get_hashmap(),
version: 0,
}
}

fn db_checks(id: i32, db: &mut MockDatabase) {
db.expect_get_job_by_internal_id_and_type()
.times(1)
.with(eq(id.clone().to_string()), eq(JobType::ProofCreation))
.returning(|_, _| Ok(None));

db.expect_create_job()
.times(1)
.withf(move |item| item.internal_id == id.clone().to_string())
.returning(move |_| Ok(get_job_item_mock_by_id(id)));
}

fn get_hashmap() -> HashMap<String, String> {
let cairo_pie_path = format!("{}/src/tests/artifacts/fibonacci.zip", env!("CARGO_MANIFEST_DIR"));
HashMap::from([(JOB_METADATA_CAIRO_PIE_PATH_KEY.into(), cairo_pie_path)])
}
16 changes: 2 additions & 14 deletions crates/orchestrator/src/tests/workers/snos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::jobs::types::JobType;
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::tests::workers::utils::get_job_item_mock_by_id;
use crate::workers::snos::SnosWorker;
use crate::workers::Worker;
use da_client_interface::MockDaClient;
use httpmock::MockServer;
use mockall::predicate::eq;
use rstest::rstest;
use serde_json::json;
use std::collections::HashMap;
use std::error::Error;
use uuid::Uuid;

Expand Down Expand Up @@ -92,15 +92,3 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {

Ok(())
}

fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem {
JobItem {
id: uuid,
internal_id: id.clone(),
job_type: JobType::SnosRun,
status: JobStatus::Created,
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
}
}
97 changes: 97 additions & 0 deletions crates/orchestrator/src/tests/workers/update_state/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::{JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::tests::workers::utils::{
db_create_job_expectations_update_state_worker, get_job_by_mock_id_vector, get_job_item_mock_by_id,
};
use crate::workers::update_state::UpdateStateWorker;
use crate::workers::Worker;
use da_client_interface::MockDaClient;
use httpmock::MockServer;
use mockall::predicate::eq;
use rstest::rstest;
use std::error::Error;
use uuid::Uuid;

#[rstest]
#[case(false, 0)]
#[case(true, 5)]
#[tokio::test]
async fn test_update_state_worker(
#[case] last_successful_job_exists: bool,
#[case] number_of_processed_jobs: usize,
) -> Result<(), Box<dyn Error>> {
let server = MockServer::start();
let da_client = MockDaClient::new();
let mut db = MockDatabase::new();
let mut queue = MockQueueProvider::new();

const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";

// Mocking db function expectations
// If no successful state update jobs exist
if !last_successful_job_exists {
db.expect_get_last_successful_job_by_type().with(eq(JobType::StateTransition)).times(1).returning(|_| Ok(None));
} else {
// if successful state update job exists

// mocking the return value of first function call (getting last successful jobs):
db.expect_get_last_successful_job_by_type()
.with(eq(JobType::StateTransition))
.times(1)
.returning(|_| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()))));

// mocking the return values of second function call (getting completed proving worker jobs)
db.expect_get_jobs_after_internal_id_by_job_type()
.with(eq(JobType::ProofCreation), eq(JobStatus::Completed), eq("1".to_string()))
.returning(move |_, _, _| {
Ok(get_job_by_mock_id_vector(
JobType::ProofCreation,
JobStatus::Completed,
number_of_processed_jobs as u64,
2,
))
});

// mocking getting of the jobs (when there is a safety check for any pre-existing job during job creation)
let completed_jobs =
get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Completed, number_of_processed_jobs as u64, 2);
for job in completed_jobs {
db.expect_get_job_by_internal_id_and_type()
.times(1)
.with(eq(job.internal_id.to_string()), eq(JobType::StateTransition))
.returning(|_, _| Ok(None));
}

// mocking the creation of jobs
db_create_job_expectations_update_state_worker(
&mut db,
get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Completed, number_of_processed_jobs as u64, 2),
);
}

// Queue function call simulations
queue
.expect_send_message_to_queue()
.returning(|_, _, _| Ok(()))
.withf(|queue, _payload, _delay| queue == JOB_PROCESSING_QUEUE);

// mock block number (madara) : 5
let config = init_config(
Some(format!("http://localhost:{}", server.port())),
Some(db),
Some(queue),
Some(da_client),
None,
None,
)
.await;
config_force_init(config).await;

let update_state_worker = UpdateStateWorker {};
update_state_worker.run_worker().await?;

Ok(())
}
Loading

0 comments on commit 33a12b5

Please sign in to comment.