Skip to content

Commit

Permalink
fix: correctly remove worker from pool
Browse files Browse the repository at this point in the history
Moved the logic to remove a worker from pool to worker_ctx, so it will
be done all instances. Previously, crashes would leave dead workers in
the pool.
  • Loading branch information
laktek committed May 16, 2023
1 parent 046b226 commit 502edf1
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
12 changes: 0 additions & 12 deletions crates/base/src/edge_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ impl EdgeRuntime {
self.curr_user_opts.worker_timeout_ms,
memory_limit_rx,
halt_isolate_tx,
self.curr_user_opts.key,
self.curr_user_opts.pool_msg_tx.clone(),
);

// add a callback when a worker reaches its memory limit
Expand Down Expand Up @@ -335,16 +333,6 @@ impl EdgeRuntime {
};
let call = rt.block_on(future);

// send a shutdown message back to user worker pool (so it stops sending requets to the
// worker)
if let Some(k) = key {
if let Some(tx) = pool_msg_tx {
if tx.send(UserWorkerMsgs::Shutdown(k)).is_err() {
error!("failed to send the shutdown signal to user worker pool");
}
}
};

// send a message to halt the isolate
if halt_isolate_tx.send(call).is_err() {
error!("failed to send the halt execution signal");
Expand Down
14 changes: 14 additions & 0 deletions crates/base/src/worker_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ pub async fn create_worker(
let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel::<Result<(), Error>>();
let (unix_stream_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStream>();

let (worker_key, pool_msg_tx) = match init_opts.conf.clone() {
EdgeContextOpts::UserWorker(worker_opts) => (worker_opts.key, worker_opts.pool_msg_tx),
EdgeContextOpts::MainWorker(_opts) => (None, None),
};

let _handle: thread::JoinHandle<Result<(), Error>> = thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand Down Expand Up @@ -60,6 +65,15 @@ pub async fn create_worker(
);
}

// remove the worker from pool
if let Some(k) = worker_key {
if let Some(tx) = pool_msg_tx {
if tx.send(UserWorkerMsgs::Shutdown(k)).is_err() {
error!("failed to send the shutdown signal to user worker pool");
}
}
}

Ok(())
});

Expand Down

0 comments on commit 502edf1

Please sign in to comment.