Skip to content

Commit

Permalink
Add helper classes for deletion vector in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 13, 2023
1 parent b8b2b7c commit 37c1d70
Show file tree
Hide file tree
Showing 11 changed files with 480 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.delete;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.math.LongMath;
import com.google.common.primitives.SignedBytes;

import java.nio.ByteBuffer;
import java.util.Arrays;

import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;

// This implements Base85 using the 4 byte block aligned encoding and character set from Z85 https://rfc.zeromq.org/spec/32
// Delta Lake implementation is https://github.com/delta-io/delta/blob/master/kernel/kernel-api/src/main/java/io/delta/kernel/internal/deletionvectors/Base85Codec.java
public final class Base85Codec
{
@VisibleForTesting
static final long BASE = 85L;
@VisibleForTesting
static final long BASE_2ND_POWER = LongMath.pow(BASE, 2);
@VisibleForTesting
static final long BASE_3RD_POWER = LongMath.pow(BASE, 3);
@VisibleForTesting
static final long BASE_4TH_POWER = LongMath.pow(BASE, 4);

private static final int ASCII_BITMASK = 0x7F;

// UUIDs always encode into 20 characters
static final int ENCODED_UUID_LENGTH = 20;

private static final String BASE85_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#";

@VisibleForTesting
static final byte[] ENCODE_MAP = BASE85_CHARACTERS.getBytes(UTF_8);

// The bitmask is the same as largest possible value, so the length of the array must be one greater.
static final byte[] DECODE_MAP = new byte[ASCII_BITMASK + 1];

static {
// Following loop doesn't fill all values
Arrays.fill(DECODE_MAP, (byte) -1);
for (int i = 0; i < ENCODE_MAP.length; i++) {
DECODE_MAP[ENCODE_MAP[i]] = SignedBytes.checkedCast(i);
}
}

private Base85Codec() {}

public static ByteBuffer decodeBlocks(String encoded)
{
char[] input = encoded.toCharArray();
checkArgument(input.length % 5 == 0, "Input should be 5 character aligned");
ByteBuffer buffer = ByteBuffer.allocate(input.length / 5 * 4);

// A mechanism to detect invalid characters in the input while decoding, that only has a
// single conditional at the very end, instead of branching for every character.
class InputCharDecoder
{
int canary;

long decodeInputChar(int i)
{
char c = input[i];
canary |= c; // non-ascii char has bits outside of ASCII_BITMASK
byte b = DECODE_MAP[c & ASCII_BITMASK];
canary |= b; // invalid char maps to -1, which has bits outside ASCII_BITMASK
return b;
}
}

int inputIndex = 0;
InputCharDecoder inputCharDecoder = new InputCharDecoder();
while (buffer.hasRemaining()) {
int sum = 0;
sum += inputCharDecoder.decodeInputChar(inputIndex) * BASE_4TH_POWER;
sum += inputCharDecoder.decodeInputChar(inputIndex + 1) * BASE_3RD_POWER;
sum += inputCharDecoder.decodeInputChar(inputIndex + 2) * BASE_2ND_POWER;
sum += inputCharDecoder.decodeInputChar(inputIndex + 3) * BASE;
sum += inputCharDecoder.decodeInputChar(inputIndex + 4);
buffer.putInt(sum);
inputIndex += 5;
}
checkArgument((inputCharDecoder.canary & ~ASCII_BITMASK) == 0, "Input is not valid Z85: %s", encoded);
buffer.rewind();
return buffer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.delete;

import com.google.common.base.CharMatcher;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.spi.TrinoException;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.UUID;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.delete.Base85Codec.decodeBlocks;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Math.toIntExact;
import static java.nio.ByteOrder.LITTLE_ENDIAN;

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-format
public final class DeletionVectors
{
private static final int PORTABLE_ROARING_BITMAP_MAGIC_NUMBER = 1681511377;

private static final String UUID_MARKER = "u"; // relative path with random prefix on disk
private static final String PATH_MARKER = "p"; // absolute path on disk
private static final String INLINE_MARKER = "i"; // inline

private static final CharMatcher ALPHANUMERIC = CharMatcher.inRange('A', 'Z').or(CharMatcher.inRange('a', 'z')).or(CharMatcher.inRange('0', '9')).precomputed();

private DeletionVectors() {}

public static Roaring64NavigableMap readDeletionVectors(TrinoFileSystem fileSystem, Location location, DeletionVectorEntry deletionVector)
throws IOException
{
if (deletionVector.storageType().equals(UUID_MARKER)) {
TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv())));
ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes());
Roaring64NavigableMap bitmaps = deserializeDeletionVectors(buffer);
if (bitmaps.getLongCardinality() != deletionVector.cardinality()) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "The number of deleted rows expects %s but got %s".formatted(deletionVector.cardinality(), bitmaps.getLongCardinality()));
}
return bitmaps;
}
if (deletionVector.storageType().equals(INLINE_MARKER) || deletionVector.storageType().equals(PATH_MARKER)) {
throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + deletionVector.storageType());
}
throw new IllegalArgumentException("Unexpected storage type: " + deletionVector.storageType());
}

