Skip to content

Commit

Permalink
Merge branch 'main' of github.com:mbari-org/fastapi-localtrack into main
Browse files Browse the repository at this point in the history
  • Loading branch information
danellecline committed Oct 6, 2023
2 parents 1abd069 + dcaf167 commit c958df6
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 158 deletions.
2 changes: 1 addition & 1 deletion bin/run_dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export $(grep -v '^#' $BASE_DIR/.env.dev | xargs)
# Replace ${HOME} with the actual home directory and export needed variables
DATA_DIR=${DATA_DIR/\$\{HOME\}/$HOME}
export DATABASE_DIR=${DATA_DIR}/sqlite_data # Path to local database
export MODEL_DIR=${DATA_DIR}/models # Path to local database
export MODEL_DIR=${DATA_DIR}/models # Path to models

echo "Fetch a few videos to serve in the default nginx/video directory"
mkdir -p ${DATA_DIR}/nginx/video
Expand Down
3 changes: 2 additions & 1 deletion bin/run_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}" )" && pwd )"
BASE_DIR="$(cd "$(dirname "${SCRIPT_DIR}/../.." )" && pwd )"
cd $BASE_DIR
export PYTHONPATH=$BASE_DIR/src:$BASE_DIR/tests
export SQLALCHEMY_SILENCE_UBER_WARNING=1

# Run the development stack
./bin/run_dev.sh
Expand All @@ -20,7 +21,7 @@ export $(grep -v '^#' $BASE_DIR/.env.dev | xargs)
# Model tests
pytest -s -v tests/test_model.py::test_model_discovery
pytest -s -v tests/test_model.py::test_num_models
pytest -s -v tests/test_model.py::test_stats
pytest -s -v tests/test_model.py::test_status

# Health tests
pytest -s -v tests/test_health.py::test_health
Expand Down
30 changes: 27 additions & 3 deletions src/daemon/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from dependency_injector.wiring import Provide, inject

from aiodocker import Docker, DockerError
from daemon.dispatcher import Dispatcher
from daemon.container import Container
from daemon.logger import create_logger_file, info
from daemon.logger import create_logger_file, info, err
import asyncio
import yaml
import os
Expand All @@ -17,6 +18,7 @@
log_path = Path(os.path.dirname(__file__)).parent.parent / 'logs'
create_logger_file(log_path, 'daemon')


@inject
def main(dispatcher: Dispatcher = Provide[Container.dispatcher]) -> None:
dispatcher.run()
Expand Down Expand Up @@ -49,6 +51,24 @@ def env_check() -> bool:
return True


async def docker_check(data: dict) -> bool:
# if running this on an arm64 machine, use the arm64 docker image
if os.uname().machine == 'arm64':
image_name = data['monitors']['docker']['strongsort_container_arm64']
else:
image_name = data['monitors']['docker']['strongsort_container']
async with Docker() as docker_aoi:
try:
return await docker_aoi.images.inspect(image_name)
except DockerError as e:
if e.status == 404:
await docker_aoi.pull(image_name)
else:
raise DockerError(e.status, f'Error retrieving {image_name} image.')

return False


async def run():
yaml_path = Path(os.path.dirname(__file__)).parent.parent / 'config.yml'
if not yaml_path.exists():
Expand All @@ -62,10 +82,14 @@ async def run():
return False

check_minio = await minio_check(data)
check_docker = await docker_check(data)

# Exit if we can't upload to minio
# Exit if we can't upload to minio or pull docker images
if not check_minio:
print(f"Could not upload to minio")
err(f"Could not upload to minio")
return False
if not check_docker:
err(f"Could not pull docker image")
return False

# Initialize the container
Expand Down
139 changes: 71 additions & 68 deletions src/daemon/docker_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# fastapi-localtrack, Apache-2.0 license
# Filename: daemon/docker_client.py
# Description: Docker client that manages docker containers
import asyncio

import os
from datetime import datetime
from pathlib import Path
Expand All @@ -24,58 +24,68 @@ class DockerClient:

def __init__(self) -> None:
info('Initializing DockerClient')
self._instances = {}
self._runners = {}

async def check(self, database_path: Path) -> None:
"""
Check the status of any running jobs
Check the status of any running jobs and close them out
:param database_path:
:return:
"""

session_maker = init_db(database_path, reset=False)

jobs_to_remove = []
for job_id, instance in self._instances.items():
for job_id, runner in self._runners.items():

if runner.is_running():
info(f'Job {job_id} docker container {runner.container_name} is still running')
continue

if instance.is_complete():
info(f'Job {job_id} processing complete: {instance.is_successful()}')
if runner.is_successful():
info(f'Job {job_id} docker container {runner.container_name} processing complete')
jobs_to_remove.append(job_id)
await runner.fini()

# Update the job status and notify
with session_maker.begin() as db:
job = db.query(JobLocal).filter(JobLocal.id == job_id).first()
if instance.is_successful():
if job.media[0].metadata_b64:
metadata = json_b64_decode(job.media[0].metadata_b64)
else:
metadata = {}

update_media(db, job, job.media[0].name, Status.SUCCESS)
jobs_to_remove.append(job_id)

job.results, local_path, num_tracks, processing_time_secs = instance.get_results()

metadata['s3_path'] = job.results
metadata['num_tracks'] = num_tracks
metadata['processing_time_secs'] = processing_time_secs

update_media(db, job,
job.media[0].name,
Status.SUCCESS,
metadata_b64=json_b64_encode(metadata))

await notify(job, local_path)

