Skip to content

Commit

Permalink
Merge pull request #35 from beeldengeluid/skip-dl-worker
Browse files Browse the repository at this point in the history
now also allowing s3 input
  • Loading branch information
jblom authored Nov 7, 2023
2 parents 58ccbb8 + 42ea582 commit e9103a2
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 25 deletions.
4 changes: 3 additions & 1 deletion config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ VISXP_PREP:
SPECTOGRAM_WINDOW_SIZE_MS: 1000
SPECTOGRAM_SAMPLERATE_HZ:
- 24000
TEST_INPUT_FILE: /data/testob-take-2.mp4
TEST_INPUT_FILE: https://openbeelden.nl/files/13/66/1411058.1366653.WEEKNUMMER404-HRE000042FF_924200_1089200.mp4
# TEST_INPUT_FILE: s3://your-bucket/assets/your-test-video.mp4
# TEST_INPUT_FILE: /data/your-test-video.mp4
INPUT:
DELETE_ON_COMPLETION: False # NOTE: set to True in production environment
OUTPUT:
Expand Down
84 changes: 64 additions & 20 deletions io_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from dane import Document
from dane.config import cfg
from dane.s3_util import S3Store
from dane.s3_util import S3Store, parse_s3_uri, validate_s3_uri
from models import OutputType, DownloadResult, Provenance


Expand Down Expand Up @@ -181,14 +181,15 @@ def obtain_input_file(
handler, doc: Document
) -> Tuple[Optional[DownloadResult], Optional[Provenance]]:
# step 1: try to fetch the content via the configured DANE download worker
download_result = _fetch_downloaded_content(handler, doc)
download_result = _fetch_dane_download_result(handler, doc)

# step 2: try to download the file if no DANE download worker was configured
if download_result is None:
logger.info(
"The file was not downloaded by the DANE worker, downloading it myself..."
)
download_result = _download_content(doc)
download_result = _download_dane_target_url(doc)

