Skip to content

Commit

Permalink
fix(indexer/rebuild): switch to Tokio for threading in rebuild.rs (#2436
Browse files Browse the repository at this point in the history
)

Replace std::thread with tokio::task::spawn_blocking for producing and applying updates. This improves asynchronous handling and integrates better with the Tokio runtime.

fixed this when rebuild:

```shell
2024-08-14T21:58:40.926122Z INFO rooch::commands::indexer::commands::rebuild: indexer rebuild started
thread '<unnamed>' panicked at crates/rooch-indexer/src/indexer_reader.rs:119:13:
there is no reactor running, must be called from the context of a Tokio 1.x runtime
note: run with RUST_BACKTRACE=1 environment variable to display a backtrace
Indexer rebuild task finished(0 updates applied) in: 21.674956ms
Error: Produce updates error
```
  • Loading branch information
popcnt1 committed Aug 14, 2024
1 parent f23e0c1 commit 861d650
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions crates/rooch/src/commands/indexer/commands/rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, SyncSender};
use std::thread;
use std::time::Instant;

use anyhow::{Error, Result};
Expand Down Expand Up @@ -52,15 +51,16 @@ impl RebuildCommand {
let (indexer_store, indexer_reader, start_time) = self.init();
let (tx, rx) = mpsc::sync_channel(2);

let produce_updates_thread =
thread::spawn(move || produce_updates(tx, indexer_reader, input_path, batch_size));
let produce_updates_thread = tokio::task::spawn_blocking(move || {
produce_updates(tx, indexer_reader, input_path, batch_size)
});
let apply_updates_thread =
thread::spawn(move || apply_updates(rx, indexer_store, start_time));
tokio::task::spawn_blocking(move || apply_updates(rx, indexer_store, start_time));
let _ = produce_updates_thread
.join()
.await
.map_err(|_e| RoochError::from(Error::msg("Produce updates error".to_string())))?;
let _ = apply_updates_thread
.join()
.await
.map_err(|_e| RoochError::from(Error::msg("Apply updates error ".to_string())))?;

Ok(())
Expand Down

0 comments on commit 861d650

Please sign in to comment.