Skip to content

Commit

Permalink
[Kernel] Cleanup the APIs used for checking read/write supported prot…
Browse files Browse the repository at this point in the history
…ocol
  • Loading branch information
vkorukanti committed Feb 12, 2025
1 parent b02edc8 commit b6a4ac7
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.StructType;
import java.util.*;
Expand Down Expand Up @@ -73,13 +72,11 @@ public class TableFeatures {
// Helper Methods //
////////////////////

public static void validateReadSupportedTable(
Protocol protocol, String tablePath, Optional<Metadata> metadata) {
public static void validateReadSupportedTable(Protocol protocol, String tablePath) {
switch (protocol.getMinReaderVersion()) {
case 1:
break;
case 2:
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
break;
case 3:
List<String> readerFeatures = protocol.getReaderFeatures();
Expand All @@ -88,9 +85,6 @@ public static void validateReadSupportedTable(
unsupportedFeatures.removeAll(SUPPORTED_READER_FEATURES);
throw DeltaErrors.unsupportedReaderFeature(tablePath, unsupportedFeatures);
}
if (readerFeatures.contains("columnMapping")) {
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
}
break;
default:
throw DeltaErrors.unsupportedReaderProtocol(tablePath, protocol.getMinReaderVersion());
Expand All @@ -111,18 +105,17 @@ public static void validateReadSupportedTable(
*
* @param protocol Table protocol
* @param metadata Table metadata
* @param tableSchema Table schema
*/
public static void validateWriteSupportedTable(
Protocol protocol, Metadata metadata, StructType tableSchema, String tablePath) {
Protocol protocol, Metadata metadata, String tablePath) {
int minWriterVersion = protocol.getMinWriterVersion();
switch (minWriterVersion) {
case 1:
break;
case 2:
// Append-only and column invariants are the writer features added in version 2
// Append-only is supported, but not the invariants
validateNoInvariants(tableSchema);
validateNoInvariants(metadata.getSchema());
break;
case 3:
// Check constraints are added in version 3
Expand All @@ -141,7 +134,7 @@ public static void validateWriteSupportedTable(
if (writerFeature.equals(INVARIANTS_FEATURE_NAME)) {
// For version 7, we allow 'invariants' to be present in the protocol's writerFeatures
// to unblock certain use cases, provided that no invariants are defined in the schema.
validateNoInvariants(tableSchema);
validateNoInvariants(metadata.getSchema());
} else if (!SUPPORTED_WRITER_FEATURES.contains(writerFeature)) {
throw unsupportedWriterFeature(tablePath, writerFeature);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -197,8 +196,7 @@ public CloseableIterator<ColumnarBatch> getChanges(
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
if (!protocolVector.isNullAt(rowId)) {
Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
TableFeatures.validateReadSupportedTable(
protocol, getDataPath().toString(), Optional.empty());
TableFeatures.validateReadSupportedTable(protocol, getDataPath().toString());
}
}
if (shouldDropProtocolColumn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ public Transaction build(Engine engine) {
protocol = protocol.withNewWriterFeatures(newWriterFeatures);
List<String> curWriterFeatures = protocol.getWriterFeatures();
checkArgument(!Objects.equals(oldWriterFeatures, curWriterFeatures));
TableFeatures.validateWriteSupportedTable(
protocol, metadata, metadata.getSchema(), table.getPath(engine));
TableFeatures.validateWriteSupportedTable(protocol, metadata, table.getPath(engine));
}
}

Expand All @@ -184,10 +183,7 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
String tablePath = table.getPath(engine);
// Validate the table has no features that Kernel doesn't yet support writing into it.
TableFeatures.validateWriteSupportedTable(
snapshot.getProtocol(),
snapshot.getMetadata(),
snapshot.getMetadata().getSchema(),
tablePath);
snapshot.getProtocol(), snapshot.getMetadata(), tablePath);

if (!isNewTable) {
if (schema.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public static void checkpoint(Engine engine, Clock clock, SnapshotImpl snapshot)
validateWriteSupportedTable(
snapshot.getProtocol(),
snapshot.getMetadata(),
snapshot.getSchema(),
snapshot.getDataPath().toString());

final Path checkpointPath = FileNames.checkpointFileSingular(logPath, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(

if (protocol != null) {
// Stop since we have found the latest Protocol and Metadata.
TableFeatures.validateReadSupportedTable(
protocol, dataPath.toString(), Optional.of(metadata));
TableFeatures.validateReadSupportedTable(protocol, dataPath.toString());
return new Tuple2<>(protocol, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,6 @@ public static ColumnMappingMode getColumnMappingMode(Map<String, String> configu
.orElse(ColumnMappingMode.NONE);
}

/**
* Checks if the given column mapping mode in the given table metadata is supported. Throws on
* unsupported modes.
*
* @param metadata Metadata of the table
*/
public static void throwOnUnsupportedColumnMappingMode(Metadata metadata) {
getColumnMappingMode(metadata.getConfiguration());
}

/**
* Helper method that converts the logical schema (requested by the connector) to physical schema
* of the data stored in data files based on the table's column mapping mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ class TableFeaturesSuite extends AnyFunSuite {
test("validateWriteSupported: protocol 2 with invariants") {
checkUnsupported(
createTestProtocol(minWriterVersion = 2),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = true))
metadata = createTestMetadata(includeVariant = true))
}

test("validateWriteSupported: protocol 2, with appendOnly and invariants") {
checkUnsupported(
createTestProtocol(minWriterVersion = 2),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = true))
metadata = createTestMetadata(includeVariant = true))
}

Seq(3, 4, 5, 6).foreach { minWriterVersion =>
Expand Down Expand Up @@ -90,33 +88,28 @@ class TableFeaturesSuite extends AnyFunSuite {

test("validateWriteSupported: protocol 7 with invariants, schema doesn't contain invariants") {
checkSupported(
createTestProtocol(minWriterVersion = 7, "invariants"),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = false)
createTestProtocol(minWriterVersion = 7, "invariants")
)
}

test("validateWriteSupported: protocol 7 with invariants, schema contains invariants") {
checkUnsupported(
createTestProtocol(minWriterVersion = 7, "invariants"),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = true)
metadata = createTestMetadata(includeVariant = true)
)
}

def checkSupported(
protocol: Protocol,
metadata: Metadata = null,
schema: StructType = createTestSchema()): Unit = {
validateWriteSupportedTable(protocol, metadata, schema, "/test/table")
metadata: Metadata = createTestMetadata()): Unit = {
validateWriteSupportedTable(protocol, metadata, "/test/table")
}

def checkUnsupported(
protocol: Protocol,
metadata: Metadata = null,
schema: StructType = createTestSchema()): Unit = {
metadata: Metadata = createTestMetadata()): Unit = {
intercept[KernelException] {
validateWriteSupportedTable(protocol, metadata, schema, "/test/table")
validateWriteSupportedTable(protocol, metadata, "/test/table")
}
}

Expand All @@ -131,18 +124,20 @@ class TableFeaturesSuite extends AnyFunSuite {
)
}

def createTestMetadata(withAppendOnly: Boolean = false): Metadata = {
def createTestMetadata(
withAppendOnly: Boolean = false, includeVariant: Boolean = false): Metadata = {
var config: Map[String, String] = Map()
if (withAppendOnly) {
config = Map("delta.appendOnly" -> "true");
}
val testSchema = createTestSchema(includeVariant);
new Metadata(
"id",
Optional.of("name"),
Optional.of("description"),
new Format("parquet", Collections.emptyMap()),
"sss",
new StructType(),
testSchema.toJson,
testSchema,
new ArrayValue() { // partitionColumns
override def getSize = 1

Expand Down

0 comments on commit b6a4ac7

Please sign in to comment.