Skip to content

Commit

Permalink
mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
fbrv committed Jul 22, 2024
1 parent ec4748d commit b9c731f
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 37 deletions.
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,24 @@ To execute the forward service, use the following command:
Example `configuration.toml`:

```toml
beacon-urls = ["beacon-url-1", "beacon-url-2"]
url-provider = "lookahead"
beacon-nodes = ["beacon-url-1", "beacon-url-2"]

[[lookahead-providers-relays]]
chain-id = 1
relay-urls = ["relay-1", "relay-2"]
relays = ["relay-1", "relay-2"]

[[lookahead-providers-relays]]
chain-id = 2
relay-urls = ["relay-3"]
relays = ["relay-3"]

[url-mapping]
"bls-public-key-1" = "url-to-forward-request"
```

### Details
- url-provider: Specifies the source of the URL. It can be either lookahead or url-mapping.
- If set to **lookahead**, the URL is derived from the lookahead entry.
- If set to **url-mapping**, the URL is determined by looking up the public keys between the lookahead entry public key and the map provided in url-mapping.

Make sure to provide the necessary beacon and relay URLs in the configuration file.
5 changes: 3 additions & 2 deletions config.example.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
beacon-urls = ["https:://beacon-url"]
url-provider = "lookahead"
beacon-nodes = ["https:://beacon-url"]

[[lookahead-providers-relays]]
chain-id = 1
relay-urls = ["https://relay-url"]
relays = ["https://relay-url"]
20 changes: 16 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
use std::{fs, path::Path};

use eyre::{Result, WrapErr};
use hashbrown::HashMap;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum UrlProvider {
Lookahead,
UrlMapping,
}

#[derive(Debug, Deserialize)]
pub struct Config {
#[serde(rename = "lookahead-providers-relays")]
pub lookahead_providers_relays: Vec<LookaheadProvider>,
#[serde(rename = "beacon-urls")]
pub beacon_urls: Vec<String>,
#[serde(rename = "beacon-nodes")]
pub beacon_nodes: Vec<String>,
#[serde(rename = "url-provider")]
pub url_provider: UrlProvider,
#[serde(rename = "pubkey-url-map")]
pub url_map: Option<HashMap<String, String>>,
}

#[derive(Debug, Deserialize)]
pub struct LookaheadProvider {
#[serde(rename = "chain-id")]
pub chain_id: u16,
#[serde(rename = "relay-urls")]
pub relay_urls: Vec<String>,
#[serde(rename = "relays")]
pub relays: Vec<String>,
}

impl Config {
Expand Down
149 changes: 128 additions & 21 deletions src/forward_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl RpcForward {

fn router(shared_state: SharedState) -> Router {
Router::new()
.route("/:scan_id", post(scan_id_forward_request))
.route("/:chain_id", post(scan_id_forward_request))
.route("/", post(forward_request))
.layer(TraceLayer::new_for_http())
.with_state(Arc::new(shared_state))
Expand All @@ -107,27 +107,32 @@ async fn scan_id_forward_request(
body: Bytes,
) -> Result<impl IntoResponse, impl IntoResponse> {
if let Some(manager) = state.managers.get(&chain_id) {
if let Some(entry) = manager.get_next_elected_preconfer() {
match inner_forward_request(&state.client, &entry.url, body, headers).await {
Ok(res) => Ok(res),
Err(_) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error while forwarding request".to_string(),
)),
}
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("no preconfer has been elected for chain_id {}", chain_id),
))
match manager.get_url() {
None => Err((
StatusCode::BAD_REQUEST,
format!("no lookahead entry found for chain-id {}", chain_id),
)),
Some(url) => match url {
Ok(url) => match inner_forward_request(&state.client, &url, body, headers).await {
Ok(res) => Ok(res),
Err(_) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error while forwarding request".to_string(),
)),
},
Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())),
},
}
} else {
Err((StatusCode::BAD_REQUEST, format!("no lookahead provider found for id {}", chain_id)))
Err((
StatusCode::BAD_REQUEST,
format!("no lookahead provider found for chain-id {}", chain_id),
))
}
}

async fn forward_request(State(_state): State<Arc<SharedState>>) -> impl IntoResponse {
(StatusCode::BAD_REQUEST, "missing chain_id parameter")
(StatusCode::BAD_REQUEST, "missing chain-id parameter")
}

