diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index 67ab5a732..a08602007 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -1,10 +1,9 @@ import argparse +import datetime import logging import pathlib -import shutil import sys import time -import uuid from contextlib import closing import brotli @@ -35,6 +34,19 @@ logger.addHandler(logging_console_handler) +def print_compression_job_status(job_row, current_time): + job_uncompressed_size = job_row["uncompressed_size"] + job_compressed_size = job_row["compressed_size"] + job_start_time = job_row["start_time"] + compression_ratio = float(job_uncompressed_size) / job_compressed_size + compression_speed = job_uncompressed_size / (current_time - job_start_time).total_seconds() + logger.info( + f"Compressed {pretty_size(job_uncompressed_size)} into " + f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}x). " + f"Speed: {pretty_size(compression_speed)}/s." + ) + + def handle_job_update(db, db_cursor, job_id, no_progress_reporting): if no_progress_reporting: polling_query = ( @@ -42,16 +54,12 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting): ) else: polling_query = ( - f"SELECT status, status_msg, uncompressed_size, compressed_size " + f"SELECT start_time, status, status_msg, uncompressed_size, compressed_size " f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}" ) - completion_query = ( - f"SELECT duration, uncompressed_size, compressed_size " - f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}" - ) - job_last_uncompressed_size = 0 + while True: db_cursor.execute(polling_query) results = db_cursor.fetchall() @@ -63,37 +71,26 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting): job_row = results[0] job_status = job_row["status"] - - if not no_progress_reporting: - job_uncompressed_size = job_row["uncompressed_size"] - job_compressed_size = job_row["compressed_size"] - if job_uncompressed_size > 0: - compression_ratio = float(job_uncompressed_size) / job_compressed_size - if job_last_uncompressed_size < job_uncompressed_size: - logger.info( - f"Compressed {pretty_size(job_uncompressed_size)} into " - f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f})" - ) - job_last_uncompressed_size = job_uncompressed_size + current_time = datetime.datetime.now() if CompressionJobStatus.SUCCEEDED == job_status: # All tasks in the job is done - speed = 0 if not no_progress_reporting: - db_cursor.execute(completion_query) - job_row = db_cursor.fetchone() - if job_row["duration"] and job_row["duration"] > 0: - speed = job_row["uncompressed_size"] / job_row["duration"] - logger.info( - f"Compression finished. Runtime: {job_row['duration']}s. " - f"Speed: {pretty_size(speed)}/s." - ) + logger.info("Compression finished.") + print_compression_job_status(job_row, current_time) break # Done if CompressionJobStatus.FAILED == job_status: # One or more tasks in the job has failed logger.error(f"Compression failed. {job_row['status_msg']}") break # Done - if CompressionJobStatus.RUNNING == job_status or CompressionJobStatus.PENDING == job_status: + + if CompressionJobStatus.RUNNING == job_status: + if not no_progress_reporting: + job_uncompressed_size = job_row["uncompressed_size"] + if job_last_uncompressed_size < job_uncompressed_size: + print_compression_job_status(job_row, current_time) + job_last_uncompressed_size = job_uncompressed_size + elif CompressionJobStatus.PENDING == job_status: pass # Simply wait another iteration else: error_msg = f"Unhandled CompressionJobStatus: {job_status}" diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index cf696b240..ce88ad185 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -1,6 +1,5 @@ import datetime import json -import logging import os import pathlib import subprocess @@ -9,21 +8,69 @@ import yaml from celery.app.task import Task from celery.utils.log import get_task_logger -from clp_py_utils.clp_config import Database, StorageEngine +from clp_py_utils.clp_config import ( + COMPRESSION_JOBS_TABLE_NAME, + COMPRESSION_TASKS_TABLE_NAME, + Database, + StorageEngine, +) from clp_py_utils.clp_logging import set_logging_level from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.compress.celery import app from job_orchestration.scheduler.constants import CompressionTaskStatus from job_orchestration.scheduler.job_config import ClpIoConfig, PathsToCompress -from job_orchestration.scheduler.scheduler_data import ( - CompressionTaskFailureResult, - CompressionTaskSuccessResult, -) +from job_orchestration.scheduler.scheduler_data import CompressionTaskResult # Setup logging logger = get_task_logger(__name__) +def update_compression_task_metadata(db_cursor, task_id, kv): + if not len(kv): + raise ValueError("Must specify at least one field to update") + + field_set_expressions = [f'{k}="{v}"' for k, v in kv.items()] + query = f""" + UPDATE {COMPRESSION_TASKS_TABLE_NAME} + SET {", ".join(field_set_expressions)} + WHERE id={task_id} + """ + db_cursor.execute(query) + + +def increment_compression_job_metadata(db_cursor, job_id, kv): + if not len(kv): + raise ValueError("Must specify at least one field to update") + + field_set_expressions = [f"{k}={k}+{v}" for k, v in kv.items()] + query = f""" + UPDATE {COMPRESSION_JOBS_TABLE_NAME} + SET {", ".join(field_set_expressions)} + WHERE id={job_id} + """ + db_cursor.execute(query) + + +def update_tags(db_cursor, table_prefix, archive_id, tag_ids): + db_cursor.executemany( + f"INSERT INTO {table_prefix}archive_tags (archive_id, tag_id) VALUES (%s, %s)", + [(archive_id, tag_id) for tag_id in tag_ids], + ) + + +def update_job_metadata_and_tags(db_cursor, job_id, table_prefix, tag_ids, archive_stats): + if tag_ids is not None: + update_tags(db_cursor, table_prefix, archive_stats["id"], tag_ids) + increment_compression_job_metadata( + db_cursor, + job_id, + dict( + uncompressed_size=archive_stats["uncompressed_size"], + compressed_size=archive_stats["size"], + ), + ) + + def make_clp_command( clp_home: pathlib.Path, archive_output_dir: pathlib.Path, @@ -80,14 +127,6 @@ def make_clp_s_command( return compression_cmd -def update_tags(db_conn, db_cursor, table_prefix, archive_id, tag_ids): - db_cursor.executemany( - f"INSERT INTO {table_prefix}archive_tags (archive_id, tag_id) VALUES (%s, %s)", - [(archive_id, tag_id) for tag_id in tag_ids], - ) - db_conn.commit() - - def run_clp( clp_config: ClpIoConfig, clp_home: pathlib.Path, @@ -98,6 +137,7 @@ def run_clp( task_id: int, tag_ids, paths_to_compress: PathsToCompress, + sql_adapter: SQL_Adapter, clp_metadata_db_connection_config, ): """ @@ -112,6 +152,7 @@ def run_clp( :param task_id: :param tag_ids: :param paths_to_compress: PathToCompress + :param sql_adapter: SQL_Adapter :param clp_metadata_db_connection_config :return: tuple -- (whether compression was successful, output messages) """ @@ -169,45 +210,49 @@ def run_clp( compression_successful = False proc = subprocess.Popen(compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file) - sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_connection_config)) - with closing(sql_adapter.create_connection(True)) as db_conn, closing( - db_conn.cursor(dictionary=True) - ) as db_cursor: - # Compute the total amount of data compressed - last_archive_stats = None - total_uncompressed_size = 0 - total_compressed_size = 0 - while True: - line = proc.stdout.readline() - if not line: - break - stats = json.loads(line.decode("ascii")) - if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]: - # We've started a new archive so add the previous archive's last - # reported size to the total - total_uncompressed_size += last_archive_stats["uncompressed_size"] - total_compressed_size += last_archive_stats["size"] - if tag_ids is not None: - update_tags( - db_conn, - db_cursor, - clp_metadata_db_connection_config["table_prefix"], - last_archive_stats["id"], - tag_ids, - ) - last_archive_stats = stats - if last_archive_stats is not None: - # Add the last archive's last reported size + # Compute the total amount of data compressed + last_archive_stats = None + total_uncompressed_size = 0 + total_compressed_size = 0 + while True: + line = proc.stdout.readline() + if not line: + break + stats = json.loads(line.decode("ascii")) + if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]: + # We've started a new archive so add the previous archive's last + # reported size to the total total_uncompressed_size += last_archive_stats["uncompressed_size"] total_compressed_size += last_archive_stats["size"] - if tag_ids is not None: - update_tags( - db_conn, + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + update_job_metadata_and_tags( db_cursor, + job_id, clp_metadata_db_connection_config["table_prefix"], - last_archive_stats["id"], tag_ids, + last_archive_stats, ) + db_conn.commit() + + last_archive_stats = stats + + if last_archive_stats is not None: + # Add the last archive's last reported size + total_uncompressed_size += last_archive_stats["uncompressed_size"] + total_compressed_size += last_archive_stats["size"] + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + update_job_metadata_and_tags( + db_cursor, + job_id, + clp_metadata_db_connection_config["table_prefix"], + tag_ids, + last_archive_stats, + ) + db_conn.commit() # Wait for compression to finish return_code = proc.wait() @@ -225,13 +270,15 @@ def run_clp( # Close stderr log file stderr_log_file.close() + worker_output = { + "total_uncompressed_size": total_uncompressed_size, + "total_compressed_size": total_compressed_size, + } if compression_successful: - return compression_successful, { - "total_uncompressed_size": total_uncompressed_size, - "total_compressed_size": total_compressed_size, - } + return CompressionTaskStatus.SUCCEEDED, worker_output else: - return compression_successful, {"error_message": f"See logs {stderr_log_path}"} + worker_output["error_message"] = f"See logs {stderr_log_path}" + return CompressionTaskStatus.FAILED, worker_output @app.task(bind=True) @@ -256,9 +303,11 @@ def compress( clp_io_config = ClpIoConfig.parse_raw(clp_io_config_json) paths_to_compress = PathsToCompress.parse_raw(paths_to_compress_json) + sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_connection_config)) + start_time = datetime.datetime.now() logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION STARTED.") - compression_successful, worker_output = run_clp( + compression_task_status, worker_output = run_clp( clp_io_config, pathlib.Path(clp_home_str), pathlib.Path(data_dir_str), @@ -268,25 +317,37 @@ def compress( task_id, tag_ids, paths_to_compress, + sql_adapter, clp_metadata_db_connection_config, ) duration = (datetime.datetime.now() - start_time).total_seconds() logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION COMPLETED.") - if compression_successful: - return CompressionTaskSuccessResult( - task_id=task_id, - status=CompressionTaskStatus.SUCCEEDED, - start_time=start_time, - duration=duration, - total_uncompressed_size=worker_output["total_uncompressed_size"], - total_compressed_size=worker_output["total_compressed_size"], - ).dict() - else: - return CompressionTaskFailureResult( - task_id=task_id, - status=CompressionTaskStatus.FAILED, - start_time=start_time, - duration=duration, - error_message=worker_output["error_message"], - ).dict() + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + update_compression_task_metadata( + db_cursor, + task_id, + dict( + start_time=start_time, + status=compression_task_status, + partition_uncompressed_size=worker_output["total_uncompressed_size"], + partition_compressed_size=worker_output["total_compressed_size"], + duration=duration, + ), + ) + if CompressionTaskStatus.SUCCEEDED == compression_task_status: + increment_compression_job_metadata(db_cursor, job_id, dict(num_tasks_completed=1)) + db_conn.commit() + + compression_task_result = CompressionTaskResult( + task_id=task_id, + status=compression_task_status, + duration=duration, + ) + + if CompressionTaskStatus.FAILED == compression_task_status: + compression_task_result.error_message = worker_output["error_message"] + + return compression_task_result.dict() diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index 7280e366b..62b7a27fc 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -27,8 +27,7 @@ from job_orchestration.scheduler.job_config import ClpIoConfig from job_orchestration.scheduler.scheduler_data import ( CompressionJob, - CompressionTaskFailureResult, - CompressionTaskSuccessResult, + CompressionTaskResult, ) from pydantic import ValidationError @@ -228,9 +227,7 @@ def poll_running_jobs(db_conn, db_cursor): jobs_to_delete = [] for job_id, job in scheduled_jobs.items(): job_success = True - num_tasks_completed = 0 - uncompressed_size = 0 - compressed_size = 0 + duration = 0.0 error_message = "" try: @@ -241,49 +238,24 @@ def poll_running_jobs(db_conn, db_cursor): duration = (datetime.datetime.now() - job.start_time).total_seconds() # Check for finished jobs for task_result in returned_results: - if not task_result["status"] == CompressionTaskStatus.SUCCEEDED: - task_result = CompressionTaskFailureResult.parse_obj(task_result) + task_result = CompressionTaskResult.parse_obj(task_result) + if task_result.status == CompressionTaskStatus.SUCCEEDED: + logger.info( + f"Compression task job-{job_id}-task-{task_result.task_id} completed in" + f" {task_result.duration} second(s)." + ) + else: job_success = False error_message += f"task {task_result.task_id}: {task_result.error_message}\n" - update_compression_task_metadata( - db_cursor, - task_result.task_id, - dict( - start_time=task_result.start_time, - status=task_result.status, - duration=task_result.duration, - ), - ) logger.error( f"Compression task job-{job_id}-task-{task_result.task_id} failed with" f" error: {task_result.error_message}." ) - else: - task_result = CompressionTaskSuccessResult.parse_obj(task_result) - num_tasks_completed += 1 - uncompressed_size += task_result.total_uncompressed_size - compressed_size += task_result.total_compressed_size - update_compression_task_metadata( - db_cursor, - task_result.task_id, - dict( - start_time=task_result.start_time, - status=task_result.status, - partition_uncompressed_size=task_result.total_uncompressed_size, - partition_compressed_size=task_result.total_compressed_size, - duration=task_result.duration, - ), - ) - logger.info( - f"Compression task job-{job_id}-task-{task_result.task_id} completed in" - f" {task_result.duration} second(s)." - ) + except Exception as e: logger.error(f"Error while getting results for job {job_id}: {e}") job_success = False - db_conn.commit() - if job_success: logger.info(f"Job {job_id} succeeded.") update_compression_job_metadata( @@ -292,9 +264,6 @@ def poll_running_jobs(db_conn, db_cursor): dict( status=CompressionJobStatus.SUCCEEDED, duration=duration, - uncompressed_size=uncompressed_size, - compressed_size=compressed_size, - num_tasks_completed=num_tasks_completed, ), ) else: @@ -305,7 +274,6 @@ def poll_running_jobs(db_conn, db_cursor): dict( status=CompressionJobStatus.FAILED, status_msg=error_message, - num_tasks_completed=num_tasks_completed, ), ) db_conn.commit() diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 0c46cc544..87c1540e7 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -18,8 +18,8 @@ class CompressionJob(BaseModel): class CompressionTaskResult(BaseModel): task_id: int status: int - start_time: datetime.datetime duration: float + error_message: Optional[str] @validator("status") def valid_status(cls, field): @@ -29,15 +29,6 @@ def valid_status(cls, field): return field -class CompressionTaskFailureResult(CompressionTaskResult): - error_message: str - - -class CompressionTaskSuccessResult(CompressionTaskResult): - total_uncompressed_size: int - total_compressed_size: int - - class InternalJobState(Enum): WAITING_FOR_REDUCER = auto() WAITING_FOR_DISPATCH = auto()