From 9781c6ca57935b0f7f99bac605c7e3f3332af15a Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 4 Sep 2024 15:28:12 -0700 Subject: [PATCH] [Core] Fix runtime env race condition when uploading the same package concurrently (#47482) Signed-off-by: Jiajun Yao --- python/ray/_private/runtime_env/packaging.py | 13 +++++++++--- .../tests/test_runtime_env_working_dir_4.py | 21 ++++++++++++++++++- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/runtime_env/packaging.py b/python/ray/_private/runtime_env/packaging.py index 7e386dba8607..091384c96921 100644 --- a/python/ray/_private/runtime_env/packaging.py +++ b/python/ray/_private/runtime_env/packaging.py @@ -1,3 +1,4 @@ +import time import asyncio import hashlib import logging @@ -578,18 +579,24 @@ def upload_package_if_needed( return False package_file = Path(_get_local_path(base_directory, pkg_uri)) + # Make the temporary zip file name unique so that it doesn't conflict with + # concurrent upload_package_if_needed calls with the same pkg_uri. + # See https://github.com/ray-project/ray/issues/47471. + package_file = package_file.with_name( + f"{time.time_ns()}_{os.getpid()}_{package_file.name}" + ) create_package( directory, package_file, include_parent_dir=include_parent_dir, excludes=excludes, ) - - upload_package_to_gcs(pkg_uri, package_file.read_bytes()) - + package_file_bytes = package_file.read_bytes() # Remove the local file to avoid accumulating temporary zip files. package_file.unlink() + upload_package_to_gcs(pkg_uri, package_file_bytes) + return True diff --git a/python/ray/tests/test_runtime_env_working_dir_4.py b/python/ray/tests/test_runtime_env_working_dir_4.py index 810cf33ab6e6..15479f89298e 100644 --- a/python/ray/tests/test_runtime_env_working_dir_4.py +++ b/python/ray/tests/test_runtime_env_working_dir_4.py @@ -6,7 +6,11 @@ from pytest_lazyfixture import lazy_fixture import ray -from ray._private.test_utils import wait_for_condition, check_local_files_gced +from ray._private.test_utils import ( + wait_for_condition, + check_local_files_gced, + run_string_as_driver_nonblocking, +) from ray.exceptions import GetTimeoutError # This test requires you have AWS credentials set up (any AWS credentials will @@ -204,6 +208,21 @@ def check(self): assert not check_local_files_gced(cluster) +def test_two_concurrent_jobs_with_same_working_dir(call_ray_start, tmp_working_dir): + """Test that uploading the same working dir concurrently works + https://github.com/ray-project/ray/issues/47471 + """ + + script = f""" +import ray +ray.init(runtime_env={{"working_dir": "{str(tmp_working_dir)}"}}) +""" + job1 = run_string_as_driver_nonblocking(script) + job2 = run_string_as_driver_nonblocking(script) + assert job1.wait() == 0, job1.stderr.readlines() + assert job2.wait() == 0, job2.stderr.readlines() + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))