Skip to content

Commit

Permalink
Move getLatestMetadataLocation to IcebergUtil
Browse files Browse the repository at this point in the history
  • Loading branch information
oskar-szwajkowski authored and wendigo committed Mar 6, 2024
1 parent 23569fc commit e62ba8b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import io.airlift.slice.Slice;
import io.airlift.slice.SliceUtf8;
import io.airlift.slice.Slices;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
Expand Down Expand Up @@ -69,6 +73,7 @@
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.math.BigDecimal;
import java.math.BigInteger;
Expand All @@ -94,6 +99,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Maps.immutableEntry;
import static com.google.common.collect.Streams.mapWithIndex;
import static io.airlift.slice.Slices.utf8Slice;
Expand All @@ -103,6 +109,8 @@
import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnMetadata;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
Expand Down Expand Up @@ -163,6 +171,7 @@
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
import static org.apache.iceberg.types.Type.TypeID.FIXED;
import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash;
import static org.apache.iceberg.util.PropertyUtil.propertyAsBoolean;

public final class IcebergUtil
Expand Down Expand Up @@ -846,4 +855,43 @@ public static Map<String, Integer> columnNameToPositionInSchema(Schema schema)
(column, position) -> immutableEntry(column.name(), Long.valueOf(position).intValue()))
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
}

public static String getLatestMetadataLocation(TrinoFileSystem fileSystem, String location)
{
List<Location> latestMetadataLocations = new ArrayList<>();
String metadataDirectoryLocation = format("%s/%s", stripTrailingSlash(location), METADATA_FOLDER_NAME);
try {
int latestMetadataVersion = -1;
FileIterator fileIterator = fileSystem.listFiles(Location.of(metadataDirectoryLocation));
while (fileIterator.hasNext()) {
FileEntry fileEntry = fileIterator.next();
Location fileLocation = fileEntry.location();
String fileName = fileLocation.fileName();
if (fileName.endsWith(METADATA_FILE_EXTENSION)) {
int versionNumber = parseVersion(fileName);
if (versionNumber > latestMetadataVersion) {
latestMetadataVersion = versionNumber;
latestMetadataLocations.clear();
latestMetadataLocations.add(fileLocation);
}
else if (versionNumber == latestMetadataVersion) {
latestMetadataLocations.add(fileLocation);
}
}
}
if (latestMetadataLocations.isEmpty()) {
throw new TrinoException(ICEBERG_INVALID_METADATA, "No versioned metadata file exists at location: " + metadataDirectoryLocation);
}
if (latestMetadataLocations.size() > 1) {
throw new TrinoException(ICEBERG_INVALID_METADATA, format(
"More than one latest metadata file found at location: %s, latest metadata files are %s",
metadataDirectoryLocation,
latestMetadataLocations));
}
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking table location: " + location, e);
}
return getOnlyElement(latestMetadataLocations).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
Expand All @@ -35,17 +33,13 @@

import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FILE_EXTENSION;
import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME;
import static io.trino.plugin.iceberg.IcebergUtil.parseVersion;
import static io.trino.plugin.iceberg.IcebergUtil.getLatestMetadataLocation;
import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED;
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_FOUND;
Expand Down Expand Up @@ -180,45 +174,6 @@ private static String getMetadataLocation(TrinoFileSystem fileSystem, String loc
.orElseGet(() -> getLatestMetadataLocation(fileSystem, location));
}

public static String getLatestMetadataLocation(TrinoFileSystem fileSystem, String location)
{
List<Location> latestMetadataLocations = new ArrayList<>();
String metadataDirectoryLocation = format("%s/%s", stripTrailingSlash(location), METADATA_FOLDER_NAME);
try {
int latestMetadataVersion = -1;
FileIterator fileIterator = fileSystem.listFiles(Location.of(metadataDirectoryLocation));
while (fileIterator.hasNext()) {
FileEntry fileEntry = fileIterator.next();
Location fileLocation = fileEntry.location();
String fileName = fileLocation.fileName();
if (fileName.endsWith(METADATA_FILE_EXTENSION)) {
int versionNumber = parseVersion(fileName);
if (versionNumber > latestMetadataVersion) {
latestMetadataVersion = versionNumber;
latestMetadataLocations.clear();
latestMetadataLocations.add(fileLocation);
}
else if (versionNumber == latestMetadataVersion) {
latestMetadataLocations.add(fileLocation);
}
}
}
if (latestMetadataLocations.isEmpty()) {
throw new TrinoException(ICEBERG_INVALID_METADATA, "No versioned metadata file exists at location: " + metadataDirectoryLocation);
}
if (latestMetadataLocations.size() > 1) {
throw new TrinoException(ICEBERG_INVALID_METADATA, format(
"More than one latest metadata file found at location: %s, latest metadata files are %s",
metadataDirectoryLocation,
latestMetadataLocations));
}
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking table location: " + location, e);
}
return getOnlyElement(latestMetadataLocations).toString();
}

private static void validateMetadataLocation(TrinoFileSystem fileSystem, Location location)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
import static io.trino.plugin.iceberg.IcebergTestUtils.withSmallRowGroups;
import static io.trino.plugin.iceberg.IcebergUtil.TRINO_QUERY_ID_NAME;
import static io.trino.plugin.iceberg.procedure.RegisterTableProcedure.getLatestMetadataLocation;
import static io.trino.plugin.iceberg.IcebergUtil.getLatestMetadataLocation;
import static io.trino.spi.predicate.Domain.multipleValues;
import static io.trino.spi.predicate.Domain.singleValue;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME;
import static io.trino.plugin.iceberg.procedure.RegisterTableProcedure.getLatestMetadataLocation;
import static io.trino.plugin.iceberg.IcebergUtil.getLatestMetadataLocation;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingSession.testSessionBuilder;
Expand Down

0 comments on commit e62ba8b

Please sign in to comment.