Skip to content

Commit

Permalink
[core] remove import thread (#36293)
Browse files Browse the repository at this point in the history
Why are these changes needed?
import thread is no longer needed, remove it.
  • Loading branch information
scv119 authored Jun 13, 2023
1 parent 07a5bf0 commit b737a02
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 213 deletions.
3 changes: 0 additions & 3 deletions python/ray/_private/function_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,6 @@ def export_key(self, key):
# One optimization is that we can use importer counter since
# it's sure keys before this counter has been allocated.
with self._export_lock:
self._num_exported = max(
self._num_exported, self._worker.import_thread.num_imported
)
while True:
self._num_exported += 1
holder = make_export_key(
Expand Down
187 changes: 0 additions & 187 deletions python/ray/_private/import_thread.py

This file was deleted.

18 changes: 0 additions & 18 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from typing_extensions import Literal, Protocol

import ray
import ray._private.import_thread as import_thread
import ray._private.node
import ray._private.parameter
import ray._private.profiling as profiling
Expand Down Expand Up @@ -2248,21 +2247,6 @@ def connect(
" and will be removed in the future."
)

# Setup import thread and start the import thread
# if the worker has job_id initialized.
# Otherwise, defer the start up of
# import thread until job_id is initialized.
# (python/ray/_raylet.pyx maybe_initialize_job_config)
if mode not in (RESTORE_WORKER_MODE, SPILL_WORKER_MODE):
worker.import_thread = import_thread.ImportThread(
worker, mode, worker.threads_stopped
)
if (
worker.current_job_id != JobID.nil()
and ray._raylet.Config.start_python_importer_thread()
):
worker.import_thread.start()

# If this is a driver running in SCRIPT_MODE, start a thread to print error
# messages asynchronously in the background. Ideally the scheduler would
# push messages to the driver's worker service, but we ran into bugs when
Expand Down Expand Up @@ -2317,8 +2301,6 @@ def disconnect(exiting_interpreter=False):
worker.gcs_error_subscriber.close()
if hasattr(worker, "gcs_log_subscriber"):
worker.gcs_log_subscriber.close()
if hasattr(worker, "import_thread"):
worker.import_thread.join_import_thread()
if hasattr(worker, "listener_thread"):
worker.listener_thread.join()
if hasattr(worker, "logger_thread"):
Expand Down
3 changes: 0 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2102,9 +2102,6 @@ def maybe_initialize_job_config():
print(job_id_magic_token, end="")
print(job_id_magic_token, file=sys.stderr, end="")

# Only start import thread after job_config is initialized
ray._private.worker.start_import_thread()

job_config_initialized = True


Expand Down
3 changes: 1 addition & 2 deletions python/ray/tests/test_basic_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,9 @@ def get_kv_metrics():
b'cluster' b'CLUSTER_METADATA'
b'tracing' b'tracing_startup_hook'
b'fun' b'IsolatedExports:01000000:\x00\x00\x00\x00\x00\x00\x00\x01'
b'fun' b'RemoteFunction:01000000:'
"""
# !!!If you want to increase this number, please let ray-core knows this!!!
assert freqs["internal_kv_get"] == 4
assert freqs["internal_kv_get"] == 3


@pytest.mark.skipif(sys.platform == "win32", reason="Fails on Windows.")
Expand Down

0 comments on commit b737a02

Please sign in to comment.