Skip to content

Commit

Permalink
Add a builder for IcebergTableHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
assaf2 authored and raunaqmorarka committed Dec 4, 2022
1 parent 3350aa8 commit 34efd53
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -369,24 +369,18 @@ public IcebergTableHandle getTableHandle(

Map<String, String> tableProperties = table.properties();
String nameMappingJson = tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING);
return new IcebergTableHandle(
tableName.getSchemaName(),
name.getTableName(),
name.getTableType(),
tableSnapshotId,
SchemaParser.toJson(tableSchema),
partitionSpec.map(PartitionSpecParser::toJson),
table.operations().current().formatVersion(),
TupleDomain.all(),
TupleDomain.all(),
ImmutableSet.of(),
Optional.ofNullable(nameMappingJson),
table.location(),
table.properties(),
NO_RETRIES,
ImmutableList.of(),
false,
Optional.empty());
return IcebergTableHandle.builder()
.withSchemaName(tableName.getSchemaName())
.withTableName(name.getTableName())
.withTableType(name.getTableType())
.withSnapshotId(tableSnapshotId)
.withTableSchemaJson(SchemaParser.toJson(tableSchema))
.withPartitionSpecJson(partitionSpec.map(PartitionSpecParser::toJson))
.withFormatVersion(table.operations().current().formatVersion())
.withNameMappingJson(Optional.ofNullable(nameMappingJson))
.withTableLocation(table.location())
.withStorageProperties(table.properties())
.build();
}

private static long getSnapshotIdFromVersion(Table table, ConnectorTableVersion version)
Expand Down Expand Up @@ -1082,7 +1076,10 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl

return new BeginTableExecuteResult<>(
executeHandle,
table.forOptimize(true, optimizeHandle.getMaxScannedFileSize()));
IcebergTableHandle.buildFrom(table)
.withRecordScannedFiles(true)
.withMaxScannedFileSize(Optional.of(optimizeHandle.getMaxScannedFileSize()))
.build());
}

@Override
Expand Down Expand Up @@ -1693,7 +1690,9 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec());

beginTransaction(icebergTable);
return table.withRetryMode(retryMode);
return IcebergTableHandle.buildFrom(table)
.withRetryMode(retryMode)
.build();
}

@Override
Expand All @@ -1719,10 +1718,12 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec());

beginTransaction(icebergTable);
return table.withRetryMode(retryMode)
return IcebergTableHandle.buildFrom(table)
.withRetryMode(retryMode)
.withUpdatedColumns(updatedColumns.stream()
.map(IcebergColumnHandle.class::cast)
.collect(toImmutableList()));
.collect(toImmutableList()))
.build();
}

@Override
Expand Down Expand Up @@ -1792,7 +1793,9 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT

beginTransaction(icebergTable);

IcebergTableHandle newTableHandle = table.withRetryMode(retryMode);
IcebergTableHandle newTableHandle = IcebergTableHandle.buildFrom(table)
.withRetryMode(retryMode)
.build();
IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode);

return new IcebergMergeTableHandle(newTableHandle, insertHandle);
Expand Down Expand Up @@ -2114,26 +2117,11 @@ else if (isMetadataColumnId(columnHandle.getId())) {
&& newUnenforcedConstraint.equals(table.getUnenforcedPredicate())) {
return Optional.empty();
}

return Optional.of(new ConstraintApplicationResult<>(
new IcebergTableHandle(
table.getSchemaName(),
table.getTableName(),
table.getTableType(),
table.getSnapshotId(),
table.getTableSchemaJson(),
table.getPartitionSpecJson(),
table.getFormatVersion(),
newUnenforcedConstraint,
newEnforcedConstraint,
table.getProjectedColumns(),
table.getNameMappingJson(),
table.getTableLocation(),
table.getStorageProperties(),
table.getRetryMode(),
table.getUpdatedColumns(),
table.isRecordScannedFiles(),
table.getMaxScannedFileSize()),
IcebergTableHandle.buildFrom(table)
.withUnenforcedPredicate(newUnenforcedConstraint)
.withEnforcedPredicate(newEnforcedConstraint)
.build(),
remainingConstraint.transformKeys(ColumnHandle.class::cast),
extractionResult.remainingExpression(),
false));
Expand Down Expand Up @@ -2187,7 +2175,9 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti
.collect(toImmutableList());

return Optional.of(new ProjectionApplicationResult<>(
icebergTableHandle.withProjectedColumns(projectedColumns),
IcebergTableHandle.buildFrom(icebergTableHandle)
.withProjectedColumns(projectedColumns)
.build(),
projections,
assignmentsList,
false));
Expand Down Expand Up @@ -2221,7 +2211,9 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti

List<Assignment> outputAssignments = ImmutableList.copyOf(newAssignments.values());
return Optional.of(new ProjectionApplicationResult<>(
icebergTableHandle.withProjectedColumns(projectedColumnsBuilder.build()),
IcebergTableHandle.buildFrom(icebergTableHandle)
.withProjectedColumns(projectedColumnsBuilder.build())
.build(),
newProjections,
outputAssignments,
false));
Expand Down Expand Up @@ -2265,24 +2257,10 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
checkArgument(originalHandle.getMaxScannedFileSize().isEmpty(), "Unexpected max scanned file size set");

return tableStatisticsCache.computeIfAbsent(
new IcebergTableHandle(
originalHandle.getSchemaName(),
originalHandle.getTableName(),
originalHandle.getTableType(),
originalHandle.getSnapshotId(),
originalHandle.getTableSchemaJson(),
originalHandle.getPartitionSpecJson(),
originalHandle.getFormatVersion(),
originalHandle.getUnenforcedPredicate(),
originalHandle.getEnforcedPredicate(),
ImmutableSet.of(), // projectedColumns don't affect stats
originalHandle.getNameMappingJson(),
originalHandle.getTableLocation(),
originalHandle.getStorageProperties(),
NO_RETRIES, // retry mode doesn't affect stats
originalHandle.getUpdatedColumns(),
originalHandle.isRecordScannedFiles(),
originalHandle.getMaxScannedFileSize()),
IcebergTableHandle.buildFrom(originalHandle)
.withProjectedColumns(ImmutableSet.of()) // projectedColumns don't affect stats
.withRetryMode(NO_RETRIES) // retry mode doesn't affect stats
.build(),
handle -> {
Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName());
return TableStatisticsMaker.getTableStatistics(typeManager, session, handle, icebergTable);
Expand Down
Loading

0 comments on commit 34efd53

Please sign in to comment.