Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-package: Add support for printing real-time compression statistics. #388

Merged
merged 16 commits into from
May 15, 2024

Conversation

wraymo
Copy link
Contributor

@wraymo wraymo commented May 10, 2024

References

Description

For large jobs, users often experience extended waiting periods until completion. During that time, they have no idea about the compression status. This PR addresses this issue by printing out real-time statistics (compression ratio and speed) during compression. Specifically, this PR introduces the code changes listed below:

  • CompressionTaskFailureResult and CompressionTaskSuccessResult are removed.
  • The compression scheduler does not update the completion status of compression tasks.
  • Instead, the completion status of compression tasks are updated within each task and the fields (i.e. compressed_size, uncompressed_size and num_tasks_completed) of compression jobs are updated at the same time.
  • For the user compression script, completion_query is removed and compression statistics are now printed promptly upon their update.

Validation performed

  • Built and started the package
  • Compressed the hive-24hrs dataset. It printed out statistics like this
2024-05-10 03:22:52,628 [INFO] [/opt/clp/lib/python3/site-packages/clp_package_utils/scripts/native/compress.py] Compression job 6 submitted.
2024-05-10 03:23:02,159 [INFO] [/opt/clp/lib/python3/site-packages/clp_package_utils/scripts/native/compress.py] Compressed 159.87MB into 4.14MB (38.58). Speed: 48.95MB/s.
2024-05-10 03:23:04,165 [INFO] [/opt/clp/lib/python3/site-packages/clp_package_utils/scripts/native/compress.py] Compressed 928.14MB into 23.46MB (39.56). Speed: 176.05MB/s.
2024-05-10 03:23:04,667 [INFO] [/opt/clp/lib/python3/site-packages/clp_package_utils/scripts/native/compress.py] Compressed 1.41GB into 36.32MB (39.73). Speed: 249.91MB/s.
2024-05-10 03:23:05,670 [INFO] [/opt/clp/lib/python3/site-packages/clp_package_utils/scripts/native/compress.py] Compressed 1.66GB into 40.11MB (42.36). Speed: 250.69MB/s.
2024-05-10 03:23:06,673 [INFO] [/opt/clp/lib/python3/site-packages/clp_package_utils/scripts/native/compress.py] Compression finished. Compressed 1.99GB into 44.37MB (45.89). Speed: 261.66MB/s.

@kirkrodrigues
Copy link
Member

Can you summarize the approach taken? I.e., at a high level, what is the code change.

@wraymo
Copy link
Contributor Author

wraymo commented May 10, 2024

Can you summarize the approach taken? I.e., at a high level, what is the code change.

Updated

@wraymo wraymo requested a review from kirkrodrigues May 13, 2024 20:55
Comment on lines 90 to 100
compression_ratio = float(job_uncompressed_size) / job_compressed_size
compression_speed = (
job_uncompressed_size
/ (current_time - job_row["start_time"]).total_seconds()
)
logger.info(
f"Compressed {pretty_size(job_uncompressed_size)} into "
f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}). "
f"Speed: {pretty_size(compression_speed)}/s."
)
job_last_uncompressed_size = job_uncompressed_size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's deduplicate this with the block on line 68.

if not task_result["status"] == CompressionTaskStatus.SUCCEEDED:
task_result = CompressionTaskFailureResult.parse_obj(task_result)
task_result = CompressionTaskResult.parse_obj(task_result)
if not task_result.status == CompressionTaskStatus.SUCCEEDED:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not task_result.status == CompressionTaskStatus.SUCCEEDED:
if task_result.status != CompressionTaskStatus.SUCCEEDED:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, this if-else would be simpler if we swapped the cases.

if not line:
break
stats = json.loads(line.decode("ascii"))
if stats["id"] != last_archive_id:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, since an archive may print its stats multiple times (after every segment is created in clp's case), then we would still need the previous logic that keeps last_archive_stats, right?

Copy link
Contributor Author

@wraymo wraymo May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous logic is to check last_archive_stats['id'], if it's different from current id, we update everything. That's the same as current logic?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. In the previous logic if last_archive_stats['id'] was different from the current ID, we would add uncompressed_size and size from last_archive_stats, but in the current code, we're adding the values from stats.

To see why this is a problem, imagine clp creates two archives with two segments each, meaning it will print the archive stats 4 times, something like this:

  1. archive-1-seg-1: uncompressed_size = 10, size = 1
  2. archive-1-seg-2: uncompressed_size = 20, size = 2
  3. archive-2-seg-1: uncompressed_size = 5, size = 1
  4. archive-2-seg-2: uncompressed_size = 10, size 2

In the current code, when we see the printout of (1), we will do total_uncompressed_size += 10, size += 1. When we see the printout of (3), we will do total_uncompressed_size += 5, size += 1. This will give us total_uncompressed_size = 15, size = 2. But it should be total_uncompressed_size = 30, size = 4.

Copy link
Contributor Author

@wraymo wraymo May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see your point. uncompressed_size is cumulative. I thought we could accept the first one, and abandon the rest with the same id.

).dict()
else:
return CompressionTaskFailureResult(
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should try to keep our MySQL connections short, even if frequent; so I would prefer we open a connection only just before we perform a write (currently, this opens a connection before we start compression, which itself could take a long time depending on the dataset and config).

MySQL's blog says that it's capable of handling a lot of short connections, but its default concurrent connection limit is only 151. We should probably benchmark this for ourselves at some point (a long time ago, @haiqi96 had done some scalability benchmarking that showed MySQL struggled with 20 concurrent connections performing inserts), but for now, I think following their advice is the safer option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my original plan was to use short connection. But I notice in compression scheduler, we maintain a long connection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the scheduler's case, it's only maintaining one connection that it's using for polling (among other things), right? In theory we could make it use shorter connections, but there I'm not sure it will make much difference (we should still measure at some point though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. For the compression task, we read the output from the process and update the database. Do you think we should open a connection each time when we get a new line?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could try opening a connection each time we need to update the archive's stats + tags (which would only be every time we finish an archive) and then once at the end of the task.

@wraymo wraymo requested a review from kirkrodrigues May 15, 2024 15:11
Comment on lines 30 to 31
logger.error("Must specify at least one field to update")
raise ValueError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.error("Must specify at least one field to update")
raise ValueError
raise ValueError("Must specify at least one field to update")

Comment on lines 44 to 45
logger.error("Must specify at least one field to update")
raise ValueError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.error("Must specify at least one field to update")
raise ValueError
raise ValueError("Must specify at least one field to update")

@wraymo wraymo requested a review from kirkrodrigues May 15, 2024 18:12
@kirkrodrigues kirkrodrigues changed the title clp-package: Add support for printing real-time compression statistics clp-package: Add support for printing real-time compression statistics. May 15, 2024
@wraymo wraymo merged commit 69b1434 into y-scope:main May 15, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants