From 861d650a6dc955ac6426ef01f3b533d5a951255f Mon Sep 17 00:00:00 2001 From: popcnt <142196625+popcnt1@users.noreply.github.com> Date: Thu, 15 Aug 2024 06:34:26 +0800 Subject: [PATCH] fix(indexer/rebuild): switch to Tokio for threading in rebuild.rs (#2436) Replace std::thread with tokio::task::spawn_blocking for producing and applying updates. This improves asynchronous handling and integrates better with the Tokio runtime. fixed this when rebuild: ```shell 2024-08-14T21:58:40.926122Z INFO rooch::commands::indexer::commands::rebuild: indexer rebuild started thread '' panicked at crates/rooch-indexer/src/indexer_reader.rs:119:13: there is no reactor running, must be called from the context of a Tokio 1.x runtime note: run with RUST_BACKTRACE=1 environment variable to display a backtrace Indexer rebuild task finished(0 updates applied) in: 21.674956ms Error: Produce updates error ``` --- .../rooch/src/commands/indexer/commands/rebuild.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/rooch/src/commands/indexer/commands/rebuild.rs b/crates/rooch/src/commands/indexer/commands/rebuild.rs index f75424fbe4..fb606f1262 100644 --- a/crates/rooch/src/commands/indexer/commands/rebuild.rs +++ b/crates/rooch/src/commands/indexer/commands/rebuild.rs @@ -7,7 +7,6 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::mpsc; use std::sync::mpsc::{Receiver, SyncSender}; -use std::thread; use std::time::Instant; use anyhow::{Error, Result}; @@ -52,15 +51,16 @@ impl RebuildCommand { let (indexer_store, indexer_reader, start_time) = self.init(); let (tx, rx) = mpsc::sync_channel(2); - let produce_updates_thread = - thread::spawn(move || produce_updates(tx, indexer_reader, input_path, batch_size)); + let produce_updates_thread = tokio::task::spawn_blocking(move || { + produce_updates(tx, indexer_reader, input_path, batch_size) + }); let apply_updates_thread = - thread::spawn(move || apply_updates(rx, indexer_store, start_time)); + tokio::task::spawn_blocking(move || apply_updates(rx, indexer_store, start_time)); let _ = produce_updates_thread - .join() + .await .map_err(|_e| RoochError::from(Error::msg("Produce updates error".to_string())))?; let _ = apply_updates_thread - .join() + .await .map_err(|_e| RoochError::from(Error::msg("Apply updates error ".to_string())))?; Ok(())