Skip to content

Commit

Permalink
[Iceberg] Session property for target split size
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacBlanco committed Feb 5, 2025
1 parent 81e31fe commit a9f8332
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 56 deletions.
12 changes: 9 additions & 3 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ connector using a WITH clause:

The following table properties are available, which are specific to the Presto Iceberg connector:

======================================= =============================================================== ============
======================================= =============================================================== =========================
Property Name Description Default
======================================= =============================================================== ============
======================================= =============================================================== =========================
``format`` Optionally specifies the format of table data files, ``PARQUET``
either ``PARQUET`` or ``ORC``.

Expand Down Expand Up @@ -388,7 +388,10 @@ Property Name Description

``metrics_max_inferred_column`` Optionally specifies the maximum number of columns for which ``100``
metrics are collected.
======================================= =============================================================== ============

``read.target.split-size`` The target size for an individual split when generating splits ``134217728`` (128MB)
for a table scan. Must be specified in bytes.
======================================= =============================================================== =========================

The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``,
and a file system location of ``s3://test_bucket/test_schema/test_table``:
Expand Down Expand Up @@ -421,6 +424,9 @@ Property Name Description
``iceberg.rows_for_metadata_optimization_threshold`` Overrides the behavior of the connector property
``iceberg.rows-for-metadata-optimization-threshold`` in the current
session.
``iceberg.target_split_size`` Overrides the target split size for all tables in a query in bytes.
Set to 0 to use the value in each Iceberg table's
``read.target.split-size`` property.
===================================================== ======================================================================

Caching Support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;

public abstract class IcebergAbstractMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -611,6 +612,7 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable));
properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable));
properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable));
properties.put(SPLIT_SIZE, IcebergUtil.getSplitSize(icebergTable));

return properties.build();
}
Expand Down Expand Up @@ -1007,6 +1009,9 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
case COMMIT_RETRIES:
updateProperties.set(TableProperties.COMMIT_NUM_RETRIES, String.valueOf(entry.getValue()));
break;
case SPLIT_SIZE:
updateProperties.set(TableProperties.SPLIT_SIZE, entry.getValue().toString());
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Updating property " + entry.getKey() + " is not supported currently");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;

public final class IcebergSessionProperties
Expand All @@ -65,6 +66,7 @@ public final class IcebergSessionProperties
public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight";
public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold";
public static final String STATISTICS_KLL_SKETCH_K_PARAMETER = "statistics_kll_sketch_k_parameter";
public static final String TARGET_SPLIT_SIZE = "target_split_size";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -189,6 +191,11 @@ public IcebergSessionProperties(
.add(integerProperty(STATISTICS_KLL_SKETCH_K_PARAMETER,
"The K parameter for the Apache DataSketches KLL sketch when computing histogram statistics",
icebergConfig.getStatisticsKllSketchKParameter(),
false))
.add(longProperty(
TARGET_SPLIT_SIZE,
"The target split size. Set to 0 to use the iceberg table's read.split.target-size property",
0L,
false));

nessieConfig.ifPresent((config) -> propertiesBuilder
Expand Down Expand Up @@ -323,4 +330,9 @@ public static int getStatisticsKllSketchKParameter(ConnectorSession session)
{
return session.getProperty(STATISTICS_KLL_SKETCH_K_PARAMETER, Integer.class);
}

public static Long getTargetSplitSize(ConnectorSession session)
{
return session.getProperty(TARGET_SPLIT_SIZE, Long.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getMetadataColumnConstraints;
import static com.facebook.presto.iceberg.IcebergUtil.getNonMetadataColumnConstraints;
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
import static java.util.Objects.requireNonNull;

public class IcebergSplitManager
Expand Down Expand Up @@ -95,7 +96,7 @@ public ConnectorSplitSource getSplits(
IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan()
.fromSnapshotExclusive(fromSnapshot)
.toSnapshot(toSnapshot);
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, scan.targetSplitSize());
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, getTargetSplitSize(session, scan).toBytes());
}
else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
Expand All @@ -117,7 +118,7 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
IcebergSplitSource splitSource = new IcebergSplitSource(
session,
tableScan,
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
TableScanUtil.splitFiles(tableScan.planFiles(), getTargetSplitSize(session, tableScan).toBytes()),
getMinimumAssignedSplitWeight(session),
getMetadataColumnConstraints(layoutHandle.getValidPredicate()));
return splitSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -131,7 +132,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
PartitionSpecParser.toJson(spec),
partitionData.map(PartitionData::toJson),
getNodeSelectionStrategy(session),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / getTargetSplitSize(session, tableScan).toBytes(), minimumAssignedSplitWeight), 1.0)),
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
Optional.empty(),
getDataSequenceNumber(task.file()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Locale.ENGLISH;
Expand All @@ -44,6 +45,7 @@ public class IcebergTableProperties
public static final String METADATA_PREVIOUS_VERSIONS_MAX = "metadata_previous_versions_max";
public static final String METADATA_DELETE_AFTER_COMMIT = "metadata_delete_after_commit";
public static final String METRICS_MAX_INFERRED_COLUMN = "metrics_max_inferred_column";
public static final String TARGET_SPLIT_SIZE = TableProperties.SPLIT_SIZE;
private static final String DEFAULT_FORMAT_VERSION = "2";

private final List<PropertyMetadata<?>> tableProperties;
Expand Down Expand Up @@ -112,6 +114,10 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
"The maximum number of columns for which metrics are collected",
icebergConfig.getMetricsMaxInferredColumn(),
false))
.add(longProperty(TARGET_SPLIT_SIZE,
"Desired size of split to generate during query scan planning",
TableProperties.SPLIT_SIZE_DEFAULT,
false))
.build();

