diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index a1df11f2a4126..7098574349191 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -36,6 +36,7 @@ public class IcebergConfig private CatalogType catalogType = HIVE; private String catalogWarehouse; private int catalogCacheSize = 10; + private int maxPartitionsPerWriter = 100; private List hadoopConfigResources = ImmutableList.of(); @NotNull @@ -119,4 +120,18 @@ public IcebergConfig setHadoopConfigResources(String files) } return this; } + + @Min(1) + public int getMaxPartitionsPerWriter() + { + return maxPartitionsPerWriter; + } + + @Config("iceberg.max-partitions-per-writer") + @ConfigDescription("Maximum number of partitions per writer") + public IcebergConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter) + { + this.maxPartitionsPerWriter = maxPartitionsPerWriter; + return this; + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java index 6599b4cdd354e..776d279713170 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java @@ -77,7 +77,7 @@ public class IcebergPageSink private static final int MAX_PAGE_POSITIONS = 4096; @SuppressWarnings({"FieldCanBeLocal", "FieldMayBeStatic"}) - private final int maxOpenWriters = 100; // TODO: make this configurable + private final int maxOpenWriters; private final Schema outputSchema; private final PartitionSpec partitionSpec; private final LocationProvider locationProvider; @@ -107,7 +107,8 @@ public IcebergPageSink( List inputColumns, JsonCodec jsonCodec, ConnectorSession session, - FileFormat fileFormat) + FileFormat fileFormat, + int maxOpenWriters) { requireNonNull(inputColumns, "inputColumns is null"); this.outputSchema = requireNonNull(outputSchema, "outputSchema is null"); @@ -120,6 +121,7 @@ public IcebergPageSink( this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.session = requireNonNull(session, "session is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); + this.maxOpenWriters = maxOpenWriters; this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec)); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java index d6d52a478bb5b..a79312b452d53 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java @@ -43,18 +43,22 @@ public class IcebergPageSinkProvider private final JsonCodec jsonCodec; private final IcebergFileWriterFactory fileWriterFactory; private final PageIndexerFactory pageIndexerFactory; + private final int maxOpenPartitions; @Inject public IcebergPageSinkProvider( HdfsEnvironment hdfsEnvironment, JsonCodec jsonCodec, IcebergFileWriterFactory fileWriterFactory, - PageIndexerFactory pageIndexerFactory) + PageIndexerFactory pageIndexerFactory, + IcebergConfig icebergConfig) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null"); this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); + requireNonNull(icebergConfig, "icebergConfig is null"); + this.maxOpenPartitions = icebergConfig.getMaxPartitionsPerWriter(); } @Override @@ -87,6 +91,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab tableHandle.getInputColumns(), jsonCodec, session, - tableHandle.getFileFormat()); + tableHandle.getFileFormat(), + maxOpenPartitions); } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index 43b4d08a3917f..0014fa1cd3009 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -39,7 +39,8 @@ public void testDefaults() .setCatalogType(HIVE) .setCatalogWarehouse(null) .setCatalogCacheSize(10) - .setHadoopConfigResources(null)); + .setHadoopConfigResources(null) + .setMaxPartitionsPerWriter(100)); } @Test @@ -52,6 +53,7 @@ public void testExplicitPropertyMappings() .put("iceberg.catalog.warehouse", "path") .put("iceberg.catalog.cached-catalog-num", "6") .put("iceberg.hadoop.config.resources", "/etc/hadoop/conf/core-site.xml") + .put("iceberg.max-partitions-per-writer", "222") .build(); IcebergConfig expected = new IcebergConfig() @@ -60,7 +62,8 @@ public void testExplicitPropertyMappings() .setCatalogType(HADOOP) .setCatalogWarehouse("path") .setCatalogCacheSize(6) - .setHadoopConfigResources("/etc/hadoop/conf/core-site.xml"); + .setHadoopConfigResources("/etc/hadoop/conf/core-site.xml") + .setMaxPartitionsPerWriter(222); assertFullMapping(properties, expected); }