Skip to content

Commit

Permalink
graph: Track bytes read and writes for DIPS metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Mar 7, 2024
1 parent 186ddff commit 25827fc
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 13 deletions.
2 changes: 1 addition & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ where
) {
Ok(()) => (),
Err(e) => {
error!(logger, "Failed to gas metrics to object store"; "error" => e.to_string())
error!(logger, "Failed to upload DIPS metrics"; "error" => e.to_string())
}
}
}
Expand Down
135 changes: 124 additions & 11 deletions graph/src/components/metrics/gas.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::blockchain::BlockPtr;
use crate::components::store::DeploymentId;
use crate::components::store::{DeploymentId, Entity};
use crate::data::store::Id;
use crate::env::ENV_VARS;
use crate::schema::EntityType;
use crate::spawn;
use crate::util::cache_weight::CacheWeight;
use anyhow::{anyhow, Result};
use csv::Writer;
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::path::Path;
use object_store::ObjectStore;
use serde::Serialize;
use slog::{error, info, Logger};
use std::sync::RwLock;
use std::{collections::HashMap, sync::Arc};
Expand All @@ -15,29 +19,53 @@ use url::Url;
pub struct GasMetrics {
pub gas_counter: Arc<RwLock<HashMap<String, u64>>>,
pub op_counter: Arc<RwLock<HashMap<String, u64>>>,
pub read_bytes_counter: Arc<RwLock<HashMap<(EntityType, Id), u64>>>,
pub write_bytes_counter: Arc<RwLock<HashMap<(EntityType, Id), u64>>>,
}

