Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Commit

Permalink
add farmer metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ParthDesai committed Feb 6, 2024
1 parent b45c73e commit ddab0b2
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 3 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sdk/farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ lru = "0.12.2"
libmimalloc-sys = { version = "0.1.35", features = ["extended"] }
parking_lot = "0.12"
pin-project = "1"
prometheus-client = "0.22.0"
rayon = "1.7.0"
sdk-traits = { path = "../traits" }
sdk-utils = { path = "../utils" }
Expand All @@ -24,6 +25,7 @@ subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev =
subspace-erasure-coding = { git = "https://github.com/subspace/subspace", rev = "e8ed377ecee4e1992ffb731d67cd04bf444e6382" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "e8ed377ecee4e1992ffb731d67cd04bf444e6382", default-features = false }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "e8ed377ecee4e1992ffb731d67cd04bf444e6382" }
subspace-metrics = { git = "https://github.com/subspace/subspace", rev = "e8ed377ecee4e1992ffb731d67cd04bf444e6382" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "e8ed377ecee4e1992ffb731d67cd04bf444e6382" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "e8ed377ecee4e1992ffb731d67cd04bf444e6382", features = ["parallel"] }
subspace-rpc-primitives = { git = "https://github.com/subspace/subspace", rev = "e8ed377ecee4e1992ffb731d67cd04bf444e6382" }
Expand Down
117 changes: 114 additions & 3 deletions sdk/farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![feature(const_option)]

mod metrics;

