Skip to content

Commit

Permalink
feat : added review #1 changes : added error handling for snos workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Arun Jangra authored and Arun Jangra committed Jun 14, 2024
1 parent 2e9cf16 commit 10ba1e8
Show file tree
Hide file tree
Showing 15 changed files with 72 additions and 59 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
url = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
async-std = "1.12.0"

[features]
default = ["ethereum", "with_mongodb", "with_sqs"]
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub async fn config() -> Guard<Arc<Config>> {
/// However, when running tests, we often want to reinitialize because we want to clear the DB and
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already
/// stored config inside `ArcSwap` with the new configuration and pool settings.
#[cfg(test)]
pub async fn config_force_init(config: Config) {
match CONFIG.get() {
Some(arc) => arc.store(Arc::new(config)),
Expand Down
2 changes: 0 additions & 2 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::jobs::types::{JobItem, JobStatus, JobType};
use ::mongodb::Cursor;
use async_trait::async_trait;
use color_eyre::Result;
use mockall::automock;
Expand Down Expand Up @@ -35,7 +34,6 @@ pub trait Database: Send + Sync {

async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_all_jobs(&self, job_type: JobType) -> Result<Cursor<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
9 changes: 1 addition & 8 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use mongodb::options::{FindOneOptions, UpdateOptions};
use mongodb::{
bson::doc,
options::{ClientOptions, ServerApi, ServerApiVersion},
Client, Collection, Cursor,
Client, Collection,
};
use std::collections::HashMap;
use uuid::Uuid;
Expand Down Expand Up @@ -127,11 +127,4 @@ impl Database for MongoDb {
.await
.expect("Failed to fetch latest job by given job type"))
}

async fn get_all_jobs(&self, job_type: JobType) -> Result<Cursor<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
Ok(self.get_job_collection().find(filter, None).await.expect("Failed to fetch jobs with given job type"))
}
}
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(snos_job::SnosJob),
_ => unimplemented!("Job type not implemented yet."),
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::collections::HashMap;
use uuid::Uuid;

/// An external id.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(untagged)]
pub enum ExternalId {
/// A string.
Expand Down Expand Up @@ -98,7 +98,7 @@ pub enum JobStatus {
VerificationFailed,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct JobItem {
/// an uuid to identify a job
#[serde(with = "uuid_1_as_binary")]
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() {

async fn start_cron(worker: Box<dyn Worker>, interval: u64) {
loop {
worker.run_worker().await;
worker.run_worker().await.expect("Error in running the worker.");
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}
3 changes: 3 additions & 0 deletions crates/orchestrator/src/tests/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use serde_json::json;

use super::super::common::{default_job_item, init_config};
use starknet_core::types::{FieldElement, MaybePendingStateUpdate, StateDiff};
use tracing::log;
use uuid::Uuid;

use crate::jobs::types::ExternalId;
Expand Down Expand Up @@ -55,6 +56,8 @@ async fn test_process_job() {
da_client.expect_publish_state_diff().times(1).returning(|_| Ok(internal_id.to_string()));
let config = init_config(Some(format!("http://localhost:{}", server.port())), None, None, Some(da_client)).await;

log::info!("this is the port {}", server.port());

let state_update = MaybePendingStateUpdate::Update(StateUpdate {
block_hash: FieldElement::default(),
new_root: FieldElement::default(),
Expand Down
76 changes: 46 additions & 30 deletions crates/orchestrator/src/tests/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,94 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::queue::job_queue::JobQueueMessage;
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::workers::snos::SnosWorker;
use crate::workers::Worker;
use da_client_interface::MockDaClient;
use httpmock::MockServer;
use mockall::predicate::eq;
use rstest::rstest;
use serde_json::json;
use std::collections::HashMap;
use std::error::Error;
use uuid::Uuid;

#[rstest]
#[case(false)]
#[case(true)]
#[tokio::test]
async fn test_create_job() {
async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {
let server = MockServer::start();
let da_client = MockDaClient::new();
let mut db = MockDatabase::new();
let mut queue = MockQueueProvider::new();
let start_job_index;
let block;

// Mocking db functions
db.expect_get_latest_job_by_type().returning(|_| Ok(None)).call(JobType::SnosRun).expect("Failed to call.");
// Getting jobs for check expectations
for i in 1..6 {
db.expect_get_job_by_internal_id_and_type()
.returning(|_, _| Ok(None))
.call(&i.to_string(), &JobType::SnosRun)
.expect("Failed to call.");
const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";

// Mocking db function expectations
if !db_val {
db.expect_get_latest_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
start_job_index = 1;
block = 5;
} else {
let uuid_temp = Uuid::new_v4();

db.expect_get_latest_job_by_type()
.with(eq(JobType::SnosRun))
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp.clone()))));
block = 6;
start_job_index = 2;
}

// Creating jobs expectations
for i in 1..6 {
for i in start_job_index..block + 1 {
// Getting jobs for check expectations
db.expect_get_job_by_internal_id_and_type()
.times(1)
.with(eq(i.clone().to_string()), eq(JobType::SnosRun))
.returning(|_, _| Ok(None));

let uuid = Uuid::new_v4();

// creating jobs call expectations
db.expect_create_job()
.returning(|_| Ok(get_job_item_mock_by_id("1".to_string())))
.call(get_job_item_mock_by_id(i.to_string()))
.expect("Failed to call");
.times(1)
.withf(move |item| item.internal_id == i.clone().to_string())
.returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid.clone())));
}

// Queue function call simulations
queue
.expect_send_message_to_queue()
.returning(|_, _, _| Ok(()))
.call(
"madara_orchestrator_job_processing_queue".to_string(),
serde_json::to_string(&JobQueueMessage { id: Uuid::new_v4() }).unwrap(),
None,
)
.expect("Failed to call");
queue.expect_send_message_to_queue().returning(|_, _, _| Ok(())).withf(
|queue, _payload, _delay| {
queue == JOB_PROCESSING_QUEUE
}
);

// mock block number (madara) : 5
let rpc_response_block_number = 5;
let rpc_response_block_number = block;
let response = json!({ "id": 1,"jsonrpc":"2.0","result": rpc_response_block_number });

let config =
init_config(Some(format!("http://localhost:{}", server.port())), Some(db), Some(queue), Some(da_client)).await;
config_force_init(config).await;


// mocking block call
let rpc_block_call_mock = server.mock(|when, then| {
when.path("/").body_contains("starknet_blockNumber");
then.status(200).body(serde_json::to_vec(&response).unwrap());
});

let snos_worker = SnosWorker {};
snos_worker.run_worker().await;
snos_worker.run_worker().await?;

rpc_block_call_mock.assert();
}

fn get_job_item_mock_by_id(id: String) -> JobItem {
let uuid = Uuid::new_v4();
Ok(())
}

fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem {
JobItem {
id: uuid,
internal_id: id.clone(),
Expand Down
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use std::error::Error;

pub mod proof_registration;
pub mod proving;
Expand All @@ -7,5 +8,5 @@ pub mod update_state;

#[async_trait]
pub trait Worker: Send + Sync {
async fn run_worker(&self);
async fn run_worker(&self) -> Result<(), Box<dyn Error>>;
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/proof_registration.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::workers::Worker;
use async_trait::async_trait;
use std::error::Error;

pub struct ProofRegistrationWorker;

Expand All @@ -8,7 +9,7 @@ impl Worker for ProofRegistrationWorker {
/// 1. Fetch all blocks with a successful proving job run
/// 2. Group blocks that have the same proof
/// 3. For each group, create a proof registration job with from and to block in metadata
async fn run_worker(&self) {
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
todo!()
}
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::workers::Worker;
use async_trait::async_trait;
use std::error::Error;

pub struct ProvingWorker;

#[async_trait]
impl Worker for ProvingWorker {
/// 1. Fetch all successful SNOS job runs that don't have a proving job
/// 2. Create a proving job for each SNOS job run
async fn run_worker(&self) {
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
todo!()
}
}
18 changes: 7 additions & 11 deletions crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::workers::Worker;
use async_trait::async_trait;
use starknet::providers::Provider;
use std::collections::HashMap;
use tracing::log;
use std::error::Error;

pub struct SnosWorker;

Expand All @@ -14,11 +14,10 @@ impl Worker for SnosWorker {
/// 1. Fetch the latest completed block from the Starknet chain
/// 2. Fetch the last block that had a SNOS job run.
/// 3. Create SNOS run jobs for all the remaining blocks
// TEST : added config temporarily to test
async fn run_worker(&self) {
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
let config = config().await;
let provider = config.starknet_client();
let latest_block_number = provider.block_number().await.expect("Failed to fetch block number from rpc");
let latest_block_number = provider.block_number().await?;
let latest_block_processed_data = config
.database()
.get_latest_job_by_type(JobType::SnosRun)
Expand All @@ -27,22 +26,19 @@ impl Worker for SnosWorker {
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

let latest_block_processed: u64 =
latest_block_processed_data.parse().expect("Failed to convert block number from JobItem into u64");
let latest_block_processed: u64 = latest_block_processed_data.parse()?;

let block_diff = latest_block_number - latest_block_processed;

// if all blocks are processed
if block_diff == 0 {
return;
return Ok(());
}

for x in latest_block_processed + 1..latest_block_number + 1 {
create_job(JobType::SnosRun, x.to_string(), HashMap::new())
.await
.expect("Error : failed to create job for snos workers.");
create_job(JobType::SnosRun, x.to_string(), HashMap::new()).await?;
}

log::info!("jobs created !!");
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/update_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::workers::Worker;
use async_trait::async_trait;
use std::error::Error;

pub struct UpdateStateWorker;

Expand All @@ -8,7 +9,7 @@ impl Worker for UpdateStateWorker {
/// 1. Fetch the last succesful state update job
/// 2. Fetch all succesful proving jobs covering blocks after the last state update
/// 3. Create state updates for all the blocks that don't have a state update job
async fn run_worker(&self) {
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
todo!()
}
}

0 comments on commit 10ba1e8

Please sign in to comment.