if job.media[0].metadata_b64:
metadata = json_b64_decode(job.media[0].metadata_b64)
else:
update_media(db, job, job.media[0].name, Status.FAILED)
jobs_to_remove.append(job_id)
await notify(job, None)
metadata = {}
job.results, local_path, num_tracks, processing_time_secs = runner.get_results()
metadata['s3_path'] = job.results
metadata['num_tracks'] = num_tracks
metadata['processing_time_secs'] = processing_time_secs
update_media(db, job,
job.media[0].name,
Status.SUCCESS,
metadata_b64=json_b64_encode(metadata))
await notify(job, local_path)

if runner.failed():
warn(f'Job {job_id} docker container {runner.container_name} failed')
jobs_to_remove.append(job_id)
await runner.fini()
# Update the job status and notify
with session_maker.begin() as db:
job = db.query(JobLocal).filter(JobLocal.id == job_id).first()
if job.media[0].metadata_b64:
metadata = json_b64_decode(job.media[0].metadata_b64)
else:
metadata = {}
update_media(db, job,
job.media[0].name,
Status.FAILED,
metadata_b64=json_b64_encode(metadata))
await notify(job, local_path)

# Remove the instances
for job_id in jobs_to_remove:
if job_id in self._instances:
info(f'Removing instance for job {job_id}')
del self._instances[job_id]
if job_id in self._runners:
info(f'Removing runner for job {job_id} from the list of runners')
del self._runners[job_id]

async def process(self,
num_gpus: int,
Expand Down Expand Up @@ -113,7 +123,9 @@ async def process(self,
client = docker.from_env()

# Get all active docker containers
all_containers = client.containers.list(all=True, filters={'name': DEFAULT_CONTAINER_NAME})
all_containers = client.containers.list(all=True)
all_containers = [container for container in all_containers if
container.name.startswith(DEFAULT_CONTAINER_NAME)]

info(f'Found {len(all_containers)} active {DEFAULT_CONTAINER_NAME} docker containers')

Expand All @@ -136,26 +148,25 @@ async def process(self,
args = job_data.args or DEFAULT_ARGS

info(f'Running job {job_data.id} with output {output_s3}')
instance = DockerRunner(image_name=job_data.engine,
job_id=job_data.id,
output_s3=output_s3,
video_url=job_data.media[0].name,
model_s3=job_data.model,
track_s3=s3_track_config,
args=args)
runner = DockerRunner(image_name=job_data.engine,
job_id=job_data.id,
output_s3=output_s3,
video_url=job_data.media[0].name,
model_s3=job_data.model,
track_s3=s3_track_config,
args=args)

with session_maker.begin() as db:
job = db.query(JobLocal).filter(JobLocal.id == job_data.id).first()
update_media(db, job, job.media[0].name, Status.RUNNING)

self._instances[job_data.id] = instance
self._runners[job_data.id] = runner

# Process the video asynchronously
asyncio.run_coroutine_threadsafe(instance.run(has_gpu), asyncio.get_event_loop())
await runner.run(has_gpu)
else:
err(f'No job found with id {job_id}')
else:
info(f'Already running {len(all_containers)} jobs. Waiting for one to finish')
info(f'Already running maximum allowed {len(all_containers)} jobs. Waiting for one to finish')

@staticmethod
def startup(database_path: Path) -> None:
Expand Down Expand Up @@ -195,27 +206,24 @@ def startup(database_path: Path) -> None:
job.media[0].status = Status.FAILED

# Get all active docker containers
all_containers = client.containers.list()
# Prune to only those that start with the default container name
all_containers = client.containers.list(all=True)
all_containers = [container for container in all_containers if
container.name.startswith(DEFAULT_CONTAINER_NAME)]

if len(all_containers) > 0:
# Should never get here unless something went wrong
for container in all_containers:
err(
f'Container {container.id} was running but the service was restarted. Stopping and removing it')
# Stop the container
try:
container = client.containers.get(container.id)
if container.status == 'running':
container.kill()
# Should never get here unless something went wrong
for container in all_containers:
err(
f'Container {container.id} was running but the service was restarted. Stopping and removing it')
# Stop the container
try:
container = client.containers.get(container.id)
if container.status == 'running':
container.stop()
info(f"Container {container.id} stopped successfully.")
container.remove()
info(f"Container {container.id} removed successfully.")
except Exception as e:
exception(e)
container.remove()
info(f"Container {container.id} removed successfully.")
except Exception as e:
exception(e)


async def notify(job: JobLocal, local_path: Path = None) -> None:
Expand All @@ -230,28 +238,23 @@ async def notify(job: JobLocal, local_path: Path = None) -> None:
warn("NOTIFY_URL environment variable not set. Skipping notification")
return

# Add any additional kwargs to the metadata
metadata = json_b64_decode(job.metadata_b64)
# Add job id to the metadata
metadata['job_id'] = job.id

if local_path and local_path.exists():
with local_path.open("rb") as file:
results = file.read()
form_data = {
"job_id": f"{job.id}",
"metadata": (None, json.dumps(metadata), 'application/json'),
"file": results
}
else:
err(f'No track tar file found for job {job.id}')
form_data = {
"job_id": f"{job.id}",
"metadata": (None, json.dumps(metadata), 'application/json'),
"file": None
}

info(f'Sending notification {metadata} to {notify_url}')
info(f'Sending notification for job {job.id} to {notify_url}')

# Send the multipart POST request
response = requests.post(notify_url, files=form_data)
Expand Down
Loading

0 comments on commit c958df6

Please sign in to comment.