diff --git a/config/config.yml b/config/config.yml index e6f44d5..2a50780 100644 --- a/config/config.yml +++ b/config/config.yml @@ -35,7 +35,9 @@ VISXP_PREP: SPECTOGRAM_WINDOW_SIZE_MS: 1000 SPECTOGRAM_SAMPLERATE_HZ: - 24000 - TEST_INPUT_FILE: /data/testob-take-2.mp4 + TEST_INPUT_FILE: https://openbeelden.nl/files/13/66/1411058.1366653.WEEKNUMMER404-HRE000042FF_924200_1089200.mp4 + # TEST_INPUT_FILE: s3://your-bucket/assets/your-test-video.mp4 + # TEST_INPUT_FILE: /data/your-test-video.mp4 INPUT: DELETE_ON_COMPLETION: False # NOTE: set to True in production environment OUTPUT: diff --git a/io_util.py b/io_util.py index 9a06099..9776b4f 100644 --- a/io_util.py +++ b/io_util.py @@ -8,7 +8,7 @@ from dane import Document from dane.config import cfg -from dane.s3_util import S3Store +from dane.s3_util import S3Store, parse_s3_uri, validate_s3_uri from models import OutputType, DownloadResult, Provenance @@ -181,14 +181,15 @@ def obtain_input_file( handler, doc: Document ) -> Tuple[Optional[DownloadResult], Optional[Provenance]]: # step 1: try to fetch the content via the configured DANE download worker - download_result = _fetch_downloaded_content(handler, doc) + download_result = _fetch_dane_download_result(handler, doc) # step 2: try to download the file if no DANE download worker was configured if download_result is None: logger.info( "The file was not downloaded by the DANE worker, downloading it myself..." ) - download_result = _download_content(doc) + download_result = _download_dane_target_url(doc) + if download_result: download_provenance = Provenance( activity_name="download", @@ -204,43 +205,86 @@ def obtain_input_file( # https://www.openbeelden.nl/files/29/29494.29451.WEEKNUMMER243-HRE00015742.mp4 -def _download_content(doc: Document) -> Optional[DownloadResult]: +def _download_dane_target_url(doc: Document) -> Optional[DownloadResult]: if not doc.target or "url" not in doc.target or not doc.target["url"]: logger.info("No url found in DANE doc") return None + return download_uri(doc.target["url"]) + + +def download_uri(uri: str) -> Optional[DownloadResult]: + logger.info(f"Trying to download {uri}") + if validate_s3_uri(uri): + logger.info("URI seems to be an s3 uri") + return s3_download(uri) + return http_download(uri) + + +def _fetch_dane_download_result(handler, doc: Document) -> Optional[DownloadResult]: + logger.info("checking download worker output") + possibles = handler.searchResult(doc._id, DANE_DOWNLOAD_TASK_KEY) + logger.info(possibles) + # NOTE now MUST use the latest dane-beng-download-worker or dane-download-worker + if len(possibles) > 0 and "file_path" in possibles[0].payload: + return DownloadResult( + possibles[0].payload.get("file_path"), + possibles[0].payload.get("download_time", -1), + possibles[0].payload.get("mime_type", "unknown"), + possibles[0].payload.get("content_length", -1), + ) + logger.error("No file_path found in download result") + return None - logger.info("downloading {}".format(doc.target["url"])) - fn = os.path.basename(urlparse(doc.target["url"]).path) + +# TODO test this! +def http_download(url: str) -> Optional[DownloadResult]: + logger.info(f"Downloading {url}") + fn = os.path.basename(urlparse(url).path) # fn = unquote(fn) # fn = doc.target['url'][doc.target['url'].rfind('/') +1:] output_file = os.path.join(get_download_dir(), fn) - logger.info("saving to file {}".format(fn)) + logger.info(f"Saving to file {fn}") # download if the file is not present (preventing unnecessary downloads) start_time = time() if not os.path.exists(output_file): with open(output_file, "wb") as file: - response = requests.get(doc.target["url"]) + response = requests.get(url) file.write(response.content) file.close() download_time = time() - start_time return DownloadResult( - fn, # NOTE or output_file? hmmm + output_file, # NOTE or output_file? hmmm download_time, # TODO add mime_type and content_length ) -def _fetch_downloaded_content(handler, doc: Document) -> Optional[DownloadResult]: - logger.info("checking download worker output") - possibles = handler.searchResult(doc._id, DANE_DOWNLOAD_TASK_KEY) - logger.info(possibles) - # NOTE now MUST use the latest dane-beng-download-worker or dane-download-worker - if len(possibles) > 0 and "file_path" in possibles[0].payload: +# e.g. s3://dane-asset-staging-gb/assets/2101608170158176431__NOS_JOURNAAL_-WON01513227.mp4 +def s3_download(s3_uri: str) -> Optional[DownloadResult]: + logger.info(f"Downloading {s3_uri}") + if not validate_s3_uri(s3_uri): + logger.error(f"Invalid S3 URI: {s3_uri}") + return None + + # source_id = get_source_id(s3_uri) + start_time = time() + output_folder = get_download_dir() + + # TODO download the content into get_download_dir() + s3 = S3Store(cfg.OUTPUT.S3_ENDPOINT_URL) + bucket, object_name = parse_s3_uri(s3_uri) + logger.info(f"OBJECT NAME: {object_name}") + input_file_path = os.path.join( + get_download_dir(), + # source_id, + os.path.basename(object_name), # i.e. visxp_prep__.tar.gz + ) + success = s3.download_file(bucket, object_name, output_folder) + if success: + download_time = time() - start_time return DownloadResult( - possibles[0].payload.get("file_path"), - possibles[0].payload.get("download_time", -1), - possibles[0].payload.get("mime_type", "unknown"), - possibles[0].payload.get("content_length", -1), + input_file_path, + download_time, ) - logger.error("No file_path found in download result") + logger.error("Failed to download input data from S3") return None diff --git a/poetry.lock b/poetry.lock index 1f33f1a..012ee8e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -945,6 +945,29 @@ brotli = ["brotli (==1.0.9)", "brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotl secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"] socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] +[[package]] +name = "validators" +version = "0.22.0" +description = "Python Data Validation for Humans™" +category = "main" +optional = false +python-versions = ">=3.8" +files = [ + {file = "validators-0.22.0-py3-none-any.whl", hash = "sha256:61cf7d4a62bbae559f2e54aed3b000cea9ff3e2fdbe463f51179b92c58c9585a"}, + {file = "validators-0.22.0.tar.gz", hash = "sha256:77b2689b172eeeb600d9605ab86194641670cdb73b60afd577142a9397873370"}, +] + +[package.extras] +docs-offline = ["myst-parser (>=2.0.0)", "pypandoc-binary (>=1.11)", "sphinx (>=7.1.1)"] +docs-online = ["mkdocs (>=1.5.2)", "mkdocs-git-revision-date-localized-plugin (>=1.2.0)", "mkdocs-material (>=9.2.6)", "mkdocstrings[python] (>=0.22.0)", "pyaml (>=23.7.0)"] +hooks = ["pre-commit (>=3.3.3)"] +package = ["build (>=1.0.0)", "twine (>=4.0.2)"] +runner = ["tox (>=4.11.1)"] +sast = ["bandit[toml] (>=1.7.5)"] +testing = ["pytest (>=7.4.0)"] +tooling = ["black (>=23.7.0)", "pyright (>=1.1.325)", "ruff (>=0.0.287)"] +tooling-extras = ["pyaml (>=23.7.0)", "pypandoc-binary (>=1.11)", "pytest (>=7.4.0)"] + [[package]] name = "yacs" version = "0.1.8" @@ -964,4 +987,4 @@ PyYAML = "*" [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "dea97119dbf27e87d01d23149e594bb8eaf6492fe870c7bbab0fd4a0cc737a6a" +content-hash = "7da9ac5d4e30f424485222b8d9500cf6ac8edf0c8e831f582cf63fe7135bd2ae" diff --git a/pyproject.toml b/pyproject.toml index 583fd19..c576ab0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ python_speech_features = { git = "https://github.com/jameslyons/python_speech_fe numpy = "^1.24.3" ffmpeg-python = "^0.2.0" dane = "^0.3.9" +validators = "^0.22.0" [tool.poetry.group.dev.dependencies] @@ -70,5 +71,6 @@ module = [ 'yaml', 'yacs.*', 'numpy', + 'validators', ] ignore_missing_imports = true diff --git a/requirements.txt b/requirements.txt index 48fdcd2..7b58153 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ dane >= 0.3.9 opencv-python >= 4.8.0.76 numpy >= 1.24.3 ffmpeg-python >= 0.2.0 -python_speech_features @ git+https://github.com/jameslyons/python_speech_features \ No newline at end of file +python_speech_features @ git+https://github.com/jameslyons/python_speech_features +validators >= 0.22.0 \ No newline at end of file diff --git a/worker.py b/worker.py index d99ea14..97067b0 100644 --- a/worker.py +++ b/worker.py @@ -3,11 +3,13 @@ from pathlib import Path import sys from time import time +import validators from base_util import validate_config from dane import Document, Task, Result from dane.base_classes import base_worker from dane.config import cfg +from dane.s3_util import validate_s3_uri from models import CallbackResponse, Provenance from io_util import ( get_base_output_dir, @@ -15,6 +17,7 @@ get_s3_output_file_uri, obtain_input_file, get_download_dir, + download_uri, ) from pika.exceptions import ChannelClosedByBroker from main_data_processor import ( @@ -38,14 +41,24 @@ # triggered by running: python worker.py --run-test-file def process_configured_input_file(): logger.info("Triggered processing of configured VISXP_PREP.TEST_INPUT_PATH") - proc_result = generate_input_for_feature_extraction(cfg.VISXP_PREP.TEST_INPUT_FILE) + input_file_path = cfg.VISXP_PREP.TEST_INPUT_FILE + if validate_s3_uri(input_file_path) or validators.url(input_file_path): + logger.info("Input is a URI, contuining to download") + download_result = download_uri(input_file_path) + input_file_path = download_result.file_path if download_result else None + + if not input_file_path: + logger.error("input file empty") + sys.exit() + + proc_result = generate_input_for_feature_extraction(input_file_path) if proc_result.provenance: logger.info( f"Successfully processed example file in {proc_result.provenance.processing_time_ms}ms" ) logger.info("Result ok, now applying the desired IO on the results") validated_output: CallbackResponse = apply_desired_io_on_output( - cfg.VISXP_PREP.TEST_INPUT_FILE, + input_file_path, proc_result, cfg.INPUT.DELETE_ON_COMPLETION, cfg.OUTPUT.DELETE_ON_COMPLETION,