Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(exex): subscribe to notifications explicitly #10573

Merged
merged 31 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dfcb2e1
feat(exex): subscribe to notifications explicitly
shekhirin Aug 27, 2024
eae2985
add head
shekhirin Aug 28, 2024
cf22e41
doc comment
shekhirin Aug 28, 2024
aff910e
more comments
shekhirin Aug 28, 2024
5725e0b
use derive_more for deref, deref mut
shekhirin Aug 28, 2024
342b15f
more comments
shekhirin Aug 28, 2024
1dd3a8b
comment subscribe with head for now
shekhirin Aug 29, 2024
6edafa7
fix test utils
shekhirin Aug 29, 2024
d33281e
fix doc links
shekhirin Aug 29, 2024
5c54005
test inactive functionality
shekhirin Aug 29, 2024
a07345d
update book
shekhirin Aug 29, 2024
515cd3b
Merge remote-tracking branch 'origin/main' into alexey/exexcontext-no…
shekhirin Sep 2, 2024
acc8310
use two streams and no state
shekhirin Sep 3, 2024
c8c89f4
remove unused imports and deps
shekhirin Sep 3, 2024
83cfcc1
fix tests
shekhirin Sep 3, 2024
689e538
update book
shekhirin Sep 3, 2024
6075bdb
add a todo comment about backfill
shekhirin Sep 3, 2024
7beb8d9
comment subscribe_with_head
shekhirin Sep 3, 2024
49feb0e
Revert "comment subscribe_with_head"
shekhirin Sep 3, 2024
47a0f32
Revert "Revert "comment subscribe_with_head""
shekhirin Sep 3, 2024
1e5f6c3
fix docs
shekhirin Sep 3, 2024
0e32dc0
use blocknumhash
shekhirin Sep 4, 2024
d4eb1bb
consume subscriber on subscribe
shekhirin Sep 4, 2024
5d0385a
no either type, just return separate stream types
shekhirin Sep 5, 2024
e3ddac4
revertme: init backfill job factory
shekhirin Sep 5, 2024
d6e7003
Revert "revertme: init backfill job factory"
shekhirin Sep 5, 2024
44e4094
include components into notifications structs
shekhirin Sep 6, 2024
3e4060a
Merge remote-tracking branch 'origin/main' into alexey/exexcontext-no…
shekhirin Sep 6, 2024
2f4ac1e
update book
shekhirin Sep 6, 2024
d357e2d
add backward compatibility method
shekhirin Sep 6, 2024
291929c
another backward compat method
shekhirin Sep 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion book/developers/exex/hello-world.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ reth = { git = "https://github.com/paradigmxyz/reth.git" } # Reth
reth-exex = { git = "https://github.com/paradigmxyz/reth.git" } # Execution Extensions
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git" } # Ethereum Node implementation
reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" } # Logging

eyre = "0.6" # Easy error handling
futures-util = "0.3" # Stream utilities for consuming notifications
```

### Default Reth node
Expand Down Expand Up @@ -101,13 +103,14 @@ If you try running a node with an ExEx that exits, the node will exit as well.
Now, let's extend our simplest ExEx and start actually listening to new notifications, log them, and send events back to the main node

```rust,norun,noplayground,ignore
use futures_util::StreamExt;
use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;