impl GasMetrics {
pub fn new() -> Self {
let gas_counter = Arc::new(RwLock::new(HashMap::new()));
let op_counter = Arc::new(RwLock::new(HashMap::new()));
let read_bytes_counter = Arc::new(RwLock::new(HashMap::new()));
let write_bytes_counter = Arc::new(RwLock::new(HashMap::new()));

GasMetrics {
gas_counter,
op_counter,
read_bytes_counter,
write_bytes_counter,
}
}

// Converts the map to CSV and returns it as a String
fn map_to_csv(data: &HashMap<String, u64>) -> Result<String> {
fn serialize_to_csv<T: Serialize, U: Serialize, I: IntoIterator<Item = T>>(
data: I,
column_names: U,
) -> Result<String> {
let mut wtr = Writer::from_writer(vec![]);
for (key, value) in data {
wtr.serialize((key, value))?;
wtr.serialize(column_names)?;
for record in data {
wtr.serialize(record)?;
}
wtr.flush()?;
Ok(String::from_utf8(wtr.into_inner()?)?)
}

fn map_to_csv(data: &HashMap<String, u64>, column_names: (&str, &str)) -> Result<String> {
Self::serialize_to_csv(data.iter().map(|(key, value)| (key, value)), column_names)
}

fn entity_stats_to_csv(
data: &HashMap<(EntityType, Id), u64>,
column_names: (&str, &str, &str),
) -> Result<String> {
Self::serialize_to_csv(
data.iter()
.map(|((entity_type, id), value)| (entity_type.typename(), id.to_string(), value)),
column_names,
)
}

async fn write_csv_to_store(bucket: &str, path: &str, data: String) -> Result<()> {
let data_bytes = data.into_bytes();

Expand All @@ -64,15 +92,27 @@ impl GasMetrics {
subgraph_id: DeploymentId,
) -> Result<()> {
let logger = logger.clone();
let gas_data = Self::map_to_csv(&self.gas_counter.read().unwrap())?;
let op_data = Self::map_to_csv(&self.op_counter.read().unwrap())?;
let gas_data = Self::map_to_csv(&self.gas_counter.read().unwrap(), ("method", "gas"))?;
let op_data = Self::map_to_csv(&self.op_counter.read().unwrap(), ("method", "count"))?;
let read_bytes_data = Self::entity_stats_to_csv(
&self.read_bytes_counter.read().unwrap(),
("entity", "id", "bytes"),
)?;
let write_bytes_data = Self::entity_stats_to_csv(
&self.write_bytes_counter.read().unwrap(),
("entity", "id", "bytes"),
)?;
let bucket = &ENV_VARS.dips_metrics_object_store_url;

match bucket {
Some(bucket) => {
spawn(async move {
let gas_file = format!("{}/{}/gas.csv", subgraph_id, block_ptr.number);
let op_file = format!("{}/{}/op.csv", subgraph_id, block_ptr.number);
let read_bytes_file =
format!("{}/{}/read_bytes.csv", subgraph_id, block_ptr.number);
let write_bytes_file =
format!("{}/{}/write_bytes.csv", subgraph_id, block_ptr.number);

match Self::write_csv_to_store(bucket, &gas_file, gas_data).await {
Ok(_) => {
Expand All @@ -95,14 +135,47 @@ impl GasMetrics {
}
Err(e) => error!(logger, "Error writing op metrics to object-store: {}", e),
}

match Self::write_csv_to_store(bucket, &read_bytes_file, read_bytes_data).await
{
Ok(_) => {
info!(
logger,
"Wrote read bytes metrics to object-store for block {}",
block_ptr.number
);
}
Err(e) => {
error!(
logger,
"Error writing read bytes metrics to object-store: {}", e
)
}
}

match Self::write_csv_to_store(bucket, &write_bytes_file, write_bytes_data)
.await
{
Ok(_) => {
info!(
logger,
"Wrote write bytes metrics to object-store for block {}",
block_ptr.number
);
}
Err(e) => {
error!(
logger,
"Error writing write bytes metrics to object-store: {}", e
)
}
}
});
}
None => return Err(anyhow!("Failed to parse gas metrics object store URL"))?,
None => return Err(anyhow!("Failed to parse object store URL"))?,
}

// Clear the maps
self.gas_counter.write().unwrap().clear();
self.op_counter.write().unwrap().clear();
self.reset_counters();

Ok(())
}
Expand All @@ -111,6 +184,14 @@ impl GasMetrics {
Arc::new(Self::new())
}

// Reset all counters
pub fn reset_counters(&self) {
self.gas_counter.write().unwrap().clear();
self.op_counter.write().unwrap().clear();
self.read_bytes_counter.write().unwrap().clear();
self.write_bytes_counter.write().unwrap().clear();
}

pub fn track_gas(&self, method: &str, gas_used: u64) {
let mut map = self.gas_counter.write().unwrap(); //
let counter = map.entry(method.to_string()).or_insert(0);
Expand All @@ -122,4 +203,36 @@ impl GasMetrics {
let counter = map.entry(method.to_string()).or_insert(0);
*counter += op_count;
}

pub fn track_entity_read(&self, entity_type: &EntityType, entity: &Entity) {
let mut map = self.read_bytes_counter.write().unwrap();

let counter = map.entry((entity_type.clone(), entity.id())).or_insert(0);
*counter += entity.weight() as u64;
}

pub fn track_entity_write(&self, entity_type: &EntityType, entity: &Entity) {
let mut map = self.write_bytes_counter.write().unwrap();

let counter = map.entry((entity_type.clone(), entity.id())).or_insert(0);
*counter += entity.weight() as u64;
}

pub fn track_entity_read_batch(&self, entity_type: &EntityType, entities: &[Entity]) {
let mut map = self.read_bytes_counter.write().unwrap();

for entity in entities {
let counter = map.entry((entity_type.clone(), entity.id())).or_insert(0);
*counter += entity.weight() as u64;
}
}

pub fn track_entity_write_batch(&self, entity_type: &EntityType, entities: &[Entity]) {
let mut map = self.write_bytes_counter.write().unwrap();

for entity in entities {
let counter = map.entry((entity_type.clone(), entity.id())).or_insert(0);
*counter += entity.weight() as u64;
}
}
}
2 changes: 1 addition & 1 deletion graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub struct EnvVars {
pub subgraph_settings: Option<String>,
/// Whether to prefer substreams blocks streams over firehose when available.
pub prefer_substreams_block_streams: bool,
/// Set by the flag `GRAPH_ENABLE_GAS_METRICS`. Whether to enable
/// Set by the flag `GRAPH_ENABLE_DIPS_METRICS`. Whether to enable
/// gas metrics. Off by default.
pub enable_dips_metrics: bool,
/// Set by the env var `GRAPH_HISTORY_BLOCKS_OVERRIDE`. Defaults to None
Expand Down
20 changes: 20 additions & 0 deletions graph/src/runtime/gas/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ mod ops;
mod saturating;
mod size_of;
use crate::components::metrics::gas::GasMetrics;
use crate::components::store::Entity;
use crate::prelude::{CheapClone, ENV_VARS};
use crate::runtime::DeterministicHostError;
use crate::schema::EntityType;
pub use combinators::*;
pub use costs::DEFAULT_BASE_COST;
pub use costs::*;
Expand Down Expand Up @@ -135,6 +137,24 @@ impl GasCounter {
self.consume_host_fn_inner(amount, Some(method))
}

pub fn track_entity_read(&self, entity_type: &EntityType, entity: &Entity) {
if ENV_VARS.enable_dips_metrics {
self.metrics.track_entity_read(entity_type, entity);
}
}

pub fn track_entity_reads(&self, entity_type: &EntityType, entities: &[Entity]) {
if ENV_VARS.enable_dips_metrics {
self.metrics.track_entity_read_batch(entity_type, entities);
}
}

pub fn track_entity_write(&self, entity_type: &EntityType, entity: &Entity) {
if ENV_VARS.enable_dips_metrics {
self.metrics.track_entity_write(entity_type, entity);
}
}

pub fn get(&self) -> Gas {
Gas(self.counter.load(SeqCst))
}
Expand Down
8 changes: 8 additions & 0 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ impl HostExports {
);
poi_section.end();

gas.track_entity_write(&entity_type, &entity);

state.entity_cache.set(key, entity)?;

Ok(())
Expand Down Expand Up @@ -394,6 +396,10 @@ impl HostExports {
"store_get",
)?;

if let Some(ref entity) = result {
gas.track_entity_read(&entity_type, entity);
}

Ok(result)
}

Expand Down Expand Up @@ -421,6 +427,8 @@ impl HostExports {
"store_load_related",
)?;

gas.track_entity_reads(&entity_type, &result);

Ok(result)
}

Expand Down

0 comments on commit 25827fc

Please sign in to comment.