Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iceberg] Add manifest file caching for HMS-based deployments #24481

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ZacBlanco
Copy link
Contributor

@ZacBlanco ZacBlanco commented Feb 3, 2025

Description

Adds manifest file caching to the Iceberg connector for HMS-based deployments.

Motivation and Context

In order to optimize and plan iceberg queries we call the planFiles() API multiple times throughout the query optimization lifecycle. Each time it requires reading and parsing metadata files which usually exist on an external filesystem such as S3. For large tables there could be hundreds of files. They usually range in a few kilobytes in size up to a few megabytes. When not cached in memory within Presto it can lead to significant E2E query latency degradation.

Impact

TBD

Test Plan

TBD

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

== RELEASE NOTES ==

Iceberg Connector Changes
* Add manifest file caching for deployments which use the Hive metastore.
* Enable manifest caching by default.

@prestodb-ci prestodb-ci added the from:IBM PR from IBM label Feb 3, 2025
@ZacBlanco ZacBlanco force-pushed the upstream-iceberg-manifest-caching branch from 7db7896 to 666d248 Compare February 5, 2025 16:55
@ZacBlanco ZacBlanco marked this pull request as ready for review February 5, 2025 18:09
@ZacBlanco ZacBlanco requested review from hantangwangd and a team as code owners February 5, 2025 18:09
@ZacBlanco ZacBlanco requested a review from jaystarshot February 5, 2025 18:09
Copy link
Member

@jaystarshot jaystarshot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may not have the correct context on this but is it possible to add some tests too?

@@ -67,7 +67,7 @@ public class IcebergConfig

private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
private boolean manifestCachingEnabled;
private boolean manifestCachingEnabled = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended?

Copy link
Contributor Author

@ZacBlanco ZacBlanco Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is intentional. Performance is significantly worse with it disabled, and I don't think there are any known downsides to making this enabled by default other than an increased memory footprint