async fn my_exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
while let Some(notification) = ctx.notifications.next().await {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
Expand Down
10 changes: 7 additions & 3 deletions book/developers/exex/remote.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,15 @@ Don't forget to emit `ExExEvent::FinishedHeight`

```rust,norun,noplayground,ignore
// ...

use futures_util::StreamExt;
use reth_exex::{ExExContext, ExExEvent};

async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
while let Some(notification) = ctx.notifications.next().await {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
Expand Down Expand Up @@ -332,6 +334,9 @@ fn main() -> eyre::Result<()> {
<summary>Click to expand</summary>

```rust,norun,noplayground,ignore
use std::sync::Arc;

use futures_util::StreamExt;
use remote_exex::proto::{
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
Expand All @@ -340,7 +345,6 @@ use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};
Expand Down Expand Up @@ -381,7 +385,7 @@ async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
while let Some(notification) = ctx.notifications.next().await {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
Expand Down
6 changes: 4 additions & 2 deletions book/developers/exex/tracking-state.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
task::{ready, Context, Poll},
};

use futures_util::StreamExt;
use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
Expand All @@ -40,7 +41,7 @@ impl<Node: FullNodeComponents> Future for MyExEx<Node> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) {
while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
Expand Down Expand Up @@ -101,6 +102,7 @@ use std::{
task::{ready, Context, Poll},
};

use futures_util::StreamExt;
use reth::{api::FullNodeComponents, primitives::BlockNumber};
use reth_exex::{ExExContext, ExExEvent};
use reth_node_ethereum::EthereumNode;
Expand Down Expand Up @@ -130,7 +132,7 @@ impl<Node: FullNodeComponents> Future for MyExEx<Node> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) {
while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
if let Some(reverted_chain) = notification.reverted_chain() {
this.transactions = this.transactions.saturating_sub(
reverted_chain
Expand Down
6 changes: 3 additions & 3 deletions book/installation/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ You can build Reth on Linux, macOS, Windows, and Windows WSL2.

## Dependencies

First, **install Rust** using [rustup](https://rustup.rs/):
First, **install Rust** using [rustup](https://rustup.rs/):

```bash
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Expand All @@ -32,7 +32,7 @@ operating system:

These are needed to build bindings for Reth's database.

The Minimum Supported Rust Version (MSRV) of this project is 1.81.0. If you already have a version of Rust installed,
The Minimum Supported Rust Version (MSRV) of this project is 1.80.0. If you already have a version of Rust installed,
you can check your version by running `rustc --version`. To update your version of Rust, run `rustup update`.

## Build Reth
Expand Down Expand Up @@ -147,7 +147,7 @@ _(Thanks to Sigma Prime for this section from [their Lighthouse book](https://li

### Bus error (WSL2)

In WSL 2 on Windows, the default virtual disk size is set to 1TB.
In WSL 2 on Windows, the default virtual disk size is set to 1TB.

You must increase the allocated disk size for your WSL2 instance before syncing reth.

Expand Down
4 changes: 2 additions & 2 deletions crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ reth-metrics.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-payload-builder.workspace = true
reth-primitives-traits.workspace = true
reth-primitives = { workspace = true, features = ["secp256k1"] }
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-prune-types.workspace = true
reth-revm.workspace = true
Expand All @@ -45,9 +45,9 @@ reth-db-api.workspace = true
reth-db-common.workspace = true
reth-evm-ethereum.workspace = true
reth-node-api.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-testing-utils.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }

secp256k1.workspace = true

Expand Down
12 changes: 6 additions & 6 deletions crates/exex/exex/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use reth_node_api::{FullNodeComponents, NodeTypesWithEngine};
use reth_node_core::node_config::NodeConfig;
use reth_primitives::Head;
use reth_tasks::TaskExecutor;
use tokio::sync::mpsc::{Receiver, UnboundedSender};
use tokio::sync::mpsc::UnboundedSender;

use crate::{ExExEvent, ExExNotification};
use crate::{ExExEvent, ExExNotifications};

/// Captures the context that an `ExEx` has access to.
pub struct ExExContext<Node: FullNodeComponents> {
Expand All @@ -24,13 +24,13 @@ pub struct ExExContext<Node: FullNodeComponents> {
/// Additionally, the exex can pre-emptively emit a `FinishedHeight` event to specify what
/// blocks to receive notifications for.
pub events: UnboundedSender<ExExEvent>,
/// Channel to receive [`ExExNotification`]s.
/// Channel to receive [`ExExNotification`](crate::ExExNotification)s.
///
/// # Important
///
/// Once a an [`ExExNotification`] is sent over the channel, it is considered delivered by the
/// node.
pub notifications: Receiver<ExExNotification>,
/// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is
/// considered delivered by the node.
pub notifications: ExExNotifications<Node>,

/// node components
pub components: Node,
Expand Down
Loading
Loading