Skip to content

Commit

Permalink
rdblues Table impl for Delta tables
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Oct 11, 2024
1 parent 2fd4f6d commit dcc95ff
Show file tree
Hide file tree
Showing 18 changed files with 2,855 additions and 161 deletions.
11 changes: 11 additions & 0 deletions connectors/kafka/src/main/java/io/delta/catalog/UnityCatalog.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package io.delta.catalog;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;

public class UnityCatalog implements Catalog {

// temporary hard coded map until we get to talk to the actual catalog
private Map<TableIdentifier, String> tableIdToPathMap;

@Override
public String name() {
return "unity-catalog";
Expand All @@ -27,6 +33,11 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public Table loadTable(TableIdentifier identifier) {
throw new UnsupportedOperationException("Not implemented");
Expand Down
142 changes: 142 additions & 0 deletions connectors/kafka/src/main/java/io/delta/table/CreateNameMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.delta.table;

import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.FieldMetadata;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.shaded.com.google.common.collect.Lists;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.MappedFields;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

class CreateNameMapping extends DeltaTypeVisitor<MappedFields> {
private static final String FIELD_ID_KEY = "delta.columnMapping.id";
private static final String PHYSICAL_NAME_KEY = "delta.columnMapping.physicalName";
private static final Joiner DOT = Joiner.on(".");

private final Deque<String> fieldNames =
org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList();
private final NameMapping existingMapping;
private final AtomicInteger nextId;

int lastAssignedId() {
return nextId.get();
}

private int findOrAssignId(String name, FieldMetadata metadata) {
Object deltaId = metadata.get(FIELD_ID_KEY);
if (deltaId instanceof Number) {
return ((Number) deltaId).intValue();
} else {
return findOrAssignId(name);
}
}

private int findOrAssignId(String name) {
if (existingMapping != null) {
MappedField mapping = existingMapping.find(fieldNames, name);
if (mapping != null) {
return mapping.id();
}
}

return nextId.incrementAndGet();
}

CreateNameMapping(NameMapping existingMapping, int lastAssignedId) {
this.existingMapping = existingMapping;
this.nextId = new AtomicInteger(lastAssignedId);
}

CreateNameMapping(int lastAssignedId) {
this.existingMapping = null;
this.nextId = new AtomicInteger(lastAssignedId);
}

@Override
public MappedFields struct(StructType struct, List<MappedFields> fieldResults) {
List<StructField> fields = struct.fields();
List<MappedField> mappings = Lists.newArrayList();
for (int i = 0; i < fieldResults.size(); i += 1) {
StructField field = fields.get(i);
String name = field.getName();
FieldMetadata metadata = field.getMetadata();

int id = findOrAssignId(name, metadata);
Object physicalName = metadata.get(PHYSICAL_NAME_KEY);
if (physicalName != null) {
mappings.add(
MappedField.of(
id, ImmutableList.of(name, physicalName.toString()), fieldResults.get(i)));
} else {
mappings.add(MappedField.of(id, name, fieldResults.get(i)));
}
}

return MappedFields.of(mappings);
}

@Override
public MappedFields field(StructField field, MappedFields fieldResult) {
return fieldResult;
}

@Override
public MappedFields array(ArrayType array, MappedFields elementResult) {
int elementId = findOrAssignId("element");
return MappedFields.of(MappedField.of(elementId, "element", elementResult));
}

@Override
public MappedFields map(MapType map, MappedFields keyResult, MappedFields valueResult) {
int keyId = findOrAssignId("key");
int valueId = findOrAssignId("value");
return MappedFields.of(
MappedField.of(keyId, "key", keyResult), MappedField.of(valueId, "value", valueResult));
}

@Override
public MappedFields atomic(DataType atomic) {
return null; // primitives have no nested fields
}

private String fieldName(String name) {
return DOT.join(Iterables.concat(fieldNames, ImmutableList.of(name)));
}

@Override
public void beforeField(StructField field) {
fieldNames.addLast(field.getName());
}

@Override
public void afterField(StructField field) {
fieldNames.removeLast();
}
}
151 changes: 151 additions & 0 deletions connectors/kafka/src/main/java/io/delta/table/DeltaAppend.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.delta.table;

import static io.delta.kernel.internal.util.Utils.toCloseableIterator;

import io.delta.kernel.Operation;
import io.delta.kernel.Table;
import io.delta.kernel.Transaction;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.utils.CloseableIterable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.commons.compress.utils.Lists;
import org.apache.hadoop.shaded.com.google.common.collect.Iterables;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

class DeltaAppend implements AppendFiles {
private final DeltaTable table;
private final Table deltaTable;
private final Engine deltaEngine;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> appendedFiles = Lists.newArrayList();
private final Map<String, String> parameters = Maps.newHashMap();

DeltaAppend(DeltaTable table, Table deltaTable, Engine deltaEngine) {
this.table = table;
this.deltaTable = deltaTable;
this.deltaEngine = deltaEngine;
parameters.put("iceberg-operation", DataOperations.APPEND);
}

@Override
public AppendFiles appendFile(DataFile file) {
appendedFiles.add(file);
summaryBuilder.addedFile(table.spec(), file);
return this;
}

@Override
public AppendFiles appendManifest(ManifestFile manifest) {
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, table.io(), table.specs())) {
for (DataFile append : reader) {
appendedFiles.add(append);
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close manifest: " + manifest.path(), e);
}

return this;
}

@Override
public AppendFiles set(String property, String value) {
parameters.put(property, value);
return this;
}

@Override
public AppendFiles deleteWith(Consumer<String> deleteFunc) {
return this;
}

@Override
public AppendFiles stageOnly() {
throw new UnsupportedOperationException("Cannot stage commits in Delta tables");
}

@Override
public AppendFiles scanManifestsWith(ExecutorService executorService) {
return this;
}

@Override
public Snapshot apply() {
return null; // TODO
}

@Override
public void commit() {
table.refresh();
// Map<String, String> summary = summaryBuilder.build();
long now = System.currentTimeMillis();

Transaction transaction =
deltaTable
.createTransactionBuilder(deltaEngine, engineInfo(), Operation.WRITE)
.build(deltaEngine);

// pass the table location as a URI to construct relative paths
URI root = URI.create(table.location());
Iterable<Row> adds =
Iterables.transform(
appendedFiles,
file -> DeltaFileUtil.addFile(root, table.schema(), table.spec(), now, file));

transaction.commit(
deltaEngine, CloseableIterable.inMemoryIterable(toCloseableIterator(adds.iterator())));
}

// private static Map<String, String> toOperationMetrics(Map<String, String> summary) {
// return ImmutableMap.of(
// "numFiles",
// summary.get(SnapshotSummary.ADDED_FILES_PROP),
// "numOutputRows",
// summary.get(SnapshotSummary.ADDED_RECORDS_PROP),
// "numOutputBytes",
// summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP));
// }

private static String engineInfo() {
Map<String, String> env = EnvironmentContext.get();
String engine = env.get(EnvironmentContext.ENGINE_NAME);
String version = env.get(EnvironmentContext.ENGINE_VERSION);
String icebergVersion = IcebergBuild.version();
return String.format("%s/%s Iceberg/%s", engine, version, icebergVersion);
}
}
Loading

0 comments on commit dcc95ff

Please sign in to comment.