Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(repo): Added panic hooks and reworked graceful shutdown #278

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions crates/fuel-streams-core/src/stream/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use async_nats::{
client::FlushErrorKind,
error,
jetstream::{
consumer::StreamErrorKind,
Expand Down Expand Up @@ -43,7 +42,4 @@ pub enum StreamError {

/// Failed to consume messages from stream
ConsumerMessages(#[from] error::Error<StreamErrorKind>),

/// failed to flush messages in the stream
StreamFlush(#[from] error::Error<FlushErrorKind>),
}
14 changes: 0 additions & 14 deletions crates/fuel-streams-core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,20 +301,6 @@ impl<S: Streamable> Stream<S> {
}
}

pub async fn flush_await(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx

&self,
client: &NatsClient,
) -> Result<(), StreamError> {
if client.is_connected() {
client
.nats_client
.flush()
.await
.map_err(StreamError::StreamFlush)?;
}
Ok(())
}

#[cfg(any(test, feature = "test-helpers"))]
pub async fn assert_has_stream(
&self,
Expand Down
87 changes: 55 additions & 32 deletions crates/fuel-streams-publisher/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,6 @@ impl Publisher {
})
}

pub async fn flush_await_all_streams(&self) -> anyhow::Result<()> {
let streams = [
self.streams.blocks.flush_await(&self.nats_client).boxed(),
self.streams
.transactions
.flush_await(&self.nats_client)
.boxed(),
];
try_join_all(streams).await?;
Ok(())
}

#[cfg(feature = "test-helpers")]
pub async fn default_with_publisher(
nats_client: &NatsClient,
Expand All @@ -189,6 +177,26 @@ impl Publisher {
&self.streams
}

fn set_panic_hook(&mut self) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overrides the default panic hook, so if the publisher panics, we won't get the panic message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will, the first thing I am doing in printing the stack above!

let nats_client = self.nats_client.clone();
let fuel_service = Arc::clone(&self.fuel_service);
std::panic::set_hook(Box::new(move |panic_info| {
let payload = panic_info
.payload()
.downcast_ref::<&str>()
.unwrap_or(&"Unknown panic");
tracing::error!("Publisher panicked with a message: {:?}", payload);
let handle = tokio::runtime::Handle::current();
let nats_client = nats_client.clone();
let fuel_service = Arc::clone(&fuel_service);
handle.spawn(async move {
Publisher::flush_await_all_streams(&nats_client).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing streams after panic is unreliable. async may be in a compromised state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a usual practice to at least attempt some cleanup. Usually if we die, we might just end up in a really bad state. Also , note that both methods flush_await_all_streams and stop_fuel do not throw exceptions, i.e. they cannot fail whatsoever.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you can do sync cleanup but not grab the Tokio runtime. Here's an example:

use tokio::runtime::Handle;
use std::panic;

fn set_panic_hook() {
    panic::set_hook(Box::new(|panic_info| {
        println!("Panic occurred: {}", panic_info);
        // can panic
        let handle = Handle::current();
        // can panic
        handle.spawn(async {
            println!("Performing async cleanup in panic hook...");
            // can panic
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            println!("Async cleanup completed.");
        });
    }));
}

#[tokio::main]
async fn main() {
    set_panic_hook();
    panic!("This is a test panic!");
}
Panic occurred: panicked at src/main.rs:27:5:
This is a test panic!
Performing async cleanup in panic hook...
<EOF>

Run it several times and you'll see that you may not even make it to

Performing async cleanup in panic hook...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lostman that happens because the owner process, main dies, right? At that point, we're leveraging the auto-destructor from Tokio -- so graceful shutdown of processes still happens and we simply need our operations to be recoverable from panics.

Copy link
Member

@Jurshsmith Jurshsmith Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it more fine-grained, we could use CancellationToken & TaskTracker and similar APIs (like tokio-graceful-shutdown) but since our operations are supposed to be idempotent, I think this solution is quite ergonomic. wdyt?

Publisher::stop_fuel_service(&fuel_service).await;
std::process::exit(1);
});
}));
}

async fn publish_block_data(
&self,
result: ImporterResult,
Expand All @@ -201,35 +209,50 @@ impl Publisher {

async fn shutdown_services_with_timeout(&self) -> anyhow::Result<()> {
tokio::time::timeout(GRACEFUL_SHUTDOWN_TIMEOUT, async {
tracing::info!("Flushing in-flight messages to nats ...");
match self.flush_await_all_streams().await {
Ok(_) => tracing::info!("Flushed in-flight messages to nats"),
Err(e) => tracing::error!(
"Flushing in-flight messages to nats failed: {:?}",
e
),
}

tracing::info!("Stopping fuel core ...");
match self
.fuel_service
.send_stop_signal_and_await_shutdown()
.await
{
Ok(state) => {
tracing::info!("Stopped fuel core. Status = {:?}", state)
}
Err(e) => tracing::error!("Stopping fuel core failed: {:?}", e),
}
Publisher::flush_await_all_streams(&self.nats_client).await;
Publisher::stop_fuel_service(&self.fuel_service).await;
})
.await?;

Ok(())
}

async fn flush_await_all_streams(nats_client: &NatsClient) {
tracing::info!("Flushing in-flight messages to nats ...");
match nats_client.nats_client.flush().await {
Ok(_) => {
tracing::info!("Flushed all streams successfully!");
}
Err(e) => {
tracing::error!("Failed to flush all streams: {:?}", e);
}
}
}

async fn stop_fuel_service(fuel_service: &Arc<FuelService>) {
if matches!(
fuel_service.state(),
fuel_core_services::State::Stopped
| fuel_core_services::State::Stopping
| fuel_core_services::State::StoppedWithError(_)
| fuel_core_services::State::NotStarted
) {
return;
}

tracing::info!("Stopping fuel core ...");
match fuel_service.send_stop_signal_and_await_shutdown().await {
Ok(state) => {
tracing::info!("Stopped fuel core. Status = {:?}", state)
}
Err(e) => tracing::error!("Stopping fuel core failed: {:?}", e),
}
}

pub async fn run(mut self) -> anyhow::Result<Self> {
let mut stop_handle = StopHandle::new();
stop_handle.spawn_signal_listener();
self.set_panic_hook();

let last_published_block = self
.streams
Expand Down
Loading