Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Adds Utility Class for Implementing ZOrdering #3966

Closed
wants to merge 12 commits into from
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/ByteBuffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class ByteBuffers {

Expand All @@ -46,6 +47,15 @@ public static byte[] toByteArray(ByteBuffer buffer) {
}
}

public static ByteBuffer reuse(ByteBuffer reuse, int length) {
Preconditions.checkArgument(reuse.hasArray() && reuse.arrayOffset() == 0 && reuse.capacity() == length,
"Cannot reuse buffer: Should be an array %s, should have an offset of 0 %s, should be of size %s was %s",
reuse.hasArray(), reuse.arrayOffset(), length, reuse.capacity());
reuse.position(0);
reuse.limit(length);
return reuse;
}

public static ByteBuffer copy(ByteBuffer buffer) {
if (buffer == null) {
return null;
Expand Down
187 changes: 187 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.util;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

/**
* Within Z-Ordering the byte representations of objects being compared must be ordered,
* this requires several types to be transformed when converted to bytes. The goal is to
* map object's whose byte representation are not lexicographically ordered into representations
* that are lexicographically ordered.
* Most of these techniques are derived from
* https://aws.amazon.com/blogs/database/z-order-indexing-for-multifaceted-queries-in-amazon-dynamodb-part-2/
*
* Some implementation is taken from
* https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/util/OrderedBytes.java
*/
public class ZOrderByteUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be documented somewhere the produced byte[] should be compared lexicographically, with bytes as unsigned.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I will add this


private ZOrderByteUtils() {

}

/**
* Signed ints do not have their bytes in magnitude order because of the sign bit.
* To fix this, flip the sign bit so that all negatives are ordered before positives. This essentially
* shifts the 0 value so that we don't break our ordering when we cross the new 0 value.
*/
public static byte[] intToOrderedBytes(int val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Integer.BYTES);
bytes.putInt(val ^ 0x80000000);
return bytes.array();
}

/**
* Signed longs are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)}
*/
public static byte[] longToOrderedBytes(long val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Long.BYTES);
bytes.putLong(val ^ 0x8000000000000000L);
return bytes.array();
}

/**
* Signed shorts are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)}
*/
public static byte[] shortToOrderedBytes(short val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Short.BYTES);
bytes.putShort((short) (val ^ (0x8000)));
return bytes.array();
}

/**
* Signed tiny ints are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)}
*/
public static byte[] tinyintToOrderedBytes(byte val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Byte.BYTES);
bytes.put((byte) (val ^ (0x80)));
return bytes.array();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return the ByteBuffer instead since we're passing in a ByteBuffer?

}

/**
* IEEE 754 :
* “If two floating-point numbers in the same format are ordered (say, x {@literal <} y),
* they are ordered the same way when their bits are reinterpreted as sign-magnitude integers.”
*
* Which means floats can be treated as sign magnitude integers which can then be converted into lexicographically
* comparable bytes
*/
public static byte[] floatToOrderedBytes(float val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Float.BYTES);
int ival = Float.floatToIntBits(val);
ival ^= ((ival >> (Integer.SIZE - 1)) | Integer.MIN_VALUE);
bytes.putInt(ival);
return bytes.array();
}

/**
* Doubles are treated the same as floats in {@link #floatToOrderedBytes(float, ByteBuffer)}
*/
public static byte[] doubleToOrderedBytes(double val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Double.BYTES);
long lng = Double.doubleToLongBits(val);
lng ^= ((lng >> (Long.SIZE - 1)) | Long.MIN_VALUE);
bytes.putLong(lng);
return bytes.array();
}

/**
* Strings are lexicographically sortable BUT if different byte array lengths will
* ruin the Z-Ordering. (ZOrder requires that a given column contribute the same number of bytes every time).
* This implementation just uses a set size to for all output byte representations. Truncating longer strings
* and right padding 0 for shorter strings.
*/
public static byte[] stringToOrderedBytes(String val, int length, ByteBuffer reuse) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the length going to be calculated in practice?
Is it going to be a fixed prefix, like 128 or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The debate we were having before was wether to limit it to the max common length of columns, or whether to let it go beyond that.

Like with (A, B, CC, DDD)

Do you return

A. : All bytes that are available
ABCDCDD

B. : All bytes that can be interleaved with at least one other column
ABCDCD

Or

C. : All bytes that can be interleaved with all other columns
ABCD

My current implementation in Spark just does A which is the sum of all column lengths, but we could do B and save some space at the cost of losing a bit of single column ordering. I don't think C actually makes a lot of sense unless we do some hind of bin hashing and actually generate bytes all of the same size.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the our Spark implementation (see the follow up to this pr) we just set a length for the moment. Mostly to save us from unbounded strings increasing the size of our z-order algorithm and making sure the column always contributes the same number of bytes.

In the future I hope we can use statistics to do something smarter, or maybe not even use truncation and use some binning function or something like that based on the range of values in the column.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A. : All bytes that are available

i think this is what we should do (Maybe with some max length for safety) because

  1. z-order on 1 column should be consistent with sort on that column
  2. adding a low-cardinality, short column (like boolean, or tinyint, or varchar(1) if we had it), should not reduce z-order sorting strength; so one short columns should not stop us from ordering longer columns

