Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Implement proving worker #22

Merged
merged 19 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub trait Database: Send + Sync {

async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;

async fn get_successful_snos_jobs_without_proving(&self) -> Result<Vec<JobItem>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the database trait shouldn't have functions with the name snos or proving as that seems to leak the internal types. We should generalise the function over all types and take the type as an input. Function can look like

fn get_jobs_without_successor(job_a_type, job_a_status, job_b_type, job_b_status)

wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

}

pub trait DatabaseConfig {
Expand Down
53 changes: 52 additions & 1 deletion crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use crate::jobs::types::{JobItem, JobStatus, JobType};
use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::Document;
use futures::TryStreamExt;
use mongodb::bson::{Bson, Document};
use mongodb::options::{FindOneOptions, UpdateOptions};
use mongodb::{
bson,
bson::doc,
options::{ClientOptions, ServerApi, ServerApiVersion},
Client, Collection,
Expand Down Expand Up @@ -127,4 +129,53 @@ impl Database for MongoDb {
.await
.expect("Failed to fetch latest job by given job type"))
}

async fn get_successful_snos_jobs_without_proving(&self) -> Result<Vec<JobItem>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not an immediate issue, but we might need pagination here: imagine if there's a problem with the prover that lasts for quite some time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I will look into it and discuss and tell you. Can you explain a bit more on why we need pagination here in a db call ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case there is a really large amount of pending jobs :) Anyways, that's should not happen in practice, feel free to resolve

let filter = vec![
// Stage 1: Match successful SNOS job runs
doc! {
"$match": {
"job_type": "SnosRun",
"status": "Completed",
}
},
// Stage 2: Lookup to find corresponding proving jobs
doc! {
"$lookup": {
"from": "jobs",
"let": { "internal_id": "$internal_id" },
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{ "$eq": ["$job_type", "ProofCreation"] },
{ "$eq": ["$internal_id", "$$internal_id"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should rename internal_id to block_id at last? :)
The relation between jobs (and the fact they can have the same ID) is non-obvious otherwise

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok cool will rename internal_id to block_id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @unstark

So I talked about this to apoorv and he told me that the interal_id field was just to make sure that for a particular job type there is a way to track the jobs internally so that's why a generic name internal_id was given to this field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So @unstark my hypothesis was that the orchestrator can be designed in a way that we can also have jobs at the txn level later on. Maybe something like txn level proving or stream of txs to some XYZ place. So in that case, the internal_id will be the txn_hash. What are your thoughts on this? Does it feel like an overkill?

]
}
}
}
],
"as": "proving_jobs"
}
},
// Stage 3: Filter out SNOS runs that have corresponding proving jobs
doc! {
"$match": {
"proving_jobs": { "$eq": [] }
}
},
];

let mut cursor = self.get_job_collection().aggregate(filter, None).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming jobs are sorted by internal_id here? (just to confirm that we do not need to do explicit sorting)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we don't need sorting here as it will be already sorted result.

let mut vec_jobs: Vec<JobItem> = Vec::new();
while let Some(val) = cursor.try_next().await? {
match bson::from_bson(Bson::Document(val)) {
Ok(job_item) => vec_jobs.push(job_item),
Err(e) => eprintln!("Failed to deserialize JobItem: {:?}", e),
}
}

Ok(vec_jobs)
}
}
2 changes: 2 additions & 0 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use uuid::Uuid;

pub mod constants;
pub mod da_job;
pub mod proving_job;
mod register_proof_job;
pub mod snos_job;
mod state_update_job;
Expand Down Expand Up @@ -171,6 +172,7 @@ fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(snos_job::SnosJob),
JobType::ProofCreation => Box::new(proving_job::ProvingJob),
_ => unimplemented!("Job type not implemented yet."),
}
}
Expand Down
55 changes: 55 additions & 0 deletions crates/orchestrator/src/jobs/proving_job/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::config::Config;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::Job;
use async_trait::async_trait;
use color_eyre::Result;
use std::collections::HashMap;
use uuid::Uuid;

pub struct ProvingJob;

#[async_trait]
impl Job for ProvingJob {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need to merge with the #6

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will do it.

async fn create_job(
&self,
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
job_type: JobType::ProofCreation,
status: JobStatus::Created,
external_id: String::new().into(),
// metadata must contain the blocks that have been included inside this proof
// this will allow state update jobs to be created for each block
metadata,
version: 0,
})
}

async fn process_job(&self, _config: &Config, _job: &JobItem) -> Result<String> {
// Get proof from S3 and submit on chain for verification
// We need to implement a generic trait for this to support multiple
// base layers
todo!()
}

async fn verify_job(&self, _config: &Config, _job: &JobItem) -> Result<JobVerificationStatus> {
// verify that the proof transaction has been included on chain
todo!()
}

fn max_process_attempts(&self) -> u64 {
todo!()
}

fn max_verification_attempts(&self) -> u64 {
todo!()
}

fn verification_polling_delay_seconds(&self) -> u64 {
todo!()
}
}
2 changes: 2 additions & 0 deletions crates/orchestrator/src/tests/common/constants.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub const MADARA_RPC_URL: &str = "http://localhost:9944";
#[allow(dead_code)]
pub const ETHEREUM_MAX_BYTES_PER_BLOB: u64 = 131072;
#[allow(dead_code)]
pub const ETHEREUM_MAX_BLOB_PER_TXN: u64 = 6;
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ pub mod server;
pub mod queue;

