Skip to content

Commit

Permalink
Feature: Implement proving worker (madara-alliance#22)
Browse files Browse the repository at this point in the history
* feat : added snos worker implementation and unit tests

* feat : added review #1 changes : added error handling for snos workers

* feat : added review #1 changes : added error handling for snos workers

* fix : lint

* fix : lint errors

* feat : added proving worker

* feat : added proving worker

* fix : refactor : uncomment temp changes

* fix : ci fixes

* fix : lint

* fix : lint

* feat : added complete implementation of proving job

* fix : tests fix proving worker

* fix : lint

* feat : db generic fucntion

* feat : refactoring

---------

Co-authored-by: Arun Jangra <ocdbytes@Aruns-MacBook-Pro.local>
  • Loading branch information
2 people authored and Tranduy1dol committed Jul 9, 2024
1 parent 81461a1 commit 5c8b720
Show file tree
Hide file tree
Showing 15 changed files with 6,762 additions and 120 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dotenvy = { workspace = true }
ethereum-da-client = { workspace = true, optional = true }
futures = { workspace = true }
lazy_static = { workspace = true }
log = "0.4.21"
majin-blob-core = { git = "https://github.com/AbdelStark/majin-blob", branch = "main" }
majin-blob-types = { git = "https://github.com/AbdelStark/majin-blob", branch = "main" }
mockall = "0.12.1"
Expand Down
8 changes: 7 additions & 1 deletion crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ::mongodb::bson::doc;
use std::collections::HashMap;

use async_trait::async_trait;
Expand Down Expand Up @@ -33,9 +34,14 @@ pub trait Database: Send + Sync {
new_status: JobStatus,
metadata: HashMap<String, String>,
) -> Result<()>;

async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_jobs_without_successor(
&self,
job_a_type: JobType,
job_a_status: JobStatus,
job_b_type: JobType,
) -> Result<Vec<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
133 changes: 130 additions & 3 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use async_std::stream::StreamExt;
use std::collections::HashMap;

use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::{doc, Document};
use mongodb::options::{ClientOptions, FindOneOptions, ServerApi, ServerApiVersion, UpdateOptions};
use mongodb::{Client, Collection};
use mongodb::bson::{Bson, Document};
use mongodb::options::{FindOneOptions, UpdateOptions};
use mongodb::{
bson,
bson::doc,
options::{ClientOptions, ServerApi, ServerApiVersion},
Client, Collection,
};
use uuid::Uuid;

use crate::database::mongodb::config::MongoDbConfig;
Expand Down Expand Up @@ -126,4 +132,125 @@ impl Database for MongoDb {
.await
.expect("Failed to fetch latest job by given job type"))
}

/// function to get jobs that don't have a successor job.
///
/// `job_a_type` : Type of job that we need to get that doesn't have any successor.
///
/// `job_a_status` : Status of job A.
///
/// `job_b_type` : Type of job that we need to have as a successor for Job A.
///
/// `job_b_status` : Status of job B which we want to check with.
///
/// Eg :
///
/// Getting SNOS jobs that do not have a successive proving job initiated yet.
///
/// job_a_type : SnosRun
///
/// job_a_status : Completed
///
/// job_b_type : ProofCreation
///
/// TODO : For now Job B status implementation is pending so we can pass None
async fn get_jobs_without_successor(
&self,
job_a_type: JobType,
job_a_status: JobStatus,
job_b_type: JobType,
) -> Result<Vec<JobItem>> {
// Convert enums to Bson strings
let job_a_type_bson = Bson::String(format!("{:?}", job_a_type));
let job_a_status_bson = Bson::String(format!("{:?}", job_a_status));
let job_b_type_bson = Bson::String(format!("{:?}", job_b_type));

// TODO :
// implement job_b_status here in the pipeline

// Construct the initial pipeline
let pipeline = vec![
// Stage 1: Match job_a_type with job_a_status
doc! {
"$match": {
"job_type": job_a_type_bson,
"status": job_a_status_bson,
}
},
// Stage 2: Lookup to find corresponding job_b_type jobs
doc! {
"$lookup": {
"from": "jobs",
"let": { "internal_id": "$internal_id" },
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{ "$eq": ["$job_type", job_b_type_bson] },
// Conditionally match job_b_status if provided
{ "$eq": ["$internal_id", "$$internal_id"] }
]
}
}
},
// TODO : Job B status code :
// // Add status matching if job_b_status is provided
// if let Some(status) = job_b_status {
// doc! {
// "$match": {
// "$expr": { "$eq": ["$status", status] }
// }
// }
// } else {
// doc! {}
// }
// ].into_iter().filter(|d| !d.is_empty()).collect::<Vec<_>>(),
],
"as": "successor_jobs"
}
},
// Stage 3: Filter out job_a_type jobs that have corresponding job_b_type jobs
doc! {
"$match": {
"successor_jobs": { "$eq": [] }
}
},
];

// TODO : Job B status code :
// // Conditionally add status matching for job_b_status
// if let Some(status) = job_b_status {
// let job_b_status_bson = Bson::String(format!("{:?}", status));
//
// // Access the "$lookup" stage in the pipeline and modify the "pipeline" array inside it
// if let Ok(lookup_stage) = pipeline[1].get_document_mut("pipeline") {
// if let Ok(lookup_pipeline) = lookup_stage.get_array_mut(0) {
// lookup_pipeline.push(Bson::Document(doc! {
// "$match": {
// "$expr": { "$eq": ["$status", job_b_status_bson] }
// }
// }));
// }
// }
// }

let collection = self.get_job_collection();
let mut cursor = collection.aggregate(pipeline, None).await?;

let mut vec_jobs: Vec<JobItem> = Vec::new();

