Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-package: Add support for printing real-time compression statistics. #388

Merged
merged 16 commits into from
May 15, 2024
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import argparse
import datetime
import logging
import pathlib
import shutil
import sys
import time
import uuid
from contextlib import closing

import brotli
Expand Down Expand Up @@ -35,23 +34,32 @@
logger.addHandler(logging_console_handler)


def print_compression_job_status(job_row, current_time):
job_uncompressed_size = job_row["uncompressed_size"]
job_compressed_size = job_row["compressed_size"]
job_start_time = job_row["start_time"]
compression_ratio = float(job_uncompressed_size) / job_compressed_size
compression_speed = job_uncompressed_size / (current_time - job_start_time).total_seconds()
logger.info(
f"Compressed {pretty_size(job_uncompressed_size)} into "
f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}x). "
f"Speed: {pretty_size(compression_speed)}/s."
)


def handle_job_update(db, db_cursor, job_id, no_progress_reporting):
if no_progress_reporting:
polling_query = (
f"SELECT status, status_msg FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}"
)
else:
polling_query = (
f"SELECT status, status_msg, uncompressed_size, compressed_size "
f"SELECT start_time, status, status_msg, uncompressed_size, compressed_size "
f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}"
)

completion_query = (
f"SELECT duration, uncompressed_size, compressed_size "
f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}"
)

job_last_uncompressed_size = 0

while True:
db_cursor.execute(polling_query)
results = db_cursor.fetchall()
Expand All @@ -63,37 +71,26 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting):

job_row = results[0]
job_status = job_row["status"]

if not no_progress_reporting:
job_uncompressed_size = job_row["uncompressed_size"]
job_compressed_size = job_row["compressed_size"]
if job_uncompressed_size > 0:
compression_ratio = float(job_uncompressed_size) / job_compressed_size
if job_last_uncompressed_size < job_uncompressed_size:
logger.info(
f"Compressed {pretty_size(job_uncompressed_size)} into "
f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f})"
)
job_last_uncompressed_size = job_uncompressed_size
current_time = datetime.datetime.now()

if CompressionJobStatus.SUCCEEDED == job_status:
# All tasks in the job is done
speed = 0
if not no_progress_reporting:
db_cursor.execute(completion_query)
job_row = db_cursor.fetchone()
if job_row["duration"] and job_row["duration"] > 0:
speed = job_row["uncompressed_size"] / job_row["duration"]
logger.info(
f"Compression finished. Runtime: {job_row['duration']}s. "
f"Speed: {pretty_size(speed)}/s."
)
logger.info("Compression finished.")
print_compression_job_status(job_row, current_time)
break # Done
if CompressionJobStatus.FAILED == job_status:
# One or more tasks in the job has failed
logger.error(f"Compression failed. {job_row['status_msg']}")
break # Done
if CompressionJobStatus.RUNNING == job_status or CompressionJobStatus.PENDING == job_status:

if CompressionJobStatus.RUNNING == job_status:
if not no_progress_reporting:
job_uncompressed_size = job_row["uncompressed_size"]
if job_last_uncompressed_size < job_uncompressed_size:
print_compression_job_status(job_row, current_time)
job_last_uncompressed_size = job_uncompressed_size
elif CompressionJobStatus.PENDING == job_status:
pass # Simply wait another iteration
else:
error_msg = f"Unhandled CompressionJobStatus: {job_status}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime
import json
import logging
import os
import pathlib
import subprocess
Expand All @@ -9,21 +8,69 @@
import yaml
from celery.app.task import Task
from celery.utils.log import get_task_logger
from clp_py_utils.clp_config import Database, StorageEngine
from clp_py_utils.clp_config import (
COMPRESSION_JOBS_TABLE_NAME,
COMPRESSION_TASKS_TABLE_NAME,
Database,
StorageEngine,
)
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.compress.celery import app
from job_orchestration.scheduler.constants import CompressionTaskStatus
from job_orchestration.scheduler.job_config import ClpIoConfig, PathsToCompress
from job_orchestration.scheduler.scheduler_data import (
CompressionTaskFailureResult,
CompressionTaskSuccessResult,
)
from job_orchestration.scheduler.scheduler_data import CompressionTaskResult

# Setup logging
logger = get_task_logger(__name__)


