Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexerService::try_loop_sync should not run if ckb has received stop signal #4351

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion util/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ ckb-notify = { path = "../../notify", version = "= 0.114.0-pre" }
ckb-store = { path = "../../store", version = "= 0.114.0-pre" }
ckb-stop-handler = { path = "../stop-handler", version = "= 0.114.0-pre" }
ckb-async-runtime = { path = "../runtime", version = "= 0.114.0-pre" }
ckb-channel = { path = "../channel", version = "= 0.114.0-pre" }
rhai = { version = "1.10.0", features = ["no_function", "no_float", "no_module", "sync"]}
serde_json = "1.0"
numext-fixed-uint = "0.1"
Expand Down
20 changes: 11 additions & 9 deletions util/indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use ckb_jsonrpc_types::{
};
use ckb_logger::{error, info};
use ckb_notify::NotifyController;
use ckb_stop_handler::{new_crossbeam_exit_rx, new_tokio_exit_rx, CancellationToken};
use ckb_stop_handler::{has_received_stop_signal, new_tokio_exit_rx, CancellationToken};
use ckb_store::ChainStore;
use ckb_types::{
core::{self, BlockNumber},
Expand Down Expand Up @@ -153,14 +153,10 @@ impl IndexerService {
}
}
}
let stop_rx = new_crossbeam_exit_rx();
loop {
ckb_channel::select! {
recv(stop_rx) -> _ =>{
info!("apply_init_tip received exit signal, exit now");
break;
},
default() => {},
if has_received_stop_signal() {
info!("apply_init_tip received exit signal, exit now");
break;
}

if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
Expand Down Expand Up @@ -197,6 +193,11 @@ impl IndexerService {
CustomFilters::new(self.block_filter.as_deref(), self.cell_filter.as_deref()),
);
loop {
if has_received_stop_signal() {
info!("try_loop_sync received exit signal, exit now");
break;
}

if let Some((tip_number, tip_hash)) = indexer.tip().expect("get tip should be OK") {
match self.get_block_by_number(tip_number + 1) {
Some(block) => {
Expand Down Expand Up @@ -237,12 +238,13 @@ impl IndexerService {
let poll_service = self.clone();
self.async_handle.spawn(async move {
let _initial_finished = initial_syncing.await;
info!("initial_syncing finished");
if stop.is_cancelled() {
info!("Indexer received exit signal, cancel new_block_watcher task, exit now");
return;
}

info!("initial_syncing finished");

let mut new_block_watcher = notify_controller
.watch_new_block(SUBSCRIBER_NAME.to_string())
.await;
Expand Down
4 changes: 2 additions & 2 deletions util/stop-handler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! TODO(doc): @keroro520

pub use stop_register::{
broadcast_exit_signals, new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread,
wait_all_ckb_services_exit,
broadcast_exit_signals, has_received_stop_signal, new_crossbeam_exit_rx, new_tokio_exit_rx,
register_thread, wait_all_ckb_services_exit,
};

pub use tokio_util::sync::CancellationToken;
Expand Down
10 changes: 10 additions & 0 deletions util/stop-handler/src/stop_register.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ckb_channel::TrySendError;
use ckb_logger::{debug, info, trace, warn};
use ckb_util::Mutex;
use std::sync::atomic::AtomicBool;
use tokio_util::sync::CancellationToken;

struct CkbServiceHandles {
Expand Down Expand Up @@ -34,6 +35,9 @@ static CKB_HANDLES: once_cell::sync::Lazy<Mutex<CkbServiceHandles>> =
})
});

static RECEIVED_STOP_SIGNAL: once_cell::sync::Lazy<AtomicBool> =
once_cell::sync::Lazy::new(AtomicBool::default);

static TOKIO_EXIT: once_cell::sync::Lazy<CancellationToken> =
once_cell::sync::Lazy::new(CancellationToken::new);

Expand All @@ -52,9 +56,15 @@ pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> {
rx
}

/// Check if the ckb process has received stop signal
pub fn has_received_stop_signal() -> bool {
RECEIVED_STOP_SIGNAL.load(std::sync::atomic::Ordering::SeqCst)
}

/// Broadcast exit signals to all threads and all tokio tasks
pub fn broadcast_exit_signals() {
debug!("Received exit signal; broadcasting exit signal to all threads");
RECEIVED_STOP_SIGNAL.store(true, std::sync::atomic::Ordering::SeqCst);
TOKIO_EXIT.cancel();
CROSSBEAM_EXIT_SENDERS
.lock()
Expand Down
Loading