-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathexex.rs
148 lines (129 loc) · 5.33 KB
/
exex.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
//! Support for launching execution extensions.
use std::{fmt, fmt::Debug};
use futures::future;
use reth_chain_state::ForkChoiceSubscriptions;
use reth_chainspec::EthChainSpec;
use reth_exex::{ExExContext, ExExHandle, ExExManager, ExExManagerHandle, Wal};
use reth_node_api::{FullNodeComponents, NodeTypes};
use reth_primitives::Head;
use reth_provider::CanonStateSubscriptions;
use reth_tracing::tracing::{debug, info};
use tracing::Instrument;
use crate::{common::WithConfigs, exex::BoxedLaunchExEx};
/// Can launch execution extensions.
pub struct ExExLauncher<Node: FullNodeComponents> {
head: Head,
extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
components: Node,
config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
}
impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
/// Create a new `ExExLauncher` with the given extensions.
pub const fn new(
head: Head,
components: Node,
extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
) -> Self {
Self { head, extensions, components, config_container }
}
/// Launches all execution extensions.
///
/// Spawns all extensions and returns the handle to the exex manager if any extensions are
/// installed.
pub async fn launch(self) -> eyre::Result<Option<ExExManagerHandle>> {
let Self { head, extensions, components, config_container } = self;
if extensions.is_empty() {
// nothing to launch
return Ok(None)
}
let mut exex_handles = Vec::with_capacity(extensions.len());
let mut exexes = Vec::with_capacity(extensions.len());
for (id, exex) in extensions {
// create a new exex handle
let (handle, events, notifications) = ExExHandle::new(
id.clone(),
head,
components.provider().clone(),
components.block_executor().clone(),
);
exex_handles.push(handle);
// create the launch context for the exex
let context = ExExContext {
head,
config: config_container.config.clone(),
reth_config: config_container.toml_config.clone(),
components: components.clone(),
events,
notifications,
};
let executor = components.task_executor().clone();
exexes.push(async move {
debug!(target: "reth::cli", id, "spawning exex");
let span = reth_tracing::tracing::info_span!("exex", id);
// init the exex
let exex = exex.launch(context).instrument(span.clone()).await.unwrap();
// spawn it as a crit task
executor.spawn_critical(
"exex",
async move {
info!(target: "reth::cli", "ExEx started");
match exex.await {
Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
Err(err) => panic!("ExEx {id} crashed: {err}"),
}
}
.instrument(span),
);
});
}
future::join_all(exexes).await;
// spawn exex manager
debug!(target: "reth::cli", "spawning exex manager");
// todo(onbjerg): rm magic number
let exex_wal = Wal::new(
config_container
.config
.datadir
.clone()
.resolve_datadir(config_container.config.chain.chain())
.exex_wal(),
)?;
let exex_manager = ExExManager::new(
exex_handles,
1024,
exex_wal,
components.provider().finalized_block_stream(),
);
let exex_manager_handle = exex_manager.handle();
components.task_executor().spawn_critical("exex manager", async move {
exex_manager.await.expect("exex manager crashed");
});
// send notifications from the blockchain tree to exex manager
let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
let mut handle = exex_manager_handle.clone();
components.task_executor().spawn_critical(
"exex manager blockchain tree notifications",
async move {
while let Ok(notification) = canon_state_notifications.recv().await {
handle
.send_async(notification.into())
.await
.expect("blockchain tree notification could not be sent to exex manager");
}
},
);
info!(target: "reth::cli", "ExEx Manager started");
Ok(Some(exex_manager_handle))
}
}
impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ExExLauncher")
.field("head", &self.head)
.field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
.field("components", &"...")
.field("config_container", &self.config_container)
.finish()
}
}