// Iterate over the cursor and process each document
while let Some(result) = cursor.next().await {
match result {
Ok(document) => match bson::from_bson(Bson::Document(document)) {
Ok(job_item) => vec_jobs.push(job_item),
Err(e) => eprintln!("Failed to deserialize JobItem: {:?}", e),
},
Err(e) => eprintln!("Error retrieving document: {:?}", e),
}
}

Ok(vec_jobs)
}
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_

pub mod constants;
pub mod da_job;
pub mod prover_job;
pub mod proving_job;
pub mod register_proof_job;
pub mod snos_job;
pub mod state_update_job;
Expand Down Expand Up @@ -176,6 +176,7 @@ fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(snos_job::SnosJob),
JobType::ProofCreation => Box::new(proving_job::ProvingJob),
_ => unimplemented!("Job type not implemented yet."),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::Job;
use crate::config::Config;

pub struct ProverJob;
pub struct ProvingJob;

#[async_trait]
impl Job for ProverJob {
impl Job for ProvingJob {
async fn create_job(
&self,
_config: &Config,
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/tests/common/constants.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub const MADARA_RPC_URL: &str = "http://localhost:9944";
#[allow(dead_code)]
pub const ETHEREUM_MAX_BYTES_PER_BLOB: u64 = 131072;
#[allow(dead_code)]
pub const ETHEREUM_MAX_BLOB_PER_TXN: u64 = 6;
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use rstest::rstest;
pub mod da_job;

#[cfg(test)]
pub mod prover_job;
pub mod proving_job;

#[rstest]
#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use uuid::Uuid;

use super::super::common::{default_job_item, init_config};
use crate::jobs::constants::JOB_METADATA_CAIRO_PIE_PATH_KEY;
use crate::jobs::prover_job::ProverJob;
use crate::jobs::proving_job::ProvingJob;
use crate::jobs::types::{JobItem, JobStatus, JobType};
use crate::jobs::Job;

#[rstest]
#[tokio::test]
async fn test_create_job() {
let config = init_config(None, None, None, None, None).await;
let job = ProverJob
let job = ProvingJob
.create_job(
&config,
String::from("0"),
Expand All @@ -41,7 +41,7 @@ async fn test_verify_job(#[from(default_job_item)] job_item: JobItem) {
prover_client.expect_get_task_status().times(1).returning(|_| Ok(TaskStatus::Succeeded));

let config = init_config(None, None, None, None, Some(prover_client)).await;
assert!(ProverJob.verify_job(&config, &job_item).await.is_ok());
assert!(ProvingJob.verify_job(&config, &job_item).await.is_ok());
}

#[rstest]
Expand All @@ -58,7 +58,7 @@ async fn test_process_job() {
let cairo_pie_path = format!("{}/src/tests/artifacts/fibonacci.zip", env!("CARGO_MANIFEST_DIR"));

assert_eq!(
ProverJob
ProvingJob
.process_job(
&config,
&JobItem {
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ pub mod server;
pub mod queue;

pub mod common;
mod workers;
pub mod workers;
106 changes: 4 additions & 102 deletions crates/orchestrator/src/tests/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,102 +1,4 @@
use std::collections::HashMap;
use std::error::Error;

use da_client_interface::MockDaClient;
use httpmock::MockServer;
use mockall::predicate::eq;
use rstest::rstest;
use serde_json::json;
use uuid::Uuid;

use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::workers::snos::SnosWorker;
use crate::workers::Worker;

#[rstest]
#[case(false)]
#[case(true)]
#[tokio::test]
async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {
let server = MockServer::start();
let da_client = MockDaClient::new();
let mut db = MockDatabase::new();
let mut queue = MockQueueProvider::new();
let start_job_index;
let block;

const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";

// Mocking db function expectations
if !db_val {
db.expect_get_latest_job_by_type_and_internal_id().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
start_job_index = 1;
block = 5;
} else {
let uuid_temp = Uuid::new_v4();

db.expect_get_latest_job_by_type_and_internal_id()
.with(eq(JobType::SnosRun))
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
block = 6;
start_job_index = 2;
}

for i in start_job_index..block + 1 {
// Getting jobs for check expectations
db.expect_get_job_by_internal_id_and_type()
.times(1)
.with(eq(i.clone().to_string()), eq(JobType::SnosRun))
.returning(|_, _| Ok(None));

let uuid = Uuid::new_v4();

// creating jobs call expectations
db.expect_create_job()
.times(1)
.withf(move |item| item.internal_id == i.clone().to_string())
.returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid)));
}

// Queue function call simulations
queue
.expect_send_message_to_queue()
.returning(|_, _, _| Ok(()))
.withf(|queue, _payload, _delay| queue == JOB_PROCESSING_QUEUE);

// mock block number (madara) : 5
let rpc_response_block_number = block;
let response = json!({ "id": 1,"jsonrpc":"2.0","result": rpc_response_block_number });
let config =
init_config(Some(format!("http://localhost:{}", server.port())), Some(db), Some(queue), Some(da_client), None)
.await;
config_force_init(config).await;

// mocking block call
let rpc_block_call_mock = server.mock(|when, then| {
when.path("/").body_contains("starknet_blockNumber");
then.status(200).body(serde_json::to_vec(&response).unwrap());
});

let snos_worker = SnosWorker {};
snos_worker.run_worker().await?;

rpc_block_call_mock.assert();

Ok(())
}

fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem {
JobItem {
id: uuid,
internal_id: id.clone(),
job_type: JobType::SnosRun,
status: JobStatus::Created,
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
}
}
#[cfg(test)]
pub mod proving;
#[cfg(test)]
pub mod snos;
Loading

0 comments on commit 5c8b720

Please sign in to comment.