async fn inner_forward_request(
Expand All @@ -148,6 +153,7 @@ mod test {
time::Duration,
};

use alloy::rpc::types::beacon::{constants::BLS_PUBLIC_KEY_BYTES_LEN, BlsPublicKey};
use axum::{
extract::State,
http::HeaderMap,
Expand All @@ -163,7 +169,8 @@ mod test {

use crate::{
forward_service::{router, SharedState},
lookahead::{Lookahead, LookaheadEntry, LookaheadManager, LookaheadProvider},
lookahead::{Lookahead, LookaheadEntry, LookaheadManager, LookaheadProvider, UrlProvider},
preconf::election::{PreconferElection, SignedPreconferElection},
};

struct DummySharedState {
Expand All @@ -176,6 +183,7 @@ mod test {
let manager = LookaheadManager::new(
Lookahead { map: DashMap::new().into() },
LookaheadProvider::None,
UrlProvider::LookaheadEntry,
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
Expand All @@ -186,7 +194,7 @@ mod test {
tokio::time::sleep(Duration::from_secs(1)).await;
let res = reqwest::Client::new().post("http://localhost:12001").send().await.unwrap();
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
assert_eq!(res.text().await.unwrap(), "missing chain_id parameter");
assert_eq!(res.text().await.unwrap(), "missing chain-id parameter");
Ok(())
}

Expand All @@ -196,6 +204,7 @@ mod test {
let manager = LookaheadManager::new(
Lookahead { map: DashMap::new().into() },
LookaheadProvider::None,
UrlProvider::LookaheadEntry,
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
Expand All @@ -206,7 +215,7 @@ mod test {
tokio::time::sleep(Duration::from_secs(1)).await;
let res = reqwest::Client::new().post("http://localhost:12002/2").send().await.unwrap();
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
assert_eq!(res.text().await.unwrap(), "no lookahead provider found for id 2");
assert_eq!(res.text().await.unwrap(), "no lookahead provider found for chain-id 2");
Ok(())
}

Expand All @@ -218,7 +227,11 @@ mod test {
url: "http://not-a-valid-url".into(),
..Default::default()
});
let manager = LookaheadManager::new(Lookahead { map }, LookaheadProvider::None);
let manager = LookaheadManager::new(
Lookahead { map },
LookaheadProvider::None,
UrlProvider::LookaheadEntry,
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
Expand Down Expand Up @@ -248,7 +261,11 @@ mod test {
url: "http://localhost:12004".into(),
..Default::default()
});
let manager = LookaheadManager::new(Lookahead { map }, LookaheadProvider::None);
let manager = LookaheadManager::new(
Lookahead { map },
LookaheadProvider::None,
UrlProvider::LookaheadEntry,
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
Expand Down Expand Up @@ -276,6 +293,96 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_url_map_request() -> Result<()> {
tokio::spawn(async move {
let dst = Arc::new(Mutex::new(DummySharedState { cnt: 0 }));
let router: Router = Router::new()
.route("/", post(handle_request))
.route("/cnt", get(counter))
.with_state(dst);
let listener = tokio::net::TcpListener::bind("localhost:12006").await.unwrap();
axum::serve(listener, router).await.unwrap();
});
tokio::spawn(async move {
let map = Arc::new(DashMap::new());
let signature: BlsPublicKey = BlsPublicKey::from([42u8; BLS_PUBLIC_KEY_BYTES_LEN]);
let mut url_mapping = HashMap::new();
url_mapping.insert(signature.to_string(), "http://localhost:12006".into());
map.insert(0, LookaheadEntry {
url: "".into(),
election: SignedPreconferElection {
message: PreconferElection {
preconfer_pubkey: signature.clone(),
..Default::default()
},
..Default::default()
},
});
let manager = LookaheadManager::new(
Lookahead { map },
LookaheadProvider::None,
UrlProvider::UrlMap(url_mapping),
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
let listener = tokio::net::TcpListener::bind("localhost:12007").await.unwrap();
axum::serve(listener, router).await.unwrap();
});
tokio::time::sleep(Duration::from_secs(1)).await;
for _ in 0..10 {
let mut headers = HeaderMap::new();
headers.insert("Content-Type", HeaderValue::from_str("application/json").unwrap());
let res = reqwest::Client::new()
.post("http://localhost:12007/1")
.body("dummy plain body")
.headers(headers)
.headers(HeaderMap::new())
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::OK);
}

let cnt_res = reqwest::get("http://localhost:12006/cnt").await.unwrap();
assert_eq!(StatusCode::OK, cnt_res.status());
assert_eq!(cnt_res.text().await.unwrap(), "10");
Ok(())
}

#[tokio::test]
async fn test_no_pubkey() -> Result<()> {
tokio::spawn(async move {
let signature: BlsPublicKey = BlsPublicKey::from([42u8; BLS_PUBLIC_KEY_BYTES_LEN]);
let map = Arc::new(DashMap::new());
let mut provider = HashMap::new();
provider.insert(signature.to_string(), "http:://not-a-valid-http".into());
map.insert(0, LookaheadEntry { url: "".into(), ..Default::default() });
let manager = LookaheadManager::new(
Lookahead { map },
LookaheadProvider::None,
UrlProvider::UrlMap(provider),
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
let listener = tokio::net::TcpListener::bind("localhost:12008").await.unwrap();
axum::serve(listener, router).await.unwrap();
});
tokio::time::sleep(Duration::from_secs(1)).await;
let res = reqwest::Client::new().post("http://localhost:12008/1").send().await.unwrap();
assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(
res.text().await.unwrap(),
format!(
"could not find key for pubkey {}",
BlsPublicKey::from([0u8; BLS_PUBLIC_KEY_BYTES_LEN]).to_string()
)
);
Ok(())
}

async fn handle_request(
State(state): State<Arc<Mutex<DummySharedState>>>,
headers: HeaderMap,
Expand Down
Loading

0 comments on commit b9c731f

Please sign in to comment.