-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Periodic L1 synchronization #96
Conversation
Minimum allowed coverage is Generated by 🐒 cobertura-action against 70c7057 |
80a2329
to
6897e36
Compare
…server' into feature/l1-synchronization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
solid work
kairos-server/src/lib.rs
Outdated
.initialize(config.casper_rpc.to_string(), config.casper_contract_hash) | ||
.await | ||
{ | ||
panic!("Event manager failed to initialize: {}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's propagate this error up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no "up" this is basically before we start the server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also simply do a map_err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map_err
introduced in 326b147.
kairos-server/src/l1_sync/service.rs
Outdated
} | ||
SyncCommand::TriggerSync(completion_ack) => { | ||
em.process_new_events().await?; | ||
let _ = completion_ack.send(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens on failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in f437b2e.
kairos-server/src/lib.rs
Outdated
.initialize(config.casper_rpc.to_string(), config.casper_contract_hash) | ||
.await | ||
{ | ||
panic!("Event manager failed to initialize: {}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no "up" this is basically before we start the server
kairos-server/src/lib.rs
Outdated
.initialize(config.casper_rpc.to_string(), config.casper_contract_hash) | ||
.await | ||
{ | ||
panic!("Event manager failed to initialize: {}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also simply do a map_err
kairos-server/src/lib.rs
Outdated
{ | ||
panic!("Event manager failed to initialize: {}", e); | ||
} | ||
l1_sync::interval_trigger::run(l1_sync_service).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are there 3 steps to run this service? Just make run take the server config and do all of it in run
or introduce three parameters i.e. batch_state
, contract_hash
and the casper_rpc
url.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Service must be created and initialized, then periodic trigger spawned - all that happens in run_l1_sync()
.
Btw. initialization is done directly now - 1b6cae2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kairos-server/src/l1_sync/service.rs
Outdated
match handle_command(command, event_manager.clone()).await { | ||
Ok(()) => {} | ||
Err(L1SyncError::UnexpectedError(e)) => panic!("Unrecoverable error: {}", e), | ||
Err(e) => tracing::error!("Transient error: {}", e), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be replaced with map_err
and then just match on the error type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 1fac715.
let result = sync_service.trigger_sync().await; | ||
|
||
if let Err(e) = result { | ||
tracing::error!("Unable to trigger sync: {}", e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: use map_err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in be86df8.
7a69feb
to
e5f9f8b
Compare
pub struct EventManager { | ||
next_event_id: u32, | ||
fetcher: Option<Fetcher>, | ||
schemas: Option<Schemas>, | ||
server_state: Arc<ServerStateInner>, | ||
} | ||
|
||
impl EventManager { | ||
pub fn new(server_state: Arc<ServerStateInner>) -> Self { | ||
EventManager { | ||
next_event_id: 0, | ||
fetcher: None, | ||
schemas: None, | ||
server_state, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where it makes sense to model the fetcher
and schemas
attributes as optionals and introduce a new type that basically sets up the user for using the api in a wrokng way, and moreover forces a user to have a two-step setup process ending up with these two possible use scenarios:
(Notice that the case fetcher == Some && schema == None
and fetcher == None && schema Some
is not even possible)
// correct
let event_manager = EventManager::new(); // fetcher == None && schema == None
event_manager.initialize(); // fetcher == Some(..) && schema == Some(..)
event_manager.process_new_events();
// wrong usage, but possible without any compiler warning
let event_manager = EventManager::new(); // fetcher == None && schema == None
event_manager.process_new_events(); // This will fail and I think I should not even be able to call this and a strong indicator is a missing fetcher and schema, which can both be used to prevent an eventuall call
// better: process_new_events is now just a function in the server crate and can belong semantically to the server
// the ServerState contains all the information like the `rpc_url`, `contract_hash`, `server_state`, even the `next_event_id`
pub fn process_new_events(server_state: &ServerState, callback: FnOnce -> Result) -> Result {
// initialize the fethcer. Could fail, hence result return type
// initialize the schema. Could fail, hence result return type
// call callback which injects any behavior a user might want to use which could potentially fail, hence Result return type of the callback
}
The latter being a single function call, which deals with all the details a user does not care about. The user only cares about what should happen when new events are observed, reducing the amount of knowledge to just implement the callback logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed partial construction pattern, so fetcher
and schemas
are no longer optional: 50a3654.
rpc_url: &str, | ||
contract_hash: &str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get the rpc_url from the server state. i.e. server_state.server_config.casper_rpc
same goes for the contract hash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kairos-server/src/lib.rs
Outdated
@@ -38,6 +40,35 @@ pub fn app_router(state: ServerState) -> Router { | |||
.with_state(state) | |||
} | |||
|
|||
pub async fn run_l1_sync(config: ServerConfig, server_state: Arc<ServerStateInner>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't take ownership here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config: ServerConfig
removed in 8c5cc97.
kairos-server/src/l1_sync/service.rs
Outdated
rpc_url: String, | ||
contract_hash: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same goes here as commented for the EventManager
both these are in the server_state.server_config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.enqueue_transaction(txn) | ||
.await | ||
.map_err(|e| L1SyncError::UnexpectedError(format!("unable to batch tx: {}", e)))?; | ||
self.next_event_id = i + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads odd to me, I would move this outside of the for loop, and set it to fetch_events_count
. We are not benefiting from updating that value immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to keep EventManager
state - next_event_id
- in sync with enqueued transactions. In other words if some transaction fails to batch, then manager does not consider event as processed.
Update: Counter gets incremented with every successfully processed event.
// TODO: Parse full transaction data from event, then push it to Data Availability layer. | ||
|
||
// TODO: Once we have ASN transaction, it should be converted and pushed into batch. | ||
let recipient: Vec<u8> = "cafebabe".into(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we mocking the deposit here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#121 got merged, so I introduced Deposit
event parsing in ff7b6c6.
However, recipient
field is still mocked. It is not currently possible to obtain depositor public key from event, as it contains account hash (unnecessarily wrapped in Key
)... This was already solved in kairos-tx
for contract PR - validation if explicitly given public key matches with caller account hash - so maybe we should reconsider merging it before demo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please fix the mocked recipient.
a639bde
to
8c5cc97
Compare
NOTE: We get `amount`, but handling `recipient` is not possible yet.
This PR adds periodic L1 synchronization 🔃, based on casper-event-toolkit.
Details
The core component is
EventManager
, which allows tracking events from a specified smart contract. The process begins with the setup of CES-related data - schemas, toolkit fetcher, and the number of events - usinginitialize()
. Subsequently,process_new_events()
checks for new events based on its internal counter,next_event_id
.L1SyncService
operates the event manager as a Tokio task and provides a safe API for shared access within an Axum server.Future work
main
, we will be able to parse deposit data from events - currently, this data is mocked - and push it to the batch manager (trie).Demo 🎥
l1-sync-demo5-cropped.mp4
If you want to reproduce it locally, please see the next section.
Test environment
I used dockerized NCTL with bunch of shell scripts to run this demo - all available in this gist.
1. Cleanup old resources and compile example smart contract
2. Run NCTL and deploy WASM
At this point you have contract hash that is ready to be used in the next steps.
3. Initialize Casper Event Standard
4. Call deposit/batch entrypoint to emit test events
Additional changes for demo
In order to record nice and quick demo, I updated subscribed to include relevant logs:
I also reduced synchronization interval from 30 to 10 seconds: