From 0de68f5c11b23573967b32ab71470f009b54895d Mon Sep 17 00:00:00 2001 From: Waylon Jepsen Date: Sat, 4 Nov 2023 16:35:48 +0100 Subject: [PATCH 1/7] file output type --- Cargo.lock | 164 ++++++++++++++++++++++++++ arbiter-core/Cargo.toml | 4 + arbiter-core/src/data_collection.rs | 132 ++++++++++++++++++++- arbiter-core/src/tests/data_output.rs | 73 +++++++++++- 4 files changed, 364 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4ac237b..f97589f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,6 +93,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -257,6 +272,7 @@ dependencies = [ "cargo_metadata 0.18.1", "chrono", "crossbeam-channel", + "csv", "ethers", "futures", "futures-locks", @@ -348,6 +364,28 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "async-trait" version = "0.1.74" @@ -538,6 +576,27 @@ dependencies = [ "zeroize", ] +[[package]] +name = "brotli" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "516074a47ef4bce09577a3b379392300159ce5b1ba2e501ff1c819950066100f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bs58" version = "0.5.0" @@ -1107,6 +1166,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.9.2" @@ -1716,6 +1796,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fast-float" version = "0.2.0" @@ -3183,6 +3269,35 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "parquet-format-safe" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1131c54b167dd4e4799ce762e1ab01549ebb94d5bdd13e6ec1b467491c378e1f" +dependencies = [ + "async-trait", + "futures", +] + +[[package]] +name = "parquet2" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "579fe5745f02cef3d5f236bfed216fd4693e49e4e920a13475c6132233283bce" +dependencies = [ + "async-stream", + "brotli", + "flate2", + "futures", + "lz4", + "parquet-format-safe", + "seq-macro", + "snap", + "streaming-decompression", + "xxhash-rust", + "zstd 0.12.4", +] + [[package]] name = "password-hash" version = "0.4.2" @@ -3553,18 +3668,22 @@ checksum = "d1e50c63db77f846ac5119477422f0156f0a1826ceaae7d921f9a6d5ea5f7ca3" dependencies = [ "ahash 0.8.6", "arrow-format", + "base64 0.21.5", "bytemuck", "chrono", "dyn-clone", "either", "ethnum", + "fallible-streaming-iterator", "foreign_vec", + "futures", "getrandom", "hashbrown 0.14.2", "lexical-core", "lz4", "multiversion", "num-traits", + "parquet2", "polars-error", "rustc_version", "simdutf8", @@ -3610,6 +3729,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b6480520ebde0b20935b600483b865513891e36c04942cebdd19e4f338257b4" dependencies = [ "arrow-format", + "parquet2", "regex", "simdutf8", "thiserror", @@ -3622,9 +3742,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "666466a3b151047c76d99b4e4e5f5438895ef97848008cf49b06df8e3d2d395a" dependencies = [ "ahash 0.8.6", + "async-trait", "bytes", "chrono", "fast-float", + "futures", "home", "itoa", "lexical", @@ -3643,6 +3765,8 @@ dependencies = [ "ryu", "simdutf8", "smartstring", + "tokio", + "tokio-util", ] [[package]] @@ -4542,6 +4666,12 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" +[[package]] +name = "seq-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" + [[package]] name = "serde" version = "1.0.190" @@ -4762,6 +4892,12 @@ dependencies = [ "serde", ] +[[package]] +name = "snap" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" + [[package]] name = "socket2" version = "0.4.10" @@ -4846,6 +4982,15 @@ dependencies = [ "rand", ] +[[package]] +name = "streaming-decompression" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf6cc3b19bfb128a8ad11026086e31d3ce9ad23f8ea37354b31383a187c44cf3" +dependencies = [ + "fallible-streaming-iterator", +] + [[package]] name = "streaming-iterator" version = "0.1.9" @@ -5966,6 +6111,15 @@ dependencies = [ "zstd-safe 5.0.2+zstd.1.5.2", ] +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe 6.0.6", +] + [[package]] name = "zstd" version = "0.13.0" @@ -5985,6 +6139,16 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", +] + [[package]] name = "zstd-safe" version = "7.0.0" diff --git a/arbiter-core/Cargo.toml b/arbiter-core/Cargo.toml index d2f885d2..4ea17be9 100644 --- a/arbiter-core/Cargo.toml +++ b/arbiter-core/Cargo.toml @@ -44,6 +44,10 @@ thiserror = { version = "=1.0.50" } futures-util = { version = "=0.3.29" } tracing = "0.1.40" +# File types +csv = { version = "1.3.0" } +polars = { version = "0.34.2", features = ["parquet", "csv"] } + # Dependencies for the test build and development [dev-dependencies] arbiter-derive = { path = "../arbiter-derive" } diff --git a/arbiter-core/src/data_collection.rs b/arbiter-core/src/data_collection.rs index cccc1bdb..06ab1aa3 100644 --- a/arbiter-core/src/data_collection.rs +++ b/arbiter-core/src/data_collection.rs @@ -28,6 +28,11 @@ use ethers::{ providers::Middleware, types::{Filter, FilteredParams}, }; +use polars::{ + io::parquet::ParquetWriter, + prelude::{CsvWriter, DataFrame, NamedFrom, SerWriter}, + series::Series, +}; use serde::Serialize; use serde_json::Value; @@ -57,6 +62,24 @@ pub struct EventLogger { file_name: Option, decoder: FilterDecoder, receiver: Option>, + output_file_type: Option, +} + + +/// `OutputFileType` is an enumeration that represents the different types of file formats +/// that the `EventLogger` can output to. +#[derive(Debug, Clone, Copy, Serialize)] +pub enum OutputFileType { + /// * `JSON` - Represents the JSON file format. When this variant is used, the `EventLogger` + /// will output the logged events to a JSON file. + JSON, + /// * `CSV` - Represents the CSV (Comma Separated Values) file format. When this variant is used, + /// the `EventLogger` will output the logged events to a CSV file. + CSV, + /// * `Parquet` - Represents the Parquet file format. When this variant is used, the `EventLogger` + /// will output the logged events to a Parquet file. Parquet is a columnar storage file format + /// that is optimized for use with big data processing frameworks. + Parquet, } impl EventLogger { @@ -72,6 +95,7 @@ impl EventLogger { file_name: None, decoder: BTreeMap::new(), receiver: None, + output_file_type: None, } } @@ -141,6 +165,19 @@ impl EventLogger { self } + /// Sets the output file type for the `EventLogger`. + /// The default file type is JSON. + /// # Arguments + /// + /// * `file_type` - The file type that the event logs will be stored in. + /// + /// # Returns + /// + /// The `EventLogger` instance with the specified file type. + pub fn file_type(mut self, file_type: OutputFileType) -> Self { + self.output_file_type = Some(file_type); + self + } /// Executes the `EventLogger`. /// /// This function starts the event logging process. It first deletes the @@ -165,19 +202,102 @@ impl EventLogger { let receiver = self.receiver.unwrap(); let dir = self.directory.unwrap_or("./data".into()); let file_name = self.file_name.unwrap_or("output".into()); + let file_type = self.output_file_type.unwrap_or(OutputFileType::JSON); std::thread::spawn(move || { let mut logs: BTreeMap>> = BTreeMap::new(); while let Ok(broadcast) = receiver.recv() { match broadcast { Broadcast::StopSignal => { // create new directory with path - let output_dir = std::env::current_dir().unwrap().join(dir); + let output_dir = std::env::current_dir().unwrap().join(&dir); std::fs::create_dir_all(&output_dir).unwrap(); - let file_path = output_dir.join(format!("{}.json", file_name)); - let file = std::fs::File::create(file_path).unwrap(); - let writer = BufWriter::new(file); - serde_json::to_writer(writer, &logs).expect("Unable to write data"); - break; + // match the file output type and write to correct file using the right file type + match file_type { + OutputFileType::JSON => { + let file_path = output_dir.join(format!("{}.json", file_name)); + let file = std::fs::File::create(file_path).unwrap(); + let writer = BufWriter::new(file); + serde_json::to_writer(writer, &logs).expect("Unable to write data"); + } + OutputFileType::CSV => { + // 1. Flatten the BTreeMap + let mut contract_names = Vec::new(); + let mut event_names = Vec::new(); + let mut event_values = Vec::new(); + + for (contract, events) in &logs { + for (event, values) in events { + for value in values { + contract_names.push(contract.clone()); + event_names.push(event.clone()); + event_values.push(value.to_string()); + // Assuming Value has a suitable ToString implementation + } + } + } + + // 2. Convert the vectors into a DataFrame + let mut df = DataFrame::new(vec![ + Series::new("contract_name", contract_names), + Series::new("event_name", event_names), + Series::new("event_value", event_values), + ]) + .unwrap(); + + println!("{:?}", df); + + // 3. Write the DataFrame to a CSV file + + let file_path = output_dir.join(format!("{}.csv", file_name)); + let file = std::fs::File::create(file_path).unwrap_or_else(|_| { + panic!("Error creating csv file"); + }); + let mut writer = CsvWriter::new(file); + writer.finish(&mut df).unwrap_or_else(|_| { + panic!("Error writing to csv file"); + }); + // what should happen here is that we turn the logs into a polars data fram and then use the polars csv writer to write the data + } + OutputFileType::Parquet => { + // 1. Flatten the BTreeMap + let mut contract_names = Vec::new(); + let mut event_names = Vec::new(); + let mut event_values = Vec::new(); + + for (contract, events) in &logs { + for (event, values) in events { + for value in values { + contract_names.push(contract.clone()); + event_names.push(event.clone()); + event_values.push(value.to_string()); + // Assuming Value has a suitable ToString implementation + } + } + } + + // 2. Convert the vectors into a DataFrame + let mut df = DataFrame::new(vec![ + Series::new("contract_name", contract_names), + Series::new("event_name", event_names), + Series::new("event_value", event_values), + ]) + .unwrap_or_else(|_| { + panic!("Error creating DataFrame"); + }); + + println!("{:?}", df); + + // 3. Write the DataFrame to a parquet file + let file_path = output_dir.join(format!("{}.parquet", file_name)); + let file = std::fs::File::create(file_path).unwrap_or_else(|_| { + panic!("Error creating parquet file"); + }); + let writer = ParquetWriter::new(file); + writer.finish(&mut df).unwrap_or_else(|_| { + panic!("Error writing to parquet file"); + }); + } + } } Broadcast::Event(event) => { let ethers_logs = revm_logs_to_ethers_logs(event); diff --git a/arbiter-core/src/tests/data_output.rs b/arbiter-core/src/tests/data_output.rs index 85f035b9..fc8dc74b 100644 --- a/arbiter-core/src/tests/data_output.rs +++ b/arbiter-core/src/tests/data_output.rs @@ -1,7 +1,7 @@ -use tracing_test::traced_test; +use std::{fs, path::Path}; use super::*; -use crate::data_collection::EventLogger; +use crate::{data_collection::{EventLogger, OutputFileType}, middleware::errors::RevmMiddlewareError}; #[tokio::test] async fn data_capture() { @@ -38,5 +38,72 @@ async fn data_capture() { } let _ = env.stop(); - std::fs::remove_dir_all("./data").await.unwrap(); + // std::fs::remove_dir_all("./data").unwrap(); } + +#[tokio::test] +async fn csv_output() { + let (env, client) = startup_user_controlled().unwrap(); + let (arbx, arby, lex) = deploy_liquid_exchange(client.clone()).await.unwrap(); + + EventLogger::builder() + .add(arbx.events(), "arbx") + .add(arby.events(), "arby") + .add(lex.events(), "lex") + .file_type(OutputFileType::CSV) + .run() + .unwrap(); + + // Perform some operations that generate events... + generate_events(arbx, arby, lex, client.clone()).await.unwrap_or_else(|e| { + panic!("Error generating events: {}", e); + }); + + let _ = env.stop(); + assert!(Path::new("./data/output.csv").exists()); + // Check if the CSV file was created +} + +#[tokio::test] +async fn parquet_output() { + let (env, client) = startup_user_controlled().unwrap(); + let (arbx, arby, lex) = deploy_liquid_exchange(client.clone()).await.unwrap(); + + EventLogger::builder() + .add(arbx.events(), "arbx") + .add(arby.events(), "arby") + .add(lex.events(), "lex") + .file_type(OutputFileType::Parquet) + .run() + .unwrap(); + + // Perform some operations that generate events... + generate_events(arbx, arby, lex, client.clone()).await.unwrap_or_else(|e| { + panic!("Error generating events: {}", e); + }); + + let _ = env.stop(); + + assert!(Path::new("./data/output.parquet").exists()); +} + +async fn generate_events(arbx: ArbiterToken, arby: ArbiterToken, lex: LiquidExchange, client: Arc) -> Result<(), RevmMiddlewareError>{ + for _ in 0..5 { + arbx.approve(client.address(), U256::from(1)) + .send() + .await + .unwrap() + .await?; + arby.approve(client.address(), U256::from(1)) + .send() + .await + .unwrap() + .await?; + lex.set_price(U256::from(10u128.pow(18))) + .send() + .await + .unwrap() + .await?; + } + Ok(()) +} \ No newline at end of file From 91a373a445ea8a147fe9999cc7f5dec82efae232 Mon Sep 17 00:00:00 2001 From: Kinrezc Date: Sat, 4 Nov 2023 19:29:19 -0400 Subject: [PATCH 2/7] consolidate tests into one function and wait 1 second for race condition on file creation --- arbiter-core/src/tests/data_output.rs | 61 +++++---------------------- 1 file changed, 10 insertions(+), 51 deletions(-) diff --git a/arbiter-core/src/tests/data_output.rs b/arbiter-core/src/tests/data_output.rs index fc8dc74b..3d199539 100644 --- a/arbiter-core/src/tests/data_output.rs +++ b/arbiter-core/src/tests/data_output.rs @@ -9,43 +9,14 @@ async fn data_capture() { let (arbx, arby, lex) = deploy_liquid_exchange(client.clone()).await.unwrap(); println!("Deployed contracts"); - let listener = EventLogger::builder() + // default_listener + EventLogger::builder() .add(arbx.events(), "arbx") .add(arby.events(), "arby") - .add(lex.events(), "lex"); - - listener.run().unwrap(); - - for _ in 0..5 { - arbx.approve(client.address(), U256::from(1)) - .send() - .await - .unwrap() - .await - .unwrap(); - arby.approve(client.address(), U256::from(1)) - .send() - .await - .unwrap() - .await - .unwrap(); - lex.set_price(U256::from(10u128.pow(18))) - .send() - .await - .unwrap() - .await - .unwrap(); - } - - let _ = env.stop(); - // std::fs::remove_dir_all("./data").unwrap(); -} - -#[tokio::test] -async fn csv_output() { - let (env, client) = startup_user_controlled().unwrap(); - let (arbx, arby, lex) = deploy_liquid_exchange(client.clone()).await.unwrap(); - + .add(lex.events(), "lex") + .run() + .unwrap(); + EventLogger::builder() .add(arbx.events(), "arbx") .add(arby.events(), "arby") @@ -54,21 +25,6 @@ async fn csv_output() { .run() .unwrap(); - // Perform some operations that generate events... - generate_events(arbx, arby, lex, client.clone()).await.unwrap_or_else(|e| { - panic!("Error generating events: {}", e); - }); - - let _ = env.stop(); - assert!(Path::new("./data/output.csv").exists()); - // Check if the CSV file was created -} - -#[tokio::test] -async fn parquet_output() { - let (env, client) = startup_user_controlled().unwrap(); - let (arbx, arby, lex) = deploy_liquid_exchange(client.clone()).await.unwrap(); - EventLogger::builder() .add(arbx.events(), "arbx") .add(arby.events(), "arby") @@ -77,14 +33,17 @@ async fn parquet_output() { .run() .unwrap(); - // Perform some operations that generate events... generate_events(arbx, arby, lex, client.clone()).await.unwrap_or_else(|e| { panic!("Error generating events: {}", e); }); let _ = env.stop(); + std::thread::sleep(std::time::Duration::from_secs(1)); + assert!(Path::new("./data/output.csv").exists()); assert!(Path::new("./data/output.parquet").exists()); + assert!(Path::new("./data/output.json").exists()); + std::fs::remove_dir_all("./data").unwrap(); } async fn generate_events(arbx: ArbiterToken, arby: ArbiterToken, lex: LiquidExchange, client: Arc) -> Result<(), RevmMiddlewareError>{ From c33d9f369fb5ddbecd92cf8cf065ad9913c44eb3 Mon Sep 17 00:00:00 2001 From: Kinrezc Date: Sat, 4 Nov 2023 19:29:49 -0400 Subject: [PATCH 3/7] run formatter --- arbiter-core/src/data_collection.rs | 40 +++++++++++++++------------ arbiter-core/src/tests/data_output.rs | 26 +++++++++++------ 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/arbiter-core/src/data_collection.rs b/arbiter-core/src/data_collection.rs index 06ab1aa3..2eed6768 100644 --- a/arbiter-core/src/data_collection.rs +++ b/arbiter-core/src/data_collection.rs @@ -65,20 +65,21 @@ pub struct EventLogger { output_file_type: Option, } - -/// `OutputFileType` is an enumeration that represents the different types of file formats -/// that the `EventLogger` can output to. +/// `OutputFileType` is an enumeration that represents the different types of +/// file formats that the `EventLogger` can output to. #[derive(Debug, Clone, Copy, Serialize)] pub enum OutputFileType { - /// * `JSON` - Represents the JSON file format. When this variant is used, the `EventLogger` - /// will output the logged events to a JSON file. + /// * `JSON` - Represents the JSON file format. When this variant is used, + /// the `EventLogger` will output the logged events to a JSON file. JSON, - /// * `CSV` - Represents the CSV (Comma Separated Values) file format. When this variant is used, - /// the `EventLogger` will output the logged events to a CSV file. + /// * `CSV` - Represents the CSV (Comma Separated Values) file format. When + /// this variant is used, the `EventLogger` will output the logged events + /// to a CSV file. CSV, - /// * `Parquet` - Represents the Parquet file format. When this variant is used, the `EventLogger` - /// will output the logged events to a Parquet file. Parquet is a columnar storage file format - /// that is optimized for use with big data processing frameworks. + /// * `Parquet` - Represents the Parquet file format. When this variant is + /// used, the `EventLogger` will output the logged events to a Parquet + /// file. Parquet is a columnar storage file format that is optimized for + /// use with big data processing frameworks. Parquet, } @@ -168,11 +169,11 @@ impl EventLogger { /// Sets the output file type for the `EventLogger`. /// The default file type is JSON. /// # Arguments - /// + /// /// * `file_type` - The file type that the event logs will be stored in. - /// + /// /// # Returns - /// + /// /// The `EventLogger` instance with the specified file type. pub fn file_type(mut self, file_type: OutputFileType) -> Self { self.output_file_type = Some(file_type); @@ -211,7 +212,8 @@ impl EventLogger { // create new directory with path let output_dir = std::env::current_dir().unwrap().join(&dir); std::fs::create_dir_all(&output_dir).unwrap(); - // match the file output type and write to correct file using the right file type + // match the file output type and write to correct file using the right file + // type match file_type { OutputFileType::JSON => { let file_path = output_dir.join(format!("{}.json", file_name)); @@ -231,7 +233,8 @@ impl EventLogger { contract_names.push(contract.clone()); event_names.push(event.clone()); event_values.push(value.to_string()); - // Assuming Value has a suitable ToString implementation + // Assuming Value has a suitable + // ToString implementation } } } @@ -256,7 +259,9 @@ impl EventLogger { writer.finish(&mut df).unwrap_or_else(|_| { panic!("Error writing to csv file"); }); - // what should happen here is that we turn the logs into a polars data fram and then use the polars csv writer to write the data + // what should happen here is that we turn the + // logs into a polars data fram and then use the + // polars csv writer to write the data } OutputFileType::Parquet => { // 1. Flatten the BTreeMap @@ -270,7 +275,8 @@ impl EventLogger { contract_names.push(contract.clone()); event_names.push(event.clone()); event_values.push(value.to_string()); - // Assuming Value has a suitable ToString implementation + // Assuming Value has a suitable + // ToString implementation } } } diff --git a/arbiter-core/src/tests/data_output.rs b/arbiter-core/src/tests/data_output.rs index 3d199539..6f5a9c0b 100644 --- a/arbiter-core/src/tests/data_output.rs +++ b/arbiter-core/src/tests/data_output.rs @@ -1,7 +1,10 @@ use std::{fs, path::Path}; use super::*; -use crate::{data_collection::{EventLogger, OutputFileType}, middleware::errors::RevmMiddlewareError}; +use crate::{ + data_collection::{EventLogger, OutputFileType}, + middleware::errors::RevmMiddlewareError, +}; #[tokio::test] async fn data_capture() { @@ -10,13 +13,13 @@ async fn data_capture() { println!("Deployed contracts"); // default_listener - EventLogger::builder() + EventLogger::builder() .add(arbx.events(), "arbx") .add(arby.events(), "arby") .add(lex.events(), "lex") .run() .unwrap(); - + EventLogger::builder() .add(arbx.events(), "arbx") .add(arby.events(), "arby") @@ -33,9 +36,11 @@ async fn data_capture() { .run() .unwrap(); - generate_events(arbx, arby, lex, client.clone()).await.unwrap_or_else(|e| { - panic!("Error generating events: {}", e); - }); + generate_events(arbx, arby, lex, client.clone()) + .await + .unwrap_or_else(|e| { + panic!("Error generating events: {}", e); + }); let _ = env.stop(); @@ -46,7 +51,12 @@ async fn data_capture() { std::fs::remove_dir_all("./data").unwrap(); } -async fn generate_events(arbx: ArbiterToken, arby: ArbiterToken, lex: LiquidExchange, client: Arc) -> Result<(), RevmMiddlewareError>{ +async fn generate_events( + arbx: ArbiterToken, + arby: ArbiterToken, + lex: LiquidExchange, + client: Arc, +) -> Result<(), RevmMiddlewareError> { for _ in 0..5 { arbx.approve(client.address(), U256::from(1)) .send() @@ -65,4 +75,4 @@ async fn generate_events(arbx: ArbiterToken, arby: ArbiterToken< .await?; } Ok(()) -} \ No newline at end of file +} From 9606227d8f4b7947df3e577006a87a0d995f543f Mon Sep 17 00:00:00 2001 From: Kinrezc Date: Sat, 4 Nov 2023 19:31:46 -0400 Subject: [PATCH 4/7] fix spelling --- arbiter-core/src/data_collection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbiter-core/src/data_collection.rs b/arbiter-core/src/data_collection.rs index 2eed6768..822826c6 100644 --- a/arbiter-core/src/data_collection.rs +++ b/arbiter-core/src/data_collection.rs @@ -260,7 +260,7 @@ impl EventLogger { panic!("Error writing to csv file"); }); // what should happen here is that we turn the - // logs into a polars data fram and then use the + // logs into a polars data frame and then use the // polars csv writer to write the data } OutputFileType::Parquet => { From 60a28fbe385b0d694fc534fbf28a950c8306b751 Mon Sep 17 00:00:00 2001 From: Kinrezc Date: Sat, 4 Nov 2023 19:32:52 -0400 Subject: [PATCH 5/7] fmt --- arbiter-core/src/data_collection.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/arbiter-core/src/data_collection.rs b/arbiter-core/src/data_collection.rs index 822826c6..a3f50334 100644 --- a/arbiter-core/src/data_collection.rs +++ b/arbiter-core/src/data_collection.rs @@ -260,7 +260,8 @@ impl EventLogger { panic!("Error writing to csv file"); }); // what should happen here is that we turn the - // logs into a polars data frame and then use the + // logs into a polars data frame and then use + // the // polars csv writer to write the data } OutputFileType::Parquet => { From 74a18c739da6adfb0d945c89cbc13056629f48b3 Mon Sep 17 00:00:00 2001 From: Waylon Jepsen Date: Mon, 6 Nov 2023 16:27:24 +0100 Subject: [PATCH 6/7] unused imports --- arbiter-core/src/tests/data_output.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbiter-core/src/tests/data_output.rs b/arbiter-core/src/tests/data_output.rs index 6f5a9c0b..0fcb86eb 100644 --- a/arbiter-core/src/tests/data_output.rs +++ b/arbiter-core/src/tests/data_output.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::path::Path; use super::*; use crate::{ From cc97ad5b0e2c83b7979156fcfb04f2cd1f0b7459 Mon Sep 17 00:00:00 2001 From: Waylon Jepsen Date: Tue, 7 Nov 2023 17:34:03 +0100 Subject: [PATCH 7/7] abstract data_frame flattening merge conflicts nits --- .gitignore | 1 + Cargo.lock | 10 +-- Cargo.toml | 2 +- arbiter-core/data/output.json | 2 +- arbiter-core/src/data_collection.rs | 106 ++++++++++---------------- arbiter-core/src/tests/data_output.rs | 5 +- 6 files changed, 53 insertions(+), 73 deletions(-) diff --git a/.gitignore b/.gitignore index c69e40f9..e5d62e4a 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,5 @@ arbiter/ # mdbook book +arbiter-core/data example_fork/test.json \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 8c187fdb..92cc5c67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -383,7 +383,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -4688,18 +4688,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.191" +version = "1.0.192" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a834c4821019838224821468552240d4d95d14e751986442c816572d39a080c9" +checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.191" +version = "1.0.192" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46fa52d5646bce91b680189fe5b1c049d2ea38dabb4e2e7c8d00ca12cfbfbcfd" +checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 960b1823..e046d5b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ arbiter-core = { version = "0.7.2", path = "arbiter-core" } # Command line and config clap = { version = "=4.4.7", features = ["derive"] } -serde = { version = "=1.0.191", features = ["derive"] } +serde = { version = "=1.0.192", features = ["derive"] } serde_json = { version = "=1.0.108" } config = { version = "=0.13.3" } ethers = { version = "=2.0.10" } diff --git a/arbiter-core/data/output.json b/arbiter-core/data/output.json index 02f8098b..7f9aed4c 100644 --- a/arbiter-core/data/output.json +++ b/arbiter-core/data/output.json @@ -1 +1 @@ -{"events":{"arbx":{"ApprovalFilter":[{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"}]},"arby":{"ApprovalFilter":[{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"}]},"lex":{"PriceChangeFilter":[{"price":"0xde0b6b3a7640000"},{"price":"0xde0b6b3a7640000"},{"price":"0xde0b6b3a7640000"},{"price":"0xde0b6b3a7640000"},{"price":"0xde0b6b3a7640000"}]}},"metadata":{"name":"test"}} \ No newline at end of file +{"events":{"arbx":{"ApprovalFilter":[{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"}]},"arby":{"ApprovalFilter":[{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"},{"owner":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","spender":"0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5","amount":"0x1"}]},"lex":{"PriceChangeFilter":[{"price":"0xde0b6b3a7640000"},{"price":"0xde0b6b3a7640000"},{"price":"0xde0b6b3a7640000"},{"price":"0xde0b6b3a7640000"},{"price":"0xde0b6b3a7640000"}]}},"metadata":null} \ No newline at end of file diff --git a/arbiter-core/src/data_collection.rs b/arbiter-core/src/data_collection.rs index 2a413763..ab2003ce 100644 --- a/arbiter-core/src/data_collection.rs +++ b/arbiter-core/src/data_collection.rs @@ -238,38 +238,21 @@ impl EventLogger { let file_path = output_dir.join(format!("{}.json", file_name)); let file = std::fs::File::create(file_path).unwrap(); let writer = BufWriter::new(file); - serde_json::to_writer(writer, &logs).expect("Unable to write data"); - } - OutputFileType::CSV => { - // 1. Flatten the BTreeMap - let mut contract_names = Vec::new(); - let mut event_names = Vec::new(); - let mut event_values = Vec::new(); - for (contract, events) in &logs { - for (event, values) in events { - for value in values { - contract_names.push(contract.clone()); - event_names.push(event.clone()); - event_values.push(value.to_string()); - // Assuming Value has a suitable - // ToString implementation - } - } + #[derive(Serialize, Clone)] + struct OutputData { + events: BTreeMap>>, + metadata: Option, } - - // 2. Convert the vectors into a DataFrame - let mut df = DataFrame::new(vec![ - Series::new("contract_name", contract_names), - Series::new("event_name", event_names), - Series::new("event_value", event_values), - ]) - .unwrap(); - - println!("{:?}", df); - - // 3. Write the DataFrame to a CSV file - + let data = OutputData { + events: events.clone(), + metadata: metadata.clone(), + }; + serde_json::to_writer(writer, &data).expect("Unable to write data"); + } + OutputFileType::CSV => { + let mut df = flatten_to_data_frame(events.clone()); + // Write the DataFrame to a CSV file let file_path = output_dir.join(format!("{}.csv", file_name)); let file = std::fs::File::create(file_path).unwrap_or_else(|_| { panic!("Error creating csv file"); @@ -278,42 +261,10 @@ impl EventLogger { writer.finish(&mut df).unwrap_or_else(|_| { panic!("Error writing to csv file"); }); - // what should happen here is that we turn the - // logs into a polars data frame and then use - // the - // polars csv writer to write the data } OutputFileType::Parquet => { - // 1. Flatten the BTreeMap - let mut contract_names = Vec::new(); - let mut event_names = Vec::new(); - let mut event_values = Vec::new(); - - for (contract, events) in &logs { - for (event, values) in events { - for value in values { - contract_names.push(contract.clone()); - event_names.push(event.clone()); - event_values.push(value.to_string()); - // Assuming Value has a suitable - // ToString implementation - } - } - } - - // 2. Convert the vectors into a DataFrame - let mut df = DataFrame::new(vec![ - Series::new("contract_name", contract_names), - Series::new("event_name", event_names), - Series::new("event_value", event_values), - ]) - .unwrap_or_else(|_| { - panic!("Error creating DataFrame"); - }); - - println!("{:?}", df); - - // 3. Write the DataFrame to a parquet file + let mut df = flatten_to_data_frame(events.clone()); + // Write the DataFrame to a parquet file let file_path = output_dir.join(format!("{}.parquet", file_name)); let file = std::fs::File::create(file_path).unwrap_or_else(|_| { panic!("Error creating parquet file"); @@ -367,6 +318,33 @@ impl EventLogger { } } +fn flatten_to_data_frame(events: BTreeMap>>) -> DataFrame { + // 1. Flatten the BTreeMap + let mut contract_names = Vec::new(); + let mut event_names = Vec::new(); + let mut event_values = Vec::new(); + + for (contract, events) in &events { + for (event, values) in events { + for value in values { + contract_names.push(contract.clone()); + event_names.push(event.clone()); + event_values.push(value.to_string()); + } + } + } + + // 2. Convert the vectors into a DataFrame + let df = DataFrame::new(vec![ + Series::new("contract_name", contract_names), + Series::new("event_name", event_names), + Series::new("event_value", event_values), + ]) + .unwrap(); + println!("{:?}", df); + + df +} struct EventTransmuted { /// The event filter's state pub filter: Filter, diff --git a/arbiter-core/src/tests/data_output.rs b/arbiter-core/src/tests/data_output.rs index a0410494..214b8566 100644 --- a/arbiter-core/src/tests/data_output.rs +++ b/arbiter-core/src/tests/data_output.rs @@ -1,4 +1,5 @@ use std::path::Path; + use serde::Serialize; use super::*; @@ -24,6 +25,7 @@ async fn data_capture() { .add(arby.events(), "arby") .add(lex.events(), "lex") .run() + .unwrap(); let metadata = MockMetadata { name: "test".to_string(), @@ -64,7 +66,7 @@ async fn data_capture() { assert!(Path::new("./data/output.csv").exists()); assert!(Path::new("./data/output.parquet").exists()); assert!(Path::new("./data/output.json").exists()); - std::fs::remove_dir_all("./data").unwrap(); + // std::fs::remove_dir_all("./data").unwrap(); } async fn generate_events( @@ -91,5 +93,4 @@ async fn generate_events( .await?; } Ok(()) - }