columnProperties = ImmutableList.of(stringProperty(
Expand Down Expand Up @@ -177,4 +183,9 @@ public static Integer getMetricsMaxInferredColumn(Map<String, Object> tablePrope
{
return (Integer) tableProperties.get(METRICS_MAX_INFERRED_COLUMN);
}

public static Long getTargetSplitSize(Map<String, Object> tableProperties)
{
return (Long) tableProperties.get(TableProperties.SPLIT_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.units.DataSize;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
Expand All @@ -61,6 +62,7 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
Expand Down Expand Up @@ -130,7 +132,6 @@
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES;
import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent;
import static com.facebook.presto.iceberg.FileFormat.PARQUET;
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_FORMAT_VERSION;
Expand Down Expand Up @@ -196,6 +197,8 @@
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
Expand Down Expand Up @@ -856,10 +859,10 @@ public static long getDataSequenceNumber(ContentFile<?> file)
* @param requestedSchema If provided, only delete files with this schema will be provided
*/
public static CloseableIterable<DeleteFile> getDeleteFiles(Table table,
long snapshot,
TupleDomain<IcebergColumnHandle> filter,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
long snapshot,
TupleDomain<IcebergColumnHandle> filter,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
{
Expression filterExpression = toIcebergExpression(filter);
CloseableIterable<FileScanTask> fileTasks = table.newScan().useSnapshot(snapshot).filter(filterExpression).planFiles();
Expand Down Expand Up @@ -1035,9 +1038,9 @@ private static class DeleteFilesIterator
private DeleteFile currentFile;

private DeleteFilesIterator(Map<Integer, PartitionSpec> partitionSpecsById,
CloseableIterator<FileScanTask> fileTasks,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
CloseableIterator<FileScanTask> fileTasks,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
{
this.partitionSpecsById = partitionSpecsById;
this.fileTasks = fileTasks;
Expand Down Expand Up @@ -1151,6 +1154,9 @@ public static Map<String, String> populateTableProperties(ConnectorTableMetadata

Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties());
propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn));

propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties())));

return propertiesBuilder.build();
}

Expand Down Expand Up @@ -1221,8 +1227,8 @@ public static Optional<PartitionData> partitionDataFromStructLike(PartitionSpec

/**
* Get the metadata location for target {@link Table},
* considering iceberg table properties {@code WRITE_METADATA_LOCATION}
* */
* considering iceberg table properties {@code WRITE_METADATA_LOCATION}
*/
public static String metadataLocation(Table icebergTable)
{
String metadataLocation = icebergTable.properties().get(TableProperties.WRITE_METADATA_LOCATION);
Expand All @@ -1237,8 +1243,8 @@ public static String metadataLocation(Table icebergTable)

/**
* Get the data location for target {@link Table},
* considering iceberg table properties {@code WRITE_DATA_LOCATION}, {@code OBJECT_STORE_PATH} and {@code WRITE_FOLDER_STORAGE_LOCATION}
* */
* considering iceberg table properties {@code WRITE_DATA_LOCATION}, {@code OBJECT_STORE_PATH} and {@code WRITE_FOLDER_STORAGE_LOCATION}
*/
public static String dataLocation(Table icebergTable)
{
Map<String, String> properties = icebergTable.properties();
Expand All @@ -1254,4 +1260,23 @@ public static String dataLocation(Table icebergTable)
}
return dataLocation;
}

public static Long getSplitSize(Table table)
{
return Long.parseLong(table.properties()
.getOrDefault(SPLIT_SIZE,
String.valueOf(SPLIT_SIZE_DEFAULT)));
}

public static DataSize getTargetSplitSize(long sessionValueProperty, long icebergScanTargetSplitSize)
{
return Optional.of(DataSize.succinctBytes(sessionValueProperty))
.filter(size -> !size.equals(DataSize.succinctBytes(0)))
.orElse(DataSize.succinctBytes(icebergScanTargetSplitSize));
}

public static DataSize getTargetSplitSize(ConnectorSession session, Scan<?, ?, ?> scan)
{
return getTargetSplitSize(IcebergSessionProperties.getTargetSplitSize(session), scan.targetSplitSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.iceberg.FileFormat;
import com.facebook.presto.iceberg.IcebergColumnHandle;
import com.facebook.presto.iceberg.IcebergSplit;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.iceberg.PartitionData;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
Expand Down Expand Up @@ -144,7 +145,7 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask<DataFile> task, Ch
PartitionSpecParser.toJson(spec),
partitionData.map(PartitionData::toJson),
getNodeSelectionStrategy(session),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / IcebergUtil.getTargetSplitSize(session, tableScan).toBytes(), minimumAssignedSplitWeight), 1.0)),
ImmutableList.of(),
Optional.of(new ChangelogSplitInfo(fromIcebergChangelogOperation(changeTask.operation()),
changeTask.changeOrdinal(),
Expand Down
Loading

0 comments on commit a9f8332

Please sign in to comment.