diff --git a/dashboard/modules/job/tests/test_job_agent.py b/dashboard/modules/job/tests/test_job_agent.py
index b6e1a838c378..37d51161ed77 100644
--- a/dashboard/modules/job/tests/test_job_agent.py
+++ b/dashboard/modules/job/tests/test_job_agent.py
@@ -532,7 +532,6 @@ def test_agent_logs_not_streamed_to_drivers():
     err_str = proc.stderr.read().decode("ascii")
 
     print(out_str, err_str)
-
     assert "(raylet)" not in out_str
     assert "(raylet)" not in err_str
 
diff --git a/python/ray/_private/gcs_aio_client.py b/python/ray/_private/gcs_aio_client.py
index f794fd7c7eee..d5502c6dcea1 100644
--- a/python/ray/_private/gcs_aio_client.py
+++ b/python/ray/_private/gcs_aio_client.py
@@ -56,9 +56,7 @@ def __init__(
         self._nums_reconnect_retry = nums_reconnect_retry
 
     def _connect(self):
-        print("vct connecting")
         self._gcs_client._connect()
-        print("vct connected")
 
     @property
     def address(self):
diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py
index 181396049f4e..cdf4dd079734 100644
--- a/python/ray/_private/node.py
+++ b/python/ray/_private/node.py
@@ -648,11 +648,7 @@ def _init_gcs_client(self):
             last_ex = None
             try:
                 gcs_address = self.gcs_address
-                client = GcsClient(
-                    address=gcs_address,
-                    cluster_id=self._ray_params.cluster_id,
-                )
-                self.cluster_id = client.get_cluster_id()
+                client = GcsClient(address=gcs_address)
                 if self.head:
                     # Send a simple request to make sure GCS is alive
                     # if it's a head node.
@@ -668,26 +664,19 @@ def _init_gcs_client(self):
                 time.sleep(1)
 
         if self._gcs_client is None:
-            if hasattr(self, "_logs_dir"):
-                with open(os.path.join(self._logs_dir, "gcs_server.err")) as err:
-                    # Use " C " or " E " to exclude the stacktrace.
-                    # This should work for most cases, especitally
-                    # it's when GCS is starting. Only display last 10 lines of logs.
-                    errors = [e for e in err.readlines() if " C " in e or " E " in e][
-                        -10:
-                    ]
-                error_msg = "\n" + "".join(errors) + "\n"
-                raise RuntimeError(
-                    f"Failed to {'start' if self.head else 'connect to'} GCS. "
-                    f" Last {len(errors)} lines of error files:"
-                    f"{error_msg}."
-                    f"Please check {os.path.join(self._logs_dir, 'gcs_server.out')}"
-                    " for details"
-                )
-            else:
-                raise RuntimeError(
-                    f"Failed to {'start' if self.head else 'connect to'} GCS."
-                )
+            with open(os.path.join(self._logs_dir, "gcs_server.err")) as err:
+                # Use " C " or " E " to exclude the stacktrace.
+                # This should work for most cases, especitally
+                # it's when GCS is starting. Only display last 10 lines of logs.
+                errors = [e for e in err.readlines() if " C " in e or " E " in e][-10:]
+            error_msg = "\n" + "".join(errors) + "\n"
+            raise RuntimeError(
+                f"Failed to {'start' if self.head else 'connect to'} GCS. "
+                f" Last {len(errors)} lines of error files:"
+                f"{error_msg}."
+                f"Please check {os.path.join(self._logs_dir, 'gcs_server.out')}"
+                " for details"
+            )
 
         ray.experimental.internal_kv._initialize_internal_kv(self._gcs_client)
 
@@ -1075,7 +1064,6 @@ def start_raylet(
             self._ray_params.node_manager_port,
             self._raylet_socket_name,
             self._plasma_store_socket_name,
-            self.cluster_id,
             self._ray_params.worker_path,
             self._ray_params.setup_worker_path,
             self._ray_params.storage,
diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py
index 538fe09a505d..d383f76370a4 100644
--- a/python/ray/_private/parameter.py
+++ b/python/ray/_private/parameter.py
@@ -127,7 +127,6 @@ class RayParams:
         env_vars: Override environment variables for the raylet.
         session_name: The name of the session of the ray cluster.
         webui: The url of the UI.
-        cluster_id: The cluster ID.
     """
 
     def __init__(
@@ -189,7 +188,6 @@ def __init__(
         env_vars: Optional[Dict[str, str]] = None,
         session_name: Optional[str] = None,
         webui: Optional[str] = None,
-        cluster_id: Optional[str] = None,
     ):
         self.redis_address = redis_address
         self.gcs_address = gcs_address
@@ -251,7 +249,6 @@ def __init__(
         self._enable_object_reconstruction = enable_object_reconstruction
         self.labels = labels
         self._check_usage()
-        self.cluster_id = cluster_id
 
         # Set the internal config options for object reconstruction.
         if enable_object_reconstruction:
diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py
index 996a2289cfa2..96671793928b 100644
--- a/python/ray/_private/services.py
+++ b/python/ray/_private/services.py
@@ -1357,7 +1357,6 @@ def start_raylet(
     node_manager_port: int,
     raylet_name: str,
     plasma_store_name: str,
-    cluster_id: str,
     worker_path: str,
     setup_worker_path: str,
     storage: str,
@@ -1539,7 +1538,6 @@ def start_raylet(
             f"--session-name={session_name}",
             f"--temp-dir={temp_dir}",
             f"--webui={webui}",
-            f"--cluster-id={cluster_id}",
         ]
     )
 
@@ -1645,7 +1643,6 @@ def start_raylet(
         f"--gcs-address={gcs_address}",
         f"--session-name={session_name}",
         f"--labels={labels_json_str}",
-        f"--cluster-id={cluster_id}",
     ]
 
     if is_head_node:
diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py
index 19e85bc97401..6b053b43e49d 100644
--- a/python/ray/_private/worker.py
+++ b/python/ray/_private/worker.py
@@ -1581,7 +1581,7 @@ def init(
                 spawn_reaper=False,
                 connect_only=True,
             )
-        except (ConnectionError, RuntimeError):
+        except ConnectionError:
             if gcs_address == ray._private.utils.read_ray_address(_temp_dir):
                 logger.info(
                     "Failed to connect to the default Ray cluster address at "
@@ -1590,7 +1590,7 @@ def init(
                     "address to connect to, run `ray stop` or restart Ray with "
                     "`ray start`."
                 )
-            raise ConnectionError
+            raise
 
     # Log a message to find the Ray address that we connected to and the
     # dashboard URL.
@@ -2262,7 +2262,6 @@ def connect(
         runtime_env_hash,
         startup_token,
         session_name,
-        node.cluster_id,
         "" if mode != SCRIPT_MODE else entrypoint,
         worker_launch_time_ms,
         worker_launched_time_ms,
diff --git a/python/ray/_private/workers/default_worker.py b/python/ray/_private/workers/default_worker.py
index 6b49b685f707..6000b4bd6fe2 100644
--- a/python/ray/_private/workers/default_worker.py
+++ b/python/ray/_private/workers/default_worker.py
@@ -17,12 +17,6 @@
 parser = argparse.ArgumentParser(
     description=("Parse addresses for the worker to connect to.")
 )
-parser.add_argument(
-    "--cluster-id",
-    required=True,
-    type=str,
-    help="the auto-generated ID of the cluster",
-)
 parser.add_argument(
     "--node-ip-address",
     required=True,
@@ -213,7 +207,6 @@
         gcs_address=args.gcs_address,
         session_name=args.session_name,
         webui=args.webui,
-        cluster_id=args.cluster_id,
     )
     node = ray._private.node.Node(
         ray_params,
diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd
index a465c15f952f..4099ac64645e 100644
--- a/python/ray/_raylet.pxd
+++ b/python/ray/_raylet.pxd
@@ -102,6 +102,7 @@ cdef class ObjectRef(BaseID):
 
     cdef CObjectID native(self)
 
+
 cdef class ActorID(BaseID):
     cdef CActorID data
 
diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx
index f4f92fe8f612..b4832b9302dd 100644
--- a/python/ray/_raylet.pyx
+++ b/python/ray/_raylet.pyx
@@ -129,9 +129,8 @@ from ray.includes.common cimport (
 )
 from ray.includes.unique_ids cimport (
     CActorID,
-    CClusterID,
-    CNodeID,
     CObjectID,
+    CNodeID,
     CPlacementGroupID,
     ObjectIDIndexType,
 )
@@ -2336,35 +2335,16 @@ cdef class GcsClient:
         shared_ptr[CPythonGcsClient] inner
         object address
         object _nums_reconnect_retry
-        CClusterID cluster_id
 
-    def __cinit__(self, address, nums_reconnect_retry=5, cluster_id=None):
+    def __cinit__(self, address, nums_reconnect_retry=5):
         cdef GcsClientOptions gcs_options = GcsClientOptions.from_gcs_address(address)
         self.inner.reset(new CPythonGcsClient(dereference(gcs_options.native())))
         self.address = address
         self._nums_reconnect_retry = nums_reconnect_retry
-        cdef c_string c_cluster_id
-        if cluster_id is None:
-            self.cluster_id = CClusterID.Nil()
-        else:
-            c_cluster_id = cluster_id
-            self.cluster_id = CClusterID.FromHex(c_cluster_id)
-        self._connect(5)
+        self._connect()
 
-    def _connect(self, timeout_s=None):
-        cdef:
-            int64_t timeout_ms = round(1000 * timeout_s) if timeout_s else -1
-            size_t num_retries = self._nums_reconnect_retry
-        with nogil:
-            status = self.inner.get().Connect(self.cluster_id, timeout_ms, num_retries)
-
-        check_status(status)
-        if self.cluster_id.IsNil():
-            self.cluster_id = self.inner.get().GetClusterId()
-            assert not self.cluster_id.IsNil()
-
-    def get_cluster_id(self):
-        return self.cluster_id.Hex().decode()
+    def _connect(self):
+        check_status(self.inner.get().Connect())
 
     @property
     def address(self):
@@ -2864,7 +2844,7 @@ cdef class CoreWorker:
                   node_ip_address, node_manager_port, raylet_ip_address,
                   local_mode, driver_name, stdout_file, stderr_file,
                   serialized_job_config, metrics_agent_port, runtime_env_hash,
-                  startup_token, session_name, cluster_id, entrypoint,
+                  startup_token, session_name, entrypoint,
                   worker_launch_time_ms, worker_launched_time_ms):
         self.is_local_mode = local_mode
 
@@ -2916,7 +2896,6 @@ cdef class CoreWorker:
         options.runtime_env_hash = runtime_env_hash
         options.startup_token = startup_token
         options.session_name = session_name
-        options.cluster_id = CClusterID.FromHex(cluster_id)
         options.entrypoint = entrypoint
         options.worker_launch_time_ms = worker_launch_time_ms
         options.worker_launched_time_ms = worker_launched_time_ms
diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py
index 34db56f511f9..9f22f283b54f 100644
--- a/python/ray/autoscaler/_private/monitor.py
+++ b/python/ray/autoscaler/_private/monitor.py
@@ -150,7 +150,6 @@ def __init__(
             gcs_channel
         )
         worker = ray._private.worker.global_worker
-        # TODO: eventually plumb ClusterID through to here
         gcs_client = GcsClient(address=self.gcs_address)
 
         if monitor_ip:
diff --git a/python/ray/autoscaler/v2/BUILD b/python/ray/autoscaler/v2/BUILD
index 839f29587e74..ddc043706ab8 100644
--- a/python/ray/autoscaler/v2/BUILD
+++ b/python/ray/autoscaler/v2/BUILD
@@ -80,7 +80,7 @@ py_test(
 
 py_test(
     name = "test_sdk",
-    size = "medium",
+    size = "small",
     srcs = ["tests/test_sdk.py"],
     tags = ["team:core", "exclusive"],
     deps = ["//:ray_lib", ":conftest"],
diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd
index 1f8c25ee525a..66c54b12f285 100644
--- a/python/ray/includes/common.pxd
+++ b/python/ray/includes/common.pxd
@@ -12,7 +12,6 @@ from ray.includes.optional cimport (
 from ray.includes.unique_ids cimport (
     CActorID,
     CJobID,
-    CClusterID,
     CWorkerID,
     CObjectID,
     CTaskID,
@@ -368,10 +367,8 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
     cdef cppclass CPythonGcsClient "ray::gcs::PythonGcsClient":
         CPythonGcsClient(const CGcsClientOptions &options)
 
-        CRayStatus Connect(
-            const CClusterID &cluster_id,
-            int64_t timeout_ms,
-            size_t num_retries)
+        CRayStatus Connect()
+
         CRayStatus CheckAlive(
             const c_vector[c_string] &raylet_addresses,
             int64_t timeout_ms,
@@ -408,7 +405,6 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
         CRayStatus GetClusterStatus(
             int64_t timeout_ms,
             c_string &serialized_reply)
-        CClusterID GetClusterId()
         CRayStatus DrainNode(
             const c_string &node_id,
             int32_t reason,
@@ -416,6 +412,7 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
             int64_t timeout_ms,
             c_bool &is_accepted)
 
+
 cdef extern from "ray/gcs/gcs_client/gcs_client.h" namespace "ray::gcs" nogil:
     unordered_map[c_string, double] PythonGetResourcesTotal(
         const CGcsNodeInfo& node_info)
diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd
index 1f52bbea0af0..3afb811405d2 100644
--- a/python/ray/includes/libcoreworker.pxd
+++ b/python/ray/includes/libcoreworker.pxd
@@ -13,7 +13,6 @@ from libcpp.vector cimport vector as c_vector
 
 from ray.includes.unique_ids cimport (
     CActorID,
-    CClusterID,
     CNodeID,
     CJobID,
     CTaskID,
@@ -360,7 +359,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
         c_bool connect_on_start
         int runtime_env_hash
         int startup_token
-        CClusterID cluster_id
         c_string session_name
         c_string entrypoint
         int64_t worker_launch_time_ms
diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd
index cdb9b58d9188..2fb14e6322c0 100644
--- a/python/ray/includes/unique_ids.pxd
+++ b/python/ray/includes/unique_ids.pxd
@@ -7,9 +7,6 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
         @staticmethod
         T FromBinary(const c_string &binary)
 
-        @staticmethod
-        T FromHex(const c_string &hex)
-
         @staticmethod
         const T Nil()
 
@@ -157,17 +154,6 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
 
         CTaskID TaskId() const
 
-    cdef cppclass CClusterID "ray::ClusterID"(CUniqueID):
-
-        @staticmethod
-        CClusterID FromHex(const c_string &hex_str)
-
-        @staticmethod
-        CClusterID FromRandom()
-
-        @staticmethod
-        const CClusterID Nil()
-
     cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID):
 
         @staticmethod
diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD
index 0dec9a638c94..1ee484fa9bcb 100644
--- a/python/ray/serve/BUILD
+++ b/python/ray/serve/BUILD
@@ -677,7 +677,7 @@ py_test(
 
 py_test(
     name = "test_callback",
-    size = "medium",
+    size = "small",
     srcs = serve_tests_srcs,
     tags = ["exclusive", "team:serve"],
     deps = [":serve_lib"],
diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD
index 7c78c3c3da16..6e962187372e 100644
--- a/python/ray/tests/BUILD
+++ b/python/ray/tests/BUILD
@@ -211,6 +211,7 @@ py_test_module_list(
     "test_unhandled_error.py",
     "test_utils.py",
     "test_widgets.py",
+    "test_node_labels.py",
   ],
   size = "small",
   tags = ["exclusive", "small_size_python_tests", "team:core"],
@@ -221,7 +222,6 @@ py_test_module_list(
   files = [
     "test_gcs_ha_e2e.py",
     "test_memory_pressure.py",
-    "test_node_labels.py",
   ],
   size = "medium",
   tags = ["exclusive", "team:core", "xcommit"],
diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py
index 861349774ec7..0b6b7210157e 100644
--- a/python/ray/tests/test_autoscaler.py
+++ b/python/ray/tests/test_autoscaler.py
@@ -3514,9 +3514,6 @@ class FaultyAutoscaler:
             def __init__(self, *args, **kwargs):
                 raise AutoscalerInitFailException
 
-        prev_port = os.environ.get("RAY_GCS_SERVER_PORT")
-        os.environ["RAY_GCS_SERVER_PORT"] = "12345"
-        ray.init()
         with patch("ray._private.utils.publish_error_to_driver") as mock_publish:
             with patch.multiple(
                 "ray.autoscaler._private.monitor",
@@ -3524,17 +3521,11 @@ def __init__(self, *args, **kwargs):
                 _internal_kv_initialized=Mock(return_value=False),
             ):
                 monitor = Monitor(
-                    address="localhost:12345",
-                    autoscaling_config="",
-                    log_dir=self.tmpdir,
+                    address="here:12345", autoscaling_config="", log_dir=self.tmpdir
                 )
                 with pytest.raises(AutoscalerInitFailException):
                     monitor.run()
                 mock_publish.assert_called_once()
-        if prev_port is not None:
-            os.environ["RAY_GCS_SERVER_PORT"] = prev_port
-        else:
-            del os.environ["RAY_GCS_SERVER_PORT"]
 
     def testInitializeSDKArguments(self):
         # https://github.com/ray-project/ray/issues/23166
diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py
index fcd260a77307..869c9ab648fe 100644
--- a/python/ray/tests/test_gcs_fault_tolerance.py
+++ b/python/ray/tests/test_gcs_fault_tolerance.py
@@ -433,16 +433,13 @@ def test_gcs_aio_client_reconnect(
     passed = [False]
 
     async def async_kv_get():
+        gcs_aio_client = gcs_utils.GcsAioClient(
+            address=gcs_address, nums_reconnect_retry=20 if auto_reconnect else 0
+        )
         if not auto_reconnect:
             with pytest.raises(Exception):
-                gcs_aio_client = gcs_utils.GcsAioClient(
-                    address=gcs_address, nums_reconnect_retry=0
-                )
                 await gcs_aio_client.internal_kv_get(b"a", None)
         else:
-            gcs_aio_client = gcs_utils.GcsAioClient(
-                address=gcs_address, nums_reconnect_retry=20
-            )
             assert await gcs_aio_client.internal_kv_get(b"a", None) == b"b"
         return True
 
diff --git a/python/ray/tests/test_ray_init_2.py b/python/ray/tests/test_ray_init_2.py
index 7118873288bf..55821eeebf1b 100644
--- a/python/ray/tests/test_ray_init_2.py
+++ b/python/ray/tests/test_ray_init_2.py
@@ -271,7 +271,7 @@ def verify():
         return True
 
     try:
-        wait_for_condition(verify, timeout=15, retry_interval_ms=2000)
+        wait_for_condition(verify, timeout=10, retry_interval_ms=2000)
     finally:
         proc.terminate()
         proc.wait()
diff --git a/python/ray/train/BUILD b/python/ray/train/BUILD
index 820c80aacb54..981217503d43 100644
--- a/python/ray/train/BUILD
+++ b/python/ray/train/BUILD
@@ -326,14 +326,6 @@ py_test(
     deps = [":train_lib", ":conftest"]
 )
 
-py_test(
-    name = "test_gpu_2",
-    size = "medium",
-    srcs = ["tests/test_gpu_2.py"],
-    tags = ["team:ml", "exclusive", "gpu_only"],
-    deps = [":train_lib", ":conftest"]
-)
-
 py_test(
     name = "test_gpu_auto_transfer",
     size = "medium",
diff --git a/python/ray/train/tests/test_gpu.py b/python/ray/train/tests/test_gpu.py
index a5ac24e90c3e..7ce50a9a153a 100644
--- a/python/ray/train/tests/test_gpu.py
+++ b/python/ray/train/tests/test_gpu.py
@@ -3,6 +3,7 @@
 
 from unittest.mock import patch
 import pytest
+import numpy as np
 import torch
 import torchvision
 from torch.nn.parallel import DistributedDataParallel
@@ -358,6 +359,35 @@ def train_fn():
     assert isinstance(exc_info.value.__cause__, RayTaskError)
 
 
+@pytest.mark.parametrize("use_gpu", (True, False))
+def test_torch_iter_torch_batches_auto_device(ray_start_4_cpus_2_gpus, use_gpu):
+    """
+    Tests that iter_torch_batches in TorchTrainer worker function uses the
+    default device.
+    """
+
+    def train_fn():
+        dataset = train.get_dataset_shard("train")
+        for batch in dataset.iter_torch_batches(dtypes=torch.float, device="cpu"):
+            assert str(batch["data"].device) == "cpu"
+
+        # Autodetect
+        for batch in dataset.iter_torch_batches(dtypes=torch.float):
+            assert str(batch["data"].device) == str(train.torch.get_device())
+
+    dataset = ray.data.from_numpy(np.array([[1, 2, 3, 4, 5], [1, 2, 3, 4, 5]]).T)
+    # Test that this works outside a Train function
+    for batch in dataset.iter_torch_batches(dtypes=torch.float, device="cpu"):
+        assert str(batch["data"].device) == "cpu"
+
+    trainer = TorchTrainer(
+        train_fn,
+        scaling_config=ScalingConfig(num_workers=2, use_gpu=use_gpu),
+        datasets={"train": dataset},
+    )
+    trainer.fit()
+
+
 if __name__ == "__main__":
     import sys
 
diff --git a/python/ray/train/tests/test_gpu_2.py b/python/ray/train/tests/test_gpu_2.py
deleted file mode 100644
index 1889a7d3349e..000000000000
--- a/python/ray/train/tests/test_gpu_2.py
+++ /dev/null
@@ -1,72 +0,0 @@
-import pytest
-import numpy as np
-import torch
-
-import ray
-import ray.data
-from ray import tune
-
-import ray.train as train
-from ray.air.config import ScalingConfig
-from ray.train.examples.pytorch.torch_linear_example import LinearDataset
-from ray.train.torch.torch_trainer import TorchTrainer
-
-
-class LinearDatasetDict(LinearDataset):
-    """Modifies the LinearDataset to return a Dict instead of a Tuple."""
-
-    def __getitem__(self, index):
-        return {"x": self.x[index, None], "y": self.y[index, None]}
-
-
-class NonTensorDataset(LinearDataset):
-    """Modifies the LinearDataset to also return non-tensor objects."""
-
-    def __getitem__(self, index):
-        return {"x": self.x[index, None], "y": 2}
-
-
-# Currently in DataParallelTrainers we only report metrics from rank 0.
-# For testing purposes here, we need to be able to report from all
-# workers.
-class TorchTrainerPatchedMultipleReturns(TorchTrainer):
-    def _report(self, training_iterator) -> None:
-        for results in training_iterator:
-            tune.report(results=results)
-
-
-@pytest.mark.parametrize("use_gpu", (True, False))
-def test_torch_iter_torch_batches_auto_device(ray_start_4_cpus_2_gpus, use_gpu):
-    """
-    Tests that iter_torch_batches in TorchTrainer worker function uses the
-    default device.
-    """
-
-    def train_fn():
-        dataset = train.get_dataset_shard("train")
-        for batch in dataset.iter_torch_batches(dtypes=torch.float, device="cpu"):
-            assert str(batch["data"].device) == "cpu"
-
-        # Autodetect
-        for batch in dataset.iter_torch_batches(dtypes=torch.float):
-            assert str(batch["data"].device) == str(train.torch.get_device())
-
-    dataset = ray.data.from_numpy(np.array([[1, 2, 3, 4, 5], [1, 2, 3, 4, 5]]).T)
-    # Test that this works outside a Train function
-    for batch in dataset.iter_torch_batches(dtypes=torch.float, device="cpu"):
-        assert str(batch["data"].device) == "cpu"
-
-    trainer = TorchTrainer(
-        train_fn,
-        scaling_config=ScalingConfig(num_workers=2, use_gpu=use_gpu),
-        datasets={"train": dataset},
-    )
-    trainer.fit()
-
-
-if __name__ == "__main__":
-    import sys
-
-    import pytest
-
-    sys.exit(pytest.main(["-v", "-x", "-s", __file__]))
diff --git a/python/ray/tune/tests/test_progress_reporter.py b/python/ray/tune/tests/test_progress_reporter.py
index 87855f66ca3c..d1e8a0c93119 100644
--- a/python/ray/tune/tests/test_progress_reporter.py
+++ b/python/ray/tune/tests/test_progress_reporter.py
@@ -689,9 +689,7 @@ def testEndToEndReporting(self):
                 if os.environ.get("TUNE_NEW_EXECUTION") == "0":
                     assert EXPECTED_END_TO_END_START in output
                 assert EXPECTED_END_TO_END_END in output
-                for line in output.splitlines():
-                    if "(raylet)" in line:
-                        assert "cluster ID" in line, "Unexpected raylet log messages"
+                assert "(raylet)" not in output, "Unexpected raylet log messages"
             except Exception:
                 print("*** BEGIN OUTPUT ***")
                 print(output)
diff --git a/src/mock/ray/gcs/gcs_client/gcs_client.h b/src/mock/ray/gcs/gcs_client/gcs_client.h
index cf232a712f51..e7b687d04e7d 100644
--- a/src/mock/ray/gcs/gcs_client/gcs_client.h
+++ b/src/mock/ray/gcs/gcs_client/gcs_client.h
@@ -31,10 +31,7 @@ namespace gcs {
 
 class MockGcsClient : public GcsClient {
  public:
-  MOCK_METHOD(Status,
-              Connect,
-              (instrumented_io_context & io_service, const ClusterID &cluster_id),
-              (override));
+  MOCK_METHOD(Status, Connect, (instrumented_io_context & io_service), (override));
   MOCK_METHOD(void, Disconnect, (), (override));
   MOCK_METHOD((std::pair<std::string, int>), GetGcsServerAddress, (), (const, override));
   MOCK_METHOD(std::string, DebugString, (), (const, override));
diff --git a/src/ray/common/id.h b/src/ray/common/id.h
index bca6c66492ef..7c5f430830ab 100644
--- a/src/ray/common/id.h
+++ b/src/ray/common/id.h
@@ -395,7 +395,6 @@ std::ostream &operator<<(std::ostream &os, const PlacementGroupID &id);
     type() : UniqueID() {}                                                               \
     static type FromRandom() { return type(UniqueID::FromRandom()); }                    \
     static type FromBinary(const std::string &binary) { return type(binary); }           \
-    static type FromHex(const std::string &hex) { return type(UniqueID::FromHex(hex)); } \
     static type Nil() { return type(UniqueID::Nil()); }                                  \
     static constexpr size_t Size() { return kUniqueIDSize; }                             \
                                                                                          \
@@ -415,6 +414,27 @@ std::ostream &operator<<(std::ostream &os, const PlacementGroupID &id);
 // Restore the compiler alignment to default (8 bytes).
 #pragma pack(pop)
 
+struct SafeClusterID {
+ private:
+  mutable absl::Mutex m_;
+  ClusterID id_ GUARDED_BY(m_);
+
+ public:
+  SafeClusterID(const ClusterID &id) : id_(id) {}
+
+  const ClusterID load() const {
+    absl::MutexLock l(&m_);
+    return id_;
+  }
+
+  ClusterID exchange(const ClusterID &newId) {
+    absl::MutexLock l(&m_);
+    ClusterID old = id_;
+    id_ = newId;
+    return old;
+  }
+};
+
 template <typename T>
 BaseID<T>::BaseID() {
   // Using const_cast to directly change data is dangerous. The cached
diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc
index c9ac1752063f..65737cac0793 100644
--- a/src/ray/core_worker/core_worker.cc
+++ b/src/ray/core_worker/core_worker.cc
@@ -229,7 +229,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
 
   gcs_client_ = std::make_shared<gcs::GcsClient>(options_.gcs_options, GetWorkerID());
 
-  RAY_CHECK_OK(gcs_client_->Connect(io_service_, options_.cluster_id));
+  RAY_CHECK_OK(gcs_client_->Connect(io_service_));
   RegisterToGcs(options_.worker_launch_time_ms, options_.worker_launched_time_ms);
 
   // Initialize the task state event buffer.
diff --git a/src/ray/core_worker/core_worker_options.h b/src/ray/core_worker/core_worker_options.h
index 05623bb25d36..be1b5e002775 100644
--- a/src/ray/core_worker/core_worker_options.h
+++ b/src/ray/core_worker/core_worker_options.h
@@ -92,7 +92,6 @@ struct CoreWorkerOptions {
         metrics_agent_port(-1),
         connect_on_start(true),
         runtime_env_hash(0),
-        cluster_id(ClusterID::Nil()),
         session_name(""),
         entrypoint(""),
         worker_launch_time_ms(-1),
@@ -183,8 +182,6 @@ struct CoreWorkerOptions {
   /// may not have the same pid as the process the worker pool
   /// starts (due to shim processes).
   StartupToken startup_token{0};
-  /// Cluster ID associated with the core worker.
-  ClusterID cluster_id;
   /// The function to allocate a new object for the memory store.
   /// This allows allocating the objects in the language frontend's memory.
   /// For example, for the Java worker, we can allocate the objects in the JVM heap
diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc
index 4fcc2fad191f..dfc4b2c5cfd9 100644
--- a/src/ray/gcs/gcs_client/gcs_client.cc
+++ b/src/ray/gcs/gcs_client/gcs_client.cc
@@ -14,8 +14,6 @@
 
 #include "ray/gcs/gcs_client/gcs_client.h"
 
-#include <chrono>
-#include <thread>
 #include <utility>
 
 #include "ray/common/ray_config.h"
@@ -83,10 +81,9 @@ void GcsSubscriberClient::PubsubCommandBatch(
 GcsClient::GcsClient(const GcsClientOptions &options, UniqueID gcs_client_id)
     : options_(options), gcs_client_id_(gcs_client_id) {}
 
-Status GcsClient::Connect(instrumented_io_context &io_service,
-                          const ClusterID &cluster_id) {
+Status GcsClient::Connect(instrumented_io_context &io_service) {
   // Connect to gcs service.
-  client_call_manager_ = std::make_unique<rpc::ClientCallManager>(io_service, cluster_id);
+  client_call_manager_ = std::make_unique<rpc::ClientCallManager>(io_service);
   gcs_rpc_client_ = std::make_shared<rpc::GcsRpcClient>(
       options_.gcs_address_, options_.gcs_port_, *client_call_manager_);
 
@@ -146,7 +143,9 @@ std::pair<std::string, int> GcsClient::GetGcsServerAddress() const {
   return gcs_rpc_client_->GetAddress();
 }
 
-PythonGcsClient::PythonGcsClient(const GcsClientOptions &options) : options_(options) {
+PythonGcsClient::PythonGcsClient(const GcsClientOptions &options) : options_(options) {}
+
+Status PythonGcsClient::Connect() {
   channel_ =
       rpc::GcsRpcClient::CreateGcsChannel(options_.gcs_address_, options_.gcs_port_);
   kv_stub_ = rpc::InternalKVGcsService::NewStub(channel_);
@@ -154,59 +153,27 @@ PythonGcsClient::PythonGcsClient(const GcsClientOptions &options) : options_(opt
   node_info_stub_ = rpc::NodeInfoGcsService::NewStub(channel_);
   job_info_stub_ = rpc::JobInfoGcsService::NewStub(channel_);
   autoscaler_stub_ = rpc::autoscaler::AutoscalerStateService::NewStub(channel_);
+  return Status::OK();
 }
 
-namespace {
 Status HandleGcsError(rpc::GcsStatus status) {
-  RAY_CHECK_NE(status.code(), static_cast<int>(StatusCode::OK));
+  RAY_CHECK(status.code() != static_cast<int>(StatusCode::OK));
   return Status::Invalid(status.message() +
                          " [GCS status code: " + std::to_string(status.code()) + "]");
 }
-}  // namespace
 
-Status PythonGcsClient::Connect(const ClusterID &cluster_id,
-                                int64_t timeout_ms,
-                                size_t num_retries) {
-  if (cluster_id.IsNil()) {
-    size_t tries = num_retries + 1;
-    RAY_CHECK(tries > 0) << "Expected positive retries, but got " << tries;
-
-    RAY_LOG(DEBUG) << "Retrieving cluster ID from GCS server.";
-    rpc::GetClusterIdRequest request;
-    rpc::GetClusterIdReply reply;
-
-    Status connect_status;
-    for (; tries > 0; tries--) {
-      grpc::ClientContext context;
-      PrepareContext(context, timeout_ms);
-      connect_status =
-          GrpcStatusToRayStatus(node_info_stub_->GetClusterId(&context, request, &reply));
-
-      if (connect_status.ok()) {
-        cluster_id_ = ClusterID::FromBinary(reply.cluster_id());
-        RAY_LOG(DEBUG) << "Received cluster ID from GCS server: " << cluster_id_;
-        RAY_CHECK(!cluster_id_.IsNil());
-        break;
-      } else if (!connect_status.IsGrpcError()) {
-        return HandleGcsError(reply.status());
-      }
-      std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-    }
-    RAY_RETURN_NOT_OK(connect_status);
-  } else {
-    cluster_id_ = cluster_id;
-    RAY_LOG(DEBUG) << "Client initialized with provided cluster ID: " << cluster_id_;
+void GrpcClientContextWithTimeoutMs(grpc::ClientContext &context, int64_t timeout_ms) {
+  if (timeout_ms != -1) {
+    context.set_deadline(std::chrono::system_clock::now() +
+                         std::chrono::milliseconds(timeout_ms));
   }
-
-  RAY_CHECK(!cluster_id_.IsNil()) << "Unexpected nil cluster ID.";
-  return Status::OK();
 }
 
 Status PythonGcsClient::CheckAlive(const std::vector<std::string> &raylet_addresses,
                                    int64_t timeout_ms,
                                    std::vector<bool> &result) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::CheckAliveRequest request;
   for (const auto &address : raylet_addresses) {
@@ -232,7 +199,7 @@ Status PythonGcsClient::InternalKVGet(const std::string &ns,
                                       int64_t timeout_ms,
                                       std::string &value) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::InternalKVGetRequest request;
   request.set_namespace_(ns);
@@ -259,7 +226,7 @@ Status PythonGcsClient::InternalKVMultiGet(
     int64_t timeout_ms,
     std::unordered_map<std::string, std::string> &result) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::InternalKVMultiGetRequest request;
   request.set_namespace_(ns);
@@ -291,7 +258,7 @@ Status PythonGcsClient::InternalKVPut(const std::string &ns,
                                       int64_t timeout_ms,
                                       int &added_num) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::InternalKVPutRequest request;
   request.set_namespace_(ns);
@@ -318,7 +285,7 @@ Status PythonGcsClient::InternalKVDel(const std::string &ns,
                                       int64_t timeout_ms,
                                       int &deleted_num) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::InternalKVDelRequest request;
   request.set_namespace_(ns);
@@ -343,7 +310,7 @@ Status PythonGcsClient::InternalKVKeys(const std::string &ns,
                                        int64_t timeout_ms,
                                        std::vector<std::string> &results) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::InternalKVKeysRequest request;
   request.set_namespace_(ns);
@@ -367,7 +334,7 @@ Status PythonGcsClient::InternalKVExists(const std::string &ns,
                                          int64_t timeout_ms,
                                          bool &exists) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::InternalKVExistsRequest request;
   request.set_namespace_(ns);
@@ -390,7 +357,7 @@ Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri,
                                          int expiration_s,
                                          int64_t timeout_ms) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::PinRuntimeEnvURIRequest request;
   request.set_uri(uri);
@@ -418,7 +385,7 @@ Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri,
 Status PythonGcsClient::GetAllNodeInfo(int64_t timeout_ms,
                                        std::vector<rpc::GcsNodeInfo> &result) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::GetAllNodeInfoRequest request;
   rpc::GetAllNodeInfoReply reply;
@@ -438,7 +405,7 @@ Status PythonGcsClient::GetAllNodeInfo(int64_t timeout_ms,
 Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms,
                                       std::vector<rpc::JobTableData> &result) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::GetAllJobInfoRequest request;
   rpc::GetAllJobInfoReply reply;
@@ -460,7 +427,7 @@ Status PythonGcsClient::RequestClusterResourceConstraint(
     const std::vector<std::unordered_map<std::string, double>> &bundles,
     const std::vector<int64_t> &count_array) {
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   rpc::autoscaler::RequestClusterResourceConstraintRequest request;
   rpc::autoscaler::RequestClusterResourceConstraintReply reply;
@@ -491,7 +458,7 @@ Status PythonGcsClient::GetClusterStatus(int64_t timeout_ms,
   rpc::autoscaler::GetClusterStatusRequest request;
   rpc::autoscaler::GetClusterStatusReply reply;
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   grpc::Status status = autoscaler_stub_->GetClusterStatus(&context, request, &reply);
 
@@ -517,7 +484,7 @@ Status PythonGcsClient::DrainNode(const std::string &node_id,
   rpc::autoscaler::DrainNodeReply reply;
 
   grpc::ClientContext context;
-  PrepareContext(context, timeout_ms);
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
 
   grpc::Status status = autoscaler_stub_->DrainNode(&context, request, &reply);
 
@@ -549,10 +516,7 @@ Status PythonCheckGcsHealth(const std::string &gcs_address,
   auto channel = rpc::GcsRpcClient::CreateGcsChannel(gcs_address, gcs_port);
   auto stub = rpc::NodeInfoGcsService::NewStub(channel);
   grpc::ClientContext context;
-  if (timeout_ms != -1) {
-    context.set_deadline(std::chrono::system_clock::now() +
-                         std::chrono::milliseconds(timeout_ms));
-  }
+  GrpcClientContextWithTimeoutMs(context, timeout_ms);
   rpc::CheckAliveRequest request;
   rpc::CheckAliveReply reply;
   grpc::Status status = stub->CheckAlive(&context, request, &reply);
diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h
index 5773a0229b0b..318ef69d6763 100644
--- a/src/ray/gcs/gcs_client/gcs_client.h
+++ b/src/ray/gcs/gcs_client/gcs_client.h
@@ -84,8 +84,7 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
   /// \param instrumented_io_context IO execution service.
   ///
   /// \return Status
-  virtual Status Connect(instrumented_io_context &io_service,
-                         const ClusterID &cluster_id = ClusterID::Nil());
+  virtual Status Connect(instrumented_io_context &io_service);
 
   /// Disconnect with GCS Service. Non-thread safe.
   virtual void Disconnect();
@@ -192,7 +191,7 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
 class RAY_EXPORT PythonGcsClient {
  public:
   explicit PythonGcsClient(const GcsClientOptions &options);
-  Status Connect(const ClusterID &cluster_id, int64_t timeout_ms, size_t num_retries);
+  Status Connect();
 
   Status CheckAlive(const std::vector<std::string> &raylet_addresses,
                     int64_t timeout_ms,
@@ -242,20 +241,7 @@ class RAY_EXPORT PythonGcsClient {
                    int64_t timeout_ms,
                    bool &is_accepted);
 
-  const ClusterID &GetClusterId() const { return cluster_id_; }
-
  private:
-  void PrepareContext(grpc::ClientContext &context, int64_t timeout_ms) {
-    if (timeout_ms != -1) {
-      context.set_deadline(std::chrono::system_clock::now() +
-                           std::chrono::milliseconds(timeout_ms));
-    }
-    if (!cluster_id_.IsNil()) {
-      context.AddMetadata(kClusterIdKey, cluster_id_.Hex());
-    }
-  }
-
-  ClusterID cluster_id_;
   GcsClientOptions options_;
   std::unique_ptr<rpc::InternalKVGcsService::Stub> kv_stub_;
   std::unique_ptr<rpc::RuntimeEnvGcsService::Stub> runtime_env_stub_;
diff --git a/src/ray/object_manager/test/ownership_based_object_directory_test.cc b/src/ray/object_manager/test/ownership_based_object_directory_test.cc
index a326b2c9eb65..3b7624a604a3 100644
--- a/src/ray/object_manager/test/ownership_based_object_directory_test.cc
+++ b/src/ray/object_manager/test/ownership_based_object_directory_test.cc
@@ -101,8 +101,7 @@ class MockGcsClient : public gcs::GcsClient {
     return *node_accessor_;
   }
 
-  MOCK_METHOD2(Connect,
-               Status(instrumented_io_context &io_service, const ClusterID &cluster_id));
+  MOCK_METHOD1(Connect, Status(instrumented_io_context &io_service));
 
   MOCK_METHOD0(Disconnect, void());
 };
diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc
index bd74db30f2a3..65d5f672c5aa 100644
--- a/src/ray/raylet/main.cc
+++ b/src/ray/raylet/main.cc
@@ -69,8 +69,6 @@ DEFINE_int32(ray_debugger_external, 0, "Make Ray debugger externally accessible.
 DEFINE_int64(object_store_memory, -1, "The initial memory of the object store.");
 DEFINE_string(node_name, "", "The user-provided identifier or name for this node.");
 DEFINE_string(session_name, "", "Session name (ClusterID) of the cluster.");
-DEFINE_string(cluster_id, "", "ID of the cluster, separate from observability.");
-
 #ifdef __linux__
 DEFINE_string(plasma_directory,
               "/dev/shm",
@@ -155,10 +153,6 @@ int main(int argc, char *argv[]) {
   const std::string session_name = FLAGS_session_name;
   const bool is_head_node = FLAGS_head;
   const std::string labels_json_str = FLAGS_labels;
-
-  RAY_CHECK_NE(FLAGS_cluster_id, "") << "Expected cluster ID.";
-  ray::ClusterID cluster_id = ray::ClusterID::FromHex(FLAGS_cluster_id);
-  RAY_LOG(INFO) << "Setting cluster ID to: " << cluster_id;
   gflags::ShutDownCommandLineFlags();
 
   // Configuration for the node manager.
@@ -177,7 +171,7 @@ int main(int argc, char *argv[]) {
   ray::gcs::GcsClientOptions client_options(FLAGS_gcs_address);
   gcs_client = std::make_shared<ray::gcs::GcsClient>(client_options);
 
-  RAY_CHECK_OK(gcs_client->Connect(main_service, cluster_id));
+  RAY_CHECK_OK(gcs_client->Connect(main_service));
   std::unique_ptr<ray::raylet::Raylet> raylet;
 
   RAY_CHECK_OK(gcs_client->Nodes().AsyncGetInternalConfig(
diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h
index b0f998aa9e8d..98c1e519d3c5 100644
--- a/src/ray/rpc/client_call.h
+++ b/src/ray/rpc/client_call.h
@@ -194,7 +194,7 @@ class ClientCallManager {
                              const ClusterID &cluster_id = ClusterID::Nil(),
                              int num_threads = 1,
                              int64_t call_timeout_ms = -1)
-      : cluster_id_(cluster_id),
+      : cluster_id_(ClusterID::Nil()),
         main_service_(main_service),
         num_threads_(num_threads),
         shutdown_(false),
@@ -249,7 +249,7 @@ class ClientCallManager {
     }
 
     auto call = std::make_shared<ClientCallImpl<Reply>>(
-        callback, cluster_id_, std::move(stats_handle), method_timeout_ms);
+        callback, cluster_id_.load(), std::move(stats_handle), method_timeout_ms);
     // Send request.
     // Find the next completion queue to wait for response.
     call->response_reader_ = (stub.*prepare_async_function)(
@@ -267,6 +267,14 @@ class ClientCallManager {
     return call;
   }
 
+  void SetClusterId(const ClusterID &cluster_id) {
+    auto old_id = cluster_id_.exchange(ClusterID::Nil());
+    if (!old_id.IsNil() && (old_id != cluster_id)) {
+      RAY_LOG(FATAL) << "Expected cluster ID to be Nil or " << cluster_id << ", but got"
+                     << old_id;
+    }
+  }
+
   /// Get the main service of this rpc.
   instrumented_io_context &GetMainService() { return main_service_; }
 
@@ -320,7 +328,7 @@ class ClientCallManager {
 
   /// UUID of the cluster. Potential race between creating a ClientCall object
   /// and setting the cluster ID.
-  ClusterID cluster_id_;
+  SafeClusterID cluster_id_;
 
   /// The main event loop, to which the callback functions will be posted.
   instrumented_io_context &main_service_;
diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h
index 94b0ca4d1b9e..7c602cebda0a 100644
--- a/src/ray/rpc/gcs_server/gcs_rpc_client.h
+++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h
@@ -117,7 +117,7 @@ class Executor {
         }                                                                                \
         delete executor;                                                                 \
       } else {                                                                           \
-        /* In case of GCS failure, we queue the request and these requests will be */    \
+        /* In case of GCS failure, we queue the request and these requets will be */     \
         /* executed once GCS is back. */                                                 \
         gcs_is_down_ = true;                                                             \
         auto request_bytes = request.ByteSizeLong();                                     \