Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Remove leftover code from the import thread. Deprecate the ray config RAY_start_python_importer_thread #44036

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions python/ray/_private/function_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,10 @@ def __init__(self, worker):
# Deserialize an ActorHandle will call load_actor_class(). If a
# function closure captured an ActorHandle, the deserialization of the
# function will be:
# import_thread.py
# -> fetch_and_register_remote_function (acquire lock)
# -> _load_actor_class_from_gcs (acquire lock, too)
# So, the lock should be a reentrant lock.
self.lock = threading.RLock()
self.cv = threading.Condition(lock=self.lock)

self.execution_infos = {}
# This is the counter to keep track of how many keys have already
Expand Down Expand Up @@ -475,9 +473,6 @@ def _wait_for_function(self, function_descriptor, job_id: str, timeout=10):
job_id=job_id,
)
warning_sent = True
# Try importing in case the worker did not get notified, or the
# importer thread did not run.
self._worker.import_thread._do_importing()
time.sleep(0.001)

def export_actor_class(
Expand Down
13 changes: 0 additions & 13 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2507,19 +2507,6 @@ def disconnect(exiting_interpreter=False):
ray_actor._ActorClassMethodMetadata.reset_cache()


def start_import_thread():
"""Start the import thread if the worker is connected."""
worker = global_worker
worker.check_connected()

assert _mode() not in (
RESTORE_WORKER_MODE,
SPILL_WORKER_MODE,
), "import thread can not be used in IO workers."
if worker.import_thread and ray._raylet.Config.start_python_importer_thread():
worker.import_thread.start()


@contextmanager
def _changeproctitle(title, next_title):
if _mode() is not LOCAL_MODE:
Expand Down
2 changes: 0 additions & 2 deletions python/ray/includes/ray_config.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ cdef extern from "ray/common/ray_config.h" nogil:

c_bool gcs_grpc_based_pubsub() const

c_bool start_python_importer_thread() const

c_string REDIS_CA_CERT() const

c_string REDIS_CA_PATH() const
Expand Down
4 changes: 0 additions & 4 deletions python/ray/includes/ray_config.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ cdef class Config:
def record_ref_creation_sites():
return RayConfig.instance().record_ref_creation_sites()

@staticmethod
def start_python_importer_thread():
return RayConfig.instance().start_python_importer_thread()

@staticmethod
def REDIS_CA_CERT():
return RayConfig.instance().REDIS_CA_CERT()
Expand Down
26 changes: 0 additions & 26 deletions python/ray/tests/test_basic_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,32 +95,6 @@ def get_num_workers():
time.sleep(0.1)


def test_function_import_without_importer_thread(shutdown_only):
"""Test that without background importer thread, dependencies can still be
imported in workers."""
ray.init(
_system_config={
"start_python_importer_thread": False,
},
)

@ray.remote
def f():
import threading

assert threading.get_ident() == threading.main_thread().ident
# Make sure the importer thread is not running.
for thread in threading.enumerate():
assert "import" not in thread.name

@ray.remote
def g():
ray.get(f.remote())

ray.get(g.remote())
ray.get([g.remote() for _ in range(5)])


@pytest.mark.skipif(
sys.platform == "win32",
reason="Fork is only supported on *nix systems.",
Expand Down
4 changes: 1 addition & 3 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,7 @@ RAY_CONFIG(uint32_t, task_oom_retry_delay_base_ms, 1000)
/// Duration to wait between retrying to kill a task.
RAY_CONFIG(uint32_t, cancellation_retry_ms, 2000)

/// Whether to start a background thread to import Python dependencies eagerly.
/// When set to false, Python dependencies will still be imported, only when
/// they are needed.
/// DEPRECATED. No longer used anywhere.
RAY_CONFIG(bool, start_python_importer_thread, true)

/// Determines if forking in Ray actors / tasks are supported.
Expand Down
2 changes: 0 additions & 2 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,6 @@ WorkerPool::BuildProcessCommandArgs(const Language &language,
// Support forking in gRPC.
env.insert({"GRPC_ENABLE_FORK_SUPPORT", "True"});
env.insert({"GRPC_POLL_STRATEGY", "poll"});
// Make sure only the main thread is running in Python workers.
env.insert({"RAY_start_python_importer_thread", "0"});
}

return {std::move(worker_command_args), std::move(env)};
Expand Down
Loading