diff --git a/src/ray/common/asio/instrumented_io_context.h b/src/ray/common/asio/instrumented_io_context.h index 8a72bc5f7209..737193543462 100644 --- a/src/ray/common/asio/instrumented_io_context.h +++ b/src/ray/common/asio/instrumented_io_context.h @@ -33,6 +33,15 @@ class instrumented_io_context : public boost::asio::io_context { bool running() { return is_running_.load(); } + bool run_if_stopped(std::function callback) { + if (!is_running_.exchange(true)) { + callback(); + boost::asio::io_context::run(); + return true; + } + return false; + } + void run() { is_running_.store(true); boost::asio::io_context::run(); diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 1b2dbc49e361..b19e76645184 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -112,9 +112,8 @@ Status GcsClient::Connect(instrumented_io_context &io_service, // Run the IO service here to make the above call synchronous. // If it is already running, then wait for our particular callback // to be processed. - if (!io_service.running()) { - temporary_start.set_value(true); - io_service.run(); + if (io_service.run_if_stopped( + [&temporary_start]() { temporary_start.set_value(true); })) { io_service.restart(); } else { temporary_start.set_value(false);