diff --git a/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java b/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java index 213b222dc507..efc05f179f82 100644 --- a/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java +++ b/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class ByteBuffers { @@ -46,6 +47,15 @@ public static byte[] toByteArray(ByteBuffer buffer) { } } + public static ByteBuffer reuse(ByteBuffer reuse, int length) { + Preconditions.checkArgument(reuse.hasArray() && reuse.arrayOffset() == 0 && reuse.capacity() == length, + "Cannot reuse buffer: Should be an array %s, should have an offset of 0 %s, should be of size %s was %s", + reuse.hasArray(), reuse.arrayOffset(), length, reuse.capacity()); + reuse.position(0); + reuse.limit(length); + return reuse; + } + public static ByteBuffer copy(ByteBuffer buffer) { if (buffer == null) { return null; diff --git a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java new file mode 100644 index 000000000000..b008461ea8ca --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Within Z-Ordering the byte representations of objects being compared must be ordered, + * this requires several types to be transformed when converted to bytes. The goal is to + * map object's whose byte representation are not lexicographically ordered into representations + * that are lexicographically ordered. Bytes produced should be compared lexicographically as + * unsigned bytes, big-endian. + *
+ * Most of these techniques are derived from + * https://aws.amazon.com/blogs/database/z-order-indexing-for-multifaceted-queries-in-amazon-dynamodb-part-2/ + *
+ * Some implementation is taken from + * https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/util/OrderedBytes.java + */ +public class ZOrderByteUtils { + + private ZOrderByteUtils() { + + } + + /** + * Signed ints do not have their bytes in magnitude order because of the sign bit. + * To fix this, flip the sign bit so that all negatives are ordered before positives. This essentially + * shifts the 0 value so that we don't break our ordering when we cross the new 0 value. + */ + public static ByteBuffer intToOrderedBytes(int val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Integer.BYTES); + bytes.putInt(val ^ 0x80000000); + return bytes; + } + + /** + * Signed longs are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} + */ + public static ByteBuffer longToOrderedBytes(long val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Long.BYTES); + bytes.putLong(val ^ 0x8000000000000000L); + return bytes; + } + + /** + * Signed shorts are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} + */ + public static ByteBuffer shortToOrderedBytes(short val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Short.BYTES); + bytes.putShort((short) (val ^ (0x8000))); + return bytes; + } + + /** + * Signed tiny ints are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} + */ + public static ByteBuffer tinyintToOrderedBytes(byte val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Byte.BYTES); + bytes.put((byte) (val ^ (0x80))); + return bytes; + } + + /** + * IEEE 754 : + * “If two floating-point numbers in the same format are ordered (say, x {@literal <} y), + * they are ordered the same way when their bits are reinterpreted as sign-magnitude integers.” + * + * Which means floats can be treated as sign magnitude integers which can then be converted into lexicographically + * comparable bytes + */ + public static ByteBuffer floatToOrderedBytes(float val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Float.BYTES); + int ival = Float.floatToIntBits(val); + ival ^= ((ival >> (Integer.SIZE - 1)) | Integer.MIN_VALUE); + bytes.putInt(ival); + return bytes; + } + + /** + * Doubles are treated the same as floats in {@link #floatToOrderedBytes(float, ByteBuffer)} + */ + public static ByteBuffer doubleToOrderedBytes(double val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Double.BYTES); + long lng = Double.doubleToLongBits(val); + lng ^= ((lng >> (Long.SIZE - 1)) | Long.MIN_VALUE); + bytes.putLong(lng); + return bytes; + } + + /** + * Strings are lexicographically sortable BUT if different byte array lengths will + * ruin the Z-Ordering. (ZOrder requires that a given column contribute the same number of bytes every time). + * This implementation just uses a set size to for all output byte representations. Truncating longer strings + * and right padding 0 for shorter strings. + */ + public static ByteBuffer stringToOrderedBytes(String val, int length, ByteBuffer reuse, CharsetEncoder encoder) { + Preconditions.checkArgument(encoder.charset().equals(StandardCharsets.UTF_8), + "Cannot use an encoder not using UTF_8 as it's Charset"); + + ByteBuffer bytes = ByteBuffers.reuse(reuse, length); + Arrays.fill(bytes.array(), 0, length, (byte) 0x00); + if (val != null) { + CharBuffer inputBuffer = CharBuffer.wrap(val); + encoder.encode(inputBuffer, bytes, true); + } + return bytes; + } + + /** + * For Testing interleave all available bytes + */ + static byte[] interleaveBits(byte[][] columnsBinary) { + return interleaveBits(columnsBinary, + Arrays.stream(columnsBinary).mapToInt(column -> column.length).sum()); + } + + /** + * Interleave bits using a naive loop. Variable length inputs are allowed but to get a consistent ordering it is + * required that every column contribute the same number of bytes in each invocation. Bits are interleaved from all + * columns that have a bit available at that position. Once a Column has no more bits to produce it is skipped in the + * interleaving. + * @param columnsBinary an array of ordered byte representations of the columns being ZOrdered + * @param interleavedSize the number of bytes to use in the output + * @return the columnbytes interleaved + */ + public static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize) { + byte[] interleavedBytes = new byte[interleavedSize]; + int sourceColumn = 0; + int sourceByte = 0; + int sourceBit = 7; + int interleaveByte = 0; + int interleaveBit = 7; + + while (interleaveByte < interleavedSize) { + // Take the source bit from source byte and move it to the output bit position + interleavedBytes[interleaveByte] |= + (columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit) >>> sourceBit << interleaveBit; + --interleaveBit; + + // Check if an output byte has been completed + if (interleaveBit == -1) { + // Move to the next output byte + interleaveByte++; + // Move to the highest order bit of the new output byte + interleaveBit = 7; + } + + // Check if the last output byte has been completed + if (interleaveByte == interleavedSize) { + break; + } + + // Find the next source bit to interleave + do { + // Move to next column + ++sourceColumn; + if (sourceColumn == columnsBinary.length) { + // If the last source column was used, reset to next bit of first column + sourceColumn = 0; + --sourceBit; + if (sourceBit == -1) { + // If the last bit of the source byte was used, reset to the highest bit of the next byte + sourceByte++; + sourceBit = 7; + } + } + } while (columnsBinary[sourceColumn].length <= sourceByte); + } + return interleavedBytes; + } +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java new file mode 100644 index 000000000000..bf84319d0d45 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.util; + +import java.nio.ByteBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Random; +import org.apache.iceberg.relocated.com.google.common.primitives.UnsignedBytes; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +public class TestZOrderByteUtil { + private static final byte IIIIIIII = (byte) 255; + private static final byte IOIOIOIO = (byte) 170; + private static final byte OIOIOIOI = (byte) 85; + private static final byte OOOOIIII = (byte) 15; + private static final byte OOOOOOOI = (byte) 1; + private static final byte OOOOOOOO = (byte) 0; + + private static final int NUM_TESTS = 100000; + private static final int NUM_INTERLEAVE_TESTS = 1000; + + private final Random random = new Random(42); + + private String bytesToString(byte[] bytes) { + StringBuilder result = new StringBuilder(); + for (byte b : bytes) { + result.append(String.format("%8s", Integer.toBinaryString(b & 0xFF)).replace(' ', '0')); + } + return result.toString(); + } + + /** + * Returns a non-0 length byte array + */ + private byte[] generateRandomBytes() { + int length = Math.abs(random.nextInt(100) + 1); + byte[] result = new byte[length]; + random.nextBytes(result); + return result; + } + + /** + * Test method to ensure correctness of byte interleaving code + */ + private String interleaveStrings(String[] strings) { + StringBuilder result = new StringBuilder(); + int totalLength = Arrays.stream(strings).mapToInt(String::length).sum(); + int substringIndex = 0; + int characterIndex = 0; + while (characterIndex < totalLength) { + for (String str : strings) { + if (substringIndex < str.length()) { + result.append(str.charAt(substringIndex)); + characterIndex++; + } + } + substringIndex++; + } + return result.toString(); + } + + /** + * Compares the result of a string based interleaving algorithm implemented above + * versus the binary bit-shifting algorithm used in ZOrderByteUtils. Either both + * algorithms are identically wrong or are both identically correct. + */ + @Test + public void testInterleaveRandomExamples() { + for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) { + int numByteArrays = Math.abs(random.nextInt(6)) + 1; + byte[][] testBytes = new byte[numByteArrays][]; + String[] testStrings = new String[numByteArrays]; + for (int byteIndex = 0; byteIndex < numByteArrays; byteIndex++) { + testBytes[byteIndex] = generateRandomBytes(); + testStrings[byteIndex] = bytesToString(testBytes[byteIndex]); + } + byte[] byteResult = ZOrderByteUtils.interleaveBits(testBytes); + String byteResultAsString = bytesToString(byteResult); + + String stringResult = interleaveStrings(testStrings); + + Assert.assertEquals("String interleave didn't match byte interleave", stringResult, byteResultAsString); + } + } + + @Test + public void testInterleaveEmptyBits() { + byte[][] test = new byte[4][10]; + byte[] expected = new byte[40]; + + Assert.assertArrayEquals("Should combine empty arrays", + expected, ZOrderByteUtils.interleaveBits(test)); + } + + @Test + public void testInterleaveFullBits() { + byte[][] test = new byte[4][]; + test[0] = new byte[]{IIIIIIII, IIIIIIII}; + test[1] = new byte[]{IIIIIIII}; + test[2] = new byte[0]; + test[3] = new byte[]{IIIIIIII, IIIIIIII, IIIIIIII}; + byte[] expected = new byte[]{IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII}; + + Assert.assertArrayEquals("Should combine full arrays", + expected, ZOrderByteUtils.interleaveBits(test)); + } + + @Test + public void testInterleaveMixedBits() { + byte[][] test = new byte[4][]; + test[0] = new byte[]{OOOOOOOI, IIIIIIII, OOOOOOOO, OOOOIIII}; + test[1] = new byte[]{OOOOOOOI, OOOOOOOO, IIIIIIII}; + test[2] = new byte[]{OOOOOOOI}; + test[3] = new byte[]{OOOOOOOI}; + byte[] expected = new byte[]{ + OOOOOOOO, OOOOOOOO, OOOOOOOO, OOOOIIII, + IOIOIOIO, IOIOIOIO, + OIOIOIOI, OIOIOIOI, + OOOOIIII}; + Assert.assertArrayEquals("Should combine mixed byte arrays", + expected, ZOrderByteUtils.interleaveBits(test)); + } + + @Test + public void testIntOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Integer.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Integer.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + int aInt = random.nextInt(); + int bInt = random.nextInt(); + int intCompare = Integer.signum(Integer.compare(aInt, bInt)); + byte[] aBytes = ZOrderByteUtils.intToOrderedBytes(aInt, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.intToOrderedBytes(bInt, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of ints should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aInt, bInt, intCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + intCompare, byteCompare); + } + } + + @Test + public void testLongOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Long.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Long.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + long aLong = random.nextInt(); + long bLong = random.nextInt(); + int longCompare = Integer.signum(Long.compare(aLong, bLong)); + byte[] aBytes = ZOrderByteUtils.longToOrderedBytes(aLong, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.longToOrderedBytes(bLong, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aLong, bLong, longCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + longCompare, byteCompare); + } + } + + @Test + public void testShortOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Short.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Short.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + short aShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1)); + short bShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1)); + int longCompare = Integer.signum(Long.compare(aShort, bShort)); + byte[] aBytes = ZOrderByteUtils.shortToOrderedBytes(aShort, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.shortToOrderedBytes(bShort, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aShort, bShort, longCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + longCompare, byteCompare); + } + } + + @Test + public void testTinyOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Byte.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Byte.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + byte aByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1)); + byte bByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1)); + int longCompare = Integer.signum(Long.compare(aByte, bByte)); + byte[] aBytes = ZOrderByteUtils.tinyintToOrderedBytes(aByte, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.tinyintToOrderedBytes(bByte, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aByte, bByte, longCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + longCompare, byteCompare); + } + } + + @Test + public void testFloatOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Float.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Float.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + float aFloat = random.nextFloat(); + float bFloat = random.nextFloat(); + int floatCompare = Integer.signum(Float.compare(aFloat, bFloat)); + byte[] aBytes = ZOrderByteUtils.floatToOrderedBytes(aFloat, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.floatToOrderedBytes(bFloat, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of floats should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aFloat, bFloat, floatCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + floatCompare, byteCompare); + } + } + + @Test + public void testDoubleOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Double.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Double.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + double aDouble = random.nextDouble(); + double bDouble = random.nextDouble(); + int doubleCompare = Integer.signum(Double.compare(aDouble, bDouble)); + byte[] aBytes = ZOrderByteUtils.doubleToOrderedBytes(aDouble, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.doubleToOrderedBytes(bDouble, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of doubles should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aDouble, bDouble, doubleCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + doubleCompare, byteCompare); + } + } + + @Test + public void testStringOrdering() { + CharsetEncoder encoder = StandardCharsets.UTF_8.newEncoder(); + ByteBuffer aBuffer = ByteBuffer.allocate(128); + ByteBuffer bBuffer = ByteBuffer.allocate(128); + for (int i = 0; i < NUM_TESTS; i++) { + String aString = (String) RandomUtil.generatePrimitive(Types.StringType.get(), random); + String bString = (String) RandomUtil.generatePrimitive(Types.StringType.get(), random); + int stringCompare = Integer.signum(aString.compareTo(bString)); + byte[] aBytes = ZOrderByteUtils.stringToOrderedBytes(aString, 128, aBuffer, encoder).array(); + byte[] bBytes = ZOrderByteUtils.stringToOrderedBytes(bString, 128, bBuffer, encoder).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of strings should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aString, bString, stringCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + stringCompare, byteCompare); + } + } +}