Skip to content

Commit

Permalink
reorganised code, so dane-less run also executes IO/S3 workflow; requ…
Browse files Browse the repository at this point in the history
…ires DANE branch: s3_util_extend
  • Loading branch information
jblom committed Oct 30, 2023
1 parent 7a976e4 commit 449862a
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 107 deletions.
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ __pycache__
/config.yml
pyproject.toml
poetry.lock
/tests
/tests
s3-creds.env
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ __pycache__
!/tests/data/mp4s/test.mp4
!/tests/data/spectograms/*
!/tests/data/spectograms/README.md
/config.yml
/config.yml
s3-creds.env
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ services:
- ./config:/root/.DANE
container_name: visxp
command: --run-test-file # NOTE: comment this line to spin up th worker
env_file:
- s3-creds.env
logging:
options:
max-size: 20m
Expand Down
66 changes: 65 additions & 1 deletion main_data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@
from dane.config import cfg
import hecate
import keyframe_extraction
from models import VisXPFeatureExtractionInput, OutputType, HecateOutput
from models import (
VisXPFeatureExtractionInput,
OutputType,
HecateOutput,
CallbackResponse,
)
from output_util import (
get_source_id,
get_base_output_dir,
generate_output_dirs,
delete_local_output,
delete_input_file,
transfer_output,
)
from provenance import generate_full_provenance_chain
import spectogram
Expand Down Expand Up @@ -83,3 +92,58 @@ def generate_input_for_feature_extraction(
return VisXPFeatureExtractionInput(
200, "Succesfully generated input for VisXP feature extraction", provenance
)


def apply_desired_io_on_output(
input_file: str,
proc_result: VisXPFeatureExtractionInput,
delete_input_on_completion: bool,
delete_output_on_completetion: bool,
transfer_output_on_completion: bool,
) -> CallbackResponse:
# step 4: raise exception on failure
if proc_result.state != 200:
logger.error(f"Could not process the input properly: {proc_result.message}")
input_deleted = delete_input_file(input_file, delete_input_on_completion)
logger.info(f"Deleted input file of failed process: {input_deleted}")
# something went wrong inside the VisXP work processor, return that response here
return {"state": proc_result.state, "message": proc_result.message}

# step 5: process returned successfully, generate the output
source_id = get_source_id(input_file)
visxp_output_dir = get_base_output_dir(source_id)

# step 6: transfer the output to S3 (if configured so)
transfer_success = True
if transfer_output_on_completion:
transfer_success = transfer_output(source_id)

if (
not transfer_success
): # failure of transfer, impedes the workflow, so return error
return {
"state": 500,
"message": "Failed to transfer output to S3",
}

# step 7: clear the output files (if configured so)
delete_success = True
if delete_output_on_completetion:
delete_success = delete_local_output(source_id)

if (
not delete_success
): # NOTE: just a warning for now, but one to keep an EYE out for
logger.warning(f"Could not delete output files: {visxp_output_dir}")

# step 8: clean the input file (if configured so)
if not delete_input_file(input_file, delete_input_on_completion):
return {
"state": 500,
"message": "Generated a transcript, but could not delete the input file",
}

return {
"state": 200,
"message": "Successfully generated VisXP data for the next worker",
}
43 changes: 24 additions & 19 deletions output_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ def _is_valid_visxp_output(output_dir: str) -> bool:
return os.path.exists(os.path.join(output_dir, OutputType.PROVENANCE.value))


# TODO arrange an S3 bucket to store the VisXP results in
# TODO finish implementation to whatever is needed for VisXP files
def transfer_output(source_id: str) -> bool:
output_dir = get_base_output_dir(source_id)
logger.info(f"Transferring {output_dir} to S3 (asset={source_id})")
def _validate_transfer_config() -> bool:
if any(
[
not x
Expand All @@ -94,22 +90,31 @@ def transfer_output(source_id: str) -> bool:
"TRANSFER_ON_COMPLETION configured without all the necessary S3 settings"
)
return False
return True


# TODO adapt, so it simply tars all S3_OUTPUT_TYPES and uploads this single file
def transfer_output(source_id: str) -> bool:
output_dir = get_base_output_dir(source_id)
logger.info(f"Transferring {output_dir} to S3 (asset={source_id})")
if not _validate_transfer_config():
return False

s3 = S3Store(cfg.OUTPUT.S3_ENDPOINT_URL)
for output_type in S3_OUTPUT_TYPES:
output_sub_dir = os.path.join(output_dir, output_type.value)
success = s3.transfer_to_s3(
cfg.OUTPUT.S3_BUCKET,
os.path.join(
cfg.OUTPUT.S3_FOLDER_IN_BUCKET, source_id, output_type.value
), # assets/<program ID>__<carrier ID>/spectograms|keyframes|provenance
obtain_files_to_upload_to_s3(output_sub_dir),
)
if not success:
logger.error(
f"Failed to upload output folder: {output_sub_dir}, aborting rest of upload"
)
return False
file_list = [os.path.join(output_dir, ot.value) for ot in S3_OUTPUT_TYPES]
tar_file = os.path.join(output_dir, "visxp_prep.tar.gz")

success = s3.transfer_to_s3(
cfg.OUTPUT.S3_BUCKET,
os.path.join(
cfg.OUTPUT.S3_FOLDER_IN_BUCKET, source_id
), # assets/<program ID>__<carrier ID>
file_list, # this list of subdirs will be compressed into the tar below
tar_file, # this file will be uploaded
)
if not success:
logger.error(f"Failed to upload: {tar_file}")
return False
return True


Expand Down
42 changes: 23 additions & 19 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ readme = "README.md"

[tool.poetry.dependencies]
python = "^3.10"
dane = "^0.3.8"
yacs = "^0.1.8"
pika = "^1.3.2"
requests = "^2.31.0"
opencv-python = "^4.8.0.76"
python_speech_features = { git = "https://github.com/jameslyons/python_speech_features.git"}
numpy = "^1.24.3"
ffmpeg-python = "^0.2.0"
dane = {git = "https://git@github.com/CLARIAH/DANE.git", rev = "s3_util_extend"}


[tool.poetry.group.dev.dependencies]
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
requests >= 2.31.0
pika >= 1.3.2
yacs >= 0.1.8
dane >= 0.3.8
# dane >= 0.3.8
dane @ git+https://github.com/CLARIAH/DANE.git@s3_util_extend
opencv-python >= 4.8.0.76
numpy >= 1.24.3
ffmpeg-python >= 0.2.0
Expand Down
12 changes: 12 additions & 0 deletions scripts/pre-commit-build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/sh

# This script needs to be run BEFORE you're pushing changes to the main branch
# - it makes sure to link up the correct DANE version (main branch)

if poetry remove dane; then
echo "successfully uninstalled dane"
else
echo "already uninstalled"
fi

poetry add git+https://git@github.com/CLARIAH/DANE.git#s3_util_extend
Loading

0 comments on commit 449862a

Please sign in to comment.