Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Data Submission Worker Integration. #51

Merged
merged 23 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2310fcf
update: DA job draft #1
heemankv Jul 22, 2024
df4fa86
docs: changelog updated
heemankv Jul 22, 2024
d21abff
update: is_worker_enabled impl & usage in da_submission, removal of S…
heemankv Jul 23, 2024
f9f4732
update: renamed to
heemankv Jul 23, 2024
55e1bc8
update: run worker only if it's enabled using is_worker_enabled check
heemankv Jul 23, 2024
9d7ab21
build: linter fixes
heemankv Jul 23, 2024
b209c18
Merge branch 'main' into feat/da_worker
heemankv Jul 23, 2024
fbd721d
Update CHANGELOG.md
heemankv Jul 23, 2024
e475cc4
update: limit_to_one on get_jobs_by_status
heemankv Jul 23, 2024
c13a576
update: removed get_last_successful_job_by_type, added get_latest_job…
heemankv Jul 23, 2024
e611040
update: added error to job metadata
heemankv Jul 23, 2024
586c709
update: pr resolution, simplifying get_jobs_by_status, rejected statu…
heemankv Jul 25, 2024
b0bf981
update: linting fixes
heemankv Jul 25, 2024
69b05de
Update crates/orchestrator/src/jobs/mod.rs
heemankv Jul 25, 2024
46b8c23
update: removing .expect from mongodb mod file
heemankv Jul 25, 2024
8cea54d
update: fixed testcase for snos worker
heemankv Jul 25, 2024
579d1d1
chore: correct variable name
heemankv Jul 25, 2024
992190a
update: added support to check againt multiple status - is_worker_ena…
heemankv Jul 25, 2024
ee29e87
docs: rewrote 1 job per block assumption
heemankv Jul 26, 2024
465241c
docs: DataSubmissionWorker -> DataAvailabilitySynchronizer
heemankv Jul 30, 2024
e4a2aa0
Merge branch 'main' into feat/da_worker
heemankv Aug 2, 2024
4ecf4fc
chore: liniting fix
heemankv Aug 2, 2024
5905f88
update: changed name : DataAvailabilitySynchronizer -> DataSubmission…
heemankv Aug 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- implemented DA worker.
- Function to calculate the kzg proof of x_0.
- Tests for updating the state.
- Function to update the state and publish blob on ethereum in state update job.
Expand Down
11 changes: 9 additions & 2 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,27 @@ pub trait Database: Send + Sync {
async fn update_job(&self, job: &JobItem) -> Result<()>;
async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>;
async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_jobs_without_successor(
&self,
job_a_type: JobType,
job_a_status: JobStatus,
job_b_type: JobType,
) -> Result<Vec<JobItem>>;
async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_latest_job_by_type_and_status(
&self,
job_type: JobType,
job_status: JobStatus,
) -> Result<Option<JobItem>>;
async fn get_jobs_after_internal_id_by_job_type(
&self,
job_type: JobType,
job_status: JobStatus,
internal_id: String,
) -> Result<Vec<JobItem>>;

// TODO: can be extendible to support multiple status.
async fn get_jobs_by_statuses(&self, status: Vec<JobStatus>, limit: Option<i64>) -> Result<Vec<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
56 changes: 26 additions & 30 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use async_std::stream::StreamExt;
use futures::TryStreamExt;
use std::collections::HashMap;

use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::{Bson, Document};
use mongodb::options::{FindOneOptions, UpdateOptions};
use mongodb::options::{FindOneOptions, FindOptions, UpdateOptions};
use mongodb::{
bson,
bson::doc,
Expand Down Expand Up @@ -112,16 +113,12 @@ impl Database for MongoDb {
Ok(())
}

async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>> {
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

/// function to get jobs that don't have a successor job.
Expand Down Expand Up @@ -226,8 +223,7 @@ impl Database for MongoDb {
// }
// }

let collection = self.get_job_collection();
let mut cursor = collection.aggregate(pipeline, None).await?;
let mut cursor = self.get_job_collection().aggregate(pipeline, None).await?;

let mut vec_jobs: Vec<JobItem> = Vec::new();

Expand All @@ -245,18 +241,18 @@ impl Database for MongoDb {
Ok(vec_jobs)
}

async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
async fn get_latest_job_by_type_and_status(
&self,
job_type: JobType,
job_status: JobStatus,
) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": bson::to_bson(&job_type)?,
"job_status": bson::to_bson(&JobStatus::Completed)?
"job_status": bson::to_bson(&job_status)?
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();

Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

async fn get_jobs_after_internal_id_by_job_type(
Expand All @@ -271,23 +267,23 @@ impl Database for MongoDb {
"internal_id": { "$gt": internal_id }
};

let mut jobs = self
.get_job_collection()
.find(filter, None)
.await
.expect("Failed to fetch latest jobs by given job type and internal_od conditions");
let jobs = self.get_job_collection().find(filter, None).await?.try_collect().await?;

let mut results = Vec::new();
Ok(jobs)
}

while let Some(result) = jobs.next().await {
match result {
Ok(job_item) => {
results.push(job_item);
}
Err(e) => return Err(e.into()),
async fn get_jobs_by_statuses(&self, job_status: Vec<JobStatus>, limit: Option<i64>) -> Result<Vec<JobItem>> {
let filter = doc! {
"job_status": {
// TODO: Check that the conversion leads to valid output!
"$in": job_status.iter().map(|status| bson::to_bson(status).unwrap_or(Bson::Null)).collect::<Vec<Bson>>()
}
}
};

let find_options = limit.map(|val| FindOptions::builder().limit(Some(val)).build());

let jobs = self.get_job_collection().find(filter, find_options).await?.try_collect().await?;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved

Ok(results)
Ok(jobs)
}
}
10 changes: 8 additions & 2 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn process_job(id: Uuid) -> Result<()> {
match job.status {
// we only want to process jobs that are in the created or verification failed state.
// verification failed state means that the previous processing failed and we want to retry
JobStatus::Created | JobStatus::VerificationFailed(_) => {
JobStatus::Created | JobStatus::VerificationFailed => {
log::info!("Processing job with id {:?}", id);
}
_ => {
Expand Down Expand Up @@ -135,7 +135,13 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
config.database().update_job_status(&job, JobStatus::Completed).await?;
}
JobVerificationStatus::Rejected(e) => {
config.database().update_job_status(&job, JobStatus::VerificationFailed(e)).await?;
let mut new_job = job.clone();
new_job.metadata.insert("error".to_string(), e);
new_job.status = JobStatus::VerificationFailed;

config.database().update_job(&new_job).await?;

log::error!("Verification failed for job with id {:?}. Cannot verify.", id);

// retry job processing if we haven't exceeded the max limit
let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub enum JobStatus {
/// The job was processed but the was unable to be verified under the given time
VerificationTimeout,
/// The job failed processing
VerificationFailed(String),
VerificationFailed,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
4 changes: 3 additions & 1 deletion crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use dotenvy::dotenv;
use orchestrator::config::config;
use orchestrator::queue::init_consumers;
use orchestrator::routes::app_router;
use orchestrator::workers::data_submission_worker::DataSubmissionWorker;
use orchestrator::workers::proof_registration::ProofRegistrationWorker;
use orchestrator::workers::proving::ProvingWorker;
use orchestrator::workers::snos::SnosWorker;
Expand Down Expand Up @@ -33,14 +34,15 @@ async fn main() {
tokio::spawn(start_cron(Box::new(ProvingWorker), 60));
tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60));
tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60));
tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60));

tracing::info!("Listening on http://{}", address);
axum::serve(listener, app).await.expect("Failed to start axum server");
}

async fn start_cron(worker: Box<dyn Worker>, interval: u64) {
loop {
worker.run_worker().await.expect("Error in running the worker.");
worker.run_worker_if_enabled().await.expect("Error in running the worker.");
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}
13 changes: 8 additions & 5 deletions crates/orchestrator/src/tests/workers/snos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::JobType;
use crate::jobs::types::{JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::tests::workers::utils::get_job_item_mock_by_id;
Expand Down Expand Up @@ -30,15 +30,18 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {

// Mocking db function expectations
if !db_val {
db.expect_get_last_successful_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
db.expect_get_latest_job_by_type_and_status()
.times(1)
.with(eq(JobType::SnosRun), eq(JobStatus::Completed))
.returning(|_, _| Ok(None));
start_job_index = 1;
block = 5;
} else {
let uuid_temp = Uuid::new_v4();

db.expect_get_last_successful_job_by_type()
.with(eq(JobType::SnosRun))
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
db.expect_get_latest_job_by_type_and_status()
.with(eq(JobType::SnosRun), eq(JobStatus::Completed))
.returning(move |_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
block = 6;
start_job_index = 2;
}
Expand Down
11 changes: 7 additions & 4 deletions crates/orchestrator/src/tests/workers/update_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ async fn test_update_state_worker(
// Mocking db function expectations
// If no successful state update jobs exist
if !last_successful_job_exists {
db.expect_get_last_successful_job_by_type().with(eq(JobType::StateTransition)).times(1).returning(|_| Ok(None));
db.expect_get_latest_job_by_type_and_status()
.with(eq(JobType::StateTransition), eq(JobStatus::Completed))
.times(1)
.returning(|_, _| Ok(None));
} else {
// if successful state update job exists

// mocking the return value of first function call (getting last successful jobs):
db.expect_get_last_successful_job_by_type()
.with(eq(JobType::StateTransition))
db.expect_get_latest_job_by_type_and_status()
.with(eq(JobType::StateTransition), eq(JobStatus::Completed))
.times(1)
.returning(|_| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()))));
.returning(|_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()))));

// mocking the return values of second function call (getting completed proving worker jobs)
db.expect_get_jobs_after_internal_id_by_job_type()
Expand Down
48 changes: 48 additions & 0 deletions crates/orchestrator/src/workers/data_submission_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::config::config;
use crate::jobs::create_job;
use crate::jobs::types::{JobStatus, JobType};
use crate::workers::Worker;
use async_trait::async_trait;
use std::collections::HashMap;
use std::error::Error;

pub struct DataSubmissionWorker;

#[async_trait]
impl Worker for DataSubmissionWorker {
// 0. All ids are assumed to be block numbers.
// 1. Fetch the latest completed Proving job.
// 2. Fetch the latest DA job creation.
// 3. Create jobs from after the lastest DA job already created till latest completed proving job.
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
let config = config().await;

// provides latest completed proof creation job id
let latest_proven_job_id = config
.database()
.get_latest_job_by_type_and_status(JobType::ProofCreation, JobStatus::Completed)
.await
.unwrap()
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

// provides latest triggered data submission job id
let latest_data_submission_job_id = config
.database()
.get_latest_job_by_type(JobType::DataSubmission)
.await
.unwrap()
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?;
let latest_proven_id: u64 = latest_proven_job_id.parse()?;

// creating data submission jobs for latest blocks that don't have existing data submission jobs yet.
for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 {
create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new()).await?;
}

Ok(())
}
}
37 changes: 35 additions & 2 deletions crates/orchestrator/src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,46 @@
use std::error::Error;

use crate::{config::config, jobs::types::JobStatus};
use async_trait::async_trait;
use std::error::Error;

pub mod data_submission_worker;
pub mod proof_registration;
pub mod proving;
pub mod snos;
pub mod update_state;

#[async_trait]
pub trait Worker: Send + Sync {
async fn run_worker_if_enabled(&self) -> Result<(), Box<dyn Error>> {
if !self.is_worker_enabled().await? {
heemankv marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
}
self.run_worker().await
}

async fn run_worker(&self) -> Result<(), Box<dyn Error>>;

// Assumption
// If say a job for block X fails, we don't want the worker to respawn another job for the same block
// we will resolve the existing failed job first.

// We assume the system to keep working till a job hasn't failed,
// as soon as it fails we currently halt any more execution and wait for manual intervention.

// Checks if any of the jobs have failed
// Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed
// Halts any new job creation till all the count of failed jobs is not Zero.
async fn is_worker_enabled(&self) -> Result<bool, Box<dyn Error>> {
let config = config().await;

let failed_jobs = config
.database()
.get_jobs_by_statuses(vec![JobStatus::VerificationFailed, JobStatus::VerificationTimeout], Some(1))
.await?;

if !failed_jobs.is_empty() {
return Ok(false);
}

Ok(true)
}
}
4 changes: 2 additions & 2 deletions crates/orchestrator/src/workers/snos.rs
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use starknet::providers::Provider;

use crate::config::config;
use crate::jobs::create_job;
use crate::jobs::types::JobType;
use crate::jobs::types::{JobStatus, JobType};
use crate::workers::Worker;

pub struct SnosWorker;
Expand All @@ -22,7 +22,7 @@ impl Worker for SnosWorker {
let latest_block_number = provider.block_number().await?;
let latest_block_processed_data = config
.database()
.get_last_successful_job_by_type(JobType::SnosRun)
.get_latest_job_by_type_and_status(JobType::SnosRun, JobStatus::Completed)
.await
.unwrap()
.map(|item| item.internal_id)
Expand Down
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/update_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ impl Worker for UpdateStateWorker {
/// 3. Create state updates for all the blocks that don't have a state update job
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
let config = config().await;
let latest_successful_job = config.database().get_last_successful_job_by_type(JobType::StateTransition).await?;
let latest_successful_job =
config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?;

match latest_successful_job {
Some(job) => {
Expand Down
4 changes: 2 additions & 2 deletions crates/prover-services/gps-fact-checker/src/fact_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
//! constructed using a stack of nodes (initialized to an empty stack) by repeating for each pair:
//! 1. Add #n_pages lead nodes to the stack.
//! 2. Pop the top #n_nodes, construct a parent node for them, and push it back to the stack.
//! After applying the steps above, the stack must contain exactly one node, which will
//! constitute the root of the Merkle tree.
//! After applying the steps above, the stack must contain exactly one node, which will
//! constitute the root of the Merkle tree.
//!
//! For example, [(2, 2)] will create a Merkle tree with a root and two direct children, while
//! [(3, 2), (0, 2)] will create a Merkle tree with a root whose left child is a leaf and
Expand Down
Loading