diff --git a/.gitignore b/.gitignore index 641790fb..38f01166 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ __pycache__/ *_version.py tmp.* .deployed_resources.sh +scripts/submit_jobs/job_attachment_job/outputs/ diff --git a/scripts/submit_jobs/job_attachment_job/files/test_input_file b/scripts/submit_jobs/job_attachment_job/files/test_input_file new file mode 100644 index 00000000..5e87f2e1 --- /dev/null +++ b/scripts/submit_jobs/job_attachment_job/files/test_input_file @@ -0,0 +1 @@ +Hi! This is a nice test input file. \ No newline at end of file diff --git a/scripts/submit_jobs/job_attachment_job/template.json b/scripts/submit_jobs/job_attachment_job/template.json new file mode 100644 index 00000000..f2da4464 --- /dev/null +++ b/scripts/submit_jobs/job_attachment_job/template.json @@ -0,0 +1,40 @@ +{ + "specificationVersion": "jobtemplate-2023-09", + "name": "AssetsExample", + "parameterDefinitions": [ + { + "name": "DataDir", + "type": "PATH", + "dataFlow": "INOUT", + "userInterface": { + "label": "Input/Output Directory", + "control": "CHOOSE_DIRECTORY" + } + }, + { + "name": "StringToAppend", + "type": "STRING" + } + ], + "steps": [ + + { + "name": "AppendString", + "script": { + "actions": { + "onRun": { + "command": "{{ Task.File.runScript }}" + } + }, + "embeddedFiles": [ + { + "name": "runScript", + "type": "TEXT", + "runnable": true, + "data": "#!/usr/bin/env bash\n\n echo -n $(cat {{Param.DataDir}}/files/test_input_file){{Param.StringToAppend}} > {{Param.DataDir}}/output_file.txt\n" + } + ] + } + } + ] +} diff --git a/test/e2e/linux/test_job_submissions.py b/test/e2e/linux/test_job_submissions.py index 65f586dc..21aa31ad 100644 --- a/test/e2e/linux/test_job_submissions.py +++ b/test/e2e/linux/test_job_submissions.py @@ -5,10 +5,19 @@ """ import boto3 import botocore.client +import configparser +import os import botocore.config import botocore.exceptions import pytest import logging +from deadline.job_attachments._aws.deadline import get_queue +from deadline.job_attachments import download +from e2e.conftest import DeadlineResources +from deadline.client.config import set_setting +from deadline.client import api +from typing import Dict, List, Optional +import uuid from deadline_test_fixtures import Job, TaskStatus, PosixSessionUser, DeadlineClient @@ -43,7 +52,7 @@ def test_success( # THEN LOG.info(f"Waiting for job {job.id} to complete") - job.wait_until_complete(client=deadline_client) + job.wait_until_complete(client=deadline_client, max_retries=20) LOG.info(f"Job result: {job}") assert job.task_run_status == TaskStatus.SUCCEEDED @@ -92,7 +101,7 @@ def test_job_run_as_user( ) # THEN - job.wait_until_complete(client=deadline_client) + job.wait_until_complete(client=deadline_client, max_retries=20) # Retrieve job output and verify whoami printed the queue's jobsRunAsUser job_logs = job.get_logs( @@ -109,3 +118,95 @@ def test_job_run_as_user( f"I am: {job_run_as_user.user}" in full_log ), f"Expected message not found in Job logs. Logs are in CloudWatch log group: {job_logs.log_group_name}" assert job.task_run_status == TaskStatus.SUCCEEDED + + def test_worker_uses_job_attachment_configuration( + self, + deadline_resources: DeadlineResources, + deadline_client: DeadlineClient, + ) -> None: + # Verify that the worker uses the correct job attachment configuration, and writes the output to the correct location + + test_run_uuid: str = str(uuid.uuid4()) + + job_bundle_path: str = os.path.join( + os.path.dirname(__file__), + "..", + "..", + "..", + "scripts", + "submit_jobs", + "job_attachment_job", + ) + job_parameters: List[Dict[str, str]] = [ + {"name": "StringToAppend", "value": test_run_uuid}, + {"name": "DataDir", "value": job_bundle_path}, + ] + config = configparser.ConfigParser() + + set_setting("defaults.farm_id", deadline_resources.farm.id, config) + set_setting("defaults.queue_id", deadline_resources.queue_a.id, config) + + job_id: Optional[str] = api.create_job_from_job_bundle( + job_bundle_path, + job_parameters, + priority=99, + config=config, + queue_parameter_definitions=[], + ) + assert job_id is not None + + job_details = Job.get_job_details( + client=deadline_client, + farm=deadline_resources.farm, + queue=deadline_resources.queue_a, + job_id=job_id, + ) + job = Job( + farm=deadline_resources.farm, + queue=deadline_resources.queue_a, + template={}, + **job_details, + ) + job.wait_until_complete(client=deadline_client, max_retries=20) + + job_attachment_settings = get_queue( + farm_id=deadline_resources.farm.id, + queue_id=deadline_resources.queue_a.id, + ).jobAttachmentSettings + + assert job_attachment_settings is not None + + job_output_downloader = download.OutputDownloader( + s3_settings=job_attachment_settings, + farm_id=deadline_resources.farm.id, + queue_id=deadline_resources.queue_a.id, + job_id=job.id, + step_id=None, + task_id=None, + ) + + output_paths_by_root = job_output_downloader.get_output_paths_by_root() + # Set root path output will be downloaded to to output_root_path. Assumes there is only one root path. + job_output_downloader.set_root_path( + list(output_paths_by_root.keys())[0], + os.path.abspath(os.path.join(job_bundle_path, "outputs", test_run_uuid)), + ) + job_output_downloader.download_job_output() + + with ( + open(os.path.join(job_bundle_path, "files", "test_input_file"), "r") as input_file, + open( + os.path.join(job_bundle_path, "outputs", test_run_uuid, "output_file.txt"), "r" + ) as output_file, + ): + input_file_content: str = input_file.read() + output_file_content = output_file.read() + + assert output_file_content == (input_file_content + test_run_uuid) + + if os.path.exists( + os.path.join(job_bundle_path, "outputs", test_run_uuid, "output_file.txt") + ): + # Remove output files, whilst making sure they exist + os.remove(os.path.join(job_bundle_path, "outputs", test_run_uuid, "output_file.txt")) + os.rmdir(os.path.join(job_bundle_path, "outputs", test_run_uuid))