Skip to content

Commit

Permalink
feat(JobAttachments)!: Add 'last seen on S3' cache (#172)
Browse files Browse the repository at this point in the history
Breaking changes:

- mostly, this is a significant change in behaviour
  (i.e. the addition of the S3 check cache) and warrants
  a new version
- the S3AssetUploader.upload_assets public function was changed
  to accommodate the S3 cache path. Customers that use custom
  Python scripts that use the S3AssetUploader may need to update
  their scripts to specify the cache path
- list_object_threshold was removed from the CLI config

Signed-off-by: Caden Marofke <marofke@amazon.com>
  • Loading branch information
marofke authored Feb 8, 2024
1 parent 278e4f6 commit 99ebaea
Show file tree
Hide file tree
Showing 21 changed files with 903 additions and 480 deletions.
8 changes: 5 additions & 3 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from .. import api
from ..exceptions import DeadlineOperationError, CreateJobWaiterCanceled
from ..config import get_setting, set_setting
from ..config import get_setting, set_setting, config_file
from ..job_bundle import deadline_yaml_dump
from ..job_bundle.loader import (
read_yaml_or_json,
Expand Down Expand Up @@ -378,7 +378,7 @@ def _default_update_hash_progress(hashing_metadata: Dict[str, str]) -> bool:
asset_groups=asset_groups,
total_input_files=total_input_files,
total_input_bytes=total_input_bytes,
hash_cache_dir=os.path.expanduser(os.path.join("~", ".deadline", "cache")),
hash_cache_dir=config_file.get_cache_directory(),
on_preparing_to_submit=hashing_progress_callback,
)
api.get_deadline_cloud_library_telemetry_client(config=config).record_hashing_summary(
Expand Down Expand Up @@ -409,7 +409,9 @@ def _default_update_upload_progress(upload_metadata: Dict[str, str]) -> bool:
upload_progress_callback = _default_update_upload_progress

upload_summary, attachment_settings = asset_manager.upload_assets(
manifests, upload_progress_callback
manifests=manifests,
on_uploading_assets=upload_progress_callback,
s3_check_cache_dir=config_file.get_cache_directory(),
)
api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary(
upload_summary
Expand Down
13 changes: 9 additions & 4 deletions src/deadline/client/config/config_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

__all__ = [
"get_config_file_path",
"get_cache_directory",
"read_config",
"write_config",
"get_setting_default",
Expand Down Expand Up @@ -38,6 +39,7 @@

# The default directory within which to save the history of created jobs.
DEFAULT_JOB_HISTORY_DIR = os.path.join("~", ".deadline", "job_history", "{aws_profile_name}")
DEFAULT_CACHE_DIR = os.path.join("~", ".deadline", "cache")

_TRUE_VALUES = {"yes", "on", "true", "1"}
_FALSE_VALUES = {"no", "off", "false", "0"}
Expand Down Expand Up @@ -107,10 +109,6 @@
"telemetry.opt_out": {"default": "false"},
"telemetry.identifier": {"default": ""},
"defaults.job_attachments_file_system": {"default": "COPIED", "depend": "defaults.farm_id"},
"settings.list_object_threshold": {
"default": "100",
"description": "If the number of files to be uploaded are bigger than this threshold, it switches to call list-objects S3 API from head-object call to check if files have already been uploaded.",
},
"settings.multipart_upload_chunk_size": {
"default": "8388608", # 8 MB (Default chunk size for multipart upload)
"description": "The chunk size to use when uploading files in multi-parts.",
Expand All @@ -137,6 +135,13 @@ def get_config_file_path() -> Path:
return Path(os.environ.get(CONFIG_FILE_PATH_ENV_VAR) or CONFIG_FILE_PATH).expanduser()


def get_cache_directory() -> str:
"""
Get the cache directory.
"""
return os.path.expanduser(DEFAULT_CACHE_DIR)


def _should_read_config(config_file_path: Path) -> bool:
global __config_file_path
global __config_mtime
Expand Down
7 changes: 4 additions & 3 deletions src/deadline/client/ui/dialogs/submit_job_progress_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def _update_hash_progress(hashing_metadata: ProgressReportMetadata) -> bool:
asset_groups=asset_groups,
total_input_files=total_input_files,
total_input_bytes=total_input_bytes,
hash_cache_dir=os.path.expanduser(os.path.join("~", ".deadline", "cache")),
hash_cache_dir=config_file.get_cache_directory(),
on_preparing_to_submit=_update_hash_progress,
)

Expand Down Expand Up @@ -334,8 +334,9 @@ def _update_upload_progress(upload_metadata: ProgressReportMetadata) -> bool:
upload_summary, attachment_settings = cast(
S3AssetManager, self._asset_manager
).upload_assets(
manifests,
_update_upload_progress,
manifests=manifests,
on_uploading_assets=_update_upload_progress,
s3_check_cache_dir=config_file.get_cache_directory(),
)

logger.info("Finished uploading job attachments files.")
Expand Down
18 changes: 1 addition & 17 deletions src/deadline/job_attachments/_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

import datetime
import os
from hashlib import shake_256
from pathlib import Path
from typing import Optional, Tuple, Union
from typing import Tuple, Union
import uuid


Expand All @@ -15,13 +14,9 @@
"_human_readable_file_size",
"_get_unique_dest_dir_name",
"_get_bucket_and_object_key",
"_get_default_hash_cache_db_file_dir",
"_is_relative_to",
]

CONFIG_ROOT = ".deadline"
COMPONENT_NAME = "job_attachments"


def _join_s3_paths(root: str, *args: str):
return "/".join([root, *args])
Expand Down Expand Up @@ -81,17 +76,6 @@ def _get_bucket_and_object_key(s3_path: str) -> Tuple[str, str]:
return bucket, key


def _get_default_hash_cache_db_file_dir() -> Optional[str]:
"""
Gets the expected directory for the hash cache database file based on OS environment variables.
If a directory cannot be found, defaults to the working directory.
"""
default_path = os.environ.get("HOME")
if default_path:
default_path = os.path.join(default_path, CONFIG_ROOT, COMPONENT_NAME)
return default_path


def _is_relative_to(path1: Union[Path, str], path2: Union[Path, str]) -> bool:
"""
Determines if path1 is relative to path2. This function is to support
Expand Down
36 changes: 26 additions & 10 deletions src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
)

from .fus3 import Fus3ProcessManager
from .exceptions import AssetSyncError, Fus3ExecutableMissingError
from .exceptions import AssetSyncError, Fus3ExecutableMissingError, JobAttachmentsS3ClientError
from .models import (
Attachments,
JobAttachmentsFileSystem,
Expand Down Expand Up @@ -426,15 +426,31 @@ def sync_inputs(
f"Virtual File System not found, falling back to {JobAttachmentsFileSystem.COPIED} for JobAttachmentsFileSystem."
)

download_summary_statistics = download_files_from_manifests(
s3_bucket=s3_settings.s3BucketName,
manifests_by_root=merged_manifests_by_root,
cas_prefix=s3_settings.full_cas_prefix(),
fs_permission_settings=fs_permission_settings,
session=self.session,
on_downloading_files=on_downloading_files,
logger=self.logger,
)
try:
download_summary_statistics = download_files_from_manifests(
s3_bucket=s3_settings.s3BucketName,
manifests_by_root=merged_manifests_by_root,
cas_prefix=s3_settings.full_cas_prefix(),
fs_permission_settings=fs_permission_settings,
session=self.session,
on_downloading_files=on_downloading_files,
logger=self.logger,
)
except JobAttachmentsS3ClientError as exc:
if exc.status_code == 404:
raise JobAttachmentsS3ClientError(
action=exc.action,
status_code=exc.status_code,
bucket_name=exc.bucket_name,
key_or_prefix=exc.key_or_prefix,
message=(
"This can happen if the S3 check cache on the submitting machine is out of date. "
"Please delete the cache file from the submitting machine, usually located in the "
"home directory (~/.deadline/cache/s3_check_cache.db) and try submitting again."
),
) from exc
else:
raise

return (
download_summary_statistics.convert_to_summary_statistics(),
Expand Down
15 changes: 15 additions & 0 deletions src/deadline/job_attachments/caches/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from .cache_db import CacheDB, CONFIG_ROOT, COMPONENT_NAME
from .hash_cache import HashCache, HashCacheEntry
from .s3_check_cache import S3CheckCache, S3CheckCacheEntry

__all__ = [
"CacheDB",
"CONFIG_ROOT",
"COMPONENT_NAME",
"HashCache",
"HashCacheEntry",
"S3CheckCache",
"S3CheckCacheEntry",
]
96 changes: 96 additions & 0 deletions src/deadline/job_attachments/caches/cache_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

"""
Module for defining a local cache file.
"""

import logging
import os
from abc import ABC
from threading import Lock
from typing import Optional

from ..exceptions import JobAttachmentsError

CONFIG_ROOT = ".deadline"
COMPONENT_NAME = "job_attachments"

logger = logging.getLogger("Deadline")


class CacheDB(ABC):
"""
Abstract base class for connecting to a local SQLite cache database.
This class is intended to always be used with a context manager to properly
close the connection to the cache database.
"""

def __init__(
self, cache_name: str, table_name: str, create_query: str, cache_dir: Optional[str] = None
) -> None:
if not cache_name or not table_name or not create_query:
raise JobAttachmentsError("Constructor strings for CacheDB cannot be empty.")
self.cache_name: str = cache_name
self.table_name: str = table_name
self.create_query: str = create_query

try:
# SQLite is included in Python installers, but might not exist if building python from source.
import sqlite3 # noqa

self.enabled = True
except ImportError:
logger.warn(f"SQLite was not found, {cache_name} will not be used.")
self.enabled = False
return

if cache_dir is None:
cache_dir = self.get_default_cache_db_file_dir()
if cache_dir is None:
raise JobAttachmentsError(
f"No default cache path found. Please provide a directory for {self.cache_name}."
)
os.makedirs(cache_dir, exist_ok=True)
self.cache_dir: str = os.path.join(cache_dir, f"{self.cache_name}.db")
self.db_lock = Lock()

def __enter__(self):
"""Called when entering the context manager."""
if self.enabled:
import sqlite3

try:
self.db_connection: sqlite3.Connection = sqlite3.connect(
self.cache_dir, check_same_thread=False
)
except sqlite3.OperationalError as oe:
raise JobAttachmentsError(
f"Could not access cache file in {self.cache_dir}"
) from oe

try:
self.db_connection.execute(f"SELECT * FROM {self.table_name}")
except Exception:
# DB file doesn't have our table, so we need to create it
logger.info(
f"No cache entries for the current library version were found. Creating a new cache for {self.cache_name}"
)
self.db_connection.execute(self.create_query)
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
"""Called when exiting the context manager."""
if self.enabled:
self.db_connection.close()

@classmethod
def get_default_cache_db_file_dir(cls) -> Optional[str]:
"""
Gets the expected directory for the cache database file based on OS environment variables.
If a directory cannot be found, defaults to the working directory.
"""
default_path = os.environ.get("HOME")
if default_path:
default_path = os.path.join(default_path, CONFIG_ROOT, COMPONENT_NAME)
return default_path
91 changes: 91 additions & 0 deletions src/deadline/job_attachments/caches/hash_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

"""
Module for accessing the local file hash cache.
"""

import logging
from dataclasses import dataclass
from typing import Any, Dict, Optional

from .cache_db import CacheDB
from ..asset_manifests.hash_algorithms import HashAlgorithm


logger = logging.getLogger("Deadline")


@dataclass
class HashCacheEntry:
"""Represents an entry in the local hash-cache database"""

file_path: str
hash_algorithm: HashAlgorithm
file_hash: str
last_modified_time: str

def to_dict(self) -> Dict[str, Any]:
return {
"file_path": self.file_path,
"hash_algorithm": self.hash_algorithm.value,
"file_hash": self.file_hash,
"last_modified_time": self.last_modified_time,
}


class HashCache(CacheDB):
"""
Class used to store and retrieve entries in the local file hash cache.
This class is intended to always be used with a context manager to properly
close the connection to the hash cache database.
This class also automatically locks when doing writes, so it can be called
by multiple threads.
"""

CACHE_NAME = "hash_cache"
CACHE_DB_VERSION = 2

def __init__(self, cache_dir: Optional[str] = None) -> None:
table_name: str = f"hashesV{self.CACHE_DB_VERSION}"
create_query: str = f"CREATE TABLE hashesV{self.CACHE_DB_VERSION}(file_path text primary key, hash_algorithm text secondary key, file_hash text, last_modified_time timestamp)"
super().__init__(
cache_name=self.CACHE_NAME,
table_name=table_name,
create_query=create_query,
cache_dir=cache_dir,
)

def get_entry(
self, file_path_key: str, hash_algorithm: HashAlgorithm
) -> Optional[HashCacheEntry]:
"""
Returns an entry from the hash cache, if it exists.
"""
if not self.enabled:
return None

with self.db_lock, self.db_connection:
entry_vals = self.db_connection.execute(
f"SELECT * FROM {self.table_name} WHERE file_path=? AND hash_algorithm=?",
[file_path_key, hash_algorithm.value],
).fetchone()
if entry_vals:
return HashCacheEntry(
file_path=entry_vals[0],
hash_algorithm=HashAlgorithm(entry_vals[1]),
file_hash=entry_vals[2],
last_modified_time=str(entry_vals[3]),
)
else:
return None

def put_entry(self, entry: HashCacheEntry) -> None:
"""Inserts or replaces an entry into the hash cache database after acquiring the lock."""
if self.enabled:
with self.db_lock, self.db_connection:
self.db_connection.execute(
f"INSERT OR REPLACE INTO {self.table_name} VALUES(:file_path, :hash_algorithm, :file_hash, :last_modified_time)",
entry.to_dict(),
)
Loading

0 comments on commit 99ebaea

Please sign in to comment.