Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: move ExEx book examples #11616

Merged
merged 4 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 10 additions & 339 deletions book/developers/exex/remote.md

Large diffs are not rendered by default.

138 changes: 2 additions & 136 deletions book/developers/exex/tracking-state.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,63 +19,7 @@ because you can't access variables inside the function to assert the state of yo
</div>

```rust,norun,noplayground,ignore
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};

use futures_util::StreamExt;
use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;

struct MyExEx<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
}

impl<Node: FullNodeComponents> Future for MyExEx<Node> {
type Output = eyre::Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};

if let Some(committed_chain) = notification.committed_chain() {
this.ctx
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
}

Poll::Ready(Ok(()))
}
}

fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("my-exex", |ctx| async move { Ok(MyExEx { ctx }) })
.launch()
.await?;

handle.wait_for_node_exit().await
})
}
{{#include ../../sources/exex/tracking-state/src/bin/1.rs}}
```

For those who are not familiar with how async Rust works on a lower level, that may seem scary,
Expand All @@ -96,85 +40,7 @@ With all that done, we're now free to add more fields to our `MyExEx` struct, an
Our ExEx will count the number of transactions in each block and log it to the console.

```rust,norun,noplayground,ignore
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};

use futures_util::StreamExt;
use reth::{api::FullNodeComponents, primitives::BlockNumber};
use reth_exex::{ExExContext, ExExEvent};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;

struct MyExEx<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
/// First block that was committed since the start of the ExEx.
first_block: Option<BlockNumber>,
/// Total number of transactions committed.
transactions: u64,
}

impl<Node: FullNodeComponents> MyExEx<Node> {
fn new(ctx: ExExContext<Node>) -> Self {
Self {
ctx,
first_block: None,
transactions: 0,
}
}
}

impl<Node: FullNodeComponents> Future for MyExEx<Node> {
type Output = eyre::Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
if let Some(reverted_chain) = notification.reverted_chain() {
this.transactions = this.transactions.saturating_sub(
reverted_chain
.blocks_iter()
.map(|b| b.body.len() as u64)
.sum(),
);
}

if let Some(committed_chain) = notification.committed_chain() {
this.first_block.get_or_insert(committed_chain.first().number);

this.transactions += committed_chain
.blocks_iter()
.map(|b| b.body.len() as u64)
.sum::<u64>();

this.ctx
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}

if let Some(first_block) = this.first_block {
info!(%first_block, transactions = %this.transactions, "Total number of transactions");
}
}

Poll::Ready(Ok(()))
}
}

fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("my-exex", |ctx| async move { Ok(MyExEx::new(ctx)) })
.launch()
.await?;

handle.wait_for_node_exit().await
})
}
{{#include ../../sources/exex/tracking-state/src/bin/2.rs}}
```

As you can see, we added two fields to our ExEx struct:
Expand Down
2 changes: 2 additions & 0 deletions book/sources/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[workspace]
members = [
"exex/hello-world",
"exex/remote",
"exex/tracking-state",
]

# Explicitly set the resolver to version 2, which is the default for packages with edition >= 2021
Expand Down
52 changes: 52 additions & 0 deletions book/sources/exex/remote/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[package]
name = "remote-exex"
version = "0.1.0"
edition = "2021"

[dependencies]
# reth
reth = { git = "https://github.com/paradigmxyz/reth.git" }
reth-exex = { git = "https://github.com/paradigmxyz/reth.git", features = ["serde"] }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git"}
reth-node-api = { git = "https://github.com/paradigmxyz/reth.git"}
reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" }

# async
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
futures-util = "0.3"

# grpc
tonic = "0.11"
prost = "0.12"
bincode = "1"

# misc
eyre = "0.6"

[build-dependencies]
tonic-build = "0.11"

[[bin]]
name = "exex_1"
path = "src/exex_1.rs"

[[bin]]
name = "exex_2"
path = "src/exex_2.rs"

[[bin]]
name = "exex_3"
path = "src/exex_3.rs"

[[bin]]
name = "exex_4"
path = "src/exex_4.rs"

[[bin]]
name = "exex"
path = "src/exex.rs"

