Skip to content

Commit

Permalink
Simplify BaseSystemTable.addRow in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Dec 2, 2024
1 parent 878f2bc commit 7b7de3d
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.iceberg.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorPageSource;
Expand Down Expand Up @@ -98,12 +99,26 @@ private List<Page> buildPages(ConnectorTableMetadata tableMetadata, ConnectorSes
private void addRows(DataTask dataTask, PageListBuilder pagesBuilder, TimeZoneKey timeZoneKey, Map<String, Integer> columnNameToPositionInSchema)
{
try (CloseableIterable<StructLike> dataRows = dataTask.rows()) {
dataRows.forEach(dataTaskRow -> addRow(pagesBuilder, dataTaskRow, timeZoneKey, columnNameToPositionInSchema));
dataRows.forEach(dataTaskRow -> addRow(pagesBuilder, new Row(dataTaskRow, columnNameToPositionInSchema), timeZoneKey));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

protected abstract void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map<String, Integer> columnNameToPositionInSchema);
protected abstract void addRow(PageListBuilder pagesBuilder, Row row, TimeZoneKey timeZoneKey);

public record Row(StructLike structLike, Map<String, Integer> columnNameToPositionInSchema)
{
public Row
{
requireNonNull(structLike, "structLike is null");
columnNameToPositionInSchema = ImmutableMap.copyOf(columnNameToPositionInSchema);
}

public <T> T get(String columnName, Class<T> javaClass)
{
return structLike.get(columnNameToPositionInSchema.get(columnName), javaClass);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TimeZoneKey;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;

import java.util.Map;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
Expand Down Expand Up @@ -63,17 +60,14 @@ private static ConnectorTableMetadata createConnectorTableMetadata(SchemaTableNa
}

@Override
protected void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map<String, Integer> columnNameToPositionInSchema)
protected void addRow(PageListBuilder pagesBuilder, Row row, TimeZoneKey timeZoneKey)
{
pagesBuilder.beginRow();

pagesBuilder.appendTimestampTzMillis(
structLike.get(columnNameToPositionInSchema.get(TIMESTAMP_COLUMN_NAME), Long.class) / MICROSECONDS_PER_MILLISECOND,
timeZoneKey);
pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get(FILE_COLUMN_NAME), String.class));
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(LATEST_SNAPSHOT_ID_COLUMN_NAME), Long.class));
pagesBuilder.appendInteger(structLike.get(columnNameToPositionInSchema.get(LATEST_SCHEMA_ID_COLUMN_NAME), Integer.class));
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(LATEST_SEQUENCE_NUMBER_COLUMN_NAME), Long.class));
pagesBuilder.appendTimestampTzMillis(row.get(TIMESTAMP_COLUMN_NAME, Long.class) / MICROSECONDS_PER_MILLISECOND, timeZoneKey);
pagesBuilder.appendVarchar(row.get(FILE_COLUMN_NAME, String.class));
pagesBuilder.appendBigint(row.get(LATEST_SNAPSHOT_ID_COLUMN_NAME, Long.class));
pagesBuilder.appendInteger(row.get(LATEST_SCHEMA_ID_COLUMN_NAME, Integer.class));
pagesBuilder.appendBigint(row.get(LATEST_SEQUENCE_NUMBER_COLUMN_NAME, Long.class));
pagesBuilder.endRow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TimeZoneKey;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;

import java.util.List;
import java.util.Map;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
Expand Down Expand Up @@ -52,15 +50,15 @@ public RefsTable(SchemaTableName tableName, Table icebergTable)
}

@Override
protected void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map<String, Integer> columnNameToPositionInSchema)
protected void addRow(PageListBuilder pagesBuilder, Row row, TimeZoneKey timeZoneKey)
{
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get("name"), String.class));
pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get("type"), String.class));
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get("snapshot_id"), Long.class));
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get("max_reference_age_in_ms"), Long.class));
pagesBuilder.appendInteger(structLike.get(columnNameToPositionInSchema.get("min_snapshots_to_keep"), Integer.class));
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get("max_snapshot_age_in_ms"), Long.class));
pagesBuilder.appendVarchar(row.get("name", String.class));
pagesBuilder.appendVarchar(row.get("type", String.class));
pagesBuilder.appendBigint(row.get("snapshot_id", Long.class));
pagesBuilder.appendBigint(row.get("max_reference_age_in_ms", Long.class));
pagesBuilder.appendInteger(row.get("min_snapshots_to_keep", Integer.class));
pagesBuilder.appendBigint(row.get("max_snapshot_age_in_ms", Long.class));
pagesBuilder.endRow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;

import java.util.Map;
Expand Down Expand Up @@ -68,21 +67,16 @@ private static ConnectorTableMetadata createConnectorTableMetadata(SchemaTableNa
}

@Override
protected void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map<String, Integer> columnNameToPositionInSchema)
protected void addRow(PageListBuilder pagesBuilder, Row row, TimeZoneKey timeZoneKey)
{
pagesBuilder.beginRow();

pagesBuilder.appendTimestampTzMillis(
structLike.get(columnNameToPositionInSchema.get(COMMITTED_AT_COLUMN_NAME), Long.class) / MICROSECONDS_PER_MILLISECOND,
timeZoneKey);
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(SNAPSHOT_ID_COLUMN_NAME), Long.class));

Long parentId = structLike.get(columnNameToPositionInSchema.get(PARENT_ID_COLUMN_NAME), Long.class);
pagesBuilder.appendBigint(parentId);

pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get(OPERATION_COLUMN_NAME), String.class));
pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get(MANIFEST_LIST_COLUMN_NAME), String.class));
pagesBuilder.appendVarcharVarcharMap(structLike.get(columnNameToPositionInSchema.get(SUMMARY_COLUMN_NAME), Map.class));
pagesBuilder.appendTimestampTzMillis(row.get(COMMITTED_AT_COLUMN_NAME, Long.class) / MICROSECONDS_PER_MILLISECOND, timeZoneKey);
pagesBuilder.appendBigint(row.get(SNAPSHOT_ID_COLUMN_NAME, Long.class));
pagesBuilder.appendBigint(row.get(PARENT_ID_COLUMN_NAME, Long.class));
pagesBuilder.appendVarchar(row.get(OPERATION_COLUMN_NAME, String.class));
pagesBuilder.appendVarchar(row.get(MANIFEST_LIST_COLUMN_NAME, String.class));
//noinspection unchecked
pagesBuilder.appendVarcharVarcharMap(row.get(SUMMARY_COLUMN_NAME, Map.class));
pagesBuilder.endRow();
}
}

0 comments on commit 7b7de3d

Please sign in to comment.