Skip to content

Commit

Permalink
Fix Error Prone warnings in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jul 21, 2022
1 parent 6ba5035 commit bf3d164
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void deleteRows(Block rowIds)
positionDeleteSink = positionDeleteSinkSupplier.get();
verify(positionDeleteSink != null);
}
positionDeleteSink.appendPage(new Page(rowIds));
positionDeleteSink.appendPage(new Page(rowIds)).join();
}

@Override
Expand All @@ -255,7 +255,7 @@ public void updateRows(Page page, List<Integer> columnValueAndRowIdChannels)
}

ColumnarRow rowIdColumns = ColumnarRow.toColumnarRow(page.getBlock(rowIdChannel));
positionDeleteSink.appendPage(new Page(rowIdColumns.getField(0)));
positionDeleteSink.appendPage(new Page(rowIdColumns.getField(0))).join();

List<Types.NestedField> columns = schema.columns();
Block[] fullPage = new Block[columns.size()];
Expand All @@ -272,7 +272,7 @@ public void updateRows(Page page, List<Integer> columnValueAndRowIdChannels)
}
}

updatedRowPageSink.appendPage(new Page(page.getPositionCount(), fullPage));
updatedRowPageSink.appendPage(new Page(page.getPositionCount(), fullPage)).join();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -559,12 +558,15 @@ else if (orcColumn != null) {
reader.getCompressionKind()),
columnProjections);
}
catch (Exception e) {
catch (IOException | RuntimeException e) {
if (orcDataSource != null) {
try {
orcDataSource.close();
}
catch (IOException ignored) {
catch (IOException ex) {
if (!e.equals(ex)) {
e.addSuppressed(ex);
}
}
}
if (e instanceof TrinoException) {
Expand Down Expand Up @@ -607,7 +609,7 @@ private static OrcColumn setMissingFieldIds(OrcColumn column, NameMapping nameMa
.map(nestedColumn -> {
ImmutableList.Builder<String> nextQualifiedPath = ImmutableList.<String>builder()
.addAll(qualifiedPath);
if (column.getColumnType().equals(OrcType.OrcTypeKind.LIST)) {
if (column.getColumnType() == OrcType.OrcTypeKind.LIST) {
// The Trino ORC reader uses "item" for list element names, but the NameMapper expects "element"
nextQualifiedPath.add("element");
}
Expand Down Expand Up @@ -646,7 +648,7 @@ private static Map<Integer, OrcColumn> mapIdsToOrcFileColumns(List<OrcColumn> co
Traverser.forTree(OrcColumn::getNestedColumns)
.depthFirstPreOrder(columns)
.forEach(column -> {
String fieldId = (column.getAttributes().get(ORC_ICEBERG_ID_KEY));
String fieldId = column.getAttributes().get(ORC_ICEBERG_ID_KEY);
if (fieldId != null) {
columnsById.put(Integer.parseInt(fieldId), column);
}
Expand Down Expand Up @@ -901,7 +903,10 @@ else if (column.isRowPositionColumn()) {
dataSource.close();
}
}
catch (IOException ignored) {
catch (IOException ex) {
if (!e.equals(ex)) {
e.addSuppressed(ex);
}
}
if (e instanceof TrinoException) {
throw (TrinoException) e;
Expand All @@ -921,6 +926,7 @@ else if (column.isRowPositionColumn()) {

/**
* Create a new NameMapping with the same names but converted to lowercase.
*
* @param nameMapping The original NameMapping, potentially containing non-lowercase characters
*/
private static NameMapping convertToLowercase(NameMapping nameMapping)
Expand Down Expand Up @@ -962,8 +968,8 @@ public static ProjectedLayout createProjectedLayout(OrcColumn root, List<List<In
return fullyProjectedLayout();
}

Map<Integer, List<List<Integer>>> dereferencesByField = fieldIdDereferences.stream().collect(
Collectors.groupingBy(
Map<Integer, List<List<Integer>>> dereferencesByField = fieldIdDereferences.stream()
.collect(groupingBy(
sequence -> sequence.get(0),
mapping(sequence -> sequence.subList(1, sequence.size()), toUnmodifiableList())));

Expand All @@ -987,7 +993,7 @@ public ProjectedLayout getFieldLayout(OrcColumn orcColumn)
}

/**
* Creates a mapping between the input {@param columns} and base columns if required.
* Creates a mapping between the input {@code columns} and base columns if required.
*/
public static Optional<ReaderColumns> projectColumns(List<IcebergColumnHandle> columns)
{
Expand Down Expand Up @@ -1028,7 +1034,7 @@ private static TupleDomain<ColumnDescriptor> getParquetTupleDomain(Map<List<Stri
}

ImmutableMap.Builder<ColumnDescriptor, Domain> predicate = ImmutableMap.builder();
effectivePredicate.getDomains().get().forEach((columnHandle, domain) -> {
effectivePredicate.getDomains().orElseThrow().forEach((columnHandle, domain) -> {
String baseType = columnHandle.getType().getTypeSignature().getBase();
// skip looking up predicates for complex types as Parquet only stores stats for primitives
if (!baseType.equals(StandardTypes.MAP) && !baseType.equals(StandardTypes.ARRAY) && !baseType.equals(StandardTypes.ROW)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public ColumnStatistics(MethodHandle comparisonHandle, Object initialMin, Object

/**
* Gets the minimum value accumulated during stats collection.
*
* @return Empty if the statistics contained values which were not comparable, otherwise returns the min value.
*/
public Optional<Object> getMin()
Expand All @@ -297,6 +298,7 @@ public Optional<Object> getMin()

/**
* Gets the maximum value accumulated during stats collection.
*
* @return Empty if the statistics contained values which were not comparable, otherwise returns the max value.
*/
public Optional<Object> getMax()
Expand All @@ -305,12 +307,13 @@ public Optional<Object> getMax()
}

/**
* Update the stats, as long as they haven't already been invalidated
*
* @param lowerBound Trino encoded lower bound value from a file
* @param upperBound Trino encoded upper bound value from a file
*/
public void updateMinMax(Object lowerBound, Object upperBound)
{
// Update the stats, as long as they haven't already been invalidated
if (min.isPresent()) {
if (lowerBound == null) {
min = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ private static Block transformBlock(Type sourceType, FixedWidthType resultType,
@VisibleForTesting
static long epochYear(long epochMilli)
{
return YEAR_FIELD.get(epochMilli) - 1970;
return YEAR_FIELD.get(epochMilli) - 1970L;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public TestIcebergOrcConnectorTest()
@Override
protected boolean supportsIcebergFileStatistics(String typeName)
{
return !(typeName.equalsIgnoreCase("varbinary")) &&
!(typeName.equalsIgnoreCase("uuid"));
return !typeName.equalsIgnoreCase("varbinary") &&
!typeName.equalsIgnoreCase("uuid");
}

@Override
Expand Down

0 comments on commit bf3d164

Please sign in to comment.