Skip to content

Commit

Permalink
feat: Switch to running deadline_vfs as os_user (#223)
Browse files Browse the repository at this point in the history
* Switch to running deadline_vfs as os_user

Signed-off-by: Brian Axelson <baxelson@amazon.com>

* feat(job_attachments): enhance handling S3 timeout errors and BotoCoreError (#206)

Improve error handling for S3 requests by
- adding "retries" configuration to the S3 client
- adding BotoCoreError handling to cover S3 timeout errors (e.g., ReadTimeoutError, ConnectTimeoutError)

Signed-off-by: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com>
Signed-off-by: Brian Axelson <baxelson@amazon.com>

* fix(job_attachments): Use files' last modification time to identify output files to be synced (#211)

Signed-off-by: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com>
Signed-off-by: Brian Axelson <baxelson@amazon.com>

* chore(deps): update python-semantic-release requirement (#216)

Updates the requirements on [python-semantic-release](https://github.com/python-semantic-release/python-semantic-release) to permit the latest version.
- [Release notes](https://github.com/python-semantic-release/python-semantic-release/releases)
- [Changelog](https://github.com/python-semantic-release/python-semantic-release/blob/master/CHANGELOG.md)
- [Commits](python-semantic-release/python-semantic-release@v8.7.0...v9.2.2)

---
updated-dependencies:
- dependency-name: python-semantic-release
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Brian Axelson <baxelson@amazon.com>

* chore(release): 0.41.0 (#217)

Signed-off-by: client-software-ci <129794699+client-software-ci@users.noreply.github.com>
Signed-off-by: Brian Axelson <baxelson@amazon.com>

* chore(deps): update coverage[toml] requirement from ~=7.2 to ~=7.4 (#156)

Updates the requirements on [coverage[toml]](https://github.com/nedbat/coveragepy) to permit the latest version.
- [Release notes](https://github.com/nedbat/coveragepy/releases)
- [Changelog](https://github.com/nedbat/coveragepy/blob/master/CHANGES.rst)
- [Commits](nedbat/coveragepy@7.3.0...7.4.0)

---
updated-dependencies:
- dependency-name: coverage[toml]
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Brian Axelson <baxelson@amazon.com>

* fix: Make StorageProfileOperatingSystemFamily enum case-insensitive

Signed-off-by: Caden Marofke <marofke@amazon.com>
Signed-off-by: Brian Axelson <baxelson@amazon.com>

* ci: add gpg signing of build artifacts (#218)

Signed-off-by: Charles Moore <122481442+moorec-aws@users.noreply.github.com>
Signed-off-by: Brian Axelson <baxelson@amazon.com>

* feat!: prep for rootPathFormat becoming ALL UPPERS (#222)

** BREAKING CHANGE **
* The PathFormat enum's values went from all lowercase to all uppercase
* The source_path_root in the path mapping rules return value from sync_inputs went from all lowercase to all uppercase

Signed-off-by: Morgan Epp <60796713+epmog@users.noreply.github.com>
Signed-off-by: Brian Axelson <baxelson@amazon.com>

* CR Feedback

Signed-off-by: Brian Axelson <baxelson@amazon.com>

* Cleaning up a few more 'executing the job' cases

Signed-off-by: Brian Axelson <baxelson@amazon.com>

---------

Signed-off-by: Brian Axelson <baxelson@amazon.com>
Signed-off-by: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com>
Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: client-software-ci <129794699+client-software-ci@users.noreply.github.com>
Signed-off-by: Caden Marofke <marofke@amazon.com>
Signed-off-by: Charles Moore <122481442+moorec-aws@users.noreply.github.com>
Signed-off-by: Morgan Epp <60796713+epmog@users.noreply.github.com>
Co-authored-by: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: client-software-ci <129794699+client-software-ci@users.noreply.github.com>
Co-authored-by: Caden Marofke <marofke@amazon.com>
Co-authored-by: Charles Moore <122481442+moorec-aws@users.noreply.github.com>
Co-authored-by: Morgan Epp <60796713+epmog@users.noreply.github.com>
  • Loading branch information
7 people committed Mar 22, 2024
1 parent f76bb98 commit cf9c2d2
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 89 deletions.
7 changes: 5 additions & 2 deletions src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ def sync_inputs(
boto3_session=self.session,
session_dir=session_dir,
os_user=fs_permission_settings.os_user, # type: ignore[union-attr]
os_group=getattr(fs_permission_settings, "os_group", ""),
os_env_vars=os_env_vars, # type: ignore[arg-type]
cas_prefix=s3_settings.full_cas_prefix(),
)
Expand Down Expand Up @@ -558,12 +559,14 @@ def sync_outputs(
summary_stats = SummaryStatistics()
return summary_stats

def cleanup_session(self, session_dir: Path, file_system: JobAttachmentsFileSystem):
def cleanup_session(
self, session_dir: Path, file_system: JobAttachmentsFileSystem, os_user: str
):
if file_system == JobAttachmentsFileSystem.COPIED.value:
return
try:
VFSProcessManager.find_vfs()
# Shutdown all running Deadline VFS processes since session is complete
VFSProcessManager.kill_all_processes(session_dir=session_dir)
VFSProcessManager.kill_all_processes(session_dir=session_dir, os_user=os_user)
except VFSExecutableMissingError:
logger.error("Virtual File System not found, no processes to kill.")
15 changes: 10 additions & 5 deletions src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ def write_manifest_to_temp_file(manifest: BaseAssetManifest) -> str:


def handle_existing_vfs(
manifest: BaseAssetManifest, session_dir: Path, mount_point: str
manifest: BaseAssetManifest, session_dir: Path, mount_point: str, os_user: str
) -> BaseAssetManifest:
"""
Combines provided manifest with the input manifest of the running VFS at the
Expand All @@ -892,7 +892,7 @@ def handle_existing_vfs(
Args:
manifests (BaseAssetManifest): The manifest for the new inputs to be mounted
mount_point (str): The local directory where the manifest is to be mounted
os_user: the user running the job.
Returns:
BaseAssetManifest : A single manifest containing the merged paths or the original manifest
"""
Expand All @@ -914,7 +914,9 @@ def handle_existing_vfs(
download_logger.error(f"input manifest not found for mount at {mount_point}")
return manifest

VFSProcessManager.kill_process_at_mount(session_dir=session_dir, mount_point=mount_point)
VFSProcessManager.kill_process_at_mount(
session_dir=session_dir, mount_point=mount_point, os_user=os_user
)

return manifest

Expand All @@ -925,6 +927,7 @@ def mount_vfs_from_manifests(
boto3_session: boto3.Session,
session_dir: Path,
os_user: str,
os_group: str,
os_env_vars: dict[str, str],
cas_prefix: Optional[str] = None,
) -> None:
Expand All @@ -936,7 +939,8 @@ def mount_vfs_from_manifests(
manifests_by_root: a map from each local root path to a corresponding list of tuples of manifest contents and their path.
boto3_session: The boto3 session to use.
session_dir: the directory that the session is going to use.
os_user: the user executing the job.
os_user: the user running the job.
os_group: the group of the user running the job
os_env_vars: environment variables to set for launched subprocesses
cas_prefix: The CAS prefix of the files.
Expand All @@ -951,7 +955,7 @@ def mount_vfs_from_manifests(
)

final_manifest: BaseAssetManifest = handle_existing_vfs(
manifest=manifest, session_dir=session_dir, mount_point=mount_point
manifest=manifest, session_dir=session_dir, mount_point=mount_point, os_user=os_user
)

# Write out a temporary file with the contents of the newly merged manifest
Expand All @@ -964,6 +968,7 @@ def mount_vfs_from_manifests(
mount_point,
os_user,
os_env_vars,
os_group,
cas_prefix,
)
vfs_manager.start(session_dir=session_dir)
Expand Down
138 changes: 95 additions & 43 deletions src/deadline/job_attachments/vfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import time
from pathlib import Path
import threading
from signal import SIGTERM
from typing import Dict, List, Union, Optional

from .exceptions import (
Expand All @@ -26,6 +25,7 @@


DEADLINE_VFS_PID_FILE_NAME = "vfs_pids.txt"
DEADLINE_MANIFEST_GROUP_READ_PERMS = 0o640


class VFSProcessManager(object):
Expand All @@ -44,6 +44,7 @@ class VFSProcessManager(object):
_manifest_path: str
_os_user: str
_os_env_vars: Dict[str, str]
_os_group: Optional[str]
_cas_prefix: Optional[str]

def __init__(
Expand All @@ -54,6 +55,7 @@ def __init__(
mount_point: str,
os_user: str,
os_env_vars: Dict[str, str],
os_group: Optional[str] = None,
cas_prefix: Optional[str] = None,
):
# TODO: Once Windows pathmapping is implemented we can remove this
Expand All @@ -69,47 +71,72 @@ def __init__(
self._region = region
self._manifest_path = manifest_path
self._os_user = os_user
self._os_group = os_group
self._os_env_vars = os_env_vars
self._cas_prefix = cas_prefix

@classmethod
def kill_all_processes(cls, session_dir: Path) -> None:
def kill_all_processes(cls, session_dir: Path, os_user: str) -> None:
"""
Kill all existing VFS processes when outputs have been uploaded.
:param session_dir: tmp directory for session
:param os_user: the user running the job.
"""
log.info("Terminating all VFS processes.")
try:
pid_file_path = (session_dir / DEADLINE_VFS_PID_FILE_NAME).resolve()
with open(pid_file_path, "r") as file:
for line in file.readlines():
line = line.strip()
mount_point, pid, manifest_path = line.split(":")

log.info(f"Sending SIGTERM to child processes of {pid} at {mount_point}.")
subprocess.run(["/bin/pkill", "-P", pid])
log.info(f"Sending SIGTERM to {pid}.")
try:
os.kill(int(pid), SIGTERM)
except OSError as e:
# This is raised when the VFS process has already terminated.
# This shouldn't happen, but won't cause an error if ignored
if e.errno == 3:
log.error(f"No process found for {pid}")
mount_point, _, _ = line.split(":")
cls.shutdown_libfuse_mount(mount_point, os_user)
os.remove(pid_file_path)
except FileNotFoundError:
log.warning(f"VFS pid file not found at {pid_file_path}")

@classmethod
def kill_process_at_mount(cls, session_dir: Path, mount_point: str) -> bool:
def get_shutdown_args(cls, mount_path: str, os_user: str):
"""
Return the argument list to provide the subprocess run command to shut down the mount
:param mount_path: path to mounted folder
:param os_user: the user running the job.
"""
fusermount3_path = os.path.join(cls.find_vfs_link_dir(), "fusermount3")
if not os.path.exists(fusermount3_path):
log.warn(f"fusermount3 not found at {cls.find_vfs_link_dir()}")
return None
return ["sudo", "-u", os_user, fusermount3_path, "-u", mount_path]

@classmethod
def shutdown_libfuse_mount(cls, mount_path: str, os_user: str) -> bool:
"""
Shut down the mount at the provided path using the fusermount3 unmount option
as the provided user
:param mount_path: path to mounted folder
"""
log.info(f"Attempting to shut down {mount_path} as {os_user}")
shutdown_args = cls.get_shutdown_args(mount_path, os_user)
if not shutdown_args:
return False
try:
run_result = subprocess.run(shutdown_args, check=True)
except subprocess.CalledProcessError as e:
log.warn(f"Shutdown failed with error {e}")
# Don't reraise, check if mount is gone
log.info(f"Shutdown returns {run_result.returncode}")
return cls.wait_for_mount(mount_path, expected=False)

@classmethod
def kill_process_at_mount(cls, session_dir: Path, mount_point: str, os_user: str) -> bool:
"""
Kill the VFS instance running at the given mount_point and modify the VFS pid tracking
file to remove the entry.
:param session_dir: tmp directory for session
:param mount_point: local directory to search for
:param os_user: user to attempt shut down as
"""
if not os.path.ismount(mount_point):
if not cls.is_mount(mount_point):
log.info(f"{mount_point} is not a mount, returning")
return False
log.info(f"Terminating deadline_vfs processes at {mount_point}.")
Expand All @@ -124,23 +151,9 @@ def kill_process_at_mount(cls, session_dir: Path, mount_point: str) -> bool:
if mount_point_found:
file.write(line)
else:
mount_for_pid, pid, manifest_path = line.split(":")
mount_for_pid, _, _ = line.split(":")
if mount_for_pid == mount_point:
log.info(f"Sending SIGTERM to child processes of {pid}.")
subprocess.run(["/bin/pkill", "-P", pid])
log.info(f"Sending SIGTERM to {pid}.")
try:
os.kill(int(pid), SIGTERM)
except OSError as e:
# This is raised when the VFS process has already terminated.
# This shouldn't happen, but won't cause an error if ignored
if e.errno == 3:
log.error(f"No process found for {pid}")
else:
log.error(
f"{e} when attempting to kill VFS process at {mount_for_pid}"
)
raise e
cls.shutdown_libfuse_mount(mount_point, os_user)
mount_point_found = True
else:
file.write(line)
Expand All @@ -165,7 +178,7 @@ def get_manifest_path_for_mount(cls, session_dir: Path, mount_point: str) -> Opt
with open(pid_file_path, "r") as file:
for line in file.readlines():
line = line.strip()
mount_for_pid, pid, manifest_path = line.split(":")
mount_for_pid, _, manifest_path = line.split(":")
if mount_for_pid == mount_point:
if os.path.exists(manifest_path):
return Path(manifest_path)
Expand All @@ -178,22 +191,33 @@ def get_manifest_path_for_mount(cls, session_dir: Path, mount_point: str) -> Opt
log.warning(f"No manifest found for mount {mount_point}")
return None

def wait_for_mount(self, mount_wait_seconds=60) -> bool:
@classmethod
def is_mount(cls, path) -> bool:
"""
os.path.ismount returns false for libfuse mounts owned by "other users",
use findmnt instead
"""
return subprocess.run(["findmnt", path]).returncode == 0

@classmethod
def wait_for_mount(cls, mount_path, mount_wait_seconds=60, expected=True) -> bool:
"""
After we've launched the VFS subprocess we need to wait
for the OS to validate that the mount is in place before use
:param mount_wait_seconds: Duration to wait for mount state
:param expected: Wait for the mount to exist or no longer exist
"""
log.info(f"Testing for mount at {self.get_mount_point()}")
log.info(f"Waiting for is_mount at {mount_path} to return {expected}..")
wait_seconds = mount_wait_seconds
while wait_seconds >= 0:
if os.path.ismount(self.get_mount_point()):
log.info(f"{self.get_mount_point()} is a mount, returning")
if cls.is_mount(mount_path) == expected:
log.info(f"is_mount on {mount_path} returns {expected}, returning")
return True
wait_seconds -= 1
if wait_seconds >= 0:
log.info(f"{self.get_mount_point()} not a mount, sleeping...")
log.info(f"is_mount on {mount_path} not {expected}, sleeping...")
time.sleep(1)
log.info(f"Failed to find mount at {self.get_mount_point()}")
log.info(f"Failed to find is_mount {expected} at {mount_path} after {mount_wait_seconds}")
VFSProcessManager.print_log_end()
return False

Expand Down Expand Up @@ -242,13 +266,21 @@ def build_launch_command(self, mount_point: Union[os.PathLike, str]) -> List:
executable = VFSProcessManager.find_vfs_launch_script()
if self._cas_prefix is None:
command = [
"%s %s -f --clienttype=deadline --bucket=%s --manifest=%s --region=%s -oallow_other"
% (executable, mount_point, self._asset_bucket, self._manifest_path, self._region)
"sudo -u %s %s %s -f --clienttype=deadline --bucket=%s --manifest=%s --region=%s -oallow_other"
% (
self._os_user,
executable,
mount_point,
self._asset_bucket,
self._manifest_path,
self._region,
)
]
else:
command = [
"%s %s -f --clienttype=deadline --bucket=%s --manifest=%s --region=%s --casprefix=%s -oallow_other"
"sudo -u %s %s %s -f --clienttype=deadline --bucket=%s --manifest=%s --region=%s --casprefix=%s -oallow_other"
% (
self._os_user,
executable,
mount_point,
self._asset_bucket,
Expand Down Expand Up @@ -391,6 +423,24 @@ def get_launch_environ(self) -> dict:

return my_env

def set_manifest_owner(self) -> None:
"""
Set the manifest path to be owned by _os_user
"""
log.info(
f"Attempting to set group ownership on {self._manifest_path} for {self._os_user} to {self._os_group}"
)
if not os.path.exists(self._manifest_path):
log.error(f"Manifest not found at {self._manifest_path}")
return
if self._os_group is not None:
try:
shutil.chown(self._manifest_path, group=self._os_group)
os.chmod(self._manifest_path, DEADLINE_MANIFEST_GROUP_READ_PERMS)
except OSError as e:
log.error(f"Failed to set ownership with error {e}")
raise

def start(self, session_dir: Path) -> None:
"""
Start our VFS process
Expand All @@ -399,11 +449,13 @@ def start(self, session_dir: Path) -> None:
self._run_path = VFSProcessManager.get_cwd()
log.info(f"Using run_path {self._run_path}")
log.info(f"Using mount_point {self._mount_point}")
self.set_manifest_owner()
VFSProcessManager.create_mount_point(self._mount_point)
start_command = self.build_launch_command(self._mount_point)
launch_env = self.get_launch_environ()
log.info(f"Launching VFS with command {start_command}")
log.info(f"Launching with environment {launch_env}")
log.info(f"Launching as user {self._os_user}")

try:

Expand Down Expand Up @@ -434,7 +486,7 @@ def read_output_thread(pipe, log):
log.exception(f"Exception during launch with command {start_command} exception {e}")
raise e
log.info(f"Launched VFS as pid {self._vfs_proc.pid}")
if not self.wait_for_mount():
if not VFSProcessManager.wait_for_mount(self.get_mount_point()):
log.error("Failed to mount, shutting down")
raise VFSFailedToMountError

Expand Down
2 changes: 2 additions & 0 deletions test/unit/deadline_job_attachments/test_asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,13 +865,15 @@ def test_cleanup_session_vfs_terminate_called(self, tmp_path):
self.default_asset_sync.cleanup_session(
session_dir=tmp_path,
file_system=JobAttachmentsFileSystem.COPIED,
os_user="test-user",
)

mock_find_vfs.assert_not_called()

self.default_asset_sync.cleanup_session(
session_dir=tmp_path,
file_system=JobAttachmentsFileSystem.VIRTUAL,
os_user="test-user",
)

mock_find_vfs.assert_called_once()
Loading

0 comments on commit cf9c2d2

Please sign in to comment.