Skip to content

Commit

Permalink
Merge branch 'self_hosted_runner' of https://github.com/karnotxyz/mad…
Browse files Browse the repository at this point in the history
…ara-orchestrator into self_hosted_runner
  • Loading branch information
apoorvsadana committed Sep 18, 2024
2 parents 550990a + 801cf51 commit 0eaad9f
Show file tree
Hide file tree
Showing 49 changed files with 942 additions and 716 deletions.
2 changes: 1 addition & 1 deletion .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ PORT=3000
AWS_ACCESS_KEY_ID="AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY="AWS_SECRET_ACCESS_KEY"
AWS_REGION="us-east-1"
AWS_SNS_REGION="us-east-1"
AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566"
AWS_DEFAULT_REGION="localhost"

Expand All @@ -27,7 +28,6 @@ SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:45
##### SNS #####

ALERTS="sns"
AWS_SNS_REGION="us-east-1"
AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn"
AWS_SNS_ARN_NAME="madara-orchestrator-arn"

Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- moved rust build and tests to self-hosted runner
- e2e flow test
- database timestamps
- moved rust build and tests to self-hosted runner
- alerts module.
- Tests for Settlement client.
- Worker queues to listen for trigger events.
Expand All @@ -34,6 +35,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- ethereum DA client builder
- AWS config built from TestConfigBuilder.
- Better TestConfigBuilder, with sync config clients.
- Drilled Config, removing dirty global reads.
- settings provider
- refactor AWS config usage and clean .env files
- GitHub's coverage CI yml file for localstack and db testing.
Expand Down
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ url = { version = "2.5.0", features = ["serde"] }
uuid = { version = "1.7.0", features = ["v4", "serde"] }
httpmock = { version = "0.7.0", features = ["remote"] }
num-bigint = { version = "0.4.4" }
arc-swap = { version = "1.7.0" }
num-traits = "0.2"
lazy_static = "1.4.0"
stark_evm_adapter = "0.1.1"
Expand Down
20 changes: 17 additions & 3 deletions crates/da-clients/ethereum/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use crate::EthereumDaClient;
use alloy::network::Ethereum;
use alloy::providers::ProviderBuilder;
use alloy::rpc::client::RpcClient;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use url::Url;
use utils::settings::Settings;

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -11,9 +17,17 @@ pub struct EthereumDaConfig {
impl EthereumDaConfig {
pub fn new_with_settings(settings: &impl Settings) -> color_eyre::Result<Self> {
Ok(Self {
rpc_url: settings.get_settings("SETTLEMENT_RPC_URL")?,
memory_pages_contract: settings.get_settings("MEMORY_PAGES_CONTRACT_ADDRESS")?,
private_key: settings.get_settings("PRIVATE_KEY")?,
rpc_url: settings.get_settings_or_panic("SETTLEMENT_RPC_URL"),
memory_pages_contract: settings.get_settings_or_panic("MEMORY_PAGES_CONTRACT_ADDRESS"),
private_key: settings.get_settings_or_panic("PRIVATE_KEY"),
})
}

pub async fn build_client(&self) -> EthereumDaClient {
let client =
RpcClient::new_http(Url::from_str(self.rpc_url.as_str()).expect("Failed to parse SETTLEMENT_RPC_URL"));
let provider = ProviderBuilder::<_, Ethereum>::new().on_client(client);

EthereumDaClient { provider }
}
}
1 change: 0 additions & 1 deletion crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ path = "src/main.rs"

[dependencies]
alloy = { workspace = true }
arc-swap = { workspace = true }
assert_matches = "1.5.0"
async-std = "1.12.0"
async-trait = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/orchestrator/src/alerts/aws_sns/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ pub struct AWSSNSConfig {
impl AWSSNSConfig {
pub fn new_with_settings(settings: &impl Settings) -> color_eyre::Result<Self> {
Ok(Self {
sns_arn: settings.get_settings("AWS_SNS_ARN")?,
sns_arn_region: settings.get_settings("AWS_SNS_REGION")?,
sns_arn: settings.get_settings_or_panic("AWS_SNS_ARN"),
sns_arn_region: settings.get_settings_or_panic("AWS_SNS_REGION"),
})
}
}
15 changes: 7 additions & 8 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod config;

use std::sync::Arc;

use crate::alerts::aws_sns::config::AWSSNSConfig;
use crate::alerts::Alerts;
use crate::config::ProviderConfig;
Expand All @@ -15,14 +17,11 @@ pub struct AWSSNS {
}

impl AWSSNS {
pub async fn new_with_settings(settings: &impl Settings, provider_config: ProviderConfig) -> Self {
match provider_config {
ProviderConfig::AWS(aws_config) => {
let sns_config = AWSSNSConfig::new_with_settings(settings)
.expect("Not able to get Aws sns config from provided settings");
Self { client: Client::new(&aws_config), topic_arn: sns_config.sns_arn }
}
}
pub async fn new_with_settings(settings: &impl Settings, provider_config: Arc<ProviderConfig>) -> Self {
let sns_config =
AWSSNSConfig::new_with_settings(settings).expect("Not able to get Aws sns config from provided settings");
let config = provider_config.get_aws_client_or_panic();
Self { client: Client::new(config), topic_arn: sns_config.sns_arn }
}
}

Expand Down
105 changes: 43 additions & 62 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,27 @@ use std::str::FromStr;

use std::sync::Arc;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::DataStorage;
use arc_swap::{ArcSwap, Guard};
use aws_config::meta::region::RegionProviderChain;
use aws_config::{Region, SdkConfig};
use aws_credential_types::Credentials;
use da_client_interface::DaClient;
use aws_config::SdkConfig;
use dotenvy::dotenv;
use ethereum_da_client::EthereumDaClient;
use ethereum_da_client::config::EthereumDaConfig;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{JsonRpcClient, Url};

use da_client_interface::DaClient;
use ethereum_settlement_client::EthereumSettlementClient;
use prover_client_interface::ProverClient;
use settlement_client_interface::SettlementClient;
use sharp_service::SharpProverService;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{JsonRpcClient, Url};
use starknet_settlement_client::StarknetSettlementClient;
use tokio::sync::OnceCell;
use utils::env_utils::get_env_var_or_panic;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::DataStorage;
use aws_config::meta::region::RegionProviderChain;
use aws_config::Region;
use aws_credential_types::Credentials;
use utils::settings::env::EnvSettingsProvider;
use utils::settings::Settings;

Expand Down Expand Up @@ -61,55 +62,58 @@ pub struct Config {
///
/// We are using Arc<SdkConfig> because the config size is large and keeping it
/// a pointer is a better way to pass it through.
#[derive(Clone)]
pub enum ProviderConfig {
AWS(Arc<SdkConfig>),
AWS(Box<SdkConfig>),
}

impl ProviderConfig {
pub fn get_aws_client_or_panic(&self) -> &SdkConfig {
match self {
ProviderConfig::AWS(config) => config.as_ref(),
}
}
}

/// To build a `SdkConfig` for AWS provider.
pub async fn get_aws_config(settings_provider: &impl Settings) -> SdkConfig {
let region = settings_provider
.get_settings("AWS_REGION")
.expect("Not able to get AWS_REGION from provided settings provider.");
let region = settings_provider.get_settings_or_panic("AWS_REGION");
let region_provider = RegionProviderChain::first_try(Region::new(region)).or_default_provider();
let credentials = Credentials::from_keys(
settings_provider
.get_settings("AWS_ACCESS_KEY_ID")
.expect("Not able to get AWS_ACCESS_KEY_ID from provided settings provider."),
settings_provider
.get_settings("AWS_SECRET_ACCESS_KEY")
.expect("Not able to get AWS_SECRET_ACCESS_KEY from provided settings provider."),
settings_provider.get_settings_or_panic("AWS_ACCESS_KEY_ID"),
settings_provider.get_settings_or_panic("AWS_SECRET_ACCESS_KEY"),
None,
);
aws_config::from_env().credentials_provider(credentials).region(region_provider).load().await
}

/// Initializes the app config
pub async fn init_config() -> Config {
pub async fn init_config() -> Arc<Config> {
dotenv().ok();

let settings_provider = EnvSettingsProvider {};
let provider_config = Arc::new(ProviderConfig::AWS(Box::new(get_aws_config(&settings_provider).await)));

// init starknet client
let provider = JsonRpcClient::new(HttpTransport::new(
Url::parse(get_env_var_or_panic("MADARA_RPC_URL").as_str()).expect("Failed to parse URL"),
Url::parse(settings_provider.get_settings_or_panic("MADARA_RPC_URL").as_str()).expect("Failed to parse URL"),
));

let settings_provider = EnvSettingsProvider {};
let aws_config = Arc::new(get_aws_config(&settings_provider).await);

// init database
let database = build_database_client(&settings_provider).await;
let da_client = build_da_client(&settings_provider).await;
let settlement_client = build_settlement_client(&settings_provider).await;
let prover_client = build_prover_service(&settings_provider);
let storage_client = build_storage_client(&settings_provider, ProviderConfig::AWS(Arc::clone(&aws_config))).await;
let alerts_client = build_alert_client(&settings_provider, ProviderConfig::AWS(Arc::clone(&aws_config))).await;
let storage_client = build_storage_client(&settings_provider, provider_config.clone()).await;
let alerts_client = build_alert_client(&settings_provider, provider_config.clone()).await;

// init the queue
// TODO: we use omniqueue for now which doesn't support loading AWS config
// from `SdkConfig`. We can later move to using `aws_sdk_sqs`. This would require
// us stop using the generic omniqueue abstractions for message ack/nack
let queue = build_queue_client();

Config::new(
Arc::new(Config::new(
Arc::new(provider),
da_client,
prover_client,
Expand All @@ -118,7 +122,7 @@ pub async fn init_config() -> Config {
queue,
storage_client,
alerts_client,
)
))
}

impl Config {
Expand Down Expand Up @@ -178,37 +182,14 @@ impl Config {
}
}

/// The app config. It can be accessed from anywhere inside the service.
/// It's initialized only once.
/// We are using `ArcSwap` as it allow us to replace the new `Config` with
/// a new one which is required when running test cases. This approach was
/// inspired from here - https://github.com/matklad/once_cell/issues/127
pub static CONFIG: OnceCell<ArcSwap<Config>> = OnceCell::const_new();

/// Returns the app config. Initializes if not already done.
pub async fn config() -> Guard<Arc<Config>> {
let cfg = CONFIG.get_or_init(|| async { ArcSwap::from_pointee(init_config().await) }).await;
cfg.load()
}

/// OnceCell only allows us to initialize the config once and that's how it should be on production.
/// However, when running tests, we often want to reinitialize because we want to clear the DB and
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already
/// stored config inside `ArcSwap` with the new configuration and pool settings.
#[cfg(test)]
pub async fn config_force_init(config: Config) {
match CONFIG.get() {
Some(arc) => arc.store(Arc::new(config)),
None => {
CONFIG.get_or_init(|| async { ArcSwap::from_pointee(config) }).await;
}
}
}

/// Builds the DA client based on the environment variable DA_LAYER
pub async fn build_da_client(settings_provider: &impl Settings) -> Box<dyn DaClient + Send + Sync> {
match get_env_var_or_panic("DA_LAYER").as_str() {
"ethereum" => Box::new(EthereumDaClient::new_with_settings(settings_provider)),
"ethereum" => {
let config = EthereumDaConfig::new_with_settings(settings_provider)
.expect("Not able to build config from the given settings provider.");
Box::new(config.build_client().await)
}
_ => panic!("Unsupported DA layer"),
}
}
Expand Down Expand Up @@ -246,7 +227,7 @@ pub async fn build_settlement_client(settings_provider: &impl Settings) -> Box<d

pub async fn build_storage_client(
settings_provider: &impl Settings,
provider_config: ProviderConfig,
provider_config: Arc<ProviderConfig>,
) -> Box<dyn DataStorage + Send + Sync> {
match get_env_var_or_panic("DATA_STORAGE").as_str() {
"s3" => Box::new(AWSS3::new_with_settings(settings_provider, provider_config).await),
Expand All @@ -256,7 +237,7 @@ pub async fn build_storage_client(

pub async fn build_alert_client(
settings_provider: &impl Settings,
provider_config: ProviderConfig,
provider_config: Arc<ProviderConfig>,
) -> Box<dyn Alerts + Send + Sync> {
match get_env_var_or_panic("ALERTS").as_str() {
"sns" => Box::new(AWSSNS::new_with_settings(settings_provider, provider_config).await),
Expand Down
6 changes: 1 addition & 5 deletions crates/orchestrator/src/data_storage/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ pub struct AWSS3Config {
impl DataStorageConfig for AWSS3Config {
/// To return the config struct by creating it from the environment variables.
fn new_with_settings(settings: &impl Settings) -> Self {
Self {
bucket_name: settings
.get_settings("AWS_S3_BUCKET_NAME")
.expect("Not able to get AWS_S3_BUCKET_NAME from settings provided."),
}
Self { bucket_name: settings.get_settings_or_panic("AWS_S3_BUCKET_NAME") }
}
}
23 changes: 11 additions & 12 deletions crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::config::ProviderConfig;
use crate::data_storage::aws_s3::config::AWSS3Config;
use crate::data_storage::{DataStorage, DataStorageConfig};
Expand All @@ -24,18 +26,15 @@ pub struct AWSS3 {
/// - initializing a new AWS S3 client
impl AWSS3 {
/// To init the struct with main settings
pub async fn new_with_settings(settings: &impl Settings, provider_config: ProviderConfig) -> Self {
match provider_config {
ProviderConfig::AWS(aws_config) => {
let s3_config = AWSS3Config::new_with_settings(settings);
// Building AWS S3 config
let mut s3_config_builder = aws_sdk_s3::config::Builder::from(aws_config.as_ref());
// this is necessary for it to work with localstack in test cases
s3_config_builder.set_force_path_style(Some(true));
let client = Client::from_conf(s3_config_builder.build());
Self { client, bucket: s3_config.bucket_name }
}
}
pub async fn new_with_settings(settings: &impl Settings, provider_config: Arc<ProviderConfig>) -> Self {
let s3_config = AWSS3Config::new_with_settings(settings);
let aws_config = provider_config.get_aws_client_or_panic();
// Building AWS S3 config
let mut s3_config_builder = aws_sdk_s3::config::Builder::from(aws_config);
// this is necessary for it to work with localstack in test cases
s3_config_builder.set_force_path_style(Some(true));
let client = Client::from_conf(s3_config_builder.build());
Self { client, bucket: s3_config.bucket_name }
}
}

Expand Down
6 changes: 1 addition & 5 deletions crates/orchestrator/src/database/mongodb/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ pub struct MongoDbConfig {

impl DatabaseConfig for MongoDbConfig {
fn new_with_settings(settings: &impl Settings) -> Self {
Self {
url: settings
.get_settings("MONGODB_CONNECTION_STRING")
.expect("Not able to get MONGODB_CONNECTION_STRING form the given settings"),
}
Self { url: settings.get_settings_or_panic("MONGODB_CONNECTION_STRING") }
}
}
Loading

0 comments on commit 0eaad9f

Please sign in to comment.