Skip to content

Commit

Permalink
Merge branch 'mainline' into natmatn/vfs_cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
natmatn authored Feb 20, 2024
2 parents d8462f6 + 1acfffc commit 64b8f88
Show file tree
Hide file tree
Showing 18 changed files with 122 additions and 117 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies = [
"requests ~= 2.31",
"boto3 >= 1.28.80",
"deadline == 0.37.*",
"openjd-sessions == 0.2.*",
"openjd-sessions == 0.5.*",
# tomli became tomllib in standard library in Python 3.11
"tomli == 2.0.* ; python_version<'3.11'",
"typing_extensions ~= 4.8",
Expand Down
5 changes: 1 addition & 4 deletions scripts/create_service_resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ then

echo "Creating Amazon Deadline Cloud Farm $farm_name"
FARM_ID=$(aws deadline create-farm --display-name $farm_name | jq -r ".farmId")
echo "Created Farm: ${FARM_ID}"
fi

if [ "${QUEUE_ID_1:-}" == "" ]
Expand All @@ -47,7 +48,6 @@ then
{
"farmId": "$FARM_ID",
"displayName": "$queue_name",
"status": "IDLE",
"jobRunAsUser": {
"posix": {
"user": "jobuser",
Expand All @@ -66,7 +66,6 @@ EOF
{
"farmId": "$FARM_ID",
"displayName": "$queue_name",
"status": "IDLE",
"roleArn": "$queue_1_iam_role",
"jobRunAsUser": {
"posix": {
Expand Down Expand Up @@ -112,7 +111,6 @@ then
{
"farmId": "$FARM_ID",
"displayName": "$queue_name",
"status": "IDLE",
"jobRunAsUser": {
"posix": {
"user": "jobuser",
Expand All @@ -131,7 +129,6 @@ EOF
{
"farmId": "$FARM_ID",
"displayName": "$queue_name",
"status": "IDLE",
"roleArn": "$queue_2_iam_role",
"jobRunAsUser": {
"posix": {
Expand Down
5 changes: 3 additions & 2 deletions scripts/submit_jobs/sleep/sleep_job.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{
"name": "duration",
"type": "INT",
"default": 45,
"default": 60,
"minValue": 10,
"maxValue": 600
}
Expand All @@ -17,7 +17,8 @@
"actions": {
"onRun": {
"command": "{{ Task.File.runScript }}",
"args": ["{{ Param.duration }}"]
"args": ["{{ Param.duration }}"],
"timeout": 45
}
},
"embeddedFiles": [
Expand Down
1 change: 1 addition & 0 deletions src/deadline_worker_agent/installer/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ fi
# Provision log directory
echo "Provisioning log directory (/var/log/amazon/deadline)"
mkdir -p /var/log/amazon/deadline
chmod 755 /var/log/amazon
chown -R "${wa_user}:${wa_user}" /var/log/amazon/deadline
chmod -R 750 /var/log/amazon/deadline
echo "Done provisioning log directory (/var/log/amazon/deadline)"
Expand Down
2 changes: 1 addition & 1 deletion src/deadline_worker_agent/scheduler/session_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def cleanup_session_user_processes(user: SessionUser):
raise NotImplementedError("Windows not supported")

# Check that the session user isn't the current user (agent user)
current_user = subprocess.check_output(["/usr/bin/whoami"], text=True)
current_user = subprocess.check_output(["/usr/bin/whoami"], text=True).strip()
if current_user == user.user:
logger.info(
f"Skipping cleaning up processes because the session user matches the agent user '{current_user}'"
Expand Down
4 changes: 2 additions & 2 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
JobEntityUnsupportedSchemaError,
StepDetailsError,
)
from ..sessions.job_entities.job_details import parameters_data_to_list
from ..sessions.job_entities.job_details import parameters_from_api_response

if TYPE_CHECKING:
from ..sessions.job_entities import JobEntities
Expand Down Expand Up @@ -374,7 +374,7 @@ def dequeue(self) -> SessionActionDefinition | None:
except (ValueError, RuntimeError) as e:
raise StepDetailsError(action_id, str(e)) from e
task_parameters_data: dict = action_definition.get("parameters", {})
task_parameters = parameters_data_to_list(task_parameters_data)
task_parameters = parameters_from_api_response(task_parameters_data)

next_action = RunStepTaskAction(
id=action_id,
Expand Down
10 changes: 5 additions & 5 deletions src/deadline_worker_agent/sessions/actions/run_step_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from concurrent.futures import Executor
from typing import Any, TYPE_CHECKING

from openjd.sessions import Parameter
from openjd.model import TaskParameterSet

from .openjd_action import OpenjdAction

Expand All @@ -26,14 +26,14 @@ class RunStepTaskAction(OpenjdAction):
The environment details
task_id : str
The unique task identifier
task_parameter_values : list[Parameter]
task_parameter_values : TaskParameterSet
The task parameter values
"""

step_id: str
task_id: str
_details: StepDetails
_task_parameter_values: list[Parameter]
_task_parameter_values: TaskParameterSet

def __init__(
self,
Expand All @@ -42,7 +42,7 @@ def __init__(
step_id: str,
details: StepDetails,
task_id: str,
task_parameter_values: list[Parameter],
task_parameter_values: TaskParameterSet,
) -> None:
super(RunStepTaskAction, self).__init__(
id=id,
Expand Down Expand Up @@ -79,6 +79,6 @@ def start(self, *, session: Session, executor: Executor) -> None:

def human_readable(self) -> str:
param_str = ", ".join(
f"{param.name}={repr(param.value)}" for param in self._task_parameter_values
f"{name}={repr(param.value)}" for name, param in self._task_parameter_values.items()
)
return f"step[{self.step_id}].run({param_str})"
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from typing import Any, cast

from openjd.model import parse_model, SchemaVersion, UnsupportedSchema
from openjd.model import parse_model, TemplateSpecificationVersion, UnsupportedSchema
from openjd.model.v2023_09 import Environment as Environment_2023_09
from openjd.sessions import EnvironmentModel

Expand Down Expand Up @@ -44,9 +44,12 @@ def from_boto(cls, environment_details_data: EnvironmentDetailsData) -> Environm
If the environment's Open Job Description schema version not unsupported
"""

schema_version = SchemaVersion(environment_details_data["schemaVersion"])
schema_version = TemplateSpecificationVersion(environment_details_data["schemaVersion"])

if schema_version == SchemaVersion.v2023_09:
if schema_version in (
TemplateSpecificationVersion.JOBTEMPLATE_v2023_09,
TemplateSpecificationVersion.ENVIRONMENT_v2023_09,
):
environment = parse_model(
model=Environment_2023_09, obj=environment_details_data["template"]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,15 @@
from dataclasses import dataclass
from typing import Any, cast

from openjd.sessions import Parameter, ParameterType
from deadline.job_attachments.models import JobAttachmentsFileSystem

from ...api_models import (
FloatParameter,
IntParameter,
JobAttachmentDetailsData,
PathParameter,
StringParameter,
)
from .job_entity_type import JobEntityType
from .validation import Field, validate_object


def parameters_data_to_list(
params: dict[str, StringParameter | PathParameter | IntParameter | FloatParameter | str]
) -> list[Parameter]:
result = list[Parameter]()
for name, value in params.items():
# TODO: Change to the correct type once typing information is available
# in the task_run action details.
if isinstance(value, str):
# old style for the API - TODO remove this once the assign API is updated
result.append(Parameter(ParameterType.STRING, name, value))
elif "string" in value:
value = cast(StringParameter, value)
result.append(Parameter(ParameterType.STRING, name, value["string"]))
elif "int" in value:
value = cast(IntParameter, value)
result.append(Parameter(ParameterType.INT, name, value["int"]))
elif "float" in value:
value = cast(FloatParameter, value)
result.append(Parameter(ParameterType.FLOAT, name, value["float"]))
elif "path" in value:
value = cast(PathParameter, value)
result.append(Parameter(ParameterType.PATH, name, value["path"]))
else:
# TODO - PATH parameter types
raise ValueError(f"Parameter {name} -- unknown form in API response: {str(value)}")
return result


@dataclass(frozen=True)
class JobAttachmentManifestProperties:
"""Information used to facilitate the transfer of input/output job attachments and mapping of
Expand Down
49 changes: 26 additions & 23 deletions src/deadline_worker_agent/sessions/job_entities/job_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
from typing import Any, cast
import os

from openjd.model import SchemaVersion, UnsupportedSchema
from openjd.model import (
JobParameterValues,
ParameterValue,
ParameterValueType,
SpecificationRevision,
TemplateSpecificationVersion,
UnsupportedSchema,
)
from openjd.sessions import (
Parameter,
ParameterType,
PathFormat,
PosixSessionUser,
)
Expand All @@ -29,31 +34,27 @@
from .validation import Field, validate_object


def parameters_data_to_list(
def parameters_from_api_response(
params: dict[str, StringParameter | PathParameter | IntParameter | FloatParameter | str]
) -> list[Parameter]:
result = list[Parameter]()
) -> dict[str, ParameterValue]:
result = dict[str, ParameterValue]()
for name, value in params.items():
# TODO: Change to the correct type once typing information is available
# in the task_run action details.
if isinstance(value, str):
# old style for the API - TODO remove this once the assign API is updated
result.append(Parameter(ParameterType.STRING, name, value))
elif "string" in value:
print(name, value)
if "string" in value:
value = cast(StringParameter, value)
result.append(Parameter(ParameterType.STRING, name, value["string"]))
param_value = ParameterValue(type=ParameterValueType.STRING, value=value["string"])
elif "int" in value:
value = cast(IntParameter, value)
result.append(Parameter(ParameterType.INT, name, value["int"]))
param_value = ParameterValue(type=ParameterValueType.INT, value=value["int"])
elif "float" in value:
value = cast(FloatParameter, value)
result.append(Parameter(ParameterType.FLOAT, name, value["float"]))
param_value = ParameterValue(type=ParameterValueType.FLOAT, value=value["float"])
elif "path" in value:
value = cast(PathParameter, value)
result.append(Parameter(ParameterType.PATH, name, value["path"]))
param_value = ParameterValue(type=ParameterValueType.PATH, value=value["path"])
else:
# TODO - PATH parameter types
raise ValueError(f"Parameter {name} -- unknown form in API response: {str(value)}")
result[name] = param_value
return result


Expand Down Expand Up @@ -153,13 +154,13 @@ class JobDetails:
log_group_name: str
"""The name of the log group for the session"""

schema_version: SchemaVersion
schema_version: SpecificationRevision
"""The Open Job Description schema version"""

job_attachment_settings: JobAttachmentSettings | None = None
"""The job attachment settings of the job's queue"""

parameters: list[Parameter] = field(default_factory=list)
parameters: JobParameterValues = field(default_factory=dict)
"""The job's parameters"""

job_run_as_user: JobRunAsUser | None = None
Expand Down Expand Up @@ -187,7 +188,7 @@ def from_boto(cls, job_details_data: JobDetailsData) -> JobDetails:
"""

job_parameters_data: dict = job_details_data.get("parameters", {})
job_parameters = parameters_data_to_list(job_parameters_data)
job_parameters = parameters_from_api_response(job_parameters_data)
path_mapping_rules: list[OPENJDPathMappingRule] = []
path_mapping_rules_data = job_details_data.get("pathMappingRules", None)
if path_mapping_rules_data:
Expand All @@ -209,10 +210,12 @@ def from_boto(cls, job_details_data: JobDetailsData) -> JobDetails:
or None
)

schema_version = SchemaVersion(job_details_data["schemaVersion"])
given_schema_version = TemplateSpecificationVersion(job_details_data["schemaVersion"])

if schema_version != SchemaVersion.v2023_09:
raise UnsupportedSchema(schema_version.value)
if given_schema_version == TemplateSpecificationVersion.JOBTEMPLATE_v2023_09:
schema_version = SpecificationRevision.v2023_09
else:
raise UnsupportedSchema(given_schema_version.value)

return JobDetails(
parameters=job_parameters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, cast

from openjd.model import parse_model, SchemaVersion, UnsupportedSchema
from openjd.model import parse_model, TemplateSpecificationVersion, UnsupportedSchema
from openjd.model.v2023_09 import StepTemplate as StepTemplate_2023_09

from ...api_models import StepDetailsData
Expand Down Expand Up @@ -53,9 +53,9 @@ def from_boto(cls, step_details_data: StepDetailsData) -> StepDetails:
If the environment's Open Job Description schema version not unsupported
"""

schema_version = SchemaVersion(step_details_data["schemaVersion"])
schema_version = TemplateSpecificationVersion(step_details_data["schemaVersion"])

if schema_version == SchemaVersion.v2023_09:
if schema_version == TemplateSpecificationVersion.JOBTEMPLATE_v2023_09:
# Jan 23, 2024: Forwards compatibility. The 'template' field is changing from a StepScript to
# a StepTemplate. Remove the StepScript case after the transition is complete.
details_data = step_details_data["template"]
Expand Down
15 changes: 10 additions & 5 deletions src/deadline_worker_agent/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@
from .actions import SessionActionDefinition
from .job_entities import JobAttachmentDetails, JobDetails

from openjd.model import TaskParameterSet
from openjd.sessions import (
ActionState,
ActionStatus,
EnvironmentIdentifier,
EnvironmentModel,
Parameter,
LOG as OPENJD_LOG,
PathMappingRule,
PosixSessionUser,
StepScriptModel,
Session as OPENJDSession,
SessionUser,
)
from openjd.sessions import Session as OPENJDSession
from openjd.sessions import LOG as OPENJD_LOG

from deadline.job_attachments.asset_sync import AssetSync
from deadline.job_attachments.asset_sync import logger as ASSET_SYNC_LOGGER
Expand Down Expand Up @@ -79,6 +79,7 @@
ActionState.CANCELED: "CANCELED",
ActionState.FAILED: "FAILED",
ActionState.SUCCESS: "SUCCEEDED",
ActionState.TIMEOUT: "FAILED",
}
TIME_DELTA_ZERO = timedelta()

Expand Down Expand Up @@ -966,7 +967,11 @@ def _action_updated_impl(
assert self._stop.is_set(), "current_action is None or stopping"
return

is_unsuccessful = action_status.state in (ActionState.FAILED, ActionState.CANCELED)
is_unsuccessful = action_status.state in (
ActionState.FAILED,
ActionState.CANCELED,
ActionState.TIMEOUT,
)

if (
action_status.state == ActionState.SUCCESS
Expand Down Expand Up @@ -1145,7 +1150,7 @@ def run_task(
self,
*,
step_script: StepScriptModel,
task_parameter_values: list[Parameter],
task_parameter_values: TaskParameterSet,
) -> None:
self._session.run_task(
step_script=step_script,
Expand Down
Loading

0 comments on commit 64b8f88

Please sign in to comment.