Skip to content

Commit

Permalink
update: introducing DaError
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Jul 31, 2024
1 parent 4b4be6e commit 74c8cd0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 35 deletions.
57 changes: 31 additions & 26 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
use std::collections::HashMap;
use std::ops::{Add, Mul, Rem};
use std::result::Result::{Err, Ok as OtherOk};
use std::str::FromStr;

use async_trait::async_trait;
use color_eyre::eyre::{eyre, Ok};
use color_eyre::Result;
use color_eyre::eyre::{eyre, WrapErr};
use lazy_static::lazy_static;
use num_bigint::{BigUint, ToBigUint};
use num_traits::{Num, Zero};
//
use starknet::core::types::{BlockId, FieldElement, MaybePendingStateUpdate, StateUpdate, StorageEntry};
use starknet::providers::Provider;
use std::result::Result::Ok as StdOk;
use thiserror::Error;
use tracing::log;
use uuid::Uuid;

use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::Job;
use super::{Job, JobError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;

Expand Down Expand Up @@ -49,7 +48,7 @@ impl Job for DaJob {
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
) -> Result<JobItem, JobError> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
Expand All @@ -61,19 +60,19 @@ impl Job for DaJob {
})
}

async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result<String> {
let block_no = job.internal_id.parse::<u64>()?;
async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result<String, JobError> {
let block_no = job.internal_id.parse::<u64>().wrap_err_with(|| format!("Failed to parse u64"))?;

let state_update = config.starknet_client().get_state_update(BlockId::Number(block_no)).await?;
let state_update = config
.starknet_client()
.get_state_update(BlockId::Number(block_no))
.await
.wrap_err_with(|| format!("Failed to get state Update."))?;

let state_update = match state_update {
MaybePendingStateUpdate::PendingUpdate(_) => {
log::error!("Cannot process block {} for job id {} as it's still in pending state", block_no, job.id);
return Err(eyre!(
"Cannot process block {} for job id {} as it's still in pending state",
block_no,
job.id
));
Err(DaError::BlockPending { block_no, job_id: job.id })?
}
MaybePendingStateUpdate::Update(state_update) => state_update,
};
Expand All @@ -96,13 +95,7 @@ impl Job for DaJob {

// there is a limit on number of blobs per txn, checking that here
if current_blob_length > max_blob_per_txn {
return Err(eyre!(
"Exceeded the maximum number of blobs per transaction: allowed {}, found {} for block {} and job id {}",
max_blob_per_txn,
current_blob_length,
block_no,
job.id
));
Err(DaError::MaxBlobsLimitExceeded { max_blob_per_txn, current_blob_length, block_no, job_id: job.id })?
}

// making the txn to the DA layer
Expand All @@ -111,7 +104,7 @@ impl Job for DaJob {
Ok(external_id)
}

async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result<JobVerificationStatus> {
async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
Ok(config.da_client().verify_inclusion(job.external_id.unwrap_string()?).await?.into())
}

Expand Down Expand Up @@ -169,7 +162,7 @@ pub fn convert_to_biguint(elements: Vec<FieldElement>) -> Vec<BigUint> {
biguint_vec
}

fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>>> {
fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> color_eyre::Result<Vec<Vec<u8>>> {
// Validate blob size
if blob_size < 32 {
return Err(eyre!(
Expand Down Expand Up @@ -205,7 +198,7 @@ pub async fn state_update_to_blob_data(
block_no: u64,
state_update: StateUpdate,
config: &Config,
) -> Result<Vec<FieldElement>> {
) -> color_eyre::Result<Vec<FieldElement>> {
let state_diff = state_update.state_diff;
let mut blob_data: Vec<FieldElement> = vec![
FieldElement::from(state_diff.storage_diffs.len()),
Expand Down Expand Up @@ -241,7 +234,7 @@ pub async fn state_update_to_blob_data(
let get_current_nonce_result = config.starknet_client().get_nonce(BlockId::Number(block_no), addr).await;

nonce = match get_current_nonce_result {
OtherOk(get_current_nonce) => Some(get_current_nonce),
StdOk(get_current_nonce) => Some(get_current_nonce),
Err(e) => {
log::error!("Failed to get nonce: {}", e);
return Err(eyre!("Failed to get nonce: {}", e));
Expand Down Expand Up @@ -284,7 +277,7 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: &Config) -> Result<()> {
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: &Config) -> color_eyre::Result<()> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
let data_blob_big_uint = convert_to_biguint(blob_data.clone());
Expand Down Expand Up @@ -341,3 +334,15 @@ pub fn da_word(class_flag: bool, nonce_change: Option<FieldElement>, num_changes

FieldElement::from_dec_str(&decimal_string).expect("issue while converting to fieldElement")
}

#[derive(Error, Debug)]
pub enum DaError {
#[error("Cannot process block {block_no:?} for job id {job_id:?} as it's still in pending state.")]
BlockPending { block_no: u64, job_id: Uuid },

#[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")]
MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid },

#[error("Other error: {0}")]
Other(#[from] color_eyre::eyre::Error),
}
20 changes: 11 additions & 9 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::collections::HashMap;
use std::num::ParseIntError;
use std::time::Duration;

use async_trait::async_trait;
use color_eyre::eyre::Context;
use tracing::log;
use uuid::Uuid;

use crate::config::{config, Config};
use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY};
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue};
use async_trait::async_trait;
use color_eyre::eyre::WrapErr;
use da_job::DaError;
use tracing::log;
use uuid::Uuid;

pub mod constants;
pub mod da_job;
Expand All @@ -32,15 +31,15 @@ pub trait Job: Send + Sync {
config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> color_eyre::Result<JobItem>;
) -> Result<JobItem, JobError>;
/// Should process the job and return the external_id which can be used to
/// track the status of the job. For example, a DA job will submit the state diff
/// to the DA layer and return the txn hash.
async fn process_job(&self, config: &Config, job: &mut JobItem) -> color_eyre::Result<String>;
async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result<String, JobError>;
/// Should verify the job and return the status of the verification. For example,
/// a DA job will verify the inclusion of the state diff in the DA layer and return
/// the status of the verification.
async fn verify_job(&self, config: &Config, job: &mut JobItem) -> color_eyre::Result<JobVerificationStatus>;
async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result<JobVerificationStatus, JobError>;
/// Should return the maximum number of attempts to process the job. A new attempt is made
/// every time the verification returns `JobVerificationStatus::Rejected`
fn max_process_attempts(&self) -> u64;
Expand Down Expand Up @@ -231,6 +230,9 @@ pub enum JobError {
#[error("Arithmetic error: {0}")]
Arithmetic(String),

#[error("DA Error: {0}")]
DaJobError(#[from] DaError),

#[error("Other error: {0}")]
Other(#[from] color_eyre::eyre::Error),
}
Expand Down

0 comments on commit 74c8cd0

Please sign in to comment.