From 37c1d7047aa7f0da9cb9fbe0e8a780860215032f Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 12 Jul 2023 19:18:14 +0900 Subject: [PATCH] Add helper classes for deletion vector in Delta Lake --- .../plugin/deltalake/delete/Base85Codec.java | 100 +++++++++++++ .../deltalake/delete/DeletionVectors.java | 135 ++++++++++++++++++ .../transactionlog/DeletionVectorEntry.java | 29 ++++ .../deltalake/delete/TestBase85Codec.java | 119 +++++++++++++++ .../deltalake/delete/TestDeletionVectors.java | 76 ++++++++++ .../databricks/deletion_vectors/README.md | 13 ++ .../_delta_log/00000000000000000000.json | 3 + .../_delta_log/00000000000000000001.json | 2 + .../_delta_log/00000000000000000002.json | 3 + ...r_a52eda8c-0a57-4636-814b-9c165388f7ca.bin | Bin 0 -> 43 bytes ...4e53-94c8-2e20a0796fee-c000.snappy.parquet | Bin 0 -> 796 bytes 11 files changed, 480 insertions(+) create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/Base85Codec.java create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java create mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestBase85Codec.java create mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/Base85Codec.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/Base85Codec.java new file mode 100644 index 000000000000..d9d5d3c69778 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/Base85Codec.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.math.LongMath; +import com.google.common.primitives.SignedBytes; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.nio.charset.StandardCharsets.UTF_8; + +// This implements Base85 using the 4 byte block aligned encoding and character set from Z85 https://rfc.zeromq.org/spec/32 +// Delta Lake implementation is https://github.com/delta-io/delta/blob/master/kernel/kernel-api/src/main/java/io/delta/kernel/internal/deletionvectors/Base85Codec.java +public final class Base85Codec +{ + @VisibleForTesting + static final long BASE = 85L; + @VisibleForTesting + static final long BASE_2ND_POWER = LongMath.pow(BASE, 2); + @VisibleForTesting + static final long BASE_3RD_POWER = LongMath.pow(BASE, 3); + @VisibleForTesting + static final long BASE_4TH_POWER = LongMath.pow(BASE, 4); + + private static final int ASCII_BITMASK = 0x7F; + + // UUIDs always encode into 20 characters + static final int ENCODED_UUID_LENGTH = 20; + + private static final String BASE85_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#"; + + @VisibleForTesting + static final byte[] ENCODE_MAP = BASE85_CHARACTERS.getBytes(UTF_8); + + // The bitmask is the same as largest possible value, so the length of the array must be one greater. + static final byte[] DECODE_MAP = new byte[ASCII_BITMASK + 1]; + + static { + // Following loop doesn't fill all values + Arrays.fill(DECODE_MAP, (byte) -1); + for (int i = 0; i < ENCODE_MAP.length; i++) { + DECODE_MAP[ENCODE_MAP[i]] = SignedBytes.checkedCast(i); + } + } + + private Base85Codec() {} + + public static ByteBuffer decodeBlocks(String encoded) + { + char[] input = encoded.toCharArray(); + checkArgument(input.length % 5 == 0, "Input should be 5 character aligned"); + ByteBuffer buffer = ByteBuffer.allocate(input.length / 5 * 4); + + // A mechanism to detect invalid characters in the input while decoding, that only has a + // single conditional at the very end, instead of branching for every character. + class InputCharDecoder + { + int canary; + + long decodeInputChar(int i) + { + char c = input[i]; + canary |= c; // non-ascii char has bits outside of ASCII_BITMASK + byte b = DECODE_MAP[c & ASCII_BITMASK]; + canary |= b; // invalid char maps to -1, which has bits outside ASCII_BITMASK + return b; + } + } + + int inputIndex = 0; + InputCharDecoder inputCharDecoder = new InputCharDecoder(); + while (buffer.hasRemaining()) { + int sum = 0; + sum += inputCharDecoder.decodeInputChar(inputIndex) * BASE_4TH_POWER; + sum += inputCharDecoder.decodeInputChar(inputIndex + 1) * BASE_3RD_POWER; + sum += inputCharDecoder.decodeInputChar(inputIndex + 2) * BASE_2ND_POWER; + sum += inputCharDecoder.decodeInputChar(inputIndex + 3) * BASE; + sum += inputCharDecoder.decodeInputChar(inputIndex + 4); + buffer.putInt(sum); + inputIndex += 5; + } + checkArgument((inputCharDecoder.canary & ~ASCII_BITMASK) == 0, "Input is not valid Z85: %s", encoded); + buffer.rewind(); + return buffer; + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java new file mode 100644 index 000000000000..78f109145245 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java @@ -0,0 +1,135 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import com.google.common.base.CharMatcher; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; +import io.trino.spi.TrinoException; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.UUID; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; +import static io.trino.plugin.deltalake.delete.Base85Codec.decodeBlocks; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.Math.toIntExact; +import static java.nio.ByteOrder.LITTLE_ENDIAN; + +// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-format +public final class DeletionVectors +{ + private static final int PORTABLE_ROARING_BITMAP_MAGIC_NUMBER = 1681511377; + + private static final String UUID_MARKER = "u"; // relative path with random prefix on disk + private static final String PATH_MARKER = "p"; // absolute path on disk + private static final String INLINE_MARKER = "i"; // inline + + private static final CharMatcher ALPHANUMERIC = CharMatcher.inRange('A', 'Z').or(CharMatcher.inRange('a', 'z')).or(CharMatcher.inRange('0', '9')).precomputed(); + + private DeletionVectors() {} + + public static Roaring64NavigableMap readDeletionVectors(TrinoFileSystem fileSystem, Location location, DeletionVectorEntry deletionVector) + throws IOException + { + if (deletionVector.storageType().equals(UUID_MARKER)) { + TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv()))); + ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes()); + Roaring64NavigableMap bitmaps = deserializeDeletionVectors(buffer); + if (bitmaps.getLongCardinality() != deletionVector.cardinality()) { + throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "The number of deleted rows expects %s but got %s".formatted(deletionVector.cardinality(), bitmaps.getLongCardinality())); + } + return bitmaps; + } + if (deletionVector.storageType().equals(INLINE_MARKER) || deletionVector.storageType().equals(PATH_MARKER)) { + throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + deletionVector.storageType()); + } + throw new IllegalArgumentException("Unexpected storage type: " + deletionVector.storageType()); + } + + public static String toFileName(String pathOrInlineDv) + { + int randomPrefixLength = pathOrInlineDv.length() - Base85Codec.ENCODED_UUID_LENGTH; + String randomPrefix = pathOrInlineDv.substring(0, randomPrefixLength); + checkArgument(ALPHANUMERIC.matchesAllOf(randomPrefix), "Random prefix must be alphanumeric: %s", randomPrefix); + String prefix = randomPrefix.isEmpty() ? "" : randomPrefix + "/"; + String encodedUuid = pathOrInlineDv.substring(randomPrefixLength); + UUID uuid = decodeUuid(encodedUuid); + return "%sdeletion_vector_%s.bin".formatted(prefix, uuid); + } + + public static ByteBuffer readDeletionVector(TrinoInputFile inputFile, int offset, int expectedSize) + throws IOException + { + byte[] bytes = new byte[expectedSize]; + try (DataInputStream inputStream = new DataInputStream(inputFile.newStream())) { + checkState(inputStream.skip(offset) == offset); + int actualSize = inputStream.readInt(); + if (actualSize != expectedSize) { + throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "The size of deletion vector %s expects %s but got %s".formatted(inputFile.location(), expectedSize, actualSize)); + } + inputStream.readFully(bytes); + } + return ByteBuffer.wrap(bytes).order(LITTLE_ENDIAN); + } + + private static Roaring64NavigableMap deserializeDeletionVectors(ByteBuffer buffer) + throws IOException + { + checkArgument(buffer.order() == LITTLE_ENDIAN, "Byte order must be little endian: %s", buffer.order()); + int magicNumber = buffer.getInt(); + if (magicNumber == PORTABLE_ROARING_BITMAP_MAGIC_NUMBER) { + int size = toIntExact(buffer.getLong()); + Roaring64NavigableMap bitmaps = new Roaring64NavigableMap(); + bitmaps.add(); + + for (int i = 0; i < size; i++) { + int key = buffer.getInt(); + checkArgument(key >= 0); + + RoaringBitmap bitmap = new RoaringBitmap(); + bitmap.deserialize(buffer); + bitmap.stream().forEach(bitmaps::add); + + // there seems to be no better way to ask how many bytes bitmap.deserialize has read + int consumedBytes = bitmap.serializedSizeInBytes(); + buffer.position(buffer.position() + consumedBytes); + } + return bitmaps; + } + throw new IllegalArgumentException("Unsupported magic number: " + magicNumber); + } + + public static UUID decodeUuid(String encoded) + { + ByteBuffer buffer = decodeBlocks(encoded); + return uuidFromByteBuffer(buffer); + } + + private static UUID uuidFromByteBuffer(ByteBuffer buffer) + { + checkArgument(buffer.remaining() == 16); + long highBits = buffer.getLong(); + long lowBits = buffer.getLong(); + return new UUID(highBits, lowBits); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java new file mode 100644 index 000000000000..94719ae02c71 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog; + +import java.util.OptionalInt; + +import static java.util.Objects.requireNonNull; + +// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-descriptor-schema +public record DeletionVectorEntry(String storageType, String pathOrInlineDv, OptionalInt offset, int sizeInBytes, long cardinality) +{ + public DeletionVectorEntry + { + requireNonNull(storageType, "storageType is null"); + requireNonNull(pathOrInlineDv, "pathOrInlineDv is null"); + requireNonNull(offset, "offset is null"); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestBase85Codec.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestBase85Codec.java new file mode 100644 index 000000000000..b4b40608ae39 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestBase85Codec.java @@ -0,0 +1,119 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import org.testng.annotations.Test; + +import java.nio.ByteBuffer; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.plugin.deltalake.delete.Base85Codec.BASE; +import static io.trino.plugin.deltalake.delete.Base85Codec.BASE_2ND_POWER; +import static io.trino.plugin.deltalake.delete.Base85Codec.BASE_3RD_POWER; +import static io.trino.plugin.deltalake.delete.Base85Codec.BASE_4TH_POWER; +import static io.trino.plugin.deltalake.delete.Base85Codec.ENCODE_MAP; +import static io.trino.plugin.deltalake.delete.Base85Codec.decodeBlocks; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestBase85Codec +{ + @Test + public void testDecodeBlocksIllegalCharacter() + { + assertThatThrownBy(() -> decodeBlocks("ab" + 0x7F + "de")).hasMessageContaining("Input should be 5 character aligned"); + + assertThatThrownBy(() -> decodeBlocks("abîde")).hasMessageContaining("Input is not valid Z85: abîde"); + assertThatThrownBy(() -> decodeBlocks("abπde")).hasMessageContaining("Input is not valid Z85: abπde"); + assertThatThrownBy(() -> decodeBlocks("ab\"de")).hasMessageContaining("Input is not valid Z85: ab\"de"); + } + + @Test + public void testEncodeBytes() + { + // The test case comes from https://rfc.zeromq.org/spec/32 + byte[] inputBytes = new byte[] {(byte) 0x86, 0x4F, (byte) 0xD2, 0x6F, (byte) 0xB5, 0x59, (byte) 0xF7, 0x5B}; + String encoded = encodeBytes(inputBytes); + assertThat(encoded).isEqualTo("HelloWorld"); + } + + @Test + public void testCodecRoundTrip() + { + assertThat(encodeBytes(decodeBytes("HelloWorld", 8))) + .isEqualTo("HelloWorld"); + assertThat(encodeBytes(decodeBytes("wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", 40))) + .isEqualTo("wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L"); + } + + @Test + public void testDecodeBytes() + { + String data = "HelloWorld"; + byte[] bytes = decodeBytes(data, 8); + assertThat(bytes).isEqualTo(new byte[] {(byte) 0x86, 0x4F, (byte) 0xD2, 0x6F, (byte) 0xB5, 0x59, (byte) 0xF7, 0x5B}); + } + + private static byte[] decodeBytes(String encoded, int outputLength) + { + ByteBuffer result = decodeBlocks(encoded); + if (result.remaining() > outputLength) { + // Only read the expected number of bytes + byte[] output = new byte[outputLength]; + result.get(output); + return output; + } + return result.array(); + } + + private static String encodeBytes(byte[] input) + { + if (input.length % 4 == 0) { + return encodeBlocks(ByteBuffer.wrap(input)); + } + int alignedLength = ((input.length + 4) / 4) * 4; + ByteBuffer buffer = ByteBuffer.allocate(alignedLength); + buffer.put(input); + while (buffer.hasRemaining()) { + buffer.put((byte) 0); + } + buffer.rewind(); + return encodeBlocks(buffer); + } + + private static String encodeBlocks(ByteBuffer buffer) + { + checkArgument(buffer.remaining() % 4 == 0); + int numBlocks = buffer.remaining() / 4; + // Every 4 byte block gets encoded into 5 bytes/chars + int outputLength = numBlocks * 5; + byte[] output = new byte[outputLength]; + int outputIndex = 0; + + while (buffer.hasRemaining()) { + long word = Integer.toUnsignedLong(buffer.getInt()) & 0x00000000ffffffffL; + output[outputIndex] = ENCODE_MAP[(int) (word / BASE_4TH_POWER)]; + word %= BASE_4TH_POWER; + output[outputIndex + 1] = ENCODE_MAP[(int) (word / BASE_3RD_POWER)]; + word %= BASE_3RD_POWER; + output[outputIndex + 2] = ENCODE_MAP[(int) (word / BASE_2ND_POWER)]; + word %= BASE_2ND_POWER; + output[outputIndex + 3] = ENCODE_MAP[(int) (word / BASE)]; + output[outputIndex + 4] = ENCODE_MAP[(int) (word % BASE)]; + outputIndex += 5; + } + return new String(output, UTF_8); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java new file mode 100644 index 000000000000..f54ab55f0ac2 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import com.google.common.io.Resources; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.testng.annotations.Test; + +import java.io.File; +import java.nio.file.Path; +import java.util.OptionalInt; + +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; +import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors; +import static io.trino.plugin.deltalake.delete.DeletionVectors.toFileName; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestDeletionVectors +{ + @Test + public void testUuidStorageType() + throws Exception + { + // The deletion vector has a deleted row at position 1 + Path path = new File(Resources.getResource("databricks/deletion_vectors").toURI()).toPath(); + TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); + DeletionVectorEntry deletionVector = new DeletionVectorEntry("u", "R7QFX3rGXPFLhHGq&7g<", OptionalInt.of(1), 34, 1); + + Roaring64NavigableMap bitmaps = readDeletionVectors(fileSystem, Location.of(path.toString()), deletionVector); + assertThat(bitmaps.getLongCardinality()).isEqualTo(1); + assertThat(bitmaps.contains(0)).isFalse(); + assertThat(bitmaps.contains(1)).isTrue(); + assertThat(bitmaps.contains(2)).isFalse(); + } + + @Test + public void testUnsupportedPathStorageType() + { + TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); + DeletionVectorEntry deletionVector = new DeletionVectorEntry("p", "s3://bucket/table/deletion_vector.bin", OptionalInt.empty(), 40, 1); + assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), deletionVector)) + .hasMessageContaining("Unsupported storage type for deletion vector: p"); + } + + @Test + public void testUnsupportedInlineStorageType() + { + TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); + DeletionVectorEntry deletionVector = new DeletionVectorEntry("i", "wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", OptionalInt.empty(), 40, 1); + assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), deletionVector)) + .hasMessageContaining("Unsupported storage type for deletion vector: i"); + } + + @Test + public void testToFileName() + { + assertThat(toFileName("R7QFX3rGXPFLhHGq&7g<")).isEqualTo("deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin"); + assertThat(toFileName("ab^-aqEH.-t@S}K{vb[*k^")).isEqualTo("ab/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin"); + } +} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md new file mode 100644 index 000000000000..f30f3b1279ae --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md @@ -0,0 +1,13 @@ +Data generated using Databricks 12.2: + +```sql +CREATE TABLE default.test_deletion_vectors ( + a INT, + b INT) +USING delta +LOCATION 's3://trino-ci-test/test_deletion_vectors' +TBLPROPERTIES ('delta.enableDeletionVectors' = true); + +INSERT INTO default.test_deletion_vectors VALUES (1, 11), (2, 22); +DELETE FROM default.test_deletion_vectors WHERE a = 2; +``` diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..4a5d53407173 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1682326581374,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.enableDeletionVectors\":\"true\"}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"2cbfa481-d2b0-4f59-83f9-1261492dfd46"}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}} +{"metaData":{"id":"32f26f4b-95ba-4980-b209-0132e949b3e4","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true"},"createdTime":1682326580906}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json new file mode 100644 index 000000000000..7a5e8e6418b8 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1682326587253,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"2","numOutputBytes":"796"},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"99cd5421-a1b9-40c6-8063-7298ec935fd6"}} +{"add":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","partitionValues":{},"size":796,"modificationTime":1682326588000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"a\":1,\"b\":11},\"maxValues\":{\"a\":2,\"b\":22},\"nullCount\":{\"a\":0,\"b\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json new file mode 100644 index 000000000000..00f135f1c8d2 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1682326592314,"operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.default.test_deletion_vectors_vsipbnhjjg.a = 2)\"]"},"readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"0","numRemovedBytes":"0","numCopiedRows":"0","numDeletionVectorsAdded":"1","numDeletionVectorsRemoved":"0","numAddedChangeFiles":"0","executionTimeMs":"2046","numDeletedRows":"1","scanTimeMs":"1335","numAddedFiles":"0","numAddedBytes":"0","rewriteTimeMs":"709"},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"219ffc4f-ff84-49d6-98a3-b0b105ce2a1e"}} +{"remove":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","deletionTimestamp":1682326592313,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":796,"tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","partitionValues":{},"size":796,"modificationTime":1682326588000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"a\":1,\"b\":11},\"maxValues\":{\"a\":2,\"b\":22},\"nullCount\":{\"a\":0,\"b\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"R7QFX3rGXPFLhHGq&7g<","offset":1,"sizeInBytes":34,"cardinality":1}}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin new file mode 100644 index 0000000000000000000000000000000000000000..66b4b7369d9f146ca1841aa0fa2c4be0e6866f20 GIT binary patch literal 43 jcmZQ%U|>+Xc-bDV@IyDfVW71Tpnr1r2XGLuZYVUx6*Ojnjt z*po+3z3I{a!9T&P_<#66_|kNjLcIt{=FRuM_s#nrljEmvS{Pv)JNWC5U!M&d8?fEQ zCPMe0=m?>m9Sy!^bWQvD<oe9ZWigAcZ{#8jNhlSwA&?H4L|{C4;ZQG1J&$T1HtUk!%64u#|C7U4qX zMLfyqEuZ$y=Zldy36 z@8)jC(=x>YU9;j$$+64<$Yx2xMFM#!l%9L7>GtP~g?Cdc{=4?W0?~(C7OE*c=SmJ? zXyqVI){Yu!?yN`Pv{NdpZ+SLC@SrQW8prTo`~}`{vc&)Z literal 0 HcmV?d00001