diff --git a/README.md b/README.md index ecb6f0b..ba8e126 100644 --- a/README.md +++ b/README.md @@ -40,15 +40,24 @@ To execute the forward service, use the following command: Example `configuration.toml`: ```toml -beacon-urls = ["beacon-url-1", "beacon-url-2"] +url-provider = "lookahead" +beacon-nodes = ["beacon-url-1", "beacon-url-2"] [[lookahead-providers-relays]] chain-id = 1 -relay-urls = ["relay-1", "relay-2"] +relays = ["relay-1", "relay-2"] [[lookahead-providers-relays]] chain-id = 2 -relay-urls = ["relay-3"] +relays = ["relay-3"] + +[url-mapping] +"bls-public-key-1" = "url-to-forward-request" ``` +### Details +- url-provider: Specifies the source of the URL. It can be either lookahead or url-mapping. + - If set to **lookahead**, the URL is derived from the lookahead entry. + - If set to **url-mapping**, the URL is determined by looking up the public keys between the lookahead entry public key and the map provided in url-mapping. + Make sure to provide the necessary beacon and relay URLs in the configuration file. \ No newline at end of file diff --git a/config.example.toml b/config.example.toml index c154a7d..34b17eb 100644 --- a/config.example.toml +++ b/config.example.toml @@ -1,5 +1,6 @@ -beacon-urls = ["https:://beacon-url"] +url-provider = "lookahead" +beacon-nodes = ["https:://beacon-url"] [[lookahead-providers-relays]] chain-id = 1 -relay-urls = ["https://relay-url"] \ No newline at end of file +relays = ["https://relay-url"] \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 9081b87..d3f7916 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,22 +1,34 @@ use std::{fs, path::Path}; use eyre::{Result, WrapErr}; +use hashbrown::HashMap; use serde::Deserialize; +#[derive(Debug, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum UrlProvider { + Lookahead, + UrlMapping, +} + #[derive(Debug, Deserialize)] pub struct Config { #[serde(rename = "lookahead-providers-relays")] pub lookahead_providers_relays: Vec, - #[serde(rename = "beacon-urls")] - pub beacon_urls: Vec, + #[serde(rename = "beacon-nodes")] + pub beacon_nodes: Vec, + #[serde(rename = "url-provider")] + pub url_provider: UrlProvider, + #[serde(rename = "pubkey-url-map")] + pub url_map: Option>, } #[derive(Debug, Deserialize)] pub struct LookaheadProvider { #[serde(rename = "chain-id")] pub chain_id: u16, - #[serde(rename = "relay-urls")] - pub relay_urls: Vec, + #[serde(rename = "relays")] + pub relays: Vec, } impl Config { diff --git a/src/forward_service.rs b/src/forward_service.rs index 92e254a..e262d61 100644 --- a/src/forward_service.rs +++ b/src/forward_service.rs @@ -94,7 +94,7 @@ impl RpcForward { fn router(shared_state: SharedState) -> Router { Router::new() - .route("/:scan_id", post(scan_id_forward_request)) + .route("/:chain_id", post(scan_id_forward_request)) .route("/", post(forward_request)) .layer(TraceLayer::new_for_http()) .with_state(Arc::new(shared_state)) @@ -107,27 +107,32 @@ async fn scan_id_forward_request( body: Bytes, ) -> Result { if let Some(manager) = state.managers.get(&chain_id) { - if let Some(entry) = manager.get_next_elected_preconfer() { - match inner_forward_request(&state.client, &entry.url, body, headers).await { - Ok(res) => Ok(res), - Err(_) => Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "error while forwarding request".to_string(), - )), - } - } else { - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - format!("no preconfer has been elected for chain_id {}", chain_id), - )) + match manager.get_url() { + None => Err(( + StatusCode::BAD_REQUEST, + format!("no lookahead entry found for chain-id {}", chain_id), + )), + Some(url) => match url { + Ok(url) => match inner_forward_request(&state.client, &url, body, headers).await { + Ok(res) => Ok(res), + Err(_) => Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "error while forwarding request".to_string(), + )), + }, + Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())), + }, } } else { - Err((StatusCode::BAD_REQUEST, format!("no lookahead provider found for id {}", chain_id))) + Err(( + StatusCode::BAD_REQUEST, + format!("no lookahead provider found for chain-id {}", chain_id), + )) } } async fn forward_request(State(_state): State>) -> impl IntoResponse { - (StatusCode::BAD_REQUEST, "missing chain_id parameter") + (StatusCode::BAD_REQUEST, "missing chain-id parameter") } async fn inner_forward_request( @@ -148,6 +153,7 @@ mod test { time::Duration, }; + use alloy::rpc::types::beacon::{constants::BLS_PUBLIC_KEY_BYTES_LEN, BlsPublicKey}; use axum::{ extract::State, http::HeaderMap, @@ -163,7 +169,8 @@ mod test { use crate::{ forward_service::{router, SharedState}, - lookahead::{Lookahead, LookaheadEntry, LookaheadManager, LookaheadProvider}, + lookahead::{Lookahead, LookaheadEntry, LookaheadManager, LookaheadProvider, UrlProvider}, + preconf::election::{PreconferElection, SignedPreconferElection}, }; struct DummySharedState { @@ -176,6 +183,7 @@ mod test { let manager = LookaheadManager::new( Lookahead { map: DashMap::new().into() }, LookaheadProvider::None, + UrlProvider::LookaheadEntry, ); let mut managers = HashMap::new(); managers.insert(1u16, manager); @@ -186,7 +194,7 @@ mod test { tokio::time::sleep(Duration::from_secs(1)).await; let res = reqwest::Client::new().post("http://localhost:12001").send().await.unwrap(); assert_eq!(res.status(), StatusCode::BAD_REQUEST); - assert_eq!(res.text().await.unwrap(), "missing chain_id parameter"); + assert_eq!(res.text().await.unwrap(), "missing chain-id parameter"); Ok(()) } @@ -196,6 +204,7 @@ mod test { let manager = LookaheadManager::new( Lookahead { map: DashMap::new().into() }, LookaheadProvider::None, + UrlProvider::LookaheadEntry, ); let mut managers = HashMap::new(); managers.insert(1u16, manager); @@ -206,7 +215,7 @@ mod test { tokio::time::sleep(Duration::from_secs(1)).await; let res = reqwest::Client::new().post("http://localhost:12002/2").send().await.unwrap(); assert_eq!(res.status(), StatusCode::BAD_REQUEST); - assert_eq!(res.text().await.unwrap(), "no lookahead provider found for id 2"); + assert_eq!(res.text().await.unwrap(), "no lookahead provider found for chain-id 2"); Ok(()) } @@ -218,7 +227,11 @@ mod test { url: "http://not-a-valid-url".into(), ..Default::default() }); - let manager = LookaheadManager::new(Lookahead { map }, LookaheadProvider::None); + let manager = LookaheadManager::new( + Lookahead { map }, + LookaheadProvider::None, + UrlProvider::LookaheadEntry, + ); let mut managers = HashMap::new(); managers.insert(1u16, manager); let router = router(SharedState::new(managers).unwrap()); @@ -248,7 +261,11 @@ mod test { url: "http://localhost:12004".into(), ..Default::default() }); - let manager = LookaheadManager::new(Lookahead { map }, LookaheadProvider::None); + let manager = LookaheadManager::new( + Lookahead { map }, + LookaheadProvider::None, + UrlProvider::LookaheadEntry, + ); let mut managers = HashMap::new(); managers.insert(1u16, manager); let router = router(SharedState::new(managers).unwrap()); @@ -276,6 +293,96 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_url_map_request() -> Result<()> { + tokio::spawn(async move { + let dst = Arc::new(Mutex::new(DummySharedState { cnt: 0 })); + let router: Router = Router::new() + .route("/", post(handle_request)) + .route("/cnt", get(counter)) + .with_state(dst); + let listener = tokio::net::TcpListener::bind("localhost:12006").await.unwrap(); + axum::serve(listener, router).await.unwrap(); + }); + tokio::spawn(async move { + let map = Arc::new(DashMap::new()); + let signature: BlsPublicKey = BlsPublicKey::from([42u8; BLS_PUBLIC_KEY_BYTES_LEN]); + let mut url_mapping = HashMap::new(); + url_mapping.insert(signature.to_string(), "http://localhost:12006".into()); + map.insert(0, LookaheadEntry { + url: "".into(), + election: SignedPreconferElection { + message: PreconferElection { + preconfer_pubkey: signature.clone(), + ..Default::default() + }, + ..Default::default() + }, + }); + let manager = LookaheadManager::new( + Lookahead { map }, + LookaheadProvider::None, + UrlProvider::UrlMap(url_mapping), + ); + let mut managers = HashMap::new(); + managers.insert(1u16, manager); + let router = router(SharedState::new(managers).unwrap()); + let listener = tokio::net::TcpListener::bind("localhost:12007").await.unwrap(); + axum::serve(listener, router).await.unwrap(); + }); + tokio::time::sleep(Duration::from_secs(1)).await; + for _ in 0..10 { + let mut headers = HeaderMap::new(); + headers.insert("Content-Type", HeaderValue::from_str("application/json").unwrap()); + let res = reqwest::Client::new() + .post("http://localhost:12007/1") + .body("dummy plain body") + .headers(headers) + .headers(HeaderMap::new()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } + + let cnt_res = reqwest::get("http://localhost:12006/cnt").await.unwrap(); + assert_eq!(StatusCode::OK, cnt_res.status()); + assert_eq!(cnt_res.text().await.unwrap(), "10"); + Ok(()) + } + + #[tokio::test] + async fn test_no_pubkey() -> Result<()> { + tokio::spawn(async move { + let signature: BlsPublicKey = BlsPublicKey::from([42u8; BLS_PUBLIC_KEY_BYTES_LEN]); + let map = Arc::new(DashMap::new()); + let mut provider = HashMap::new(); + provider.insert(signature.to_string(), "http:://not-a-valid-http".into()); + map.insert(0, LookaheadEntry { url: "".into(), ..Default::default() }); + let manager = LookaheadManager::new( + Lookahead { map }, + LookaheadProvider::None, + UrlProvider::UrlMap(provider), + ); + let mut managers = HashMap::new(); + managers.insert(1u16, manager); + let router = router(SharedState::new(managers).unwrap()); + let listener = tokio::net::TcpListener::bind("localhost:12008").await.unwrap(); + axum::serve(listener, router).await.unwrap(); + }); + tokio::time::sleep(Duration::from_secs(1)).await; + let res = reqwest::Client::new().post("http://localhost:12008/1").send().await.unwrap(); + assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!( + res.text().await.unwrap(), + format!( + "could not find key for pubkey {}", + BlsPublicKey::from([0u8; BLS_PUBLIC_KEY_BYTES_LEN]).to_string() + ) + ); + Ok(()) + } + async fn handle_request( State(state): State>>, headers: HeaderMap, diff --git a/src/lookahead/manager.rs b/src/lookahead/manager.rs index 0e909b8..ae9df15 100644 --- a/src/lookahead/manager.rs +++ b/src/lookahead/manager.rs @@ -1,6 +1,6 @@ use alloy::rpc::types::beacon::events::HeadEvent; use dashmap::DashMap; -use eyre::{bail, Result}; +use eyre::{bail, ContextCompat, Result}; use hashbrown::HashMap; use tokio::sync::broadcast; @@ -16,17 +16,29 @@ enum LookaheadProviderManager { Running, } +#[derive(Debug, Clone)] +pub enum UrlProvider { + LookaheadEntry, + UrlMap(HashMap), +} + /// Manages the lookahead for preconfer elections. pub struct LookaheadManager { lookahead: Lookahead, provider_manager: Option, + url_provider: UrlProvider, } impl LookaheadManager { - pub fn new(lookahead: Lookahead, lookahead_provider: LookaheadProvider) -> Self { + pub fn new( + lookahead: Lookahead, + lookahead_provider: LookaheadProvider, + url_provider: UrlProvider, + ) -> Self { Self { lookahead, provider_manager: Some(LookaheadProviderManager::Initialized(lookahead_provider)), + url_provider, } } @@ -47,9 +59,21 @@ impl LookaheadManager { } } - pub fn get_next_elected_preconfer(&self) -> Option { + fn get_next_elected_preconfer(&self) -> Option { self.lookahead.get_next_elected_preconfer() } + + pub fn get_url(&self) -> Option> { + self.get_next_elected_preconfer().map(|entry| match &self.url_provider { + UrlProvider::LookaheadEntry => Ok(entry.url), + UrlProvider::UrlMap(m) => { + let pubkey = entry.election.preconfer_pubkey().to_string(); + m.get(&pubkey) + .cloned() + .wrap_err(format!("could not find key for pubkey {}", pubkey)) + } + }) + } } /// BBuilds a map of lookahead managers from the configuration, keyed by the chain-id. @@ -57,6 +81,16 @@ pub fn lookahead_managers_from_config( config: Config, beacon_tx: broadcast::Sender, ) -> HashMap { + let url_provider = match config.url_provider { + crate::config::UrlProvider::Lookahead => UrlProvider::LookaheadEntry, + crate::config::UrlProvider::UrlProvider => { + let mut m = HashMap::new(); + for (key, value) in config.url_map.expect("pubkey-url-map not present") { + m.insert(key, value); + } + UrlProvider::UrlMap(m) + } + }; // build managers from relay lookahead providers let mut map = HashMap::new(); for r_c in config.lookahead_providers_relays { @@ -65,12 +99,12 @@ pub fn lookahead_managers_from_config( head_event_receiver: Some(beacon_tx.subscribe()), relay_provider: Some(RelayLookaheadProvider::new( lookahead.clone(), - r_c.relay_urls, + r_c.relays, HashMap::new(), )), } .build_relay_provider(); - map.insert(r_c.chain_id, LookaheadManager::new(lookahead, provider)); + map.insert(r_c.chain_id, LookaheadManager::new(lookahead, provider, url_provider.clone())); } map } diff --git a/src/main.rs b/src/main.rs index 7c1fb9f..fb6d8d9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,7 +45,7 @@ async fn main() -> Result<()> { Commands::Forward { port } => { let config = Config::from_file(&cli.config)?; let (beacon_tx, _beacon_rx) = broadcast::channel(16); - let client = MultiBeaconClient::from_endpoint_strs(&config.beacon_urls); + let client = MultiBeaconClient::from_endpoint_strs(&config.beacon_nodes); client.subscribe_to_head_events(beacon_tx.clone()).await; let listening_addr = format!("0.0.0.0:{}", port.unwrap_or(8000)); diff --git a/src/preconf/constraints.rs b/src/preconf/constraints.rs index 2307419..f5bb9b2 100644 --- a/src/preconf/constraints.rs +++ b/src/preconf/constraints.rs @@ -60,6 +60,5 @@ mod tests { let s = serde_json::to_string(&singed_constraints).unwrap(); let encode = alloy::primitives::hex::encode(s); - println!("{}", encode); } }