def update_compression_task_metadata(db_cursor, task_id, kv):
if not len(kv):
raise ValueError("Must specify at least one field to update")

field_set_expressions = [f'{k}="{v}"' for k, v in kv.items()]
query = f"""
UPDATE {COMPRESSION_TASKS_TABLE_NAME}
SET {", ".join(field_set_expressions)}
WHERE id={task_id}
"""
db_cursor.execute(query)


def increment_compression_job_metadata(db_cursor, job_id, kv):
if not len(kv):
raise ValueError("Must specify at least one field to update")

field_set_expressions = [f"{k}={k}+{v}" for k, v in kv.items()]
query = f"""
UPDATE {COMPRESSION_JOBS_TABLE_NAME}
SET {", ".join(field_set_expressions)}
WHERE id={job_id}
"""
db_cursor.execute(query)


def update_tags(db_cursor, table_prefix, archive_id, tag_ids):
db_cursor.executemany(
f"INSERT INTO {table_prefix}archive_tags (archive_id, tag_id) VALUES (%s, %s)",
[(archive_id, tag_id) for tag_id in tag_ids],
)


def update_job_metadata_and_tags(db_cursor, job_id, table_prefix, tag_ids, archive_stats):
if tag_ids is not None:
update_tags(db_cursor, table_prefix, archive_stats["id"], tag_ids)
increment_compression_job_metadata(
db_cursor,
job_id,
dict(
uncompressed_size=archive_stats["uncompressed_size"],
compressed_size=archive_stats["size"],
),
)


