Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Data Submission Worker Integration. #51

Merged
merged 23 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2310fcf
update: DA job draft #1
heemankv Jul 22, 2024
df4fa86
docs: changelog updated
heemankv Jul 22, 2024
d21abff
update: is_worker_enabled impl & usage in da_submission, removal of S…
heemankv Jul 23, 2024
f9f4732
update: renamed to
heemankv Jul 23, 2024
55e1bc8
update: run worker only if it's enabled using is_worker_enabled check
heemankv Jul 23, 2024
9d7ab21
build: linter fixes
heemankv Jul 23, 2024
b209c18
Merge branch 'main' into feat/da_worker
heemankv Jul 23, 2024
fbd721d
Update CHANGELOG.md
heemankv Jul 23, 2024
e475cc4
update: limit_to_one on get_jobs_by_status
heemankv Jul 23, 2024
c13a576
update: removed get_last_successful_job_by_type, added get_latest_job…
heemankv Jul 23, 2024
e611040
update: added error to job metadata
heemankv Jul 23, 2024
586c709
update: pr resolution, simplifying get_jobs_by_status, rejected statu…
heemankv Jul 25, 2024
b0bf981
update: linting fixes
heemankv Jul 25, 2024
69b05de
Update crates/orchestrator/src/jobs/mod.rs
heemankv Jul 25, 2024
46b8c23
update: removing .expect from mongodb mod file
heemankv Jul 25, 2024
8cea54d
update: fixed testcase for snos worker
heemankv Jul 25, 2024
579d1d1
chore: correct variable name
heemankv Jul 25, 2024
992190a
update: added support to check againt multiple status - is_worker_ena…
heemankv Jul 25, 2024
ee29e87
docs: rewrote 1 job per block assumption
heemankv Jul 26, 2024
465241c
docs: DataSubmissionWorker -> DataAvailabilitySynchronizer
heemankv Jul 30, 2024
e4a2aa0
Merge branch 'main' into feat/da_worker
heemankv Aug 2, 2024
4ecf4fc
chore: liniting fix
heemankv Aug 2, 2024
5905f88
update: changed name : DataAvailabilitySynchronizer -> DataSubmission…
heemankv Aug 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- implemented DA worker.
- Function to calculate the kzg proof of x_0.
- Tests for updating the state.
- Function to update the state and publish blob on ethereum in state update job.
Expand Down
11 changes: 9 additions & 2 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,27 @@ pub trait Database: Send + Sync {
async fn update_job(&self, job: &JobItem) -> Result<()>;
async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>;
async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_jobs_without_successor(
&self,
job_a_type: JobType,
job_a_status: JobStatus,
job_b_type: JobType,
) -> Result<Vec<JobItem>>;
async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_latest_job_by_type_and_status(
&self,
job_type: JobType,
job_status: JobStatus,
) -> Result<Option<JobItem>>;
async fn get_jobs_after_internal_id_by_job_type(
&self,
job_type: JobType,
job_status: JobStatus,
internal_id: String,
) -> Result<Vec<JobItem>>;

// TODO: can be extendible to support multiple status.
async fn get_jobs_by_status(&self, status: JobStatus, limit: Option<i64>) -> Result<Vec<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
58 changes: 27 additions & 31 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use async_std::stream::StreamExt;
use futures::TryStreamExt;
use std::collections::HashMap;

use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::{Bson, Document};
use mongodb::options::{FindOneOptions, UpdateOptions};
use mongodb::options::{FindOneOptions, FindOptions, UpdateOptions};
use mongodb::{
bson,
bson::doc,
Expand Down Expand Up @@ -112,16 +113,12 @@ impl Database for MongoDb {
Ok(())
}

async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>> {
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

/// function to get jobs that don't have a successor job.
Expand Down Expand Up @@ -226,8 +223,7 @@ impl Database for MongoDb {
// }
// }

let collection = self.get_job_collection();
let mut cursor = collection.aggregate(pipeline, None).await?;
let mut cursor = self.get_job_collection().aggregate(pipeline, None).await?;

let mut vec_jobs: Vec<JobItem> = Vec::new();

Expand All @@ -245,18 +241,18 @@ impl Database for MongoDb {
Ok(vec_jobs)
}

async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
async fn get_latest_job_by_type_and_status(
&self,
job_type: JobType,
job_status: JobStatus,
) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": bson::to_bson(&job_type)?,
"job_status": bson::to_bson(&JobStatus::Completed)?
"job_status": bson::to_bson(&job_status)?
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();

Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

async fn get_jobs_after_internal_id_by_job_type(
Expand All @@ -271,23 +267,23 @@ impl Database for MongoDb {
"internal_id": { "$gt": internal_id }
};

let mut jobs = self
.get_job_collection()
.find(filter, None)
.await
.expect("Failed to fetch latest jobs by given job type and internal_od conditions");
let jobs = self.get_job_collection().find(filter, None).await?.try_collect().await?;

let mut results = Vec::new();
Ok(jobs)
}

while let Some(result) = jobs.next().await {
match result {
Ok(job_item) => {
results.push(job_item);
}
Err(e) => return Err(e.into()),
}
}
async fn get_jobs_by_status(&self, job_status: JobStatus, limit: Option<i64>) -> Result<Vec<JobItem>> {
let filter = doc! {
"job_status": bson::to_bson(&job_status)?
};

let mut find_options = None;
if let Some(val) = limit {
find_options = Some(FindOptions::builder().limit(Some(val)).build())
};

let jobs = self.get_job_collection().find(filter, find_options).await?.try_collect().await?;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved

Ok(results)
Ok(jobs)
}
}
10 changes: 8 additions & 2 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn process_job(id: Uuid) -> Result<()> {
match job.status {
// we only want to process jobs that are in the created or verification failed state.
// verification failed state means that the previous processing failed and we want to retry
JobStatus::Created | JobStatus::VerificationFailed(_) => {
JobStatus::Created | JobStatus::VerificationFailed => {
log::info!("Processing job with id {:?}", id);
}
_ => {
Expand Down Expand Up @@ -135,7 +135,13 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
config.database().update_job_status(&job, JobStatus::Completed).await?;
}
JobVerificationStatus::Rejected(e) => {
config.database().update_job_status(&job, JobStatus::VerificationFailed(e)).await?;
let mut new_job = job.clone();
new_job.metadata.insert("error".to_string(), e);
new_job.status = JobStatus::VerificationFailed;

config.database().update_job(&new_job).await?;

log::error!("Verification failed for job with id {:?}. Cannot verify.", id);

// retry job processing if we haven't exceeded the max limit
let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub enum JobStatus {
/// The job was processed but the was unable to be verified under the given time
VerificationTimeout,
/// The job failed processing
VerificationFailed(String),
VerificationFailed,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
4 changes: 3 additions & 1 deletion crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use dotenvy::dotenv;
use orchestrator::config::config;
use orchestrator::queue::init_consumers;
use orchestrator::routes::app_router;
use orchestrator::workers::data_submission::DataSubmissionWorker;
use orchestrator::workers::proof_registration::ProofRegistrationWorker;
use orchestrator::workers::proving::ProvingWorker;
use orchestrator::workers::snos::SnosWorker;
Expand Down Expand Up @@ -33,14 +34,15 @@ async fn main() {
tokio::spawn(start_cron(Box::new(ProvingWorker), 60));
tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60));
tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60));
tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60));

tracing::info!("Listening on http://{}", address);
axum::serve(listener, app).await.expect("Failed to start axum server");
}

async fn start_cron(worker: Box<dyn Worker>, interval: u64) {
loop {
worker.run_worker().await.expect("Error in running the worker.");
worker.run_worker_if_enabled().await.expect("Error in running the worker.");
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}
13 changes: 8 additions & 5 deletions crates/orchestrator/src/tests/workers/snos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::JobType;
use crate::jobs::types::{JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::tests::workers::utils::get_job_item_mock_by_id;
Expand Down Expand Up @@ -30,15 +30,18 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {

// Mocking db function expectations
if !db_val {
db.expect_get_last_successful_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
db.expect_get_latest_job_by_type_and_status()
.times(1)
.with(eq(JobType::SnosRun), eq(JobStatus::Completed))
.returning(|_, _| Ok(None));
start_job_index = 1;
block = 5;
} else {
let uuid_temp = Uuid::new_v4();

db.expect_get_last_successful_job_by_type()
.with(eq(JobType::SnosRun))
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
db.expect_get_latest_job_by_type_and_status()
.with(eq(JobType::SnosRun), eq(JobStatus::Completed))
.returning(move |_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
block = 6;
start_job_index = 2;
}
Expand Down
11 changes: 7 additions & 4 deletions crates/orchestrator/src/tests/workers/update_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ async fn test_update_state_worker(
// Mocking db function expectations
// If no successful state update jobs exist
if !last_successful_job_exists {
db.expect_get_last_successful_job_by_type().with(eq(JobType::StateTransition)).times(1).returning(|_| Ok(None));
db.expect_get_latest_job_by_type_and_status()
.with(eq(JobType::StateTransition), eq(JobStatus::Completed))
.times(1)
.returning(|_, _| Ok(None));
} else {
// if successful state update job exists

// mocking the return value of first function call (getting last successful jobs):
db.expect_get_last_successful_job_by_type()
.with(eq(JobType::StateTransition))
db.expect_get_latest_job_by_type_and_status()
.with(eq(JobType::StateTransition), eq(JobStatus::Completed))
.times(1)
.returning(|_| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()))));
.returning(|_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()))));

// mocking the return values of second function call (getting completed proving worker jobs)
db.expect_get_jobs_after_internal_id_by_job_type()
Expand Down
48 changes: 48 additions & 0 deletions crates/orchestrator/src/workers/data_submission.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::config::config;
use crate::jobs::create_job;
use crate::jobs::types::{JobStatus, JobType};
use crate::workers::Worker;
use async_trait::async_trait;
use std::collections::HashMap;
use std::error::Error;

pub struct DataSubmissionWorker;

#[async_trait]
impl Worker for DataSubmissionWorker {
EvolveArt marked this conversation as resolved.
Show resolved Hide resolved
// 0. All ids are assumed to be block numbers.
// 1. Fetch the latest completed Proving job.
// 2. Fetch the latest DA job creation.
// 3. Create jobs from after the lastest DA job already created till latest completed proving job.
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
let config = config().await;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved

// provides latest completed proof creation job id
let latest_proven_job_id = config
.database()
.get_latest_job_by_type_and_status(JobType::ProofCreation, JobStatus::Completed)
.await
.unwrap()
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

// provides latest triggered data submission job id
let latest_data_submission_job_id = config
.database()
.get_latest_job_by_type(JobType::DataSubmission)
.await
.unwrap()
.map(|item| item.internal_id)
.unwrap_or("0".to_string());
EvolveArt marked this conversation as resolved.
Show resolved Hide resolved

let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?;
let latest_proven_id: u64 = latest_proven_job_id.parse()?;

// creating data submission jobs for latest blocks without pre-running data submission jobs jobs don't yet exist.
heemankv marked this conversation as resolved.
Show resolved Hide resolved
for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 {
create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new()).await?;
}

Ok(())
}
}
30 changes: 28 additions & 2 deletions crates/orchestrator/src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
use std::error::Error;

