Skip to content

Commit

Permalink
fix: Fix the issue of actor migration panic caused by the in-place sc…
Browse files Browse the repository at this point in the history
…ale-down (#20316)

Signed-off-by: Shanicky Chen <peng@risingwave-labs.com>
  • Loading branch information
shanicky authored Jan 27, 2025
1 parent 6bf7184 commit ef07a49
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
8 changes: 5 additions & 3 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,13 +427,13 @@ impl GlobalBarrierWorkerContextImpl {
.collect();

if expired_worker_slots.is_empty() {
debug!("no expired worker slots, skipping.");
info!("no expired worker slots, skipping.");
return self.resolve_graph_info(None).await;
}

debug!("start migrate actors.");
info!("start migrate actors.");
let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
debug!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);

let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
.intersection(&active_worker_slots)
Expand Down Expand Up @@ -535,6 +535,8 @@ impl GlobalBarrierWorkerContextImpl {
warn!(?changed, "get worker changed or timed out. Retry migrate");
}

info!("migration plan {:?}", plan);

mgr.catalog_controller.migrate_actors(plan).await?;

info!("migrate actors succeed.");
Expand Down
16 changes: 11 additions & 5 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1125,12 +1125,17 @@ impl CatalogController {
.insert(*actor_id);
}

let expired_workers: HashSet<_> = plan.keys().map(|k| k.worker_id() as WorkerId).collect();
let expired_or_changed_workers: HashSet<_> =
plan.keys().map(|k| k.worker_id() as WorkerId).collect();

let mut actor_migration_plan = HashMap::new();
for (worker, fragment) in actor_locations {
if expired_workers.contains(&worker) {
for (_, actors) in fragment {
if expired_or_changed_workers.contains(&worker) {
for (fragment_id, actors) in fragment {
debug!(
"worker {} expired or changed, migrating fragment {}",
worker, fragment_id
);
let worker_slot_to_actor: HashMap<_, _> = actors
.iter()
.enumerate()
Expand All @@ -1140,8 +1145,9 @@ impl CatalogController {
.collect();

for (worker_slot, actor) in worker_slot_to_actor {
actor_migration_plan
.insert(actor, plan[&worker_slot].worker_id() as WorkerId);
if let Some(target) = plan.get(&worker_slot) {
actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
}
}
}
}
Expand Down

0 comments on commit ef07a49

Please sign in to comment.