From edf38b83afb58bb9bc99bd95eba4981eab593a3b Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Fri, 15 Mar 2024 15:32:29 +0800 Subject: [PATCH] Remove leftover code from the import thread. Deprecate the ray config. Signed-off-by: Ruiyang Wang --- python/ray/_private/function_manager.py | 5 ----- python/ray/_private/worker.py | 13 ------------- python/ray/includes/ray_config.pxd | 2 -- python/ray/includes/ray_config.pxi | 4 ---- python/ray/tests/test_basic_4.py | 26 ------------------------- src/ray/common/ray_config_def.h | 4 +--- src/ray/raylet/worker_pool.cc | 2 -- 7 files changed, 1 insertion(+), 55 deletions(-) diff --git a/python/ray/_private/function_manager.py b/python/ray/_private/function_manager.py index 0a25f8cfdfc6d..ee414d0640aed 100644 --- a/python/ray/_private/function_manager.py +++ b/python/ray/_private/function_manager.py @@ -92,12 +92,10 @@ def __init__(self, worker): # Deserialize an ActorHandle will call load_actor_class(). If a # function closure captured an ActorHandle, the deserialization of the # function will be: - # import_thread.py # -> fetch_and_register_remote_function (acquire lock) # -> _load_actor_class_from_gcs (acquire lock, too) # So, the lock should be a reentrant lock. self.lock = threading.RLock() - self.cv = threading.Condition(lock=self.lock) self.execution_infos = {} # This is the counter to keep track of how many keys have already @@ -475,9 +473,6 @@ def _wait_for_function(self, function_descriptor, job_id: str, timeout=10): job_id=job_id, ) warning_sent = True - # Try importing in case the worker did not get notified, or the - # importer thread did not run. - self._worker.import_thread._do_importing() time.sleep(0.001) def export_actor_class( diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index faf532f0351b4..3b376e194fccd 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -2507,19 +2507,6 @@ def disconnect(exiting_interpreter=False): ray_actor._ActorClassMethodMetadata.reset_cache() -def start_import_thread(): - """Start the import thread if the worker is connected.""" - worker = global_worker - worker.check_connected() - - assert _mode() not in ( - RESTORE_WORKER_MODE, - SPILL_WORKER_MODE, - ), "import thread can not be used in IO workers." - if worker.import_thread and ray._raylet.Config.start_python_importer_thread(): - worker.import_thread.start() - - @contextmanager def _changeproctitle(title, next_title): if _mode() is not LOCAL_MODE: diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 88ea85d5c007e..6081c327c0683 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -61,8 +61,6 @@ cdef extern from "ray/common/ray_config.h" nogil: c_bool gcs_grpc_based_pubsub() const - c_bool start_python_importer_thread() const - c_string REDIS_CA_CERT() const c_string REDIS_CA_PATH() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index c517e819b866e..fc9b222b81e72 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -97,10 +97,6 @@ cdef class Config: def record_ref_creation_sites(): return RayConfig.instance().record_ref_creation_sites() - @staticmethod - def start_python_importer_thread(): - return RayConfig.instance().start_python_importer_thread() - @staticmethod def REDIS_CA_CERT(): return RayConfig.instance().REDIS_CA_CERT() diff --git a/python/ray/tests/test_basic_4.py b/python/ray/tests/test_basic_4.py index 3918239b840fc..47dae1fdeafb5 100644 --- a/python/ray/tests/test_basic_4.py +++ b/python/ray/tests/test_basic_4.py @@ -95,32 +95,6 @@ def get_num_workers(): time.sleep(0.1) -def test_function_import_without_importer_thread(shutdown_only): - """Test that without background importer thread, dependencies can still be - imported in workers.""" - ray.init( - _system_config={ - "start_python_importer_thread": False, - }, - ) - - @ray.remote - def f(): - import threading - - assert threading.get_ident() == threading.main_thread().ident - # Make sure the importer thread is not running. - for thread in threading.enumerate(): - assert "import" not in thread.name - - @ray.remote - def g(): - ray.get(f.remote()) - - ray.get(g.remote()) - ray.get([g.remote() for _ in range(5)]) - - @pytest.mark.skipif( sys.platform == "win32", reason="Fork is only supported on *nix systems.", diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 3ead71ebc18fb..072cf2b295968 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -427,9 +427,7 @@ RAY_CONFIG(uint32_t, task_oom_retry_delay_base_ms, 1000) /// Duration to wait between retrying to kill a task. RAY_CONFIG(uint32_t, cancellation_retry_ms, 2000) -/// Whether to start a background thread to import Python dependencies eagerly. -/// When set to false, Python dependencies will still be imported, only when -/// they are needed. +/// DEPRECATED. No longer used anywhere. RAY_CONFIG(bool, start_python_importer_thread, true) /// Determines if forking in Ray actors / tasks are supported. diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 388c84e6b0e0c..521232dc51ed1 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -422,8 +422,6 @@ WorkerPool::BuildProcessCommandArgs(const Language &language, // Support forking in gRPC. env.insert({"GRPC_ENABLE_FORK_SUPPORT", "True"}); env.insert({"GRPC_POLL_STRATEGY", "poll"}); - // Make sure only the main thread is running in Python workers. - env.insert({"RAY_start_python_importer_thread", "0"}); } return {std::move(worker_command_args), std::move(env)};