use crate::{config::config, jobs::types::JobStatus};
use async_trait::async_trait;
use std::error::Error;

pub mod data_submission;
pub mod proof_registration;
pub mod proving;
pub mod snos;
pub mod update_state;

#[async_trait]
pub trait Worker: Send + Sync {
async fn run_worker_if_enabled(&self) -> Result<(), Box<dyn Error>> {
if !self.is_worker_enabled().await? {
heemankv marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
}
self.run_worker().await
}

async fn run_worker(&self) -> Result<(), Box<dyn Error>>;

// Assumption : False Negative
// we are assuming that the worker will spawn only 1 job for a block and no two jobs will ever exist
// for a single block, the code might fail to work as expected if this happens.
EvolveArt marked this conversation as resolved.
Show resolved Hide resolved

// Checks if any of the jobs have failed
// Halts any new job creation till all the count of failed jobs is not Zero.
async fn is_worker_enabled(&self) -> Result<bool, Box<dyn Error>> {
let config = config().await;

let failed_jobs = config.database().get_jobs_by_status(JobStatus::VerificationFailed, Some(1)).await?;

if !failed_jobs.is_empty() {
return Ok(false);
}

Ok(true)
}
}
4 changes: 2 additions & 2 deletions crates/orchestrator/src/workers/snos.rs
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use starknet::providers::Provider;

use crate::config::config;
use crate::jobs::create_job;
use crate::jobs::types::JobType;
use crate::jobs::types::{JobStatus, JobType};
use crate::workers::Worker;

pub struct SnosWorker;
Expand All @@ -22,7 +22,7 @@ impl Worker for SnosWorker {
let latest_block_number = provider.block_number().await?;
let latest_block_processed_data = config
.database()
.get_last_successful_job_by_type(JobType::SnosRun)
.get_latest_job_by_type_and_status(JobType::SnosRun, JobStatus::Completed)
.await
.unwrap()
.map(|item| item.internal_id)
Expand Down
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/update_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ impl Worker for UpdateStateWorker {
/// 3. Create state updates for all the blocks that don't have a state update job
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
let config = config().await;
let latest_successful_job = config.database().get_last_successful_job_by_type(JobType::StateTransition).await?;
let latest_successful_job =
config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?;

match latest_successful_job {
Some(job) => {
Expand Down
Loading