use std::collections::HashMap;
use std::io;
use std::num::{NonZeroU8, NonZeroUsize};
Expand All @@ -21,13 +23,15 @@ pub use builder::{Builder, Config};
use derivative::Derivative;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use prometheus_client::registry::Registry;
use sdk_traits::Node;
use sdk_utils::{ByteSize, DestructorSet, PublicKey, TaskOutput};
use serde::{Deserialize, Serialize};
use subspace_core_primitives::crypto::kzg;
use subspace_core_primitives::{PieceIndex, Record, SectorIndex};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::piece_cache::PieceCache as FarmerPieceCache;
use subspace_farmer::single_disk_farm::farming::FarmingNotification;
use subspace_farmer::single_disk_farm::{
SectorPlottingDetails, SectorUpdate, SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmId,
SingleDiskFarmInfo, SingleDiskFarmOptions, SingleDiskFarmSummary,
Expand All @@ -41,6 +45,7 @@ use subspace_farmer::utils::{
};
use subspace_farmer::{Identity, KNOWN_PEERS_CACHE_SIZE};
use subspace_farmer_components::sector::{sector_size, SectorMetadataChecksummed};
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::KnownPeersManager;
Expand All @@ -49,6 +54,8 @@ use tokio::sync::{mpsc, oneshot, watch, Mutex, Semaphore};
use tracing::{debug, error, info, warn};
use tracing_futures::Instrument;

use crate::metrics::FarmerMetrics;

/// Description of the farm
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[non_exhaustive]
Expand All @@ -72,6 +79,7 @@ impl FarmDescription {
}

mod builder {
use std::net::SocketAddr;
use std::num::{NonZeroU8, NonZeroUsize};

use derivative::Derivative;
Expand Down Expand Up @@ -192,6 +200,11 @@ mod builder {
/// Threads will be pinned to corresponding CPU cores at creation.
#[builder(default)]
pub replotting_thread_pool_size: Option<NonZeroUsize>,
/// Defines endpoints for the prometheus metrics server. It doesn't
/// start without at least one specified endpoint. Format:
/// 127.0.0.1:8080
#[builder(default)]
pub metrics_endpoints: Vec<SocketAddr>,
}

impl Builder {
Expand Down Expand Up @@ -397,8 +410,37 @@ impl Config {
replotting_thread_pool_size,
sector_downloading_concurrency,
sector_encoding_concurrency,
metrics_endpoints,
} = self;

let mut prometheus_metrics_registry = Registry::default();
let farmer_metrics = FarmerMetrics::new(&mut prometheus_metrics_registry);
let metrics_endpoints_are_specified = !metrics_endpoints.is_empty();

if metrics_endpoints_are_specified {
let (prometheus_worker_drop_sender, prometheus_worker_drop_receiver) =
oneshot::channel();
let prometheus_task = start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(prometheus_metrics_registry),
)
.context("Unable to start farmer prometheus worker")?;

let prometheus_worker_join_handle = sdk_utils::task_spawn(
"sdk-farmer-prometheus-worker",
future::select(prometheus_task, prometheus_worker_drop_receiver),
);

destructors.add_async_destructor({
async move {
let _ = prometheus_worker_drop_sender.send(());
prometheus_worker_join_handle.await.expect(
"awaiting worker should not fail except panic by the worker itself; qed",
);
}
})?;
}

let mut single_disk_farms = Vec::with_capacity(farms.len());
let mut farm_info = HashMap::with_capacity(farms.len());

Expand Down Expand Up @@ -623,22 +665,87 @@ impl Config {
})?;

let mut sector_plotting_handler_ids = vec![];
let mut farming_handler_ids = vec![];
for (disk_farm_index, single_disk_farm) in single_disk_farms.iter().enumerate() {
let readers_and_pieces = Arc::clone(&readers_and_pieces);
let span = tracing::info_span!("farm", %disk_farm_index);

// Collect newly plotted pieces
// TODO: Once we have replotting, this will have to be updated
sector_plotting_handler_ids.push(single_disk_farm.on_sector_update(Arc::new(
sector_plotting_handler_ids.push(single_disk_farm.on_sector_update(Arc::new({
let single_disk_farm_id = *single_disk_farm.id();
let farmer_metrics = farmer_metrics.clone();

move |(_plotted_sector, sector_update)| {
let _span_guard = span.enter();
let farmer_metrics = farmer_metrics.clone();

match sector_update {
SectorUpdate::Plotting(SectorPlottingDetails::Starting { .. }) => {
farmer_metrics.sector_plotting.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Downloading) => {
farmer_metrics.sector_downloading.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)) => {
farmer_metrics
.observe_sector_downloading_time(&single_disk_farm_id, time);
farmer_metrics.sector_downloaded.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Encoding) => {
farmer_metrics.sector_encoding.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)) => {
farmer_metrics.observe_sector_encoding_time(&single_disk_farm_id, time);
farmer_metrics.sector_encoded.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Writing) => {
farmer_metrics.sector_writing.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Written(time)) => {
farmer_metrics.observe_sector_writing_time(&single_disk_farm_id, time);
farmer_metrics.sector_written.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Finished {
plotted_sector: _,
old_plotted_sector: _,
time,
}) => {
farmer_metrics.observe_sector_plotting_time(&single_disk_farm_id, time);
farmer_metrics.sector_plotted.inc();
}
_ => {}
};

handler_on_sector_update(
sector_update,
disk_farm_index,
readers_and_pieces.clone(),
)
},
)));
}
})));

farming_handler_ids.push(single_disk_farm.on_farming_notification(Arc::new({
let single_disk_farm_id = *single_disk_farm.id();
let farmer_metrics = farmer_metrics.clone();

move |farming_notification| match farming_notification {
FarmingNotification::Auditing(auditing_details) => {
farmer_metrics
.observe_auditing_time(&single_disk_farm_id, &auditing_details.time);
}
FarmingNotification::Proving(proving_details) => {
farmer_metrics.observe_proving_time(
&single_disk_farm_id,
&proving_details.time,
proving_details.result,
);
}
FarmingNotification::NonFatalError(error) => {
farmer_metrics.note_farming_error(&single_disk_farm_id, error);
}
}
})));
}

let mut single_disk_farms_stream =
Expand Down Expand Up @@ -704,6 +811,10 @@ impl Config {
destructors.add_items_to_drop(handler_id)?;
}

for handler_id in farming_handler_ids.drain(..) {
destructors.add_items_to_drop(handler_id)?;
}

tracing::debug!("Started farmer");

Ok(Farmer {
Expand Down
Loading

0 comments on commit ddab0b2

Please sign in to comment.