if download_result:
download_provenance = Provenance(
activity_name="download",
Expand All @@ -204,43 +205,86 @@ def obtain_input_file(


# https://www.openbeelden.nl/files/29/29494.29451.WEEKNUMMER243-HRE00015742.mp4
def _download_content(doc: Document) -> Optional[DownloadResult]:
def _download_dane_target_url(doc: Document) -> Optional[DownloadResult]:
if not doc.target or "url" not in doc.target or not doc.target["url"]:
logger.info("No url found in DANE doc")
return None
return download_uri(doc.target["url"])


def download_uri(uri: str) -> Optional[DownloadResult]:
logger.info(f"Trying to download {uri}")
if validate_s3_uri(uri):
logger.info("URI seems to be an s3 uri")
return s3_download(uri)
return http_download(uri)


def _fetch_dane_download_result(handler, doc: Document) -> Optional[DownloadResult]:
logger.info("checking download worker output")
possibles = handler.searchResult(doc._id, DANE_DOWNLOAD_TASK_KEY)
logger.info(possibles)
# NOTE now MUST use the latest dane-beng-download-worker or dane-download-worker
if len(possibles) > 0 and "file_path" in possibles[0].payload:
return DownloadResult(
possibles[0].payload.get("file_path"),
possibles[0].payload.get("download_time", -1),
possibles[0].payload.get("mime_type", "unknown"),
possibles[0].payload.get("content_length", -1),
)
logger.error("No file_path found in download result")
return None

logger.info("downloading {}".format(doc.target["url"]))
fn = os.path.basename(urlparse(doc.target["url"]).path)

# TODO test this!
def http_download(url: str) -> Optional[DownloadResult]:
logger.info(f"Downloading {url}")
fn = os.path.basename(urlparse(url).path)
# fn = unquote(fn)
# fn = doc.target['url'][doc.target['url'].rfind('/') +1:]
output_file = os.path.join(get_download_dir(), fn)
logger.info("saving to file {}".format(fn))
logger.info(f"Saving to file {fn}")

# download if the file is not present (preventing unnecessary downloads)
start_time = time()
if not os.path.exists(output_file):
with open(output_file, "wb") as file:
response = requests.get(doc.target["url"])
response = requests.get(url)
file.write(response.content)
file.close()
download_time = time() - start_time
return DownloadResult(
fn, # NOTE or output_file? hmmm
output_file, # NOTE or output_file? hmmm
download_time, # TODO add mime_type and content_length
)


def _fetch_downloaded_content(handler, doc: Document) -> Optional[DownloadResult]:
logger.info("checking download worker output")
possibles = handler.searchResult(doc._id, DANE_DOWNLOAD_TASK_KEY)
logger.info(possibles)
# NOTE now MUST use the latest dane-beng-download-worker or dane-download-worker
if len(possibles) > 0 and "file_path" in possibles[0].payload:
# e.g. s3://dane-asset-staging-gb/assets/2101608170158176431__NOS_JOURNAAL_-WON01513227.mp4
def s3_download(s3_uri: str) -> Optional[DownloadResult]:
logger.info(f"Downloading {s3_uri}")
if not validate_s3_uri(s3_uri):
logger.error(f"Invalid S3 URI: {s3_uri}")
return None

# source_id = get_source_id(s3_uri)
start_time = time()
output_folder = get_download_dir()

# TODO download the content into get_download_dir()
s3 = S3Store(cfg.OUTPUT.S3_ENDPOINT_URL)
bucket, object_name = parse_s3_uri(s3_uri)
logger.info(f"OBJECT NAME: {object_name}")
input_file_path = os.path.join(
get_download_dir(),
# source_id,
os.path.basename(object_name), # i.e. visxp_prep__<source_id>.tar.gz
)
success = s3.download_file(bucket, object_name, output_folder)
if success:
download_time = time() - start_time
return DownloadResult(
possibles[0].payload.get("file_path"),
possibles[0].payload.get("download_time", -1),
possibles[0].payload.get("mime_type", "unknown"),
possibles[0].payload.get("content_length", -1),
input_file_path,
download_time,
)
logger.error("No file_path found in download result")
logger.error("Failed to download input data from S3")
return None
25 changes: 24 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ python_speech_features = { git = "https://github.com/jameslyons/python_speech_fe
numpy = "^1.24.3"
ffmpeg-python = "^0.2.0"
dane = "^0.3.9"
validators = "^0.22.0"


[tool.poetry.group.dev.dependencies]
Expand Down Expand Up @@ -70,5 +71,6 @@ module = [
'yaml',
'yacs.*',
'numpy',
'validators',
]
ignore_missing_imports = true
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ dane >= 0.3.9
opencv-python >= 4.8.0.76
numpy >= 1.24.3
ffmpeg-python >= 0.2.0
python_speech_features @ git+https://github.com/jameslyons/python_speech_features
python_speech_features @ git+https://github.com/jameslyons/python_speech_features
validators >= 0.22.0
17 changes: 15 additions & 2 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
from pathlib import Path
import sys
from time import time
import validators

from base_util import validate_config
from dane import Document, Task, Result
from dane.base_classes import base_worker
from dane.config import cfg
from dane.s3_util import validate_s3_uri
from models import CallbackResponse, Provenance
from io_util import (
get_base_output_dir,
get_source_id,
get_s3_output_file_uri,
obtain_input_file,
get_download_dir,
download_uri,
)
from pika.exceptions import ChannelClosedByBroker
from main_data_processor import (
Expand All @@ -38,14 +41,24 @@
# triggered by running: python worker.py --run-test-file
def process_configured_input_file():
logger.info("Triggered processing of configured VISXP_PREP.TEST_INPUT_PATH")
proc_result = generate_input_for_feature_extraction(cfg.VISXP_PREP.TEST_INPUT_FILE)
input_file_path = cfg.VISXP_PREP.TEST_INPUT_FILE
if validate_s3_uri(input_file_path) or validators.url(input_file_path):
logger.info("Input is a URI, contuining to download")
download_result = download_uri(input_file_path)
input_file_path = download_result.file_path if download_result else None

if not input_file_path:
logger.error("input file empty")
sys.exit()

proc_result = generate_input_for_feature_extraction(input_file_path)
if proc_result.provenance:
logger.info(
f"Successfully processed example file in {proc_result.provenance.processing_time_ms}ms"
)
logger.info("Result ok, now applying the desired IO on the results")
validated_output: CallbackResponse = apply_desired_io_on_output(
cfg.VISXP_PREP.TEST_INPUT_FILE,
input_file_path,
proc_result,
cfg.INPUT.DELETE_ON_COMPLETION,
cfg.OUTPUT.DELETE_ON_COMPLETION,
Expand Down

0 comments on commit e9103a2

Please sign in to comment.