Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Data Submission Worker Integration. #51

Merged
merged 23 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2310fcf
update: DA job draft #1
heemankv Jul 22, 2024
df4fa86
docs: changelog updated
heemankv Jul 22, 2024
d21abff
update: is_worker_enabled impl & usage in da_submission, removal of S…
heemankv Jul 23, 2024
f9f4732
update: renamed to
heemankv Jul 23, 2024
55e1bc8
update: run worker only if it's enabled using is_worker_enabled check
heemankv Jul 23, 2024
9d7ab21
build: linter fixes
heemankv Jul 23, 2024
b209c18
Merge branch 'main' into feat/da_worker
heemankv Jul 23, 2024
fbd721d
Update CHANGELOG.md
heemankv Jul 23, 2024
e475cc4
update: limit_to_one on get_jobs_by_status
heemankv Jul 23, 2024
c13a576
update: removed get_last_successful_job_by_type, added get_latest_job…
heemankv Jul 23, 2024
e611040
update: added error to job metadata
heemankv Jul 23, 2024
586c709
update: pr resolution, simplifying get_jobs_by_status, rejected statu…
heemankv Jul 25, 2024
b0bf981
update: linting fixes
heemankv Jul 25, 2024
69b05de
Update crates/orchestrator/src/jobs/mod.rs
heemankv Jul 25, 2024
46b8c23
update: removing .expect from mongodb mod file
heemankv Jul 25, 2024
8cea54d
update: fixed testcase for snos worker
heemankv Jul 25, 2024
579d1d1
chore: correct variable name
heemankv Jul 25, 2024
992190a
update: added support to check againt multiple status - is_worker_ena…
heemankv Jul 25, 2024
ee29e87
docs: rewrote 1 job per block assumption
heemankv Jul 26, 2024
465241c
docs: DataSubmissionWorker -> DataAvailabilitySynchronizer
heemankv Jul 30, 2024
e4a2aa0
Merge branch 'main' into feat/da_worker
heemankv Aug 2, 2024
4ecf4fc
chore: liniting fix
heemankv Aug 2, 2024
5905f88
update: changed name : DataAvailabilitySynchronizer -> DataSubmission…
heemankv Aug 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added
- creation of data submission jobs.
heemankv marked this conversation as resolved.
Show resolved Hide resolved

## Changed

Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use dotenvy::dotenv;
use orchestrator::config::config;
use orchestrator::queue::init_consumers;
use orchestrator::routes::app_router;
use orchestrator::workers::data_submission::DataSubmissionWorker;
use orchestrator::workers::proof_registration::ProofRegistrationWorker;
use orchestrator::workers::proving::ProvingWorker;
use orchestrator::workers::snos::SnosWorker;
Expand Down Expand Up @@ -33,6 +34,7 @@ async fn main() {
tokio::spawn(start_cron(Box::new(ProvingWorker), 60));
tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60));
tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60));
tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60));

tracing::info!("Listening on http://{}", address);
axum::serve(listener, app).await.expect("Failed to start axum server");
Expand Down
55 changes: 55 additions & 0 deletions crates/orchestrator/src/workers/data_submission.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::config::config;
use crate::jobs::create_job;
use crate::jobs::types::JobType;
use crate::workers::Worker;
use async_trait::async_trait;
use std::collections::HashMap;
use std::error::Error;

pub struct DataSubmissionWorker;

#[async_trait]
impl Worker for DataSubmissionWorker {
EvolveArt marked this conversation as resolved.
Show resolved Hide resolved
// 0. All ids are assumed to be block numbers.
// 1. Fetch the latest completed Proving job.
// 2. Fetch the latest DA job creation.
// 3. Create jobs from after the lastest DA job already created till latest completed proving job.
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
let config = config().await;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved

// provides latest completed proof creation job id
let latest_proven_job_id = config
.database()
.get_last_successful_job_by_type(JobType::ProofCreation)
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
.await
.unwrap()
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

// provides latest triggered data submission job id
let latest_data_submission_job_id = config
.database()
.get_latest_job_by_type_and_internal_id(JobType::DataSubmission)
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
.await
.unwrap()
.map(|item| item.internal_id)
.unwrap_or("0".to_string());
EvolveArt marked this conversation as resolved.
Show resolved Hide resolved

let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?;
let latest_proven_id: u64 = latest_proven_job_id.parse()?;

let block_diff = latest_proven_id - latest_data_submission_id;

// if all blocks are processed
if block_diff == 0 {
return Ok(());
}
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved

// creating data submission jobs for latest blocks without pre-running data submission jobs jobs don't yet exist.
heemankv marked this conversation as resolved.
Show resolved Hide resolved
for x in latest_data_submission_id + 1..latest_proven_id + 1 {
heemankv marked this conversation as resolved.
Show resolved Hide resolved
create_job(JobType::DataSubmission, x.to_string(), HashMap::new()).await?;
}

Ok(())
}
}
1 change: 1 addition & 0 deletions crates/orchestrator/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::error::Error;

use async_trait::async_trait;

pub mod data_submission;
pub mod proof_registration;
pub mod proving;
pub mod snos;
Expand Down
Loading