pub mod common;
mod workers;
pub mod workers;
103 changes: 4 additions & 99 deletions crates/orchestrator/src/tests/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,99 +1,4 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::workers::snos::SnosWorker;
use crate::workers::Worker;
use da_client_interface::MockDaClient;
use httpmock::MockServer;
use mockall::predicate::eq;
use rstest::rstest;
use serde_json::json;
use std::collections::HashMap;
use std::error::Error;
use uuid::Uuid;

#[rstest]
#[case(false)]
#[case(true)]
#[tokio::test]
async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {
let server = MockServer::start();
let da_client = MockDaClient::new();
let mut db = MockDatabase::new();
let mut queue = MockQueueProvider::new();
let start_job_index;
let block;

const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";

// Mocking db function expectations
if !db_val {
db.expect_get_latest_job_by_type_and_internal_id().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
start_job_index = 1;
block = 5;
} else {
let uuid_temp = Uuid::new_v4();

db.expect_get_latest_job_by_type_and_internal_id()
.with(eq(JobType::SnosRun))
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
block = 6;
start_job_index = 2;
}

for i in start_job_index..block + 1 {
// Getting jobs for check expectations
db.expect_get_job_by_internal_id_and_type()
.times(1)
.with(eq(i.clone().to_string()), eq(JobType::SnosRun))
.returning(|_, _| Ok(None));

let uuid = Uuid::new_v4();

// creating jobs call expectations
db.expect_create_job()
.times(1)
.withf(move |item| item.internal_id == i.clone().to_string())
.returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid)));
}

// Queue function call simulations
queue
.expect_send_message_to_queue()
.returning(|_, _, _| Ok(()))
.withf(|queue, _payload, _delay| queue == JOB_PROCESSING_QUEUE);

// mock block number (madara) : 5
let rpc_response_block_number = block;
let response = json!({ "id": 1,"jsonrpc":"2.0","result": rpc_response_block_number });
let config =
init_config(Some(format!("http://localhost:{}", server.port())), Some(db), Some(queue), Some(da_client)).await;
config_force_init(config).await;

// mocking block call
let rpc_block_call_mock = server.mock(|when, then| {
when.path("/").body_contains("starknet_blockNumber");
then.status(200).body(serde_json::to_vec(&response).unwrap());
});

let snos_worker = SnosWorker {};
snos_worker.run_worker().await?;

rpc_block_call_mock.assert();

Ok(())
}

fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem {
JobItem {
id: uuid,
internal_id: id.clone(),
job_type: JobType::SnosRun,
status: JobStatus::Created,
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
}
}
#[cfg(test)]
pub mod proving;
#[cfg(test)]
pub mod snos;
110 changes: 110 additions & 0 deletions crates/orchestrator/src/tests/workers/proving/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::workers::proving::ProvingWorker;
use crate::workers::Worker;
use da_client_interface::MockDaClient;
use httpmock::MockServer;
use mockall::predicate::eq;
use rstest::rstest;
use std::collections::HashMap;
use std::error::Error;
use uuid::Uuid;

#[rstest]
#[case(false)]
#[case(true)]
#[tokio::test]
async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box<dyn Error>> {
let server = MockServer::start();
let da_client = MockDaClient::new();
let mut db = MockDatabase::new();
let mut queue = MockQueueProvider::new();

const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";

if incomplete_runs {
let jobs_vec_temp: Vec<JobItem> =
get_job_item_mock_by_id_vec(5).into_iter().filter(|val| val.internal_id != "3").collect();
// Mocking db call for getting successful snos jobs
db.expect_get_successful_snos_jobs_without_proving()
.times(1)
.with()
.returning(move || Ok(jobs_vec_temp.clone()));

let num_vec: Vec<i32> = vec![1, 2, 4, 5];

for i in num_vec {
db_checks(i, &mut db);
}
} else {
// Mocking db call for getting successful snos jobs
db.expect_get_successful_snos_jobs_without_proving()
.times(1)
.with()
.returning(|| Ok(get_job_item_mock_by_id_vec(5)));

for i in 1..5 + 1 {
db_checks(i, &mut db);
}
}

// Queue function call simulations
queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make this times 5 or 4?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in first case it is 5 in second case when one job is unsuccessful that's why we are using 4 here.

.expect_send_message_to_queue()
.returning(|_, _, _| Ok(()))
.withf(|queue, _payload, _delay| queue == JOB_PROCESSING_QUEUE);

let config =
init_config(Some(format!("http://localhost:{}", server.port())), Some(db), Some(queue), Some(da_client)).await;
config_force_init(config).await;

let proving_worker = ProvingWorker {};
proving_worker.run_worker().await?;

Ok(())
}

fn get_job_item_mock_by_id_vec(count: i32) -> Vec<JobItem> {
let mut job_vec: Vec<JobItem> = Vec::new();
for i in 1..count + 1 {
let uuid = Uuid::new_v4();
job_vec.push(JobItem {
id: uuid,
internal_id: i.to_string(),
job_type: JobType::ProofCreation,
status: JobStatus::Created,
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
})
}
job_vec
}

fn get_job_item_mock_by_id(id: i32) -> JobItem {
let uuid = Uuid::new_v4();
JobItem {
id: uuid,
internal_id: id.to_string(),
job_type: JobType::ProofCreation,
status: JobStatus::Created,
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
}
}

fn db_checks(id: i32, db: &mut MockDatabase) {
db.expect_get_job_by_internal_id_and_type()
.times(1)
.with(eq(id.clone().to_string()), eq(JobType::ProofCreation))
.returning(|_, _| Ok(None));

db.expect_create_job()
.times(1)
.withf(move |item| item.internal_id == id.clone().to_string())
.returning(move |_| Ok(get_job_item_mock_by_id(id)));
}
Loading
Loading