diff --git a/Cargo.lock b/Cargo.lock index 855c02a22b9c..ea4ffec0dbcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,9 +91,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.20" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "alloy-chains" @@ -161,7 +161,7 @@ dependencies = [ "alloy-transport", "futures", "futures-util", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -265,7 +265,7 @@ dependencies = [ "alloy-sol-types", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", "tracing", ] @@ -291,7 +291,7 @@ dependencies = [ "futures-utils-wasm", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -319,7 +319,7 @@ dependencies = [ "rand 0.8.5", "serde_json", "tempfile", - "thiserror 2.0.3", + "thiserror 2.0.4", "tracing", "url", ] @@ -341,7 +341,7 @@ dependencies = [ "getrandom 0.2.15", "hashbrown 0.15.2", "hex-literal", - "indexmap 2.6.0", + "indexmap 2.7.0", "itoa", "k256", "keccak-asm", @@ -390,7 +390,7 @@ dependencies = [ "schnellru", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", "url", @@ -523,7 +523,7 @@ dependencies = [ "alloy-serde", "serde", "serde_with", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -604,7 +604,7 @@ dependencies = [ "alloy-serde", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -642,7 +642,7 @@ dependencies = [ "auto_impl", "elliptic-curve", "k256", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -660,7 +660,7 @@ dependencies = [ "coins-bip39", "k256", "rand 0.8.5", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -686,7 +686,7 @@ dependencies = [ "alloy-sol-macro-input", "const-hex", "heck", - "indexmap 2.6.0", + "indexmap 2.7.0", "proc-macro-error2", "proc-macro2", "quote", @@ -745,7 +745,7 @@ dependencies = [ "futures-utils-wasm", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tower 0.5.1", "tracing", @@ -897,9 +897,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.93" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" +checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" [[package]] name = "aquamarine" @@ -1385,7 +1385,7 @@ dependencies = [ "bitflags 2.6.0", "boa_interner", "boa_macros", - "indexmap 2.6.0", + "indexmap 2.7.0", "num-bigint", "rustc-hash 2.1.0", ] @@ -1411,7 +1411,7 @@ dependencies = [ "fast-float", "hashbrown 0.14.5", "icu_normalizer", - "indexmap 2.6.0", + "indexmap 2.7.0", "intrusive-collections", "itertools 0.13.0", "num-bigint", @@ -1457,7 +1457,7 @@ dependencies = [ "boa_gc", "boa_macros", "hashbrown 0.14.5", - "indexmap 2.6.0", + "indexmap 2.7.0", "once_cell", "phf", "rustc-hash 2.1.0", @@ -1783,9 +1783,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.21" +version = "4.5.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" +checksum = "69371e34337c4c984bbe322360c2547210bf632eb2814bbe78a6e87a2935bd2b" dependencies = [ "clap_builder", "clap_derive", @@ -1793,9 +1793,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.21" +version = "4.5.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" +checksum = "6e24c1b4099818523236a8ca881d2b45db98dadfb4625cf6608c12069fcbbde1" dependencies = [ "anstream", "anstyle", @@ -2655,7 +2655,7 @@ dependencies = [ "revm", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", "walkdir", ] @@ -2811,7 +2811,7 @@ dependencies = [ "reth-node-ethereum", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -2899,7 +2899,7 @@ dependencies = [ "reth-tracing", "reth-trie-db", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", ] @@ -3067,6 +3067,7 @@ dependencies = [ "reth-chainspec", "reth-discv4", "reth-network", + "reth-network-api", "reth-primitives", "reth-tracing", "secp256k1", @@ -3538,7 +3539,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.6.0", + "indexmap 2.7.0", "slab", "tokio", "tokio-util", @@ -3699,9 +3700,9 @@ dependencies = [ [[package]] name = "http" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea" dependencies = [ "bytes", "fnv", @@ -4133,9 +4134,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" +checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "arbitrary", "equivalent", @@ -4162,7 +4163,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88" dependencies = [ "ahash", - "indexmap 2.6.0", + "indexmap 2.7.0", "is-terminal", "itoa", "log", @@ -4891,7 +4892,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b6f8152da6d7892ff1b7a1c0fa3f435e92b5918ad67035c3bb432111d9a29b" dependencies = [ "base64 0.22.1", - "indexmap 2.6.0", + "indexmap 2.7.0", "metrics", "metrics-util", "quanta", @@ -4923,7 +4924,7 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.15.2", - "indexmap 2.6.0", + "indexmap 2.7.0", "metrics", "ordered-float", "quanta", @@ -5342,7 +5343,7 @@ dependencies = [ "derive_more 1.0.0", "serde", "serde_with", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -5357,7 +5358,7 @@ dependencies = [ "alloy-sol-types", "serde", "serde_repr", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -5394,7 +5395,7 @@ dependencies = [ "op-alloy-consensus", "op-alloy-genesis", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", "tracing", "unsigned-varint", ] @@ -5434,7 +5435,7 @@ dependencies = [ "op-alloy-protocol", "serde", "snap", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -5514,9 +5515,9 @@ dependencies = [ [[package]] name = "parity-scale-codec" -version = "3.7.0" +version = "3.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8be4817d39f3272f69c59fe05d0535ae6456c2dc2fa1ba02910296c7e0a5c590" +checksum = "306800abfa29c7f16596b5970a588435e3d5b3149683d00c12b699cc19f895ee" dependencies = [ "arbitrary", "arrayvec", @@ -5525,20 +5526,19 @@ dependencies = [ "bytes", "impl-trait-for-tuples", "parity-scale-codec-derive", - "rustversion", "serde", ] [[package]] name = "parity-scale-codec-derive" -version = "3.7.0" +version = "3.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8781a75c6205af67215f382092b6e0a4ff3734798523e69073d4bcd294ec767b" +checksum = "d830939c76d294956402033aee57a6da7b438f2294eb94864c37b0569053a42c" dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.90", + "syn 1.0.109", ] [[package]] @@ -6058,7 +6058,7 @@ dependencies = [ "rustc-hash 2.1.0", "rustls", "socket2", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", ] @@ -6077,7 +6077,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror 2.0.3", + "thiserror 2.0.4", "tinyvec", "tracing", "web-time", @@ -6535,7 +6535,7 @@ dependencies = [ "reth-tokio-util", "reth-tracing", "schnellru", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tracing", @@ -6571,7 +6571,7 @@ dependencies = [ "reth-rpc-types-compat", "reth-tracing", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tower 0.4.13", "tracing", @@ -6625,7 +6625,7 @@ dependencies = [ "reth-execution-errors", "reth-primitives", "reth-storage-errors", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -6781,7 +6781,7 @@ dependencies = [ "reth-fs-util", "secp256k1", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", "tikv-jemallocator", "tracy-client", ] @@ -6926,7 +6926,7 @@ dependencies = [ "sysinfo", "tempfile", "test-fuzz", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -6983,7 +6983,7 @@ dependencies = [ "reth-trie-db", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", "tracing", ] @@ -7025,7 +7025,7 @@ dependencies = [ "schnellru", "secp256k1", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tracing", @@ -7050,7 +7050,7 @@ dependencies = [ "reth-network-peers", "reth-tracing", "secp256k1", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", ] @@ -7076,7 +7076,7 @@ dependencies = [ "secp256k1", "serde", "serde_with", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tracing", @@ -7115,7 +7115,7 @@ dependencies = [ "reth-testing-utils", "reth-tracing", "tempfile", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tokio-util", @@ -7192,7 +7192,7 @@ dependencies = [ "secp256k1", "sha2 0.10.8", "sha3", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tokio-util", @@ -7248,7 +7248,7 @@ dependencies = [ "reth-primitives-traits", "reth-trie", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", ] @@ -7275,7 +7275,7 @@ dependencies = [ "reth-prune", "reth-stages-api", "reth-tasks", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", ] @@ -7330,7 +7330,7 @@ dependencies = [ "reth-trie-parallel", "reth-trie-sparse", "revm-primitives", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", ] @@ -7376,7 +7376,7 @@ dependencies = [ "reth-execution-errors", "reth-fs-util", "reth-storage-errors", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -7409,7 +7409,7 @@ dependencies = [ "serde", "snap", "test-fuzz", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tokio-util", @@ -7438,7 +7438,7 @@ dependencies = [ "reth-primitives", "reth-primitives-traits", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -7504,7 +7504,7 @@ dependencies = [ "proptest-derive", "rustc-hash 2.1.0", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -7605,7 +7605,7 @@ dependencies = [ "reth-prune-types", "reth-storage-errors", "revm-primitives", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -7700,7 +7700,7 @@ dependencies = [ "reth-transaction-pool", "reth-trie-db", "tempfile", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", ] @@ -7727,7 +7727,7 @@ version = "1.1.2" dependencies = [ "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -7770,7 +7770,7 @@ dependencies = [ "rand 0.8.5", "reth-tracing", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tokio-util", @@ -7787,7 +7787,7 @@ dependencies = [ "criterion", "dashmap 6.1.0", "derive_more 1.0.0", - "indexmap 2.6.0", + "indexmap 2.7.0", "parking_lot", "pprof", "rand 0.8.5", @@ -7795,7 +7795,7 @@ dependencies = [ "reth-mdbx-sys", "smallvec", "tempfile", - "thiserror 2.0.3", + "thiserror 2.0.4", "tracing", ] @@ -7834,7 +7834,7 @@ dependencies = [ "reqwest", "reth-tracing", "serde_with", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", ] @@ -7894,7 +7894,7 @@ dependencies = [ "serial_test", "smallvec", "tempfile", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tokio-util", @@ -7919,7 +7919,7 @@ dependencies = [ "reth-network-types", "reth-tokio-util", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", ] @@ -7957,7 +7957,7 @@ dependencies = [ "secp256k1", "serde_json", "serde_with", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "url", ] @@ -7988,7 +7988,7 @@ dependencies = [ "reth-fs-util", "serde", "tempfile", - "thiserror 2.0.3", + "thiserror 2.0.4", "tracing", "zstd", ] @@ -8122,7 +8122,7 @@ dependencies = [ "serde", "shellexpand", "strum", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "toml", "tracing", @@ -8450,7 +8450,7 @@ dependencies = [ "reth-transaction-pool", "revm", "sha2 0.10.8", - "thiserror 2.0.3", + "thiserror 2.0.4", "tracing", ] @@ -8517,7 +8517,7 @@ dependencies = [ "reth-transaction-pool", "revm", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", ] @@ -8582,7 +8582,7 @@ dependencies = [ "reth-primitives", "revm-primitives", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", ] @@ -8757,7 +8757,7 @@ dependencies = [ "reth-tokio-util", "reth-tracing", "rustc-hash 2.1.0", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", ] @@ -8778,7 +8778,7 @@ dependencies = [ "serde", "serde_json", "test-fuzz", - "thiserror 2.0.3", + "thiserror 2.0.4", "toml", ] @@ -8864,7 +8864,7 @@ dependencies = [ "revm-primitives", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tower 0.4.13", @@ -8958,7 +8958,7 @@ dependencies = [ "reth-transaction-pool", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-util", "tower 0.4.13", @@ -8999,7 +8999,7 @@ dependencies = [ "reth-tokio-util", "reth-transaction-pool", "serde", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", ] @@ -9083,7 +9083,7 @@ dependencies = [ "schnellru", "serde", "serde_json", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tracing", @@ -9183,7 +9183,7 @@ dependencies = [ "reth-trie", "reth-trie-db", "tempfile", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", ] @@ -9210,7 +9210,7 @@ dependencies = [ "reth-static-file-types", "reth-testing-utils", "reth-tokio-util", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tracing", @@ -9316,7 +9316,7 @@ dependencies = [ "pin-project", "rayon", "reth-metrics", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", "tracing-futures", @@ -9400,7 +9400,7 @@ dependencies = [ "serde_json", "smallvec", "tempfile", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tokio-stream", "tracing", @@ -9515,7 +9515,7 @@ dependencies = [ "reth-trie", "reth-trie-common", "reth-trie-db", - "thiserror 2.0.3", + "thiserror 2.0.4", "tokio", "tracing", ] @@ -9541,7 +9541,7 @@ dependencies = [ "reth-trie", "reth-trie-common", "smallvec", - "thiserror 2.0.3", + "thiserror 2.0.4", ] [[package]] @@ -9561,9 +9561,9 @@ dependencies = [ [[package]] name = "revm-inspectors" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41bbeb6004cc4ed48d27756f0479011df91a6f5642a3abab9309eda5ce67c4ad" +checksum = "0b7f5f8a2deafb3c76f357bbf9e71b73bddb915c4994bbbe3208fbfbe8fc7f8e" dependencies = [ "alloy-primitives", "alloy-rpc-types-eth", @@ -10180,7 +10180,7 @@ version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ - "indexmap 2.6.0", + "indexmap 2.7.0", "itoa", "memchr", "ryu", @@ -10240,7 +10240,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.6.0", + "indexmap 2.7.0", "serde", "serde_derive", "serde_json", @@ -10484,9 +10484,9 @@ dependencies = [ [[package]] name = "soketto" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37468c595637c10857701c990f93a40ce0e357cedb0953d1c26c8d8027f9bb53" +checksum = "2e859df029d160cb88608f5d7df7fb4753fd20fdfb4de5644f3d8b8440841721" dependencies = [ "base64 0.22.1", "bytes", @@ -10775,11 +10775,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.3" +version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +checksum = "2f49a1853cf82743e3b7950f77e0f4d622ca36cf4317cba00c767838bac8d490" dependencies = [ - "thiserror-impl 2.0.3", + "thiserror-impl 2.0.4", ] [[package]] @@ -10795,9 +10795,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.3" +version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" +checksum = "8381894bb3efe0c4acac3ded651301ceee58a15d47c2e34885ed1908ad667061" dependencies = [ "proc-macro2", "quote", @@ -10856,9 +10856,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.36" +version = "0.3.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" dependencies = [ "deranged", "itoa", @@ -10880,9 +10880,9 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" dependencies = [ "num-conv", "time-core", @@ -10934,9 +10934,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.41.1" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" +checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" dependencies = [ "backtrace", "bytes", @@ -11042,7 +11042,7 @@ version = "0.22.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" dependencies = [ - "indexmap 2.6.0", + "indexmap 2.7.0", "serde", "serde_spanned", "toml_datetime", diff --git a/crates/e2e-test-utils/Cargo.toml b/crates/e2e-test-utils/Cargo.toml index bedacbecd759..7cb8516816b8 100644 --- a/crates/e2e-test-utils/Cargo.toml +++ b/crates/e2e-test-utils/Cargo.toml @@ -14,6 +14,7 @@ workspace = true reth-chainspec.workspace = true reth-tracing.workspace = true reth-db = { workspace = true, features = ["test-utils"] } +reth-network-api.workspace = true reth-rpc-layer.workspace = true reth-rpc-server-types.workspace = true reth-rpc-eth-api.workspace = true @@ -23,7 +24,6 @@ reth-payload-builder-primitives.workspace = true reth-payload-primitives.workspace = true reth-primitives.workspace = true reth-provider.workspace = true -reth-network-api.workspace = true reth-network.workspace = true reth-node-api.workspace = true reth-node-core.workspace = true diff --git a/crates/e2e-test-utils/src/network.rs b/crates/e2e-test-utils/src/network.rs index 2efc8d47f2d7..ce9d0b94612b 100644 --- a/crates/e2e-test-utils/src/network.rs +++ b/crates/e2e-test-utils/src/network.rs @@ -1,6 +1,7 @@ use futures_util::StreamExt; use reth_network_api::{ - test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider, PeersInfo, + events::PeerEvent, test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider, + PeersInfo, }; use reth_network_peers::{NodeRecord, PeerId}; use reth_tokio_util::EventStream; @@ -28,7 +29,7 @@ where self.network.peers_handle().add_peer(node_record.id, node_record.tcp_addr()); match self.network_events.next().await { - Some(NetworkEvent::PeerAdded(_)) => (), + Some(NetworkEvent::Peer(PeerEvent::PeerAdded(_))) => (), ev => panic!("Expected a peer added event, got: {ev:?}"), } } @@ -42,7 +43,9 @@ where pub async fn next_session_established(&mut self) -> Option { while let Some(ev) = self.network_events.next().await { match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => { + NetworkEvent::ActivePeerSession { info, .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let peer_id = info.peer_id; info!("Session established with peer: {:?}", peer_id); return Some(peer_id) } diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index 624c43f5e1ba..e17cedef11fc 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -1,7 +1,5 @@ //! API related to listening for network events. -use std::{fmt, net::SocketAddr, sync::Arc}; - use reth_eth_wire_types::{ message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData, @@ -13,26 +11,70 @@ use reth_network_p2p::error::{RequestError, RequestResult}; use reth_network_peers::PeerId; use reth_network_types::PeerAddr; use reth_tokio_util::EventStream; +use std::{ + fmt, + net::SocketAddr, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt}; -/// Provides event subscription for the network. -#[auto_impl::auto_impl(&, Arc)] -pub trait NetworkEventListenerProvider: Send + Sync { - /// Creates a new [`NetworkEvent`] listener channel. - fn event_listener(&self) -> EventStream; - /// Returns a new [`DiscoveryEvent`] stream. - /// - /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. - fn discovery_listener(&self) -> UnboundedReceiverStream; +/// A boxed stream of network peer events that provides a type-erased interface. +pub struct PeerEventStream(Pin + Send + Sync>>); + +impl fmt::Debug for PeerEventStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PeerEventStream").finish_non_exhaustive() + } +} + +impl PeerEventStream { + /// Create a new stream [`PeerEventStream`] by converting the provided stream's items into peer + /// events [`PeerEvent`] + pub fn new(stream: S) -> Self + where + S: Stream + Send + Sync + 'static, + T: Into + 'static, + { + let mapped_stream = stream.map(Into::into); + Self(Box::pin(mapped_stream)) + } } -/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers. +impl Stream for PeerEventStream { + type Item = PeerEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.0.as_mut().poll_next(cx) + } +} + +/// Represents information about an established peer session. +#[derive(Debug, Clone)] +pub struct SessionInfo { + /// The identifier of the peer to which a session was established. + pub peer_id: PeerId, + /// The remote addr of the peer to which a session was established. + pub remote_addr: SocketAddr, + /// The client version of the peer to which a session was established. + pub client_version: Arc, + /// Capabilities the peer announced. + pub capabilities: Arc, + /// The status of the peer to which a session was established. + pub status: Arc, + /// Negotiated eth version of the session. + pub version: EthVersion, +} + +/// (Non-exhaustive) List of the different events emitted by the network that are of interest for +/// subscribers. /// /// This includes any event types that may be relevant to tasks, for metrics, keep track of peers /// etc. -#[derive(Debug)] -pub enum NetworkEvent { +#[derive(Debug, Clone)] +pub enum PeerEvent { /// Closed the peer session. SessionClosed { /// The identifier of the peer to which a session was closed. @@ -41,57 +83,65 @@ pub enum NetworkEvent { reason: Option, }, /// Established a new session with the given peer. - SessionEstablished { - /// The identifier of the peer to which a session was established. - peer_id: PeerId, - /// The remote addr of the peer to which a session was established. - remote_addr: SocketAddr, - /// The client version of the peer to which a session was established. - client_version: Arc, - /// Capabilities the peer announced - capabilities: Arc, - /// A request channel to the session task. - messages: PeerRequestSender, - /// The status of the peer to which a session was established. - status: Arc, - /// negotiated eth version of the session - version: EthVersion, - }, + SessionEstablished(SessionInfo), /// Event emitted when a new peer is added PeerAdded(PeerId), /// Event emitted when a new peer is removed PeerRemoved(PeerId), } +/// (Non-exhaustive) Network events representing peer lifecycle events and session requests. +#[derive(Debug)] +pub enum NetworkEvent { + /// Basic peer lifecycle event. + Peer(PeerEvent), + /// Session established with requests. + ActivePeerSession { + /// Session information + info: SessionInfo, + /// A request channel to the session task. + messages: PeerRequestSender, + }, +} + impl Clone for NetworkEvent { fn clone(&self) -> Self { match self { - Self::SessionClosed { peer_id, reason } => { - Self::SessionClosed { peer_id: *peer_id, reason: *reason } + Self::Peer(event) => Self::Peer(event.clone()), + Self::ActivePeerSession { info, messages } => { + Self::ActivePeerSession { info: info.clone(), messages: messages.clone() } } - Self::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => Self::SessionEstablished { - peer_id: *peer_id, - remote_addr: *remote_addr, - client_version: client_version.clone(), - capabilities: capabilities.clone(), - messages: messages.clone(), - status: status.clone(), - version: *version, - }, - Self::PeerAdded(peer) => Self::PeerAdded(*peer), - Self::PeerRemoved(peer) => Self::PeerRemoved(*peer), } } } +impl From> for PeerEvent { + fn from(event: NetworkEvent) -> Self { + match event { + NetworkEvent::Peer(peer_event) => peer_event, + NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info), + } + } +} + +/// Provides peer event subscription for the network. +#[auto_impl::auto_impl(&, Arc)] +pub trait NetworkPeersEvents: Send + Sync { + /// Creates a new peer event listener stream. + fn peer_events(&self) -> PeerEventStream; +} + +/// Provides event subscription for the network. +#[auto_impl::auto_impl(&, Arc)] +pub trait NetworkEventListenerProvider: NetworkPeersEvents { + /// Creates a new [`NetworkEvent`] listener channel. + fn event_listener(&self) -> EventStream>; + /// Returns a new [`DiscoveryEvent`] stream. + /// + /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. + fn discovery_listener(&self) -> UnboundedReceiverStream; +} + /// Events produced by the `Discovery` manager. #[derive(Debug, Clone, PartialEq, Eq)] pub enum DiscoveryEvent { diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index bad6ecba5fad..89e21b9dd2db 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -44,7 +44,9 @@ use reth_eth_wire::{ use reth_fs_util::{self as fs, FsPathError}; use reth_metrics::common::mpsc::UnboundedMeteredSender; use reth_network_api::{ - test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest, + events::{PeerEvent, SessionInfo}, + test_utils::PeersHandle, + EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest, }; use reth_network_peers::{NodeRecord, PeerId}; use reth_network_types::ReputationChangeKind; @@ -712,24 +714,26 @@ impl NetworkManager { self.update_active_connection_metrics(); - self.event_sender.notify(NetworkEvent::SessionEstablished { + let session_info = SessionInfo { peer_id, remote_addr, client_version, capabilities, - version, status, - messages, - }); + version, + }; + + self.event_sender + .notify(NetworkEvent::ActivePeerSession { info: session_info, messages }); } SwarmEvent::PeerAdded(peer_id) => { trace!(target: "net", ?peer_id, "Peer added"); - self.event_sender.notify(NetworkEvent::PeerAdded(peer_id)); + self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))); self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); } SwarmEvent::PeerRemoved(peer_id) => { trace!(target: "net", ?peer_id, "Peer dropped"); - self.event_sender.notify(NetworkEvent::PeerRemoved(peer_id)); + self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))); self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); } SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { @@ -772,7 +776,8 @@ impl NetworkManager { .saturating_sub(1) as f64, ); - self.event_sender.notify(NetworkEvent::SessionClosed { peer_id, reason }); + self.event_sender + .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason })); } SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { trace!( diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 7e0b000cf343..225b6332e0eb 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -4,6 +4,7 @@ use crate::{ }; use alloy_primitives::B256; use enr::Enr; +use futures::StreamExt; use parking_lot::Mutex; use reth_discv4::{Discv4, NatResolver}; use reth_discv5::Discv5; @@ -13,6 +14,7 @@ use reth_eth_wire::{ }; use reth_ethereum_forks::Head; use reth_network_api::{ + events::{NetworkPeersEvents, PeerEvent, PeerEventStream}, test_utils::{PeersHandle, PeersHandleProvider}, BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent, NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers, @@ -192,6 +194,17 @@ impl NetworkHandle { // === API Implementations === +impl NetworkPeersEvents for NetworkHandle { + /// Returns an event stream of peer-specific network events. + fn peer_events(&self) -> PeerEventStream { + let peer_events = self.inner.event_sender.new_listener().map(|event| match event { + NetworkEvent::Peer(peer_event) => peer_event, + NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info), + }); + PeerEventStream::new(peer_events) + } +} + impl NetworkEventListenerProvider for NetworkHandle { fn event_listener(&self) -> EventStream>> { self.inner.event_sender.new_listener() diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 08bf24b88532..a27df7e7202a 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -13,6 +13,7 @@ use pin_project::pin_project; use reth_chainspec::{Hardforks, MAINNET}; use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols}; use reth_network_api::{ + events::{PeerEvent, SessionInfo}, test_utils::{PeersHandle, PeersHandleProvider}, NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers, }; @@ -641,7 +642,9 @@ impl NetworkEventStream { pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option)> { while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionClosed { peer_id, reason } => return Some((peer_id, reason)), + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) => { + return Some((peer_id, reason)) + } _ => continue, } } @@ -652,7 +655,10 @@ impl NetworkEventStream { pub async fn next_session_established(&mut self) -> Option { while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => return Some(peer_id), + NetworkEvent::ActivePeerSession { info, .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + return Some(info.peer_id) + } _ => continue, } } @@ -667,7 +673,7 @@ impl NetworkEventStream { let mut peers = Vec::with_capacity(num); while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => { + NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } => { peers.push(peer_id); num -= 1; if num == 0 { @@ -680,18 +686,24 @@ impl NetworkEventStream { peers } - /// Ensures that the first two events are a [`NetworkEvent::PeerAdded`] and - /// [`NetworkEvent::SessionEstablished`], returning the [`PeerId`] of the established + /// Ensures that the first two events are a [`NetworkEvent::Peer(PeerEvent::PeerAdded`] and + /// [`NetworkEvent::ActivePeerSession`], returning the [`PeerId`] of the established /// session. pub async fn peer_added_and_established(&mut self) -> Option { let peer_id = match self.inner.next().await { - Some(NetworkEvent::PeerAdded(peer_id)) => peer_id, + Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id, _ => return None, }; match self.inner.next().await { - Some(NetworkEvent::SessionEstablished { peer_id: peer_id2, .. }) => { - debug_assert_eq!(peer_id, peer_id2, "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"); + Some(NetworkEvent::ActivePeerSession { + info: SessionInfo { peer_id: peer_id2, .. }, + .. + }) => { + debug_assert_eq!( + peer_id, peer_id2, + "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}" + ); Some(peer_id) } _ => None, diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index a1097dacf550..2e6e2f08b65c 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -40,6 +40,7 @@ use reth_eth_wire::{ }; use reth_metrics::common::mpsc::UnboundedMeteredReceiver; use reth_network_api::{ + events::{PeerEvent, SessionInfo}, NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers, }; use reth_network_p2p::{ @@ -1050,55 +1051,81 @@ where } } + /// Handles session establishment and peer transactions initialization. + fn handle_peer_session( + &mut self, + info: SessionInfo, + messages: PeerRequestSender>, + ) { + let SessionInfo { peer_id, client_version, version, .. } = info; + + // Insert a new peer into the peerset. + let peer = PeerMetadata::::new( + messages, + version, + client_version, + self.config.max_transactions_seen_by_peer_history, + ); + let peer = match self.peers.entry(peer_id) { + Entry::Occupied(mut entry) => { + entry.insert(peer); + entry.into_mut() + } + Entry::Vacant(entry) => entry.insert(peer), + }; + + // Send a `NewPooledTransactionHashes` to the peer with up to + // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE` + // transactions in the pool. + if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() { + trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled"); + return + } + + // Get transactions to broadcast + let pooled_txs = self.pool.pooled_transactions_max( + SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, + ); + if pooled_txs.is_empty() { + trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast"); + return; + } + + // Build and send transaction hashes message + let mut msg_builder = PooledTransactionsHashesBuilder::new(version); + for pooled_tx in pooled_txs { + peer.seen_transactions.insert(*pooled_tx.hash()); + msg_builder.push_pooled(pooled_tx); + } + + debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes"); + let msg = msg_builder.build(); + self.network.send_transactions_hashes(peer_id, msg); + } + /// Handles a received event related to common network events. fn on_network_event(&mut self, event_result: NetworkEvent>) { match event_result { - NetworkEvent::SessionClosed { peer_id, .. } => { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => { // remove the peer self.peers.remove(&peer_id); self.transaction_fetcher.remove_peer(&peer_id); } - NetworkEvent::SessionEstablished { - peer_id, client_version, messages, version, .. - } => { - // Insert a new peer into the peerset. - let peer = PeerMetadata::new( - messages, - version, - client_version, - self.config.max_transactions_seen_by_peer_history, - ); - let peer = match self.peers.entry(peer_id) { - Entry::Occupied(mut entry) => { - entry.insert(peer); - entry.into_mut() + NetworkEvent::ActivePeerSession { info, messages } => { + // process active peer session and broadcast available transaction from the pool + self.handle_peer_session(info, messages); + } + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let peer_id = info.peer_id; + // get messages from existing peer + let messages = match self.peers.get(&peer_id) { + Some(p) => p.request_tx.clone(), + None => { + debug!(target: "net::tx", ?peer_id, "No peer request sender found"); + return; } - Entry::Vacant(entry) => entry.insert(peer), }; - - // Send a `NewPooledTransactionHashes` to the peer with up to - // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE` - // transactions in the pool. - if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() { - return - } - - let pooled_txs = self.pool.pooled_transactions_max( - SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, - ); - if pooled_txs.is_empty() { - // do not send a message if there are no transactions in the pool - return - } - - let mut msg_builder = PooledTransactionsHashesBuilder::new(version); - for pooled_tx in pooled_txs { - peer.seen_transactions.insert(*pooled_tx.hash()); - msg_builder.push_pooled(pooled_tx); - } - - let msg = msg_builder.build(); - self.network.send_transactions_hashes(peer_id, msg); + self.handle_peer_session(info, messages); } _ => {} } @@ -1987,27 +2014,12 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => { + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }) + transactions + .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info))) } - NetworkEvent::PeerAdded(_peer_id) => continue, + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") } @@ -2073,28 +2085,13 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => { + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }) + transactions.on_network_event(ev); } - NetworkEvent::PeerAdded(_peer_id) => continue, - ev => { + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, + _ => { error!("unexpected event {ev:?}") } } @@ -2157,27 +2154,12 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => { + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }) + transactions.on_network_event(ev); } - NetworkEvent::PeerAdded(_peer_id) => continue, + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") } @@ -2248,24 +2230,11 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }), - NetworkEvent::PeerAdded(_peer_id) => continue, + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => { + transactions.on_network_event(ev); + } + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") } @@ -2495,17 +2464,18 @@ mod tests { network.handle().update_sync_state(SyncState::Idle); // mock a peer - let (tx, _rx) = mpsc::channel(1); - tx_manager.on_network_event(NetworkEvent::SessionEstablished { + let (tx, _rx) = mpsc::channel::(1); + let session_info = SessionInfo { peer_id, remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), client_version: Arc::from(""), capabilities: Arc::new(vec![].into()), - messages: PeerRequestSender::new(peer_id, tx), status: Arc::new(Default::default()), version: EthVersion::Eth68, - }); - + }; + let messages: PeerRequestSender = PeerRequestSender::new(peer_id, tx); + tx_manager + .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages }); let mut propagate = vec![]; let mut factory = MockTransactionFactory::default(); let eip1559_tx = Arc::new(factory.create_eip1559()); diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 0a17cbd563ed..77044f4b72d2 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -15,7 +15,10 @@ use reth_network::{ BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEventListenerProvider, NetworkManager, PeersConfig, }; -use reth_network_api::{NetworkInfo, Peers, PeersInfo}; +use reth_network_api::{ + events::{PeerEvent, SessionInfo}, + NetworkInfo, Peers, PeersInfo, +}; use reth_network_p2p::{ headers::client::{HeadersClient, HeadersRequest}, sync::{NetworkSyncUpdater, SyncState}, @@ -59,13 +62,15 @@ async fn test_establish_connections() { let mut established = listener0.take(4); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionClosed { .. } | NetworkEvent::PeerRemoved(_) => { + NetworkEvent::Peer(PeerEvent::SessionClosed { .. } | PeerEvent::PeerRemoved(_)) => { panic!("unexpected event") } - NetworkEvent::SessionEstablished { peer_id, .. } => { - assert!(expected_connections.remove(&peer_id)) + NetworkEvent::ActivePeerSession { info, .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let SessionInfo { peer_id, .. } = info; + assert!(expected_connections.remove(&peer_id)); } - NetworkEvent::PeerAdded(peer_id) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert!(expected_peers.remove(&peer_id)) } } @@ -496,11 +501,16 @@ async fn test_geth_disconnect() { handle.add_peer(geth_peer_id, geth_socket); match events.next().await { - Some(NetworkEvent::PeerAdded(peer_id)) => assert_eq!(peer_id, geth_peer_id), + Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => { + assert_eq!(peer_id, geth_peer_id) + } _ => panic!("Expected a peer added event"), } - if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await { + if let Some(NetworkEvent::Peer(PeerEvent::SessionEstablished(session_info))) = + events.next().await + { + let SessionInfo { peer_id, .. } = session_info; assert_eq!(peer_id, geth_peer_id); } else { panic!("Expected a session established event"); @@ -510,7 +520,9 @@ async fn test_geth_disconnect() { handle.disconnect_peer(geth_peer_id); // wait for a disconnect from geth - if let Some(NetworkEvent::SessionClosed { peer_id, .. }) = events.next().await { + if let Some(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. })) = + events.next().await + { assert_eq!(peer_id, geth_peer_id); } else { panic!("Expected a session closed event"); diff --git a/crates/net/network/tests/it/session.rs b/crates/net/network/tests/it/session.rs index 3f74db3d37f1..71152c29bb83 100644 --- a/crates/net/network/tests/it/session.rs +++ b/crates/net/network/tests/it/session.rs @@ -6,7 +6,10 @@ use reth_network::{ test_utils::{PeerConfig, Testnet}, NetworkEvent, NetworkEventListenerProvider, }; -use reth_network_api::{NetworkInfo, Peers}; +use reth_network_api::{ + events::{PeerEvent, SessionInfo}, + NetworkInfo, Peers, +}; use reth_provider::test_utils::NoopProvider; #[tokio::test(flavor = "multi_thread")] @@ -28,10 +31,11 @@ async fn test_session_established_with_highest_version() { while let Some(event) = events.next().await { match event { - NetworkEvent::PeerAdded(peer_id) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert_eq!(handle1.peer_id(), &peer_id); } - NetworkEvent::SessionEstablished { peer_id, status, .. } => { + NetworkEvent::ActivePeerSession { info, .. } => { + let SessionInfo { peer_id, status, .. } = info; assert_eq!(handle1.peer_id(), &peer_id); assert_eq!(status.version, EthVersion::Eth68); } @@ -66,10 +70,11 @@ async fn test_session_established_with_different_capability() { while let Some(event) = events.next().await { match event { - NetworkEvent::PeerAdded(peer_id) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert_eq!(handle1.peer_id(), &peer_id); } - NetworkEvent::SessionEstablished { peer_id, status, .. } => { + NetworkEvent::ActivePeerSession { info, .. } => { + let SessionInfo { peer_id, status, .. } = info; assert_eq!(handle1.peer_id(), &peer_id); assert_eq!(status.version, EthVersion::Eth66); } diff --git a/crates/net/network/tests/it/txgossip.rs b/crates/net/network/tests/it/txgossip.rs index ebde61ef8ea1..c9911885ad87 100644 --- a/crates/net/network/tests/it/txgossip.rs +++ b/crates/net/network/tests/it/txgossip.rs @@ -7,7 +7,7 @@ use alloy_primitives::{PrimitiveSignature as Signature, U256}; use futures::StreamExt; use rand::thread_rng; use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEventListenerProvider}; -use reth_network_api::PeersInfo; +use reth_network_api::{events::PeerEvent, PeersInfo}; use reth_primitives::TransactionSigned; use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool}; @@ -139,16 +139,17 @@ async fn test_sending_invalid_transactions() { // await disconnect for bad tx spam if let Some(ev) = peer1_events.next().await { match ev { - NetworkEvent::SessionClosed { peer_id, .. } => { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => { assert_eq!(peer_id, *peer0.peer_id()); } - NetworkEvent::SessionEstablished { .. } => { + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished { .. }) => { panic!("unexpected SessionEstablished event") } - NetworkEvent::PeerAdded(_) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(_)) => { panic!("unexpected PeerAdded event") } - NetworkEvent::PeerRemoved(_) => { + NetworkEvent::Peer(PeerEvent::PeerRemoved(_)) => { panic!("unexpected PeerRemoved event") } } diff --git a/docs/crates/network.md b/docs/crates/network.md index be2c7cb3b143..7e38ac5d6014 100644 --- a/docs/crates/network.md +++ b/docs/crates/network.md @@ -787,8 +787,24 @@ The `TransactionsManager.network_events` stream is the first to have all of its The events received in this channel are of type `NetworkEvent`: [File: crates/net/network/src/manager.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/network/src/manager.rs) + +```rust,ignore +pub enum NetworkEvent { + /// Basic peer lifecycle event. + Peer(PeerEvent), + /// Session established with requests. + ActivePeerSession { + /// Session information + info: SessionInfo, + /// A request channel to the session task. + messages: PeerRequestSender, + }, +} +``` + +and with ```rust,ignore -pub enum NetworkEvent { +pub enum PeerEvent { /// Closed the peer session. SessionClosed { /// The identifier of the peer to which a session was closed. @@ -797,29 +813,29 @@ pub enum NetworkEvent { reason: Option, }, /// Established a new session with the given peer. - SessionEstablished { - /// The identifier of the peer to which a session was established. - peer_id: PeerId, - /// Capabilities the peer announced - capabilities: Arc, - /// A request channel to the session task. - messages: PeerRequestSender, - /// The status of the peer to which a session was established. - status: Status, - }, + SessionEstablished(SessionInfo), /// Event emitted when a new peer is added PeerAdded(PeerId), /// Event emitted when a new peer is removed PeerRemoved(PeerId), } ``` +[File: crates/net/network-api/src/events.rs](https://github.com/paradigmxyz/reth/blob/c46b5fc1157d12184d1dceb4dc45e26cf74b2bc6/crates/net/network-api/src/events.rs) -They're handled with the `on_network_event` method, which responds to the two variants of the `NetworkEvent` enum in the following ways: +They're handled with the `on_network_event` method, which processes session events through both `NetworkEvent::Peer(PeerEvent::SessionClosed)`, `NetworkEvent::Peer(PeerEvent::SessionEstablished)`, and `NetworkEvent::ActivePeerSession` for initializing peer connections and transaction broadcasting. -**`NetworkEvent::SessionClosed`** +Variants of the `PeerEvent` enum are defined in the following ways: + +**`PeerEvent::PeerAdded`** +Adds a peer to the network node via network handle + +**`PeerEvent::PeerRemoved`** Removes the peer given by `NetworkEvent::SessionClosed.peer_id` from the `TransactionsManager.peers` map. -**`NetworkEvent::SessionEstablished`** +**`PeerEvent::SessionClosed`** +Closes the peer session after disconnection + +**`PeerEvent::SessionEstablished`** Begins by inserting a `Peer` into `TransactionsManager.peers` by `peer_id`, which is a struct of the following form: [File: crates/net/network/src/transactions.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/network/src/transactions.rs) @@ -840,33 +856,30 @@ After the `Peer` is added to `TransactionsManager.peers`, the hashes of all of t [File: crates/net/network/src/transactions.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/network/src/transactions.rs) ```rust,ignore -fn on_network_event(&mut self, event: NetworkEvent) { - match event { - NetworkEvent::SessionClosed { peer_id, .. } => { +fn on_network_event(&mut self, event_result: NetworkEvent) { + match event_result { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => { // remove the peer self.peers.remove(&peer_id); + self.transaction_fetcher.remove_peer(&peer_id); } - NetworkEvent::SessionEstablished { peer_id, messages, .. } => { - // insert a new peer - self.peers.insert( - peer_id, - Peer { - transactions: LruCache::new( - NonZeroUsize::new(PEER_TRANSACTION_CACHE_LIMIT).unwrap(), - ), - request_tx: messages, - }, - ); - - // Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the - // pool - let msg = NewPooledTransactionHashes(self.pool.pooled_transactions()); - self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes { - peer_id, - msg, - }) + NetworkEvent::ActivePeerSession { info, messages } => { + // process active peer session and broadcast available transaction from the pool + self.handle_peer_session(info, messages); + } + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let peer_id = info.peer_id; + // get messages from existing peer + let messages = match self.peers.get(&peer_id) { + Some(p) => p.request_tx.clone(), + None => { + debug!(target: "net::tx", ?peer_id, "No peer request sender found"); + return; + } + }; + self.handle_peer_session(info, messages); } - _ => {} + _ => {} } } ``` diff --git a/examples/bsc-p2p/src/main.rs b/examples/bsc-p2p/src/main.rs index 9e83f34e92f3..cea87918322b 100644 --- a/examples/bsc-p2p/src/main.rs +++ b/examples/bsc-p2p/src/main.rs @@ -17,7 +17,10 @@ use reth_discv4::Discv4ConfigBuilder; use reth_network::{ EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager, }; -use reth_network_api::PeersInfo; +use reth_network_api::{ + events::{PeerEvent, SessionInfo}, + PeersInfo, +}; use reth_primitives::{ForkHash, ForkId}; use reth_tracing::{ tracing::info, tracing_subscriber::filter::LevelFilter, LayerInfo, LogFormat, RethTracer, @@ -78,10 +81,11 @@ async fn main() { // For the sake of the example we only print the session established event // with the chain specific details match evt { - NetworkEvent::SessionEstablished { status, client_version, peer_id, .. } => { + NetworkEvent::ActivePeerSession { info, .. } => { + let SessionInfo { status, client_version, peer_id, .. } = info; info!(peers=%net_handle.num_connected_peers() , %peer_id, chain = %status.chain, ?client_version, "Session established with a new peer."); } - NetworkEvent::SessionClosed { peer_id, reason } => { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) => { info!(peers=%net_handle.num_connected_peers() , %peer_id, ?reason, "Session closed."); } diff --git a/examples/polygon-p2p/Cargo.toml b/examples/polygon-p2p/Cargo.toml index e18f32a64737..34536ed52d71 100644 --- a/examples/polygon-p2p/Cargo.toml +++ b/examples/polygon-p2p/Cargo.toml @@ -16,6 +16,7 @@ secp256k1 = { workspace = true, features = [ tokio.workspace = true reth-network.workspace = true reth-chainspec.workspace = true +reth-network-api.workspace = true reth-primitives.workspace = true serde_json.workspace = true reth-tracing.workspace = true diff --git a/examples/polygon-p2p/src/main.rs b/examples/polygon-p2p/src/main.rs index bcc17a24f8d2..bae5399d9cd6 100644 --- a/examples/polygon-p2p/src/main.rs +++ b/examples/polygon-p2p/src/main.rs @@ -15,6 +15,7 @@ use reth_network::{ config::NetworkMode, EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager, }; +use reth_network_api::events::SessionInfo; use reth_tracing::{ tracing::info, tracing_subscriber::filter::LevelFilter, LayerInfo, LogFormat, RethTracer, Tracer, @@ -71,7 +72,8 @@ async fn main() { while let Some(evt) = events.next().await { // For the sake of the example we only print the session established event // with the chain specific details - if let NetworkEvent::SessionEstablished { status, client_version, .. } = evt { + if let NetworkEvent::ActivePeerSession { info, .. } = evt { + let SessionInfo { status, client_version, .. } = info; let chain = status.chain; info!(?chain, ?client_version, "Session established with a new peer."); }