Skip to content

Commit

Permalink
fix: separate async status check from instance run
Browse files Browse the repository at this point in the history
  • Loading branch information
danellecline committed Oct 5, 2023
1 parent 266d5d7 commit 26fd20c
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 92 deletions.
1 change: 1 addition & 0 deletions .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ MINIO_SECRET_KEY=ReplaceMePassword
MINIO_LIVE_ACCESS_KEY=!G7caZ8AJykNKhHPA
# External to container for testing
MINIO_ENDPOINT_URL=http://localhost:7000
MINIO_EXTERNAL_ENDPOINT_URL=http://localhost:7000
ROOT_BUCKET_NAME=localtrack
MODEL_PREFIX=models
TRACK_PREFIX=tracks
4 changes: 2 additions & 2 deletions src/app/job/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def update_media(db: Session, job: Job, video_name: str, status: str, metadata_b
:param status: The status of the video
:param metadata_b64: The metadata to pass to the job
"""
info(f'Updating media {video_name} to {status}')
info(f'Updating media {video_name} in job {job.id} {job.name} to {status}')

media = [m for m in job.media if m.name == video_name]

Expand All @@ -95,7 +95,7 @@ def update_media(db: Session, job: Job, video_name: str, status: str, metadata_b
media.metadata_b64 = metadata_b64

else:
info(f'A new media {video_name} was added to job {job.name}')
info(f'A new media {video_name} was added to job {job.id} {job.name}')
new_media = Media(name=video_name,
status=status,
job=job,
Expand Down
100 changes: 69 additions & 31 deletions src/daemon/docker_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,61 @@

class DockerClient:

def __init__(self) -> None:
info('Initializing DockerClient')
self._instances = {}

async def check(self, database_path: Path) -> None:
"""
Check the status of any running jobs
:param database_path:
:return:
"""

session_maker = init_db(database_path, reset=False)

jobs_to_remove = []
for job_id, instance in self._instances.items():

if instance.is_complete():
info(f'Job {job_id} processing complete: {instance.is_successful()}')

# Update the job status and notify
with session_maker.begin() as db:
job = db.query(JobLocal).filter(JobLocal.id == job_id).first()
if instance.is_successful():
if job.media[0].metadata_b64:
metadata = json_b64_decode(job.media[0].metadata_b64)
else:
metadata = {}

update_media(db, job, job.media[0].name, Status.SUCCESS)
jobs_to_remove.append(job_id)

job.results, local_path, num_tracks, processing_time_secs = instance.get_results()

metadata['s3_path'] = job.results
metadata['num_tracks'] = num_tracks
metadata['processing_time_secs'] = processing_time_secs

update_media(db, job,
job.media[0].name,
Status.SUCCESS,
metadata_b64=json_b64_encode(metadata))

await notify(job, local_path)

else:
update_media(db, job, job.media[0].name, Status.FAILED)
jobs_to_remove.append(job_id)
await notify(job, None)

# Remove the instances
for job_id in jobs_to_remove:
if job_id in self._instances:
info(f'Removing instance for job {job_id}')
del self._instances[job_id]

async def process(self,
num_gpus: int,
database_path: Path,
Expand All @@ -30,11 +85,16 @@ async def process(self,
s3_track_config: str) -> ClientResponse:
"""
Process any jobs that are queued. This function is called by the daemon module
:param num_gpus: The number of GPUs available
:param database_path: The path to the database
:param root_bucket: The root bucket for the track tar files
:param track_prefix: The prefix for the track tar files
:param s3_track_config: The s3 track config
"""
# Two files per GPU
if num_gpus > 0:
has_gpu = True
max_concurrent_jobs = num_gpus*2
max_concurrent_jobs = num_gpus * 2
else:
has_gpu = False
max_concurrent_jobs = 2
Expand Down Expand Up @@ -85,37 +145,14 @@ async def process(self,
job = db.query(JobLocal).filter(JobLocal.id == job_data.id).first()
update_media(db, job, job.media[0].name, Status.RUNNING)

self._instances[job_data.id] = instance

# Process the video asynchronously
asyncio.run_coroutine_threadsafe(instance.run(has_gpu), asyncio.get_event_loop())
info(f'Processing complete: {instance.is_successful()}')

# Update the job status and notify
with session_maker.begin() as db:
job = db.query(JobLocal).filter(JobLocal.id == job_data.id).first()
if instance.is_successful():
if job.media[0].metadata_b64:
metadata = json_b64_decode(job.media[0].metadata_b64)
else:
metadata = {}

update_media(db, job, job.media[0].name, Status.SUCCESS)

job.results, local_path, num_tracks, processing_time_secs = instance.get_results()

metadata['s3_path'] = job.results
metadata['num_tracks'] = num_tracks
metadata['processing_time_secs'] = processing_time_secs

update_media(db, job,
job.media[0].name,
Status.SUCCESS,
metadata_b64=json_b64_encode(metadata))

await notify(job, local_path)

else:
update_media(db, job, job.media[0].name, Status.FAILED)
await notify(job, None)
else:
err(f'No job found with id {job_id}')
else:
info(f'Already running {len(all_containers)} jobs. Waiting for one to finish')

@staticmethod
def startup(database_path: Path) -> None:
Expand Down Expand Up @@ -157,7 +194,8 @@ def startup(database_path: Path) -> None:
# Get all active docker containers
all_containers = client.containers.list()
# Prune to only those that start with the default container name
all_containers = [container for container in all_containers if container.name.startswith(DEFAULT_CONTAINER_NAME)]
all_containers = [container for container in all_containers if
container.name.startswith(DEFAULT_CONTAINER_NAME)]

if len(all_containers) > 0:
# Should never get here unless something went wrong
Expand Down
Loading

0 comments on commit 26fd20c

Please sign in to comment.