public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter)
{
Cache<ManifestFileCacheKey, ManifestFileCachedContent> delegate = CacheBuilder.newBuilder()
.maximumWeight(config.getManifestCachingEnabled() ? config.getMaxManifestCacheSize() : 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the caching is disabled i think we should not have any caching instead of adding this via 0 weight

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the PR to remove use of the 0 weight and bypass the cache entirely if it is not enabled.

Copy link
Contributor

@aaneja aaneja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you post some metrics about cache hit ratios/eviction for a canonical read-heavy workload ? Maybe like partitioned/unpartitioned TPCDS ?

long fileLength = delegate.getLength();
if (fileLength <= cache.getMaxFileLength() && cache.isEnabled()) {
try {
ManifestFileCachedContent content = readFully(delegate, fileLength);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N00b question, but are the (avro) manifest files always or mostly read fully and then deserialized? Or are range-reads supported ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question. For the most part when dealing with manifests, the files are read fully. However, there are cases where it is not fully used. ex. when reading partition specs in avro format, you only need to read the file metadata

However, in order to plan an entire query you need to read all of the (valid) manifest files fully. You won't really ever only need just the partition specs. The partition specs are going to be contained within one of those files anyways.

Additionally, when caching is enabled on catalogs other than HMS, this is the same approach as in the Iceberg library

implements InputFile
{
private static final Logger LOG = Logger.get(HdfsCachedInputFile.class);
private static final long BUFFER_CHUNK_SIZE = 2 * 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the default here 2Mb for a specific reason ? Can we make this Integer.MAX_VALUE ?

Copy link
Contributor Author

@ZacBlanco ZacBlanco Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Integer.MAX_VALUE is too large. The iceberg code does around 4MB I believe. There is a balance to strike in the chunk sizes. Since the byte buffer internally is going to store a contiguous array of bytes, you need that much memory available. When you break it into chunks, it's easier for the allocator to find smaller chunks and puts less pressure on the GC to evict when under load. I think going as high as 16MB would probably be OK. My gut says 2MB is probably fine but I am open to tweaking it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better for this to come from a config

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree to set a slightly smaller value here.

@ZacBlanco
Copy link
Contributor Author

ZacBlanco commented Feb 10, 2025

Can you post some metrics about cache hit ratios/eviction for a canonical read-heavy workload ? Maybe like partitioned/unpartitioned TPCDS ?

I have not tested on a partitioned dataset yet, but on our internal unpartitioned TPC-DS SF1k dataset executed in the "ds_power" configuration (1 query at a time, q1 through q99), the cache hit rate was 96.8%. The total numbers If I recall were cache hits somewhere between 10-12k, while misses were just a few hundred. When testing locally on an sf10 dataset generated from the tpcds.sf10 schema the hit rate was 99.7%

@Provides
public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter)
{
Cache<ManifestFileCacheKey, ManifestFileCachedContent> delegate = CacheBuilder.newBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use Caffeine as the caching library since iceberg-core already brings it in ? It appears to have better performance and is recommended by the Guava team too

Copy link
Contributor Author

@ZacBlanco ZacBlanco Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same thought too. Caching performance would likely improve too because eviction decisions in caffeine use global weight for eviction versus rather than per-segment weight in guava. However, most of the Presto codebase uses guava caches. Since caffeine and guava are different types, it would not be compatible with the current infrastructure such as the CacheStatsMBean object. Additionally, we use use guava's SimpleForwardingCache which is not available in caffeine, so I would have to roll my own. Not a terrible amount of effort, but I think there's enough work there to push that effort into a separate PR

@ZacBlanco
Copy link
Contributor Author

ZacBlanco commented Feb 11, 2025

Some more concrete data on how much manifest caching improves planning times. Click the image to zoom in/view high resolution.

Absolute analysis time comparison
analysis-timings

Analysis time ratio comparing caching to no caching -- 1.0 means the time was equivalent without caching. Lower is better:
manifest-caching-time-improvement

Additionally, here's some raw data which includes all the cache statistics on the manifest cache. Unfortunately, we don't have data about the evictions counts.
full-tpcds-manifest-stats.json

Here are the most pertinent IMO

  "cachestats.hitcount": 25801,
  "cachestats.hitrate": 0.9825209444021326,
  "cachestats.misscount": 459,
  "cachestats.size": 22,
  "filesizedistribution.alltime.avg": 11953.193899782134,
  "filesizedistribution.alltime.count": 459.0,
  "filesizedistribution.alltime.max": 18990,
  "filesizedistribution.alltime.maxerror": 0.0,
  "filesizedistribution.alltime.min": 4528,
  "filesizedistribution.alltime.p01": 4602,
  "filesizedistribution.alltime.p05": 6793,
  "filesizedistribution.alltime.p10": 7322,
  "filesizedistribution.alltime.p25": 8417,
  "filesizedistribution.alltime.p50": 12048,
  "filesizedistribution.alltime.p75": 14475,
  "filesizedistribution.alltime.p90": 18084,
  "filesizedistribution.alltime.p95": 18949,
  "filesizedistribution.alltime.p99": 18990,

One thing to note, is that the cache is completely fresh for q1, 2, 3 etc. So we have higher query planning times in the beginning of the DS-power run while the cache is getting populated. You can see once we've read most tables' metadata the analysis time consistently starts dropping around q6/7/8

@ZacBlanco ZacBlanco force-pushed the upstream-iceberg-manifest-caching branch 2 times, most recently from 2563391 to 2c9c425 Compare February 13, 2025 00:27
@steveburnett
Copy link
Contributor

Nit, suggested rephrase of release note to follow the Order of changes phrasing in the Release Notes Guidelines:

== RELEASE NOTES ==

Iceberg Connector Changes
* Add manifest file caching for deployments which use the Hive metastore.
* Add enable by default for manifest file caching.

@hantangwangd
Copy link
Member

A little unsure about this. Please correct me if I'm wrong, should we just implement the method Map<String, String> properties() for HdfsFileIO, so that we can utilize the Iceberg lib's manifest file cache even when configuring with our native hive catalog? Or is there any other problems I didn't notice?

The reference code in Iceberg lib could be found here. So it seems that the following code in HdfsFileIO could utilize the manifest file cache:

    public Map<String, String> properties()
    {
        return IcebergUtil.loadCachingProperties(icebergConfig);
    }

@ZacBlanco
Copy link
Contributor Author

This is a good question. I initially was going to use this method but decided it would not work well. The reason we can't use the Iceberg library caching code is that (1) there is no metrics available, so we can't track the hit/miss counts or report them in the query's runtime metrics. This is currently a limitation with non-hive catalogs. (2) is that we wouldn't be able to cache across queries because the cache key in the Iceberg library is a single IO instance. In Presto's current implementation, we create a new IO instance for every new HiveTableOperations object. This is also compounded by the fact that the cache for manifest files uses weakKeys which when enabled, causes cache key comparisons to use identity rather than equality checks, meaning we don't have a way to re-use the cache between queries which is a significant downside.

@aaneja
Copy link
Contributor

aaneja commented Feb 18, 2025

LGTM % tests

Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. Lgtm, only a couple of nit and small question.

implements InputFile
{
private static final Logger LOG = Logger.get(HdfsCachedInputFile.class);
private static final long BUFFER_CHUNK_SIZE = 2 * 1024 * 1024;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree to set a slightly smaller value here.

@ZacBlanco ZacBlanco force-pushed the upstream-iceberg-manifest-caching branch from 2c9c425 to 28cf822 Compare February 20, 2025 18:57
@ZacBlanco ZacBlanco force-pushed the upstream-iceberg-manifest-caching branch from 28cf822 to 9a73101 Compare February 20, 2025 20:21
@ZacBlanco ZacBlanco requested a review from aaneja February 20, 2025 21:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
from:IBM PR from IBM
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants