Skip to content

Commit

Permalink
core, graph: track gas metrics in csv
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Feb 19, 2024
1 parent b1bf41c commit 256520d
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 57 deletions.
95 changes: 95 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
store.shard().to_string(),
);

let gas_metrics = GasMetrics::new(deployment.hash.clone(), self.metrics_registry.clone());
let gas_metrics = Arc::new(GasMetrics::new());

let unified_mapping_api_version = manifest.unified_mapping_api_version()?;
let triggers_adapter = chain.triggers_adapter(&deployment, &required_capabilities, unified_mapping_api_version).map_err(|e|
Expand All @@ -359,7 +359,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
registry.cheap_clone(),
deployment.hash.as_str(),
stopwatch_metrics.clone(),
gas_metrics.clone(),
gas_metrics,
));

let subgraph_metrics = Arc::new(SubgraphInstanceMetrics::new(
Expand Down
17 changes: 16 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ where
persisted_data_sources.extend(persisted_off_chain_data_sources);
store
.transact_block_operations(
block_ptr,
block_ptr.clone(),
block.timestamp(),
firehose_cursor,
mods,
Expand Down Expand Up @@ -607,6 +607,21 @@ where
return Err(BlockProcessingError::Canceled);
}

// Spawns a task to upload gas metrics to GCS, enabled via ENV_VARS.enable_gas_metrics.
// Logs errors without interrupting block processing.
if ENV_VARS.enable_gas_metrics {
match self.metrics.host.gas_metrics.flush_metrics_to_gcs(
&logger,
block_ptr,
self.inputs.deployment.id,
) {
Ok(()) => (),
Err(e) => {
error!(logger, "Failed to gas metrics to GCS"; "error" => e.to_string())
}
}
}

match needs_restart {
true => Ok(Action::Restart),
false => Ok(Action::Continue),
Expand Down
2 changes: 2 additions & 0 deletions graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ itertools = "0.12.0"
# Without the "arbitrary_precision" feature, we get the error `data did not match any variant of untagged enum Response`.
web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-patches-onto-0.18", features = ["arbitrary_precision"] }
serde_plain = "1.0.2"
csv = "1.3.0"
cloud-storage = { version = "0.11.1", features = ["global-client"] }

[dev-dependencies]
clap = { version = "3.2.25", features = ["derive", "env"] }
Expand Down
140 changes: 94 additions & 46 deletions graph/src/components/metrics/gas.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,112 @@
use super::MetricsRegistry;
use crate::prelude::DeploymentHash;
use prometheus::CounterVec;
use std::sync::Arc;
use crate::blockchain::BlockPtr;
use crate::components::store::DeploymentId;
use crate::env::ENV_VARS;
use crate::spawn;
use anyhow::Result;
use cloud_storage::{Bucket, Object};
use csv::Writer;
use slog::{error, info, Logger};
use std::sync::RwLock;
use std::{collections::HashMap, sync::Arc};

#[derive(Clone)]
pub struct GasMetrics {
pub gas_counter: CounterVec,
pub op_counter: CounterVec,
pub gas_counter_map: Arc<RwLock<HashMap<String, u64>>>,
pub op_counter_map: Arc<RwLock<HashMap<String, u64>>>,
}

impl GasMetrics {
pub fn new(subgraph_id: DeploymentHash, registry: Arc<MetricsRegistry>) -> Self {
let gas_counter = registry
.global_deployment_counter_vec(
"deployment_gas",
"total gas used",
subgraph_id.as_str(),
&["method"],
)
.unwrap_or_else(|err| {
panic!(
"Failed to register deployment_gas prometheus counter for {}: {}",
subgraph_id, err
)
});

let op_counter = registry
.global_deployment_counter_vec(
"deployment_op_count",
"total number of operations",
subgraph_id.as_str(),
&["method"],
)
.unwrap_or_else(|err| {
panic!(
"Failed to register deployment_op_count prometheus counter for {}: {}",
subgraph_id, err
)
});
pub fn new() -> Self {
let gas_counter_map = Arc::new(RwLock::new(HashMap::new()));
let op_counter_map = Arc::new(RwLock::new(HashMap::new()));

GasMetrics {
gas_counter,
op_counter,
gas_counter_map,
op_counter_map,
}
}

pub fn mock() -> Self {
let subgraph_id = DeploymentHash::default();
Self::new(subgraph_id, Arc::new(MetricsRegistry::mock()))
// Converts the map to CSV and returns it as a String
fn map_to_csv(data: &HashMap<String, u64>) -> Result<String> {
let mut wtr = Writer::from_writer(vec![]);
for (key, value) in data {
wtr.serialize((key, value))?;
}
wtr.flush()?;
Ok(String::from_utf8(wtr.into_inner()?)?)
}

async fn write_csv_to_gcs(bucket_name: &str, path: &str, data: String) -> Result<()> {
let bucket = Bucket::read(bucket_name).await?;

let data_bytes = data.into_bytes();

let _ = Object::create(&bucket.name, data_bytes, path, "text/csv").await?;

Ok(())
}

/// Flushes gas and op metrics to GCS asynchronously, clearing metrics maps afterward.
///
/// Serializes metrics to CSV and spawns tasks for GCS upload, logging successes or errors.
/// Metrics are organized by block number and subgraph ID in the GCS bucket.
/// Returns `Ok(())` to indicate the upload process has started.
pub fn flush_metrics_to_gcs(
&self,
logger: &Logger,
block_ptr: BlockPtr,
subgraph_id: DeploymentId,
) -> Result<()> {
let logger = logger.clone();
let gas_data = Self::map_to_csv(&self.gas_counter_map.read().unwrap())?;
let op_data = Self::map_to_csv(&self.op_counter_map.read().unwrap())?;

spawn(async move {
let gas_file = format!("{}/gas/{}.csv", subgraph_id, block_ptr.number);
let op_file = format!("{}/op/{}.csv", subgraph_id, block_ptr.number);

let bucket = &ENV_VARS.gas_metrics_gcs_bucket;

match Self::write_csv_to_gcs(bucket, &gas_file, gas_data).await {
Ok(_) => {
info!(
logger,
"Wrote gas metrics to GCS for block {}", block_ptr.number
);
}
Err(e) => error!(logger, "Error writing gas metrics to GCS: {}", e),
}

match Self::write_csv_to_gcs(bucket, &op_file, op_data).await {
Ok(_) => {
info!(
logger,
"Wrote op metrics to GCS for block {}", block_ptr.number
);
}
Err(e) => error!(logger, "Error writing op metrics to GCS: {}", e),
}
});

// Clear the maps
self.gas_counter_map.write().unwrap().clear();
self.op_counter_map.write().unwrap().clear();

Ok(())
}

pub fn mock() -> Arc<Self> {
Arc::new(Self::new())
}

pub fn track_gas(&self, method: &str, gas_used: u64) {
self.gas_counter
.with_label_values(&[method])
.inc_by(gas_used as f64);
let mut map = self.gas_counter_map.write().unwrap(); //
let counter = map.entry(method.to_string()).or_insert(0);
*counter += gas_used;
}

pub fn track_operations(&self, method: &str, op_count: u64) {
self.op_counter
.with_label_values(&[method])
.inc_by(op_count as f64);
let mut map = self.op_counter_map.write().unwrap();
let counter = map.entry(method.to_string()).or_insert(0);
*counter += op_count;
}
}
4 changes: 2 additions & 2 deletions graph/src/components/subgraph/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub struct HostMetrics {
handler_execution_time: Box<HistogramVec>,
host_fn_execution_time: Box<HistogramVec>,
eth_call_execution_time: Box<HistogramVec>,
pub gas_metrics: GasMetrics,
pub gas_metrics: Arc<GasMetrics>,
pub stopwatch: StopwatchMetrics,
}

Expand All @@ -111,7 +111,7 @@ impl HostMetrics {
registry: Arc<MetricsRegistry>,
subgraph: &str,
stopwatch: StopwatchMetrics,
gas_metrics: GasMetrics,
gas_metrics: Arc<GasMetrics>,
) -> Self {
let handler_execution_time = registry
.new_deployment_histogram_vec(
Expand Down
Loading

0 comments on commit 256520d

Please sign in to comment.