Skip to content

Commit

Permalink
Add xxhash correctness unit test, update shading, upper bound and som…
Browse files Browse the repository at this point in the history
…e minir updates
  • Loading branch information
chenjunjiedada committed Feb 22, 2020
1 parent 1519967 commit b912587
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 94 deletions.
31 changes: 3 additions & 28 deletions parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.9</version>
<version>${net.openhft.version}</version>
</dependency>

<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>junit-benchmarks</artifactId>
Expand Down Expand Up @@ -91,6 +92,7 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand All @@ -112,33 +114,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
<artifactSet>
<includes>
<include>net.openhft</include>
<include>it.unimi.dsi:fastutil</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>it.unimi.dsi</pattern>
<shadedPattern>${shade.prefix}.it.unimi.dsi</shadedPattern>
</relocation>
<relocation>
<pattern>net.openhft</pattern>
<shadedPattern>${shade.prefix}.net.openhft</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -127,6 +126,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
this.estimateNextSizeCheck = estimateNextSizeCheck;
this.allocator = allocator;

this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
this.statisticsTruncateLength = statisticsTruncateLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ public class BlockSplitBloomFilter implements BloomFilter {
// Bits in a tiny Bloom filter block.
private static final int BITS_PER_BLOCK = 256;

// the lower bound of Bloom filter size, set to the size of a tiny Bloom filter block
// The lower bound of bloom filter size, set to the size of a tiny Bloom filter block.
public static final int LOWER_BOUND_BYTES = 32;

// the upper bound of Bloom filter size, it sets to 20MB which is enough for a row group of 128MB
// with only one column of long type which should covers most real cases.
public static final int UPPER_BOUND_BYTES = 1024 * 1024 * 20;
// The upper bound of bloom filter size, set to default row group size.
public static final int UPPER_BOUND_BYTES = 128 * 1024 * 1024;

// The number of bits to set in a tiny Bloom filter
private static final int BITS_SET_PER_BLOCK = 8;
Expand All @@ -74,6 +73,9 @@ public class BlockSplitBloomFilter implements BloomFilter {
private int maximumBytes = UPPER_BOUND_BYTES;
private int minimumBytes = LOWER_BOUND_BYTES;

// A cache used for hashing
private ByteBuffer cacheBuffer = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);

// The block-based algorithm needs 8 odd SALT values to calculate eight indexes
// of bits to set, one per 32-bit word.
private static final int[] SALT = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
Expand Down Expand Up @@ -125,6 +127,10 @@ private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) {
* @param hashStrategy The adopted hash strategy of the Bloom filter.
*/
public BlockSplitBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy) {
if (minimumBytes > maximumBytes) {
throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
}

if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
this.minimumBytes = minimumBytes;
}
Expand All @@ -135,6 +141,8 @@ public BlockSplitBloomFilter(int numBytes, int minimumBytes, int maximumBytes, H

initBitset(numBytes);

cacheBuffer.order(ByteOrder.LITTLE_ENDIAN);

switch (hashStrategy) {
case XXH64:
this.hashStrategy = hashStrategy;
Expand Down Expand Up @@ -169,6 +177,7 @@ private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) {
throw new RuntimeException("Given bitset is null");
}

cacheBuffer.order(ByteOrder.LITTLE_ENDIAN);
this.bitset = bitset;
this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
switch (hashStrategy) {
Expand Down Expand Up @@ -272,12 +281,11 @@ public static int optimalNumOfBits(long n, double p) {
Preconditions.checkArgument((p > 0.0 && p < 1.0),
"FPP should be less than 1.0 and great than 0.0");
final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8));
final double MAX = UPPER_BOUND_BYTES << 3;
int numBits = (int)m;
int numBits = (int)m ;

// Handle overflow.
if (m > MAX || m < 0) {
numBits = (int)MAX;
if (m > UPPER_BOUND_BYTES << 3 || m < 0) {
numBits = UPPER_BOUND_BYTES;
}

// Round to BITS_PER_BLOCK
Expand All @@ -297,29 +305,24 @@ public int getBitsetSize() {

@Override
public long hash(Object value) {
ByteBuffer plain;

if (value instanceof Binary) {
return hashFunction.hashBytes(((Binary) value).getBytes());
}

cacheBuffer.clear();
if (value instanceof Integer) {
plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value));
cacheBuffer.putInt((Integer)value);
} else if (value instanceof Long) {
plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value));
cacheBuffer.putLong((Long)value);
} else if (value instanceof Float) {
plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value));
cacheBuffer.putFloat((Float)value);
} else if (value instanceof Double) {
plain = ByteBuffer.allocate(Double.SIZE/ Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value));
cacheBuffer.putDouble((Double) value);
} else {
throw new RuntimeException("Parquet Bloom filter: Not supported type");
}

return hashFunction.hashByteBuffer(plain);
return doHash();
}

@Override
Expand All @@ -337,32 +340,36 @@ public Compression getCompression() {
return Compression.UNCOMPRESSED;
}

private long doHash() {
cacheBuffer.flip();
long hashResult = hashFunction.hashByteBuffer(cacheBuffer);
cacheBuffer.clear();

return hashResult;
}

@Override
public long hash(int value) {
ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
return hashFunction.hashByteBuffer(plain);
cacheBuffer.putInt(value);
return doHash();
}

@Override
public long hash(long value) {
ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value);
return hashFunction.hashByteBuffer(plain);
cacheBuffer.putLong(value);
return doHash();
}

@Override
public long hash(double value) {
ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value);
return hashFunction.hashByteBuffer(plain);
cacheBuffer.putDouble(value);
return doHash();
}