[[bin]]
name = "consumer"
path = "src/consumer.rs"
4 changes: 4 additions & 0 deletions book/sources/exex/remote/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/exex.proto")?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this to work in the CI, we also need to install protoc. I think it's not a big deal as it's quite lightweight, wdyt @mattsse ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactoring the job and adding protoc in #11618

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need to run on ci?
yeah I guess if we want to do have well maintained bindings then we're kinda forced to do this. sounds reasonable

Ok(())
}
13 changes: 13 additions & 0 deletions book/sources/exex/remote/proto/exex.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package exex;

service RemoteExEx {
rpc Subscribe(SubscribeRequest) returns (stream ExExNotification) {}
}

message SubscribeRequest {}

message ExExNotification {
bytes data = 1;
}
32 changes: 32 additions & 0 deletions book/sources/exex/remote/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use remote_exex::proto::{remote_ex_ex_client::RemoteExExClient, SubscribeRequest};
use reth_exex::ExExNotification;
use reth_tracing::{tracing::info, RethTracer, Tracer};

#[tokio::main]
async fn main() -> eyre::Result<()> {
let _ = RethTracer::new().init()?;

let mut client = RemoteExExClient::connect("http://[::1]:10000")
.await?
.max_encoding_message_size(usize::MAX)
.max_decoding_message_size(usize::MAX);

let mut stream = client.subscribe(SubscribeRequest {}).await?.into_inner();
while let Some(notification) = stream.message().await? {
let notification: ExExNotification = bincode::deserialize(&notification.data)?;

match notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
}

Ok(())
}
87 changes: 87 additions & 0 deletions book/sources/exex/remote/src/exex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use futures_util::TryStreamExt;
use remote_exex::proto::{
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};

struct ExExService {
notifications: Arc<broadcast::Sender<ExExNotification>>,
}

#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<proto::ExExNotification, Status>>;

async fn subscribe(
&self,
_request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (tx, rx) = mpsc::channel(1);

let mut notifications = self.notifications.subscribe();
tokio::spawn(async move {
while let Ok(notification) = notifications.recv().await {
let proto_notification = proto::ExExNotification {
data: bincode::serialize(&notification).expect("failed to serialize"),
};
tx.send(Ok(proto_notification))
.await
.expect("failed to send notification to client");

info!("Notification sent to the gRPC client");
}
});

Ok(Response::new(ReceiverStream::new(rx)))
}
}

async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.try_next().await? {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}

info!("Notification sent to the gRPC server");
let _ = notifications.send(notification);
}

Ok(())
}

// ANCHOR: snippet
Copy link
Collaborator

@shekhirin shekhirin Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this? and in other places too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to show only the relevant sections of the files at the book references, and allow the reader to expand the complete file only if they want to. Another way to do this is to use line numbers of the snippets instead of using these // ANCHOR: snippet comments, but then we'd have to change the line numbers if the examples are changed. Or I could just show the complete files at all references. Which one do you prefer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, i see! that's great, I think these anchors are fine

fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let notifications = Arc::new(broadcast::channel(1).0);

let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {
notifications: notifications.clone(),
}))
.serve("[::1]:10000".parse().unwrap());

let handle = builder
.node(EthereumNode::default())
.install_exex("remote-exex", |ctx| async move { Ok(remote_exex(ctx, notifications)) })
.launch()
.await?;

handle.node.task_executor.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});

handle.wait_for_node_exit().await
})
}
// ANCHOR_END: snippet
40 changes: 40 additions & 0 deletions book/sources/exex/remote/src/exex_1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use remote_exex::proto::{
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
};
use reth_node_ethereum::EthereumNode;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};

struct ExExService {}

#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<proto::ExExNotification, Status>>;

async fn subscribe(
&self,
_request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (_tx, rx) = mpsc::channel(1);

Ok(Response::new(ReceiverStream::new(rx)))
}
}

fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {}))
.serve("[::1]:10000".parse().unwrap());

let handle = builder.node(EthereumNode::default()).launch().await?;

handle.node.task_executor.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});

handle.wait_for_node_exit().await
})
}
Loading
Loading