Skip to content

Commit

Permalink
Make open writers limit configurable in iceberg connector
Browse files Browse the repository at this point in the history
Cherry-pick of Trino trinodb/trino#6684

Co-authored-by: Akshay Rai <akrai@linkedin.com>
  • Loading branch information
2 people authored and NikhilCollooru committed May 19, 2022
1 parent 8ee3bea commit 2c32a8c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class IcebergConfig
private CatalogType catalogType = HIVE;
private String catalogWarehouse;
private int catalogCacheSize = 10;
private int maxPartitionsPerWriter = 100;
private List<String> hadoopConfigResources = ImmutableList.of();

@NotNull
Expand Down Expand Up @@ -119,4 +120,18 @@ public IcebergConfig setHadoopConfigResources(String files)
}
return this;
}

@Min(1)
public int getMaxPartitionsPerWriter()
{
return maxPartitionsPerWriter;
}

@Config("iceberg.max-partitions-per-writer")
@ConfigDescription("Maximum number of partitions per writer")
public IcebergConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
{
this.maxPartitionsPerWriter = maxPartitionsPerWriter;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class IcebergPageSink
private static final int MAX_PAGE_POSITIONS = 4096;

@SuppressWarnings({"FieldCanBeLocal", "FieldMayBeStatic"})
private final int maxOpenWriters = 100; // TODO: make this configurable
private final int maxOpenWriters;
private final Schema outputSchema;
private final PartitionSpec partitionSpec;
private final LocationProvider locationProvider;
Expand Down Expand Up @@ -107,7 +107,8 @@ public IcebergPageSink(
List<IcebergColumnHandle> inputColumns,
JsonCodec<CommitTaskData> jsonCodec,
ConnectorSession session,
FileFormat fileFormat)
FileFormat fileFormat,
int maxOpenWriters)
{
requireNonNull(inputColumns, "inputColumns is null");
this.outputSchema = requireNonNull(outputSchema, "outputSchema is null");
Expand All @@ -120,6 +121,7 @@ public IcebergPageSink(
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.maxOpenWriters = maxOpenWriters;
this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,22 @@ public class IcebergPageSinkProvider
private final JsonCodec<CommitTaskData> jsonCodec;
private final IcebergFileWriterFactory fileWriterFactory;
private final PageIndexerFactory pageIndexerFactory;
private final int maxOpenPartitions;

@Inject
public IcebergPageSinkProvider(
HdfsEnvironment hdfsEnvironment,
JsonCodec<CommitTaskData> jsonCodec,
IcebergFileWriterFactory fileWriterFactory,
PageIndexerFactory pageIndexerFactory)
PageIndexerFactory pageIndexerFactory,
IcebergConfig icebergConfig)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
requireNonNull(icebergConfig, "icebergConfig is null");
this.maxOpenPartitions = icebergConfig.getMaxPartitionsPerWriter();
}

@Override
Expand Down Expand Up @@ -87,6 +91,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
tableHandle.getInputColumns(),
jsonCodec,
session,
tableHandle.getFileFormat());
tableHandle.getFileFormat(),
maxOpenPartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public void testDefaults()
.setCatalogType(HIVE)
.setCatalogWarehouse(null)
.setCatalogCacheSize(10)
.setHadoopConfigResources(null));
.setHadoopConfigResources(null)
.setMaxPartitionsPerWriter(100));
}

@Test
Expand All @@ -52,6 +53,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.catalog.warehouse", "path")
.put("iceberg.catalog.cached-catalog-num", "6")
.put("iceberg.hadoop.config.resources", "/etc/hadoop/conf/core-site.xml")
.put("iceberg.max-partitions-per-writer", "222")
.build();

IcebergConfig expected = new IcebergConfig()
Expand All @@ -60,7 +62,8 @@ public void testExplicitPropertyMappings()
.setCatalogType(HADOOP)
.setCatalogWarehouse("path")
.setCatalogCacheSize(6)
.setHadoopConfigResources("/etc/hadoop/conf/core-site.xml");
.setHadoopConfigResources("/etc/hadoop/conf/core-site.xml")
.setMaxPartitionsPerWriter(222);

assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit 2c32a8c

Please sign in to comment.