@Override
public long hash(float value) {
ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value);
return hashFunction.hashByteBuffer(plain);
cacheBuffer.putFloat(value);
return doHash();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ enum HashStrategy {
this.value = value;
}
int value;

@Override
public String toString() {
return "xxhash";
}
}

// Bloom filter algorithm.
Expand All @@ -51,6 +56,11 @@ enum Algorithm {
this.value = value;
}
int value;

@Override
public String toString() {
return "block";
}
}

// Bloom filter compression.
Expand All @@ -60,6 +70,11 @@ enum Compression {
this.value = value;
}
int value;

@Override
public String toString() {
return "uncompressed";
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public long hashBytes(byte[] input) {

@Override
public long hashByteBuffer(ByteBuffer input) {
input.flip();
return LongHashFunction.xx(0).hashBytes(input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.HashSet;
import java.util.Set;

import net.openhft.hashing.LongHashFunction;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.parquet.io.api.Binary;
import org.junit.Rule;
Expand All @@ -34,12 +37,11 @@
import static org.junit.Assert.assertTrue;

public class TestBlockSplitBloomFilter {

@Test
public void testConstructor () {
BloomFilter bloomFilter1 = new BlockSplitBloomFilter(0);
assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.LOWER_BOUND_BYTES);
BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.UPPER_BOUND_BYTES + 1);
assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.UPPER_BOUND_BYTES);
BloomFilter bloomFilter3 = new BlockSplitBloomFilter(1000);
assertEquals(bloomFilter3.getBitsetSize(), 1024);
}
Expand Down Expand Up @@ -113,7 +115,7 @@ public void testBloomFilterNDVs(){
// the optimal value formula
double numBits = -8 * ndv / Math.log(1 - Math.pow(0.01, 1.0 / 8));
int bytes = (int)numBits / 8;
assertTrue(bytes < BlockSplitBloomFilter.UPPER_BOUND_BYTES);
assertTrue(bytes < 20 * 1024 * 1024);

// a row group of 128MB with one column of UUID type
ndv = 128 * 1024 * 1024 / java.util.UUID.randomUUID().toString().length();
Expand All @@ -122,4 +124,69 @@ public void testBloomFilterNDVs(){
assertTrue(bytes < 5 * 1024 * 1024);
}

/**
* Test data is output of the following program with xxHash implementation
* from https://github.com/Cyan4973/xxHash with commit c8c4cc0f812719ce1f5b2c291159658980e7c255
*
* #define XXH_INLINE_ALL
* #include "xxhash.h"
* #include <stdlib.h>
* #include <stdio.h>
* int main()
* {
* char* src = (char*) malloc(32);
* const int N = 32;
* for (int i = 0; i < N; i++) {
* src[i] = (char) i;
* }
*
* printf("without seed\n");
* for (int i = 0; i <= N; i++) {
* printf("%lldL,\n", (long long) XXH64(src, i, 0));
* }
*
* printf("with seed 42\n");
* for (int i = 0; i <= N; i++) {
* printf("%lldL,\n", (long long) XXH64(src, i, 42));
* }
* }
*/


private static final long[] HASHES_OF_LOOPING_BYTES_WITH_SEED_0 = {
-1205034819632174695L, -1642502924627794072L, 5216751715308240086L, -1889335612763511331L,
-13835840860730338L, -2521325055659080948L, 4867868962443297827L, 1498682999415010002L,
-8626056615231480947L, 7482827008138251355L, -617731006306969209L, 7289733825183505098L,
4776896707697368229L, 1428059224718910376L, 6690813482653982021L, -6248474067697161171L,
4951407828574235127L, 6198050452789369270L, 5776283192552877204L, -626480755095427154L,
-6637184445929957204L, 8370873622748562952L, -1705978583731280501L, -7898818752540221055L,
-2516210193198301541L, 8356900479849653862L, -4413748141896466000L, -6040072975510680789L,
1451490609699316991L, -7948005844616396060L, 8567048088357095527L, -4375578310507393311L
};
private static final long[] HASHES_OF_LOOPING_BYTES_WITH_SEED_42 = {
-7444071767201028348L, -8959994473701255385L, 7116559933691734543L, 6019482000716350659L,
-6625277557348586272L, -5507563483608914162L, 1540412690865189709L, 4522324563441226749L,
-7143238906056518746L, -7989831429045113014L, -7103973673268129917L, -2319060423616348937L,
-7576144055863289344L, -8903544572546912743L, 6376815151655939880L, 5913754614426879871L,
6466567997237536608L, -869838547529805462L, -2416009472486582019L, -3059673981515537339L,
4211239092494362041L, 1414635639471257331L, 166863084165354636L, -3761330575439628223L,
3524931906845391329L, 6070229753198168844L, -3740381894759773016L, -1268276809699008557L,
1518581707938531581L, 7988048690914090770L, -4510281763783422346L, -8988936099728967847L
};

@Test
public void testXxHashCorrectness() {
byte[] data = new byte[32];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) i;

ByteBuffer input = ByteBuffer.wrap(data, 0, i).order(ByteOrder.nativeOrder());
assertEquals(HASHES_OF_LOOPING_BYTES_WITH_SEED_0[i],
LongHashFunction.xx(0).hashBytes(input));

assertEquals(HASHES_OF_LOOPING_BYTES_WITH_SEED_42[i],
LongHashFunction.xx(42).hashBytes(input));
}
}

}
Loading

0 comments on commit b912587

Please sign in to comment.