Skip to content

Commit

Permalink
chore: log detailed gossip results from bridge processes.
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita committed Oct 31, 2023
1 parent beb8310 commit 9834940
Show file tree
Hide file tree
Showing 5 changed files with 480 additions and 144 deletions.
100 changes: 69 additions & 31 deletions portal-bridge/src/beacon_bridge.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
use std::cmp::Ordering;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;

use anyhow::bail;
use jsonrpsee::http_client::HttpClient;
use serde_json::Value;
use ssz_types::VariableList;
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
use tracing::{info, warn};

use crate::consensus_api::ConsensusApi;
use crate::constants::BEACON_GENESIS_TIME;
use crate::mode::BridgeMode;
use crate::stats::{BeaconSlotStats, StatsReporter};
use crate::utils::{
duration_until_next_update, expected_current_slot, read_test_assets_from_file, TestAssets,
};
use anyhow::bail;
use ethportal_api::types::consensus::fork::ForkName;
use ethportal_api::types::consensus::light_client::bootstrap::LightClientBootstrapCapella;
use ethportal_api::types::consensus::light_client::finality_update::LightClientFinalityUpdateCapella;
use ethportal_api::types::consensus::light_client::optimistic_update::LightClientOptimisticUpdateCapella;
use ethportal_api::types::consensus::light_client::update::{
LightClientUpdate, LightClientUpdateCapella,
};
use ethportal_api::types::content_key::beacon::{
LightClientFinalityUpdateKey, LightClientOptimisticUpdateKey,
};
use ethportal_api::types::content_value::beacon::{
ForkVersionedLightClientUpdate, LightClientUpdatesByRange,
};
Expand All @@ -20,19 +35,6 @@ use ethportal_api::BeaconNetworkApiClient;
use ethportal_api::{
BeaconContentKey, BeaconContentValue, LightClientBootstrapKey, LightClientUpdatesByRangeKey,
};
use jsonrpsee::http_client::HttpClient;
use serde_json::Value;
use ssz_types::VariableList;
use std::cmp::Ordering;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use tracing::{info, warn};

use ethportal_api::types::content_key::beacon::{
LightClientFinalityUpdateKey, LightClientOptimisticUpdateKey,
};
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};

