Skip to content

Commit

Permalink
mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
fbrv committed Jul 22, 2024
1 parent ec4748d commit 4e4dbe1
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 26 deletions.
12 changes: 12 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
use std::{fs, path::Path};

use eyre::{Result, WrapErr};
use hashbrown::HashMap;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum UrlProvider {
Lookahead,
UrlProvider,
}

#[derive(Debug, Deserialize)]
pub struct Config {
#[serde(rename = "lookahead-providers-relays")]
pub lookahead_providers_relays: Vec<LookaheadProvider>,
#[serde(rename = "beacon-urls")]
pub beacon_urls: Vec<String>,
#[serde(rename = "url-provider")]
pub url_provider: UrlProvider,
#[serde(rename = "pubkey-url-map")]
pub url_map: Option<HashMap<String, String>>,
}

#[derive(Debug, Deserialize)]
Expand Down
148 changes: 127 additions & 21 deletions src/forward_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl RpcForward {

fn router(shared_state: SharedState) -> Router {
Router::new()
.route("/:scan_id", post(scan_id_forward_request))
.route("/:chain_id", post(scan_id_forward_request))
.route("/", post(forward_request))
.layer(TraceLayer::new_for_http())
.with_state(Arc::new(shared_state))
Expand All @@ -107,27 +107,32 @@ async fn scan_id_forward_request(
body: Bytes,
) -> Result<impl IntoResponse, impl IntoResponse> {
if let Some(manager) = state.managers.get(&chain_id) {
if let Some(entry) = manager.get_next_elected_preconfer() {
match inner_forward_request(&state.client, &entry.url, body, headers).await {
Ok(res) => Ok(res),
Err(_) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error while forwarding request".to_string(),
)),
}
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("no preconfer has been elected for chain_id {}", chain_id),
))
match manager.get_url() {
None => Err((
StatusCode::BAD_REQUEST,
format!("no lookahead entry found for chain-id {}", chain_id),
)),
Some(url) => match url {
Ok(url) => match inner_forward_request(&state.client, &url, body, headers).await {
Ok(res) => Ok(res),
Err(_) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error while forwarding request".to_string(),
)),
},
Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())),
},
}
} else {
Err((StatusCode::BAD_REQUEST, format!("no lookahead provider found for id {}", chain_id)))
Err((
StatusCode::BAD_REQUEST,
format!("no lookahead provider found for chain-id {}", chain_id),
))
}
}

async fn forward_request(State(_state): State<Arc<SharedState>>) -> impl IntoResponse {
(StatusCode::BAD_REQUEST, "missing chain_id parameter")
(StatusCode::BAD_REQUEST, "missing chain-id parameter")
}