def make_clp_command(
clp_home: pathlib.Path,
archive_output_dir: pathlib.Path,
Expand Down Expand Up @@ -80,14 +127,6 @@ def make_clp_s_command(
return compression_cmd


def update_tags(db_conn, db_cursor, table_prefix, archive_id, tag_ids):
db_cursor.executemany(
f"INSERT INTO {table_prefix}archive_tags (archive_id, tag_id) VALUES (%s, %s)",
[(archive_id, tag_id) for tag_id in tag_ids],
)
db_conn.commit()


def run_clp(
clp_config: ClpIoConfig,
clp_home: pathlib.Path,
Expand All @@ -98,6 +137,7 @@ def run_clp(
task_id: int,
tag_ids,
paths_to_compress: PathsToCompress,
sql_adapter: SQL_Adapter,
clp_metadata_db_connection_config,
):
"""
Expand All @@ -112,6 +152,7 @@ def run_clp(
:param task_id:
:param tag_ids:
:param paths_to_compress: PathToCompress
:param sql_adapter: SQL_Adapter
:param clp_metadata_db_connection_config
:return: tuple -- (whether compression was successful, output messages)
"""
Expand Down Expand Up @@ -169,45 +210,49 @@ def run_clp(
compression_successful = False
proc = subprocess.Popen(compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file)

sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_connection_config))
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
# Compute the total amount of data compressed
last_archive_stats = None
total_uncompressed_size = 0
total_compressed_size = 0
while True:
line = proc.stdout.readline()
if not line:
break
stats = json.loads(line.decode("ascii"))
if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]:
# We've started a new archive so add the previous archive's last
# reported size to the total
total_uncompressed_size += last_archive_stats["uncompressed_size"]
total_compressed_size += last_archive_stats["size"]
if tag_ids is not None:
update_tags(
db_conn,
db_cursor,
clp_metadata_db_connection_config["table_prefix"],
last_archive_stats["id"],
tag_ids,
)
last_archive_stats = stats
if last_archive_stats is not None:
# Add the last archive's last reported size
# Compute the total amount of data compressed
last_archive_stats = None
total_uncompressed_size = 0
total_compressed_size = 0
while True:
line = proc.stdout.readline()
if not line:
break
stats = json.loads(line.decode("ascii"))
if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]:
# We've started a new archive so add the previous archive's last
# reported size to the total
total_uncompressed_size += last_archive_stats["uncompressed_size"]
total_compressed_size += last_archive_stats["size"]
if tag_ids is not None:
update_tags(
db_conn,
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
update_job_metadata_and_tags(
db_cursor,
job_id,
clp_metadata_db_connection_config["table_prefix"],
last_archive_stats["id"],
tag_ids,
last_archive_stats,
)
db_conn.commit()

last_archive_stats = stats

if last_archive_stats is not None:
# Add the last archive's last reported size
total_uncompressed_size += last_archive_stats["uncompressed_size"]
total_compressed_size += last_archive_stats["size"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
update_job_metadata_and_tags(
db_cursor,
job_id,
clp_metadata_db_connection_config["table_prefix"],
tag_ids,
last_archive_stats,
)
db_conn.commit()

# Wait for compression to finish
return_code = proc.wait()
Expand All @@ -225,13 +270,15 @@ def run_clp(
# Close stderr log file
stderr_log_file.close()

worker_output = {
"total_uncompressed_size": total_uncompressed_size,
"total_compressed_size": total_compressed_size,
}
if compression_successful:
return compression_successful, {
"total_uncompressed_size": total_uncompressed_size,
"total_compressed_size": total_compressed_size,
}
return CompressionTaskStatus.SUCCEEDED, worker_output
else:
return compression_successful, {"error_message": f"See logs {stderr_log_path}"}
worker_output["error_message"] = f"See logs {stderr_log_path}"
return CompressionTaskStatus.FAILED, worker_output


@app.task(bind=True)
Expand All @@ -256,9 +303,11 @@ def compress(
clp_io_config = ClpIoConfig.parse_raw(clp_io_config_json)
paths_to_compress = PathsToCompress.parse_raw(paths_to_compress_json)

sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_connection_config))

start_time = datetime.datetime.now()
logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION STARTED.")
compression_successful, worker_output = run_clp(
compression_task_status, worker_output = run_clp(
clp_io_config,
pathlib.Path(clp_home_str),
pathlib.Path(data_dir_str),
Expand All @@ -268,25 +317,37 @@ def compress(
task_id,
tag_ids,
paths_to_compress,
sql_adapter,
clp_metadata_db_connection_config,
)
duration = (datetime.datetime.now() - start_time).total_seconds()
logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION COMPLETED.")

if compression_successful:
return CompressionTaskSuccessResult(
task_id=task_id,
status=CompressionTaskStatus.SUCCEEDED,
start_time=start_time,
duration=duration,
total_uncompressed_size=worker_output["total_uncompressed_size"],
total_compressed_size=worker_output["total_compressed_size"],
).dict()
else:
return CompressionTaskFailureResult(
task_id=task_id,
status=CompressionTaskStatus.FAILED,
start_time=start_time,
duration=duration,
error_message=worker_output["error_message"],
).dict()
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should try to keep our MySQL connections short, even if frequent; so I would prefer we open a connection only just before we perform a write (currently, this opens a connection before we start compression, which itself could take a long time depending on the dataset and config).

MySQL's blog says that it's capable of handling a lot of short connections, but its default concurrent connection limit is only 151. We should probably benchmark this for ourselves at some point (a long time ago, @haiqi96 had done some scalability benchmarking that showed MySQL struggled with 20 concurrent connections performing inserts), but for now, I think following their advice is the safer option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my original plan was to use short connection. But I notice in compression scheduler, we maintain a long connection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the scheduler's case, it's only maintaining one connection that it's using for polling (among other things), right? In theory we could make it use shorter connections, but there I'm not sure it will make much difference (we should still measure at some point though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. For the compression task, we read the output from the process and update the database. Do you think we should open a connection each time when we get a new line?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could try opening a connection each time we need to update the archive's stats + tags (which would only be every time we finish an archive) and then once at the end of the task.

db_conn.cursor(dictionary=True)
) as db_cursor:
update_compression_task_metadata(
db_cursor,
task_id,
dict(
start_time=start_time,
status=compression_task_status,
partition_uncompressed_size=worker_output["total_uncompressed_size"],
partition_compressed_size=worker_output["total_compressed_size"],
duration=duration,
),
)
if CompressionTaskStatus.SUCCEEDED == compression_task_status:
increment_compression_job_metadata(db_cursor, job_id, dict(num_tasks_completed=1))
db_conn.commit()

compression_task_result = CompressionTaskResult(
task_id=task_id,
status=compression_task_status,
duration=duration,
)

if CompressionTaskStatus.FAILED == compression_task_status:
compression_task_result.error_message = worker_output["error_message"]

return compression_task_result.dict()
Loading
Loading