Skip to content

Commit

Permalink
Add manifest caching to iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacBlanco committed Feb 20, 2025
1 parent 15afc5c commit 9a73101
Show file tree
Hide file tree
Showing 20 changed files with 561 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.airlift.log.Logger;
import org.apache.iceberg.io.ByteBufferInputStream;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.io.IOUtil.readRemaining;

public class HdfsCachedInputFile
implements InputFile
{
private static final Logger LOG = Logger.get(HdfsCachedInputFile.class);

private final InputFile delegate;
private final ManifestFileCacheKey cacheKey;
private final ManifestFileCache cache;

public HdfsCachedInputFile(InputFile delegate, ManifestFileCacheKey cacheKey, ManifestFileCache cache)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.cacheKey = requireNonNull(cacheKey, "cacheKey is null");
this.cache = requireNonNull(cache, "cache is null");
}

@Override
public long getLength()
{
ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
if (cachedContent != null) {
return cachedContent.getLength();
}
return delegate.getLength();
}

@Override
public SeekableInputStream newStream()
{
ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
if (cachedContent != null) {
return ByteBufferInputStream.wrap(cachedContent.getData());
}

long fileLength = delegate.getLength();
if (fileLength <= cache.getMaxFileLength() && cache.isEnabled()) {
try {
ManifestFileCachedContent content = readFully(delegate, fileLength, cache.getBufferChunkSize());
cache.put(cacheKey, content);
cache.recordFileSize(content.getLength());
return ByteBufferInputStream.wrap(content.getData());
}
catch (IOException e) {
LOG.warn("Failed to cache input file. Falling back to direct read.", e);
}
}

return delegate.newStream();
}

@Override
public String location()
{
return delegate.location();
}

@Override
public boolean exists()
{
return cache.getIfPresent(cacheKey) != null || delegate.exists();
}

private static ManifestFileCachedContent readFully(InputFile input, long fileLength, long chunkSize)
throws IOException
{
try (SeekableInputStream stream = input.newStream()) {
long totalBytesToRead = fileLength;
List<ByteBuffer> buffers = new ArrayList<>(
((int) (fileLength / chunkSize)) +
(fileLength % chunkSize == 0 ? 0 : 1));

while (totalBytesToRead > 0) {
int bytesToRead = (int) Math.min(chunkSize, totalBytesToRead);
byte[] buf = new byte[bytesToRead];
int bytesRead = readRemaining(stream, buf, 0, bytesToRead);
totalBytesToRead -= bytesRead;

if (bytesRead < bytesToRead) {
throw new IOException(
String.format(
"Failed to read %d bytes: %d bytes in stream",
fileLength, fileLength - totalBytesToRead));
}
else {
buffers.add(ByteBuffer.wrap(buf));
}
}
return new ManifestFileCachedContent(buffers, fileLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,30 @@
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.spi.PrestoException;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;

import java.io.IOException;
import java.util.Optional;

import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class HdfsFileIO
implements FileIO
{
private final HdfsEnvironment environment;
private final HdfsContext context;
private final ManifestFileCache manifestFileCache;

public HdfsFileIO(HdfsEnvironment environment, HdfsContext context)
public HdfsFileIO(ManifestFileCache manifestFileCache, HdfsEnvironment environment, HdfsContext context)
{
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
}

@Override
Expand All @@ -44,6 +49,33 @@ public InputFile newInputFile(String path)
return new HdfsInputFile(new Path(path), environment, context);
}

@Override
public InputFile newInputFile(String path, long length)
{
return new HdfsInputFile(new Path(path), environment, context, Optional.of(length));
}

protected InputFile newCachedInputFile(String path)
{
InputFile inputFile = new HdfsInputFile(new Path(path), environment, context);
return manifestFileCache.isEnabled() ?
new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(path), manifestFileCache) :
inputFile;
}

@Override
public InputFile newInputFile(ManifestFile manifest)
{
checkArgument(
manifest.keyMetadata() == null,
"Cannot decrypt manifest: {} (use EncryptingFileIO)",
manifest.path());
InputFile inputFile = new HdfsInputFile(new Path(manifest.path()), environment, context, Optional.of(manifest.length()));
return manifestFileCache.isEnabled() ?
new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(manifest.path()), manifestFileCache) :
inputFile;
}

@Override
public OutputFile newOutputFile(String path)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static java.util.Objects.requireNonNull;
Expand All @@ -32,25 +34,42 @@ public class HdfsInputFile
private final InputFile delegate;
private final HdfsEnvironment environment;
private final String user;
private final AtomicLong length;

public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context, Optional<Long> length)
{
requireNonNull(path, "path is null");
this.environment = requireNonNull(environment, "environment is null");
this.length = new AtomicLong(length.orElse(-1L));
requireNonNull(context, "context is null");
try {
this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
if (this.length.get() < 0) {
this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
}
else {
this.delegate = HadoopInputFile.fromPath(path, this.length.get(), environment.getFileSystem(context, path), environment.getConfiguration(context, path));
}
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to create input file: " + path, e);
}
this.user = context.getIdentity().getUser();
}