async fn inner_forward_request(
Expand All @@ -148,6 +153,7 @@ mod test {
time::Duration,
};

use alloy::rpc::types::beacon::{constants::BLS_PUBLIC_KEY_BYTES_LEN, BlsPublicKey};
use axum::{
extract::State,
http::HeaderMap,
Expand All @@ -163,7 +169,8 @@ mod test {

use crate::{
forward_service::{router, SharedState},
lookahead::{Lookahead, LookaheadEntry, LookaheadManager, LookaheadProvider},
lookahead::{Lookahead, LookaheadEntry, LookaheadManager, LookaheadProvider, UrlProvider},
preconf::election::{PreconferElection, SignedPreconferElection},
};

struct DummySharedState {
Expand All @@ -176,6 +183,7 @@ mod test {
let manager = LookaheadManager::new(
Lookahead { map: DashMap::new().into() },
LookaheadProvider::None,
UrlProvider::LookaheadEntry,
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
Expand All @@ -186,7 +194,7 @@ mod test {
tokio::time::sleep(Duration::from_secs(1)).await;
let res = reqwest::Client::new().post("http://localhost:12001").send().await.unwrap();
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
assert_eq!(res.text().await.unwrap(), "missing chain_id parameter");
assert_eq!(res.text().await.unwrap(), "missing chain-id parameter");
Ok(())
}

Expand All @@ -196,6 +204,7 @@ mod test {
let manager = LookaheadManager::new(
Lookahead { map: DashMap::new().into() },
LookaheadProvider::None,
UrlProvider::LookaheadEntry,
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
Expand All @@ -206,7 +215,7 @@ mod test {
tokio::time::sleep(Duration::from_secs(1)).await;
let res = reqwest::Client::new().post("http://localhost:12002/2").send().await.unwrap();
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
assert_eq!(res.text().await.unwrap(), "no lookahead provider found for id 2");
assert_eq!(res.text().await.unwrap(), "no lookahead provider found for chain-id 2");
Ok(())
}

Expand All @@ -218,7 +227,36 @@ mod test {
url: "http://not-a-valid-url".into(),
..Default::default()
});
let manager = LookaheadManager::new(Lookahead { map }, LookaheadProvider::None);
let manager = LookaheadManager::new(
Lookahead { map },
LookaheadProvider::None,
UrlProvider::LookaheadEntry,
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
let listener = tokio::net::TcpListener::bind("localhost:12003").await.unwrap();
axum::serve(listener, router).await.unwrap();
});
tokio::time::sleep(Duration::from_secs(1)).await;
let res = reqwest::Client::new().post("http://localhost:12003/1").send().await.unwrap();
assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
Ok(())
}

#[tokio::test]
async fn test_no_pubkey() -> Result<()> {
tokio::spawn(async move {
let signature: BlsPublicKey = BlsPublicKey::from([42u8; BLS_PUBLIC_KEY_BYTES_LEN]);
let map = Arc::new(DashMap::new());
let mut provider = HashMap::new();
provider.insert(signature.to_string(), "http:://not-a-valid-http".into());
map.insert(0, LookaheadEntry { url: "".into(), ..Default::default() });
let manager = LookaheadManager::new(
Lookahead { map },
LookaheadProvider::None,
UrlProvider::UrlMap(provider),
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
Expand All @@ -228,6 +266,13 @@ mod test {
tokio::time::sleep(Duration::from_secs(1)).await;
let res = reqwest::Client::new().post("http://localhost:12003/1").send().await.unwrap();
assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(
res.text().await.unwrap(),
format!(
"could not find key for pubkey {}",
BlsPublicKey::from([0u8; BLS_PUBLIC_KEY_BYTES_LEN]).to_string()
)
);
Ok(())
}

Expand All @@ -248,7 +293,11 @@ mod test {
url: "http://localhost:12004".into(),
..Default::default()
});
let manager = LookaheadManager::new(Lookahead { map }, LookaheadProvider::None);
let manager = LookaheadManager::new(
Lookahead { map },
LookaheadProvider::None,
UrlProvider::LookaheadEntry,
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
Expand Down Expand Up @@ -276,6 +325,63 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_url_map_request() -> Result<()> {
tokio::spawn(async move {
let dst = Arc::new(Mutex::new(DummySharedState { cnt: 0 }));
let router: Router = Router::new()
.route("/", post(handle_request))
.route("/cnt", get(counter))
.with_state(dst);
let listener = tokio::net::TcpListener::bind("localhost:12006").await.unwrap();
axum::serve(listener, router).await.unwrap();
});
tokio::spawn(async move {
let map = Arc::new(DashMap::new());
let signature: BlsPublicKey = BlsPublicKey::from([42u8; BLS_PUBLIC_KEY_BYTES_LEN]);
let mut url_mapping = HashMap::new();
url_mapping.insert(signature.to_string(), "http://localhost:12006".into());
map.insert(0, LookaheadEntry {
url: "".into(),
election: SignedPreconferElection {
message: PreconferElection {
preconfer_pubkey: signature.clone(),
..Default::default()
},
..Default::default()
},
});
let manager = LookaheadManager::new(
Lookahead { map },
LookaheadProvider::None,
UrlProvider::UrlMap(url_mapping),
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
let listener = tokio::net::TcpListener::bind("localhost:12007").await.unwrap();
axum::serve(listener, router).await.unwrap();
});
tokio::time::sleep(Duration::from_secs(1)).await;
for _ in 0..10 {
let mut headers = HeaderMap::new();
headers.insert("Content-Type", HeaderValue::from_str("application/json").unwrap());
let res = reqwest::Client::new()
.post("http://localhost:12007/1")
.body("dummy plain body")
.headers(headers)
.headers(HeaderMap::new())
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::OK);
}

let cnt_res = reqwest::get("http://localhost:12006/cnt").await.unwrap();
assert_eq!(StatusCode::OK, cnt_res.status());
assert_eq!(cnt_res.text().await.unwrap(), "10");
Ok(())
}
async fn handle_request(
State(state): State<Arc<Mutex<DummySharedState>>>,
headers: HeaderMap,
Expand Down
42 changes: 38 additions & 4 deletions src/lookahead/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy::rpc::types::beacon::events::HeadEvent;
use dashmap::DashMap;
use eyre::{bail, Result};
use eyre::{bail, ContextCompat, Result};
use hashbrown::HashMap;
use tokio::sync::broadcast;

Expand All @@ -16,17 +16,29 @@ enum LookaheadProviderManager {
Running,
}

#[derive(Debug, Clone)]
pub enum UrlProvider {
LookaheadEntry,
UrlMap(HashMap<String, String>),
}

/// Manages the lookahead for preconfer elections.
pub struct LookaheadManager {
lookahead: Lookahead,
provider_manager: Option<LookaheadProviderManager>,
url_provider: UrlProvider,
}

impl LookaheadManager {
pub fn new(lookahead: Lookahead, lookahead_provider: LookaheadProvider) -> Self {
pub fn new(
lookahead: Lookahead,
lookahead_provider: LookaheadProvider,
url_provider: UrlProvider,
) -> Self {
Self {
lookahead,
provider_manager: Some(LookaheadProviderManager::Initialized(lookahead_provider)),
url_provider,
}
}

Expand All @@ -47,16 +59,38 @@ impl LookaheadManager {
}
}

pub fn get_next_elected_preconfer(&self) -> Option<LookaheadEntry> {
fn get_next_elected_preconfer(&self) -> Option<LookaheadEntry> {
self.lookahead.get_next_elected_preconfer()
}

pub fn get_url(&self) -> Option<Result<String>> {
self.get_next_elected_preconfer().map(|entry| match &self.url_provider {
UrlProvider::LookaheadEntry => Ok(entry.url),
UrlProvider::UrlMap(m) => {
let pubkey = entry.election.preconfer_pubkey().to_string();
m.get(&pubkey)
.cloned()
.wrap_err(format!("could not find key for pubkey {}", pubkey))
}
})
}
}

/// BBuilds a map of lookahead managers from the configuration, keyed by the chain-id.
pub fn lookahead_managers_from_config(
config: Config,
beacon_tx: broadcast::Sender<HeadEvent>,
) -> HashMap<u16, LookaheadManager> {
let url_provider = match config.url_provider {
crate::config::UrlProvider::Lookahead => UrlProvider::LookaheadEntry,
crate::config::UrlProvider::UrlProvider => {
let mut m = HashMap::new();
for (key, value) in config.url_map.expect("pubkey-url-map not present") {
m.insert(key, value);
}
UrlProvider::UrlMap(m)
}
};
// build managers from relay lookahead providers
let mut map = HashMap::new();
for r_c in config.lookahead_providers_relays {
Expand All @@ -70,7 +104,7 @@ pub fn lookahead_managers_from_config(
)),
}
.build_relay_provider();
map.insert(r_c.chain_id, LookaheadManager::new(lookahead, provider));
map.insert(r_c.chain_id, LookaheadManager::new(lookahead, provider, url_provider.clone()));
}
map
}
1 change: 0 additions & 1 deletion src/preconf/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,5 @@ mod tests {

let s = serde_json::to_string(&singed_constraints).unwrap();
let encode = alloy::primitives::hex::encode(s);
println!("{}", encode);
}
}

0 comments on commit 4e4dbe1

Please sign in to comment.