Skip to content

Commit

Permalink
add config_dict['config_hash'] output to write_cluster_config
Browse files Browse the repository at this point in the history
  • Loading branch information
cg505 committed Nov 7, 2024
1 parent 877d77f commit d833bcb
Showing 1 changed file with 120 additions and 0 deletions.
120 changes: 120 additions & 0 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import enum
import fnmatch
import functools
import hashlib
import os
import pathlib
import pprint
Expand Down Expand Up @@ -860,6 +861,7 @@ def write_cluster_config(
if dryrun:
# If dryrun, return the unfinished tmp yaml path.
config_dict['ray'] = tmp_yaml_path
config_dict['config_hash'] = _deterministic_yaml_hash(tmp_yaml_path)
return config_dict
_add_auth_to_cluster_config(cloud, tmp_yaml_path)

Expand All @@ -882,6 +884,11 @@ def write_cluster_config(
yaml_config = common_utils.read_yaml(tmp_yaml_path)
config_dict['cluster_name_on_cloud'] = yaml_config['cluster_name']

# Make sure to do this before we optimize file mounts. Optimization is
# non-deterministic, but everything else before this point should be
# deterministic.
config_dict['config_hash'] = _deterministic_yaml_hash(tmp_yaml_path)

# Optimization: copy the contents of source files in file_mounts to a
# special dir, and upload that as the only file_mount instead. Delay
# calling this optimization until now, when all source files have been
Expand Down Expand Up @@ -990,6 +997,119 @@ def get_ready_nodes_counts(pattern, output):
return ready_head, ready_workers


@timeline.event
def _deterministic_yaml_hash(yaml_path: str) -> str:
"""Hashes the cluster yaml and contents of file mounts. Two invocations of
this function should return the same string if and only if the contents of
the yaml are the same and the file contents of all the file_mounts specified
in the yaml are the same.
Limitations:
- This function can be expensive if the file mounts are large. (E.g. a few
seconds for ~1GB.)
- Symbolic links are not explicitly handled. Some symbolic link changes may
not be detected.
Implementation: We create a byte sequence that captures the state of the
yaml file and all the files in the file mounts, then hash the byte sequence.
The format of the byte sequence is:
32 bytes - sha256 hash of the yaml file
for each file mount:
file mount remote destination (UTF-8), \0
if the file mount source is a file:
'file' encoded to UTF-8
32 byte sha256 hash of the file contents
if the file mount source is a directory:
'dir' encoded to UTF-8
for each directory and subdirectory withinin the file mount (starting from
the root and descending recursively):
name of the directory (UTF-8), \0
name of each subdirectory within the directory (UTF-8) terminated by \0
\0
for each file in the directory:
name of the file (UTF-8), \0
32 bytes - sha256 hash of the file contents
\0
if the file mount source is something else or does not exist, nothing
\0\0
Rather than constructing the whole byte sequence, which may be quite large,
we construct it incrementally by using hash.update() to add new bytes.
"""

# In python 3.11, hashlib.file_digest is available, but for <3.11 we have to
# do it manually.
# This implementation is simplified from the implementation in CPython.
# Beware of f.read() as some files may be larger than memory.
def _hash_file(path: str) -> bytes:
with open(path, 'rb') as f:
file_hash = hashlib.sha256()
buf = bytearray(2**18)
view = memoryview(buf)
while True:
size = f.readinto(buf)
if size == 0:
# EOF
break
file_hash.update(view[:size])
return file_hash.digest()

config_hash = hashlib.sha256()

config_hash.update(_hash_file(yaml_path))

yaml_config = common_utils.read_yaml(yaml_path)
file_mounts = yaml_config.get('file_mounts', {})
# Remove the file mounts added by the newline.
if '' in file_mounts:
assert file_mounts[''] == '', file_mounts['']
file_mounts.pop('')

for dst, src in sorted(file_mounts.items()):
expanded_src = os.path.expanduser(src)
config_hash.update(dst.encode('utf-8') + b'\0')

if os.path.isfile(expanded_src):
config_hash.update('file'.encode('utf-8'))
config_hash.update(_hash_file(expanded_src))

elif os.path.isdir(expanded_src):
config_hash.update('dir'.encode('utf-8'))

for (dirpath, dirnames, filenames) in os.walk(expanded_src):
config_hash.update(dirpath.encode('utf-8') + b'\0')

# Note: inplace sort will also affect the traversal order of
# os.walk. We need it so that the os.walk order is
# deterministic.
dirnames.sort()
# This includes symlinks to directories. We will recurse into
# all the directories here but not the symlinks. We don't hash
# the link destination.
for dirname in dirnames:
config_hash.update(dirname.encode('utf-8') + b'\0')
config_hash.update(b'\0')

filenames.sort()
# This includes symlinks to files. We could hash the symlink
# destination itself but instead just hash the destination
# contents.
for filename in filenames:
config_hash.update(filename.encode('utf-8') + b'\0')
config_hash.update(
_hash_file(os.path.join(dirpath, filename)))
config_hash.update(b'\0')

else:
logger.debug(
f'Unexpected file_mount that is not a file or dir: {src}')

config_hash.update(b'\0\0')

return config_hash.hexdigest()


def get_docker_user(ip: str, cluster_config_file: str) -> str:
"""Find docker container username."""
ssh_credentials = ssh_credential_from_yaml(cluster_config_file)
Expand Down

0 comments on commit d833bcb

Please sign in to comment.