public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
{
this(path, environment, context, Optional.empty());
}

@Override
public long getLength()
{
return environment.doAs(user, delegate::getLength);
return length.updateAndGet(value -> {
if (value < 0) {
return environment.doAs(user, delegate::getLength);
}
return value;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class HiveTableOperations
private final String tableName;
private final Optional<String> owner;
private final Optional<String> location;
private final FileIO fileIO;
private final HdfsFileIO fileIO;
private final IcebergHiveTableOperationsConfig config;

private TableMetadata currentMetadata;
Expand All @@ -121,10 +121,11 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
ManifestFileCache manifestFileCache,
String database,
String table)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
Expand All @@ -140,12 +141,13 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
ManifestFileCache manifestFileCache,
String database,
String table,
String owner,
String location)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
Expand All @@ -156,7 +158,7 @@ public HiveTableOperations(
}

private HiveTableOperations(
FileIO fileIO,
HdfsFileIO fileIO,
ExtendedHiveMetastore metastore,
MetastoreContext metastoreContext,
IcebergHiveTableOperationsConfig config,
Expand Down Expand Up @@ -409,7 +411,7 @@ private void refreshFromMetadataLocation(String newLocation)
config.getTableRefreshMaxRetryTime().toMillis(),
config.getTableRefreshBackoffScaleFactor())
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(fileIO, io().newInputFile(metadataLocation))));
TableMetadataParser.read(fileIO, fileIO.newCachedInputFile(metadataLocation))));
}
catch (RuntimeException e) {
throw new TableNotFoundException(getSchemaTableName(), "Table metadata is missing", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@

import javax.inject.Singleton;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -208,6 +210,25 @@ public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBean
return statisticsFileCache;
}

@Singleton
@Provides
public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter)
{
Cache<ManifestFileCacheKey, ManifestFileCachedContent> delegate = CacheBuilder.newBuilder()
.maximumWeight(config.getMaxManifestCacheSize())
.<ManifestFileCacheKey, ManifestFileCachedContent>weigher((key, entry) -> (int) entry.getData().stream().mapToLong(ByteBuffer::capacity).sum())
.expireAfterWrite(Duration.ofMillis(config.getManifestCacheExpireDuration()))
.recordStats()
.build();
ManifestFileCache manifestFileCache = new ManifestFileCache(
delegate,
config.getManifestCachingEnabled(),
config.getManifestCacheMaxContentLength(),
config.getManifestCacheMaxChunkSize().toBytes());
exporter.export(generatedNameOf(ManifestFileCache.class, connectorId), manifestFileCache);
return manifestFileCache;
}

@ForCachingHiveMetastore
@Singleton
@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public class IcebergConfig

private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
private boolean manifestCachingEnabled;
private boolean manifestCachingEnabled = true;
private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
private DataSize manifestCacheMaxChunkSize = succinctDataSize(2, MEGABYTE);
private int splitManagerThreads = Runtime.getRuntime().availableProcessors();
private DataSize maxStatisticsFileCacheSize = succinctDataSize(256, MEGABYTE);

Expand Down Expand Up @@ -348,6 +349,20 @@ public IcebergConfig setManifestCacheMaxContentLength(long manifestCacheMaxConte
return this;
}

public DataSize getManifestCacheMaxChunkSize()
{
return manifestCacheMaxChunkSize;
}

@Min(1024)
@Config("iceberg.io.manifest.cache.max-chunk-size")
@ConfigDescription("Maximum length of a buffer used to cache manifest file content. Only applicable to HIVE catalog.")
public IcebergConfig setManifestCacheMaxChunkSize(DataSize manifestCacheMaxChunkSize)
{
this.manifestCacheMaxChunkSize = manifestCacheMaxChunkSize;
return this;
}

@Min(0)
public int getSplitManagerThreads()
{
Expand Down
Loading

0 comments on commit 9a73101

Please sign in to comment.