pub struct BeaconBridge {
pub api: ConsensusApi,
Expand Down Expand Up @@ -67,11 +69,14 @@ impl BeaconBridge {
.into_beacon_assets()
.expect("Error parsing beacon test assets.");

// test files have no slot number data, so report all gossiped content at height 0.
let slot_stats = Arc::new(Mutex::new(BeaconSlotStats::new(0)));
for asset in assets.0.into_iter() {
BeaconBridge::gossip_beacon_content(
Arc::clone(&self.portal_clients),
asset.content_key,
asset.content_value,
slot_stats.clone(),
)
.await
.expect("Error serving beacon data in test mode.");
Expand Down Expand Up @@ -135,12 +140,15 @@ impl BeaconBridge {
// Serve LightClientBootstrap data
let api_clone = api.clone();
let portal_clients_clone = Arc::clone(&portal_clients);
let slot_stats = Arc::new(Mutex::new(BeaconSlotStats::new(finalized_slot)));

let slot_stats_clone = slot_stats.clone();
let bootstrap_result = tokio::spawn(async move {
Self::serve_light_client_bootstrap(
api_clone,
portal_clients_clone,
&finalized_block_root,
slot_stats_clone,
)
.await
.or_else(|err| {
Expand All @@ -154,24 +162,32 @@ impl BeaconBridge {
let api_clone = api.clone();
let portal_clients_clone = Arc::clone(&portal_clients);

let slot_stats_clone = slot_stats.clone();
let update_result = tokio::spawn(async move {
Self::serve_light_client_update(api_clone, portal_clients_clone, current_period)
.await
.or_else(|err| {
warn!("Failed to serve light client update: {err}");
Ok::<u64, ()>(current_period)
})
.expect("always return the original or new period")
Self::serve_light_client_update(
api_clone,
portal_clients_clone,
current_period,
slot_stats_clone,
)
.await
.or_else(|err| {
warn!("Failed to serve light client update: {err}");
Ok::<u64, ()>(current_period)
})
.expect("always return the original or new period")
});

// Serve `LightClientFinalityUpdate` data
let api_clone = api.clone();
let portal_clients_clone = Arc::clone(&portal_clients);
let slot_stats_clone = slot_stats.clone();
let finalized_slot = tokio::spawn(async move {
Self::serve_light_client_finality_update(
api_clone,
portal_clients_clone,
finalized_slot,
slot_stats_clone,
)
.await
.or_else(|err| {
Expand All @@ -182,8 +198,11 @@ impl BeaconBridge {
});

// Serve `LightClientOptimisticUpdate` data
tokio::spawn(async move {
if let Err(err) = Self::serve_light_client_optimistic_update(api, portal_clients).await
let slot_stats_clone = slot_stats.clone();
let optimistic_update = tokio::spawn(async move {
if let Err(err) =
Self::serve_light_client_optimistic_update(api, portal_clients, slot_stats_clone)
.await
{
warn!("Failed to serve light client optimistic update: {err}");
}
Expand All @@ -196,6 +215,14 @@ impl BeaconBridge {
let finalized_slot = finalized_slot
.await
.expect("finality update task is never cancelled");
optimistic_update
.await
.expect("optimistic update task is never cancelled");
if let Ok(stats) = slot_stats.lock() {
stats.report();
} else {
warn!("Error displaying beacon gossip stats. Unable to acquire lock.");
};
(new_period, new_finalized_block_root, finalized_slot)
}

Expand All @@ -204,6 +231,7 @@ impl BeaconBridge {
api: ConsensusApi,
portal_clients: Arc<Vec<HttpClient>>,
finalized_block_root: &str,
slot_stats: Arc<Mutex<BeaconSlotStats>>,
) -> anyhow::Result<String> {
let response = api.get_beacon_block_root("finalized".to_owned()).await?;
let response: Value = serde_json::from_str(&response)?;
Expand Down Expand Up @@ -234,7 +262,7 @@ impl BeaconBridge {
});

// Return the latest finalized block root if we successfully gossiped the latest bootstrap.
Self::gossip_beacon_content(portal_clients, content_key, content_value)
Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats)
.await
.map(|_| latest_finalized_block_root)
}
Expand All @@ -243,6 +271,7 @@ impl BeaconBridge {
api: ConsensusApi,
portal_clients: Arc<Vec<HttpClient>>,
current_period: u64,
slot_stats: Arc<Mutex<BeaconSlotStats>>,
) -> anyhow::Result<u64> {
let now = SystemTime::now();
let expected_current_period = expected_current_slot(BEACON_GENESIS_TIME, now) / (32 * 256);
Expand Down Expand Up @@ -282,14 +311,15 @@ impl BeaconBridge {
);

// Update the current known period if we successfully gossiped the latest data.
Self::gossip_beacon_content(portal_clients, content_key, content_value).await?;
Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?;

Ok(expected_current_period)
}

async fn serve_light_client_optimistic_update(
api: ConsensusApi,
portal_clients: Arc<Vec<HttpClient>>,
slot_stats: Arc<Mutex<BeaconSlotStats>>,
) -> anyhow::Result<()> {
let data = api.get_lc_optimistic_update().await?;
let update: Value = serde_json::from_str(&data)?;
Expand All @@ -304,14 +334,14 @@ impl BeaconBridge {
LightClientOptimisticUpdateKey::new(update.signature_slot),
);
let content_value = BeaconContentValue::LightClientOptimisticUpdate(update.into());

Self::gossip_beacon_content(portal_clients, content_key, content_value).await
Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await
}

async fn serve_light_client_finality_update(
api: ConsensusApi,
portal_clients: Arc<Vec<HttpClient>>,
finalized_slot: u64,
slot_stats: Arc<Mutex<BeaconSlotStats>>,
) -> anyhow::Result<u64> {
let data = api.get_lc_finality_update().await?;
let update: Value = serde_json::from_str(&data)?;
Expand Down Expand Up @@ -343,7 +373,7 @@ impl BeaconBridge {
);
let content_value = BeaconContentValue::LightClientFinalityUpdate(update.into());

Self::gossip_beacon_content(portal_clients, content_key, content_value).await?;
Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?;

Ok(new_finalized_slot)
}
Expand All @@ -353,11 +383,19 @@ impl BeaconBridge {
portal_clients: Arc<Vec<HttpClient>>,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
slot_stats: Arc<Mutex<BeaconSlotStats>>,
) -> anyhow::Result<()> {
let mut results = vec![];
for client in portal_clients.as_ref() {
client
.gossip(content_key.clone(), content_value.clone())
.await?;
let result = client
.trace_gossip(content_key.clone(), content_value.clone())
.await;
results.push(result);
}
if let Ok(mut data) = slot_stats.lock() {
data.update(content_key, results.into());
} else {
warn!("Error updating beacon gossip stats. Unable to acquire lock.");
}
Ok(())
}
Expand Down
Loading

0 comments on commit 9834940

Please sign in to comment.