public static String toFileName(String pathOrInlineDv)
{
int randomPrefixLength = pathOrInlineDv.length() - Base85Codec.ENCODED_UUID_LENGTH;
String randomPrefix = pathOrInlineDv.substring(0, randomPrefixLength);
checkArgument(ALPHANUMERIC.matchesAllOf(randomPrefix), "Random prefix must be alphanumeric: %s", randomPrefix);
String prefix = randomPrefix.isEmpty() ? "" : randomPrefix + "/";
String encodedUuid = pathOrInlineDv.substring(randomPrefixLength);
UUID uuid = decodeUuid(encodedUuid);
return "%sdeletion_vector_%s.bin".formatted(prefix, uuid);
}

public static ByteBuffer readDeletionVector(TrinoInputFile inputFile, int offset, int expectedSize)
throws IOException
{
byte[] bytes = new byte[expectedSize];
try (DataInputStream inputStream = new DataInputStream(inputFile.newStream())) {
checkState(inputStream.skip(offset) == offset);
int actualSize = inputStream.readInt();
if (actualSize != expectedSize) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "The size of deletion vector %s expects %s but got %s".formatted(inputFile.location(), expectedSize, actualSize));
}
inputStream.readFully(bytes);
}
return ByteBuffer.wrap(bytes).order(LITTLE_ENDIAN);
}

private static Roaring64NavigableMap deserializeDeletionVectors(ByteBuffer buffer)
throws IOException
{
checkArgument(buffer.order() == LITTLE_ENDIAN, "Byte order must be little endian: %s", buffer.order());
int magicNumber = buffer.getInt();
if (magicNumber == PORTABLE_ROARING_BITMAP_MAGIC_NUMBER) {
int size = toIntExact(buffer.getLong());
Roaring64NavigableMap bitmaps = new Roaring64NavigableMap();
bitmaps.add();

for (int i = 0; i < size; i++) {
int key = buffer.getInt();
checkArgument(key >= 0);

RoaringBitmap bitmap = new RoaringBitmap();
bitmap.deserialize(buffer);
bitmap.stream().forEach(bitmaps::add);

// there seems to be no better way to ask how many bytes bitmap.deserialize has read
int consumedBytes = bitmap.serializedSizeInBytes();
buffer.position(buffer.position() + consumedBytes);
}
return bitmaps;
}
throw new IllegalArgumentException("Unsupported magic number: " + magicNumber);
}

public static UUID decodeUuid(String encoded)
{
ByteBuffer buffer = decodeBlocks(encoded);
return uuidFromByteBuffer(buffer);
}

private static UUID uuidFromByteBuffer(ByteBuffer buffer)
{
checkArgument(buffer.remaining() == 16);
long highBits = buffer.getLong();
long lowBits = buffer.getLong();
return new UUID(highBits, lowBits);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.transactionlog;

import java.util.OptionalInt;

import static java.util.Objects.requireNonNull;

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-descriptor-schema
public record DeletionVectorEntry(String storageType, String pathOrInlineDv, OptionalInt offset, int sizeInBytes, long cardinality)
{
public DeletionVectorEntry
{
requireNonNull(storageType, "storageType is null");
requireNonNull(pathOrInlineDv, "pathOrInlineDv is null");
requireNonNull(offset, "offset is null");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.delete;

import org.testng.annotations.Test;

import java.nio.ByteBuffer;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.deltalake.delete.Base85Codec.BASE;
import static io.trino.plugin.deltalake.delete.Base85Codec.BASE_2ND_POWER;
import static io.trino.plugin.deltalake.delete.Base85Codec.BASE_3RD_POWER;
import static io.trino.plugin.deltalake.delete.Base85Codec.BASE_4TH_POWER;
import static io.trino.plugin.deltalake.delete.Base85Codec.ENCODE_MAP;
import static io.trino.plugin.deltalake.delete.Base85Codec.decodeBlocks;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestBase85Codec
{
@Test
public void testDecodeBlocksIllegalCharacter()
{
assertThatThrownBy(() -> decodeBlocks("ab" + 0x7F + "de")).hasMessageContaining("Input should be 5 character aligned");

assertThatThrownBy(() -> decodeBlocks("abîde")).hasMessageContaining("Input is not valid Z85: abîde");
assertThatThrownBy(() -> decodeBlocks("abπde")).hasMessageContaining("Input is not valid Z85: abπde");
assertThatThrownBy(() -> decodeBlocks("ab\"de")).hasMessageContaining("Input is not valid Z85: ab\"de");
}

@Test
public void testEncodeBytes()
{
// The test case comes from https://rfc.zeromq.org/spec/32
byte[] inputBytes = new byte[] {(byte) 0x86, 0x4F, (byte) 0xD2, 0x6F, (byte) 0xB5, 0x59, (byte) 0xF7, 0x5B};
String encoded = encodeBytes(inputBytes);
assertThat(encoded).isEqualTo("HelloWorld");
}

@Test
public void testCodecRoundTrip()
{
assertThat(encodeBytes(decodeBytes("HelloWorld", 8)))
.isEqualTo("HelloWorld");
assertThat(encodeBytes(decodeBytes("wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", 40)))
.isEqualTo("wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L");
}

@Test
public void testDecodeBytes()
{
String data = "HelloWorld";
byte[] bytes = decodeBytes(data, 8);
assertThat(bytes).isEqualTo(new byte[] {(byte) 0x86, 0x4F, (byte) 0xD2, 0x6F, (byte) 0xB5, 0x59, (byte) 0xF7, 0x5B});
}

private static byte[] decodeBytes(String encoded, int outputLength)
{
ByteBuffer result = decodeBlocks(encoded);
if (result.remaining() > outputLength) {
// Only read the expected number of bytes
byte[] output = new byte[outputLength];
result.get(output);
return output;
}
return result.array();
}

private static String encodeBytes(byte[] input)
{
if (input.length % 4 == 0) {
return encodeBlocks(ByteBuffer.wrap(input));
}
int alignedLength = ((input.length + 4) / 4) * 4;
ByteBuffer buffer = ByteBuffer.allocate(alignedLength);
buffer.put(input);
while (buffer.hasRemaining()) {
buffer.put((byte) 0);
}
buffer.rewind();
return encodeBlocks(buffer);
}

private static String encodeBlocks(ByteBuffer buffer)
{
checkArgument(buffer.remaining() % 4 == 0);
int numBlocks = buffer.remaining() / 4;
// Every 4 byte block gets encoded into 5 bytes/chars
int outputLength = numBlocks * 5;
byte[] output = new byte[outputLength];
int outputIndex = 0;

while (buffer.hasRemaining()) {
long word = Integer.toUnsignedLong(buffer.getInt()) & 0x00000000ffffffffL;
output[outputIndex] = ENCODE_MAP[(int) (word / BASE_4TH_POWER)];
word %= BASE_4TH_POWER;
output[outputIndex + 1] = ENCODE_MAP[(int) (word / BASE_3RD_POWER)];
word %= BASE_3RD_POWER;
output[outputIndex + 2] = ENCODE_MAP[(int) (word / BASE_2ND_POWER)];
word %= BASE_2ND_POWER;
output[outputIndex + 3] = ENCODE_MAP[(int) (word / BASE)];
output[outputIndex + 4] = ENCODE_MAP[(int) (word % BASE)];
outputIndex += 5;
}
return new String(output, UTF_8);
}
}
Loading

0 comments on commit 37c1d70

Please sign in to comment.