ByteBuffer bytes = ByteBuffers.reuse(reuse, length);
Arrays.fill(bytes.array(), 0, length, (byte) 0x00);
if (val != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you update this to accept a reused buffer, then we need to remember to zero out the bytes.

int maxLength = Math.min(length, val.length());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val.length() is not "encoding-aware". Maybe add a comment that it doesn't matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, don't add a comment. We must take byte-length into account, otherwise we would be truncating short non-all-ASCII string values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... an example would be String "ą", single character, so length=1.
We would copy only the first byte of the two-byte sequence (0xC4 0x85) for the letter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH sorry I thought you were talking about something else, yeah let me switch this to the byte length of the encoded string

// We may truncate mid-character
bytes.put(val.getBytes(StandardCharsets.UTF_8), 0, maxLength);
Copy link
Contributor

@rdblue rdblue Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @findepi has a point about length. In addition, this call to getBytes is going to allocate a byte array. It doesn't help to reuse a buffer if we are going to do an allocation anyway.

One thing you can do is to use CharsetEncoder.encode(CharBuffer, ByteBuffer, boolean). That way, you'd get a CharBuffer wrapping the original string without reallocating and encode into the buffer that's passed in. You'd also get information about the result, but you probably don't really care what happens. Underflow means the string is short and overflow means that you ran out of space. Neither one really affects zorder much.

}
return bytes.array();
}

/**
* For Testing interleave all available bytes
*/
static byte[] interleaveBits(byte[][] columnsBinary) {
return interleaveBits(columnsBinary,
Arrays.stream(columnsBinary).mapToInt(column -> column.length).sum());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only allow this when all the columns have the same length?

Copy link
Member Author

@RussellSpitzer RussellSpitzer Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so? Shouldn't we be allowing columns to contribute different amounts of bytes?

This is one of the difficulties for working with strings and other types since not only are we going to need more bytes for strings, but their most significant bytes may be rather late in the string.

For example

"www.iceberg.org/index"
"www.iceberg.org/spark"

Only have a differing byte at position 17.

This is one of my reluctances to require similar contributions of byte sizes until we have a better "toOrderedBytes" function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only allow this when all the columns have the same length?

z_order(an_integer, a_bigint, 50_bytes_of_string) -- each column has different byte length.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi, while those do have different lengths, is it valuable to interleave them directly? We had a discussion above about interleaving short and int. I think that the behavior most people would expect is to interleave by numeric magnitude, not the actual bytes. Basically, if two columns have the same numeric distribution, they should interleave well regardless of the type widths.

If you agree there, then what you'd do is cast the integer in your example to a bigint. Then you have 2 8-byte buffers and 1 50-byte buffer. The remainder of the 50 bytes after the first 8 doesn't matter.

There are cases where you may want to use different lengths, but to me you should be calling the version with a specified length for those.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cast the integer in your example to a bigint

that works, for numbers at leasts

The remainder of the 50 bytes after the first 8 doesn't matter.

It does:

z_order(a_short, z_order_interleaving) -- it's unlikely that 2 first bytes of a string suffice for anything useful (ht for urls)

unless it's covered somewhere else:
if you order by z_order_interleaving, c1, c2, c3, then it z_order_interleaving can be shorter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi, I see your point. If you still want the string ordering then you have to preserve the string. You're right about that. I was focusing on interleaving the ordering across the columns, but it's of course necessary to still sort by the remaining bytes in the longest column!

}

/**
* Interleave bits using a naive loop. Variable length inputs are allowed but to get a consistent ordering it is
* required that every column contribute the same number of bytes in each invocation. Bits are interleaved from all
* columns that have a bit available at that position. Once a Column has no more bits to produce it is skipped in the
* interleaving.
* @param columnsBinary an array of ordered byte representations of the columns being ZOrdered
* @param interleavedSize the number of bytes to use in the output
* @return the columnbytes interleaved
*/
public static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize) {
byte[] interleavedBytes = new byte[interleavedSize];
int sourceColumn = 0;
int sourceByte = 0;
int sourceBit = 7;
int interleaveByte = 0;
int interleaveBit = 7;

while (interleaveByte < interleavedSize) {
// Take the source bit from source byte and move it to the output bit position
interleavedBytes[interleaveByte] |=
(columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit) >>> sourceBit << interleaveBit;
--interleaveBit;

// Check if an output byte has been completed
if (interleaveBit == -1) {
// Move to the next output byte
interleaveByte++;
// Move to the highest order bit of the new output byte
interleaveBit = 7;
}

// Check if the last output byte has been completed
if (interleaveByte == interleavedSize) {
break;
}

// Find the next source bit to interleave
do {
// Move to next column
++sourceColumn;
if (sourceColumn == columnsBinary.length) {
// If the last source column was used, reset to next bit of first column
sourceColumn = 0;
--sourceBit;
if (sourceBit == -1) {
// If the last bit of the source byte was used, reset to the highest bit of the next byte
sourceByte++;
sourceBit = 7;
}
}
} while (columnsBinary[sourceColumn].length <= sourceByte);
}
return interleavedBytes;
}
}
Loading