Skip to content

Commit

Permalink
[Core] Fix runtime env race condition when uploading the same package…
Browse files Browse the repository at this point in the history
… concurrently (#47482)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Sep 4, 2024
1 parent 5463f5b commit 9781c6c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
13 changes: 10 additions & 3 deletions python/ray/_private/runtime_env/packaging.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import asyncio
import hashlib
import logging
Expand Down Expand Up @@ -578,18 +579,24 @@ def upload_package_if_needed(
return False

package_file = Path(_get_local_path(base_directory, pkg_uri))
# Make the temporary zip file name unique so that it doesn't conflict with
# concurrent upload_package_if_needed calls with the same pkg_uri.
# See https://github.com/ray-project/ray/issues/47471.
package_file = package_file.with_name(
f"{time.time_ns()}_{os.getpid()}_{package_file.name}"
)
create_package(
directory,
package_file,
include_parent_dir=include_parent_dir,
excludes=excludes,
)

upload_package_to_gcs(pkg_uri, package_file.read_bytes())

package_file_bytes = package_file.read_bytes()
# Remove the local file to avoid accumulating temporary zip files.
package_file.unlink()

upload_package_to_gcs(pkg_uri, package_file_bytes)

return True


Expand Down
21 changes: 20 additions & 1 deletion python/ray/tests/test_runtime_env_working_dir_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from pytest_lazyfixture import lazy_fixture

import ray
from ray._private.test_utils import wait_for_condition, check_local_files_gced
from ray._private.test_utils import (
wait_for_condition,
check_local_files_gced,
run_string_as_driver_nonblocking,
)
from ray.exceptions import GetTimeoutError

# This test requires you have AWS credentials set up (any AWS credentials will
Expand Down Expand Up @@ -204,6 +208,21 @@ def check(self):
assert not check_local_files_gced(cluster)


def test_two_concurrent_jobs_with_same_working_dir(call_ray_start, tmp_working_dir):
"""Test that uploading the same working dir concurrently works
https://github.com/ray-project/ray/issues/47471
"""

script = f"""
import ray
ray.init(runtime_env={{"working_dir": "{str(tmp_working_dir)}"}})
"""
job1 = run_string_as_driver_nonblocking(script)
job2 = run_string_as_driver_nonblocking(script)
assert job1.wait() == 0, job1.stderr.readlines()
assert job2.wait() == 0, job2.stderr.readlines()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down

0 comments on commit 9781c6c

Please sign in to comment.