diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index 3bd3a593eb..9f7479a6c1 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -58,6 +58,11 @@ fastutil ${fastutil.version} + + net.openhft + zero-allocation-hashing + ${net.openhft.version} + com.carrotsearch diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 4595723cf2..d9238de27f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -18,20 +18,24 @@ */ package org.apache.parquet.column; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; import org.apache.parquet.bytes.HeapByteBufferAllocator; - -import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; - -import java.util.Objects; - import org.apache.parquet.column.impl.ColumnWriteStoreV1; import org.apache.parquet.column.impl.ColumnWriteStoreV2; import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory; import org.apache.parquet.column.values.factory.ValuesWriterFactory; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; @@ -56,6 +60,7 @@ public class ParquetProperties { public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE; public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000; + public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024; public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; @@ -96,6 +101,11 @@ public static WriterVersion fromString(String name) { private final ValuesWriterFactory valuesWriterFactory; private final int columnIndexTruncateLength; private final int statisticsTruncateLength; + + // The key-value pair represents the column name and its expected distinct number of values in a row group. + private final Map bloomFilterExpectedDistinctNumbers; + private final int maxBloomFilterBytes; + private final List bloomFilterColumns; private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; private final boolean enableByteStreamSplit; @@ -115,6 +125,9 @@ private ParquetProperties(Builder builder) { this.valuesWriterFactory = builder.valuesWriterFactory; this.columnIndexTruncateLength = builder.columnIndexTruncateLength; this.statisticsTruncateLength = builder.statisticsTruncateLength; + this.bloomFilterExpectedDistinctNumbers = builder.bloomFilterColumnExpectedNDVs; + this.bloomFilterColumns = builder.bloomFilterColumns; + this.maxBloomFilterBytes = builder.maxBloomFilterBytes; this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; this.enableByteStreamSplit = builder.enableByteStreamSplit; @@ -189,11 +202,24 @@ public ByteBufferAllocator getAllocator() { public ColumnWriteStore newColumnWriteStore(MessageType schema, PageWriteStore pageStore) { + switch (writerVersion) { + case PARQUET_1_0: + return new ColumnWriteStoreV1(schema, pageStore, this); + case PARQUET_2_0: + return new ColumnWriteStoreV2(schema, pageStore, this); + default: + throw new IllegalArgumentException("unknown version " + writerVersion); + } + } + + public ColumnWriteStore newColumnWriteStore(MessageType schema, + PageWriteStore pageStore, + BloomFilterWriteStore bloomFilterWriteStore) { switch (writerVersion) { case PARQUET_1_0: - return new ColumnWriteStoreV1(schema, pageStore, this); + return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this); case PARQUET_2_0: - return new ColumnWriteStoreV2(schema, pageStore, this); + return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this); default: throw new IllegalArgumentException("unknown version " + writerVersion); } @@ -231,6 +257,22 @@ public boolean getPageWriteChecksumEnabled() { return pageWriteChecksumEnabled; } + public Map getBloomFilterColumnExpectedNDVs() { + return bloomFilterExpectedDistinctNumbers; + } + + public Set getBloomFilterColumns() { + if (bloomFilterColumns != null && bloomFilterColumns.size() > 0){ + return new HashSet<>(bloomFilterColumns); + } + + return bloomFilterExpectedDistinctNumbers.keySet(); + } + + public int getMaxBloomFilterBytes() { + return maxBloomFilterBytes; + } + public static Builder builder() { return new Builder(); } @@ -250,6 +292,9 @@ public String toString() { + "Max row count for page size check is: " + getMaxRowCountForPageSizeCheck() + '\n' + "Truncate length for column indexes is: " + getColumnIndexTruncateLength() + '\n' + "Truncate length for statistics min/max is: " + getStatisticsTruncateLength() + '\n' + + "Bloom filter enabled column names are: " + getBloomFilterColumns() + '\n' + + "Max Bloom filter size for a column is " + getMaxBloomFilterBytes() + '\n' + + "Bloom filter enabled column expected number of distinct values are: " + getBloomFilterColumnExpectedNDVs().values() + '\n' + "Page row count limit to " + getPageRowCountLimit() + '\n' + "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off"); } @@ -266,6 +311,9 @@ public static class Builder { private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH; + private Map bloomFilterColumnExpectedNDVs = new HashMap<>(); + private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES; + private List bloomFilterColumns = new ArrayList<>(); private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED; @@ -286,6 +334,9 @@ private Builder(ParquetProperties toCopy) { this.allocator = toCopy.allocator; this.pageRowCountLimit = toCopy.pageRowCountLimit; this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled; + this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers; + this.bloomFilterColumns = toCopy.bloomFilterColumns; + this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes; this.enableByteStreamSplit = toCopy.enableByteStreamSplit; } @@ -396,6 +447,41 @@ public Builder withStatisticsTruncateLength(int length) { return this; } + /** + * Set max Bloom filter bytes for related columns. + * + * @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column. + * @return this builder for method chaining + */ + public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) { + this.maxBloomFilterBytes = maxBloomFilterBytes; + return this; + } + + /** + * Set Bloom filter column names and expected NDVs. + * + * @param columnToNDVMap the columns which has bloom filter enabled. + * + * @return this builder for method chaining + */ + public Builder withBloomFilterColumnToNDVMap(Map columnToNDVMap) { + this.bloomFilterColumnExpectedNDVs = columnToNDVMap; + return this; + } + + /** + * Set Bloom filter column names. + * + * @param columns the columns which has bloom filter enabled. + * + * @return this builder for method chaining + */ + public Builder withBloomFilterColumnNames(List columns) { + this.bloomFilterColumns = columns; + return this; + } + public Builder withPageRowCountLimit(int rowCount) { Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount); pageRowCountLimit = rowCount; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java index 2018c01f5e..8740099730 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java @@ -34,6 +34,8 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; import org.apache.parquet.schema.MessageType; /** @@ -74,7 +76,7 @@ private interface ColumnWriterProvider { public ColumnWriter getColumnWriter(ColumnDescriptor path) { ColumnWriterBase column = columns.get(path); if (column == null) { - column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props); + column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props); columns.put(path, column); } return column; @@ -91,7 +93,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) { Map mcolumns = new TreeMap<>(); for (ColumnDescriptor path : schema.getColumns()) { PageWriter pageWriter = pageWriteStore.getPageWriter(path); - mcolumns.put(path, createColumnWriter(path, pageWriter, props)); + mcolumns.put(path, createColumnWriter(path, pageWriter, null, props)); } this.columns = unmodifiableMap(mcolumns); @@ -105,7 +107,38 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) { }; } - abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props); + // The Bloom filter is written to a specified bitset instead of pages, so it needs a separate write store abstract. + ColumnWriteStoreBase( + MessageType schema, + PageWriteStore pageWriteStore, + BloomFilterWriteStore bloomFilterWriteStore, + ParquetProperties props) { + this.props = props; + this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO); + Map mcolumns = new TreeMap<>(); + for (ColumnDescriptor path : schema.getColumns()) { + PageWriter pageWriter = pageWriteStore.getPageWriter(path); + if (props.getBloomFilterColumns() != null && props.getBloomFilterColumns().size() > 0) { + BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path); + mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props)); + } else { + mcolumns.put(path, createColumnWriter(path, pageWriter, null, props)); + } + } + this.columns = unmodifiableMap(mcolumns); + + this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck(); + + columnWriterProvider = new ColumnWriterProvider() { + @Override + public ColumnWriter getColumnWriter(ColumnDescriptor path) { + return columns.get(path); + } + }; + } + + abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, + BloomFilterWriter bloomFilterWriter, ParquetProperties props); @Override public ColumnWriter getColumnWriter(ColumnDescriptor path) { diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java index 7258423fb4..c4760d04f2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java @@ -22,6 +22,8 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; import org.apache.parquet.schema.MessageType; public class ColumnWriteStoreV1 extends ColumnWriteStoreBase { @@ -36,8 +38,15 @@ public ColumnWriteStoreV1(final PageWriteStore pageWriteStore, super(pageWriteStore, props); } + public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore, + BloomFilterWriteStore bloomFilterWriteStore, + ParquetProperties props) { + super(schema, pageWriteStore, bloomFilterWriteStore, props); + } + @Override - ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) { - return new ColumnWriterV1(path, pageWriter, props); + ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, + BloomFilterWriter bloomFilterWriter, ParquetProperties props) { + return new ColumnWriterV1(path, pageWriter, bloomFilterWriter, props); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java index bf1090d0bc..590c3edcf2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java @@ -22,6 +22,8 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; import org.apache.parquet.schema.MessageType; public class ColumnWriteStoreV2 extends ColumnWriteStoreBase { @@ -30,8 +32,15 @@ public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, Par super(schema, pageWriteStore, props); } + public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, + BloomFilterWriteStore bloomFilterWriteStore, + ParquetProperties props) { + super(schema, pageWriteStore, bloomFilterWriteStore, props); + } + @Override - ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) { - return new ColumnWriterV2(path, pageWriter, props); + ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, + BloomFilterWriter bloomFilterWriter, ParquetProperties props) { + return new ColumnWriterV2(path, pageWriter, bloomFilterWriter, props); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index 8fc7d31ba1..c46b26a283 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -19,6 +19,8 @@ package org.apache.parquet.column.impl; import java.io.IOException; +import java.util.Map; +import java.util.Set; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriter; @@ -27,6 +29,9 @@ import org.apache.parquet.column.page.PageWriter; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.io.api.Binary; import org.slf4j.Logger; @@ -53,10 +58,22 @@ abstract class ColumnWriterBase implements ColumnWriter { private long rowsWrittenSoFar = 0; private int pageRowCount; + private final BloomFilterWriter bloomFilterWriter; + private final BloomFilter bloomFilter; + ColumnWriterBase( ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) { + this(path, pageWriter, null, props); + } + + ColumnWriterBase( + ColumnDescriptor path, + PageWriter pageWriter, + BloomFilterWriter bloomFilterWriter, + ParquetProperties props + ) { this.path = path; this.pageWriter = pageWriter; resetStatistics(); @@ -64,6 +81,30 @@ abstract class ColumnWriterBase implements ColumnWriter { this.repetitionLevelColumn = createRLWriter(props, path); this.definitionLevelColumn = createDLWriter(props, path); this.dataColumn = props.newValuesWriter(path); + + this.bloomFilterWriter = bloomFilterWriter; + Set bloomFilterColumns = props.getBloomFilterColumns(); + String column = String.join(".", path.getPath()); + if (!bloomFilterColumns.contains(column)) { + this.bloomFilter = null; + return; + } + int maxBloomFilterSize = props.getMaxBloomFilterBytes(); + + Map bloomFilterColumnExpectedNDVs = props.getBloomFilterColumnExpectedNDVs(); + if (bloomFilterColumnExpectedNDVs.size() > 0) { + // If user specify the column NDV, we construct Bloom filter from it. + if (bloomFilterColumnExpectedNDVs.containsKey(column)) { + int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits( + bloomFilterColumnExpectedNDVs.get(column).intValue(), BlockSplitBloomFilter.DEFAULT_FPP); + + this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize); + } else { + this.bloomFilter = null; + } + } else { + this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize); + } } abstract ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path); @@ -122,6 +163,36 @@ public long getBufferedSizeInMemory() { + pageWriter.getMemSize(); } + private void updateBloomFilter(int value) { + if (bloomFilter != null) { + bloomFilter.insertHash(bloomFilter.hash(value)); + } + } + + private void updateBloomFilter(long value) { + if (bloomFilter != null) { + bloomFilter.insertHash(bloomFilter.hash(value)); + } + } + + private void updateBloomFilter(double value) { + if (bloomFilter != null) { + bloomFilter.insertHash(bloomFilter.hash(value)); + } + } + + private void updateBloomFilter(float value) { + if (bloomFilter != null) { + bloomFilter.insertHash(bloomFilter.hash(value)); + } + } + + private void updateBloomFilter(Binary value) { + if (bloomFilter != null) { + bloomFilter.insertHash(bloomFilter.hash(value)); + } + } + /** * Writes the current value * @@ -137,6 +208,7 @@ public void write(double value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeDouble(value); statistics.updateStats(value); + updateBloomFilter(value); ++valueCount; } @@ -155,6 +227,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeFloat(value); statistics.updateStats(value); + updateBloomFilter(value); ++valueCount; } @@ -173,6 +246,7 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeBytes(value); statistics.updateStats(value); + updateBloomFilter(value); ++valueCount; } @@ -209,6 +283,7 @@ public void write(int value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeInteger(value); statistics.updateStats(value); + updateBloomFilter(value); ++valueCount; } @@ -227,6 +302,7 @@ public void write(long value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeLong(value); statistics.updateStats(value); + updateBloomFilter(value); ++valueCount; } @@ -246,6 +322,10 @@ void finalizeColumnChunk() { } dataColumn.resetDictionary(); } + + if (bloomFilterWriter != null && bloomFilter != null) { + bloomFilterWriter.writeBloomFilter(bloomFilter); + } } /** diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java index 646e31aa7e..752042480b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java @@ -27,6 +27,7 @@ import org.apache.parquet.column.page.PageWriter; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; /** * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer. @@ -37,6 +38,11 @@ final class ColumnWriterV1 extends ColumnWriterBase { super(path, pageWriter, props); } + public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter, + BloomFilterWriter bloomFilterWriter, ParquetProperties props) { + super(path, pageWriter, bloomFilterWriter, props); + } + @Override ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) { return props.newRepetitionLevelWriter(path); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java index e4e8563cb9..cc44e2d630 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,6 +28,7 @@ import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; import org.apache.parquet.io.ParquetEncodingException; @@ -59,6 +60,11 @@ public BytesInput getBytes() { super(path, pageWriter, props); } + ColumnWriterV2(ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter bloomFilterWriter, + ParquetProperties props) { + super(path, pageWriter, bloomFilterWriter, props); + } + @Override ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) { return path.getMaxRepetitionLevel() == 0 ? NULL_WRITER : new RLEWriterForV2(props.newRepetitionLevelEncoder(path)); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java new file mode 100644 index 0000000000..a74c4265e0 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column.values.bloomfilter; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.io.api.Binary; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.IntBuffer; + +/* + * This Bloom filter is implemented using block-based Bloom filter algorithm from Putze et al.'s + * "Cache-, Hash- and Space-Efficient Bloom filters". The basic idea is to hash the item to a tiny + * Bloom filter which size fit a single cache line or smaller. This implementation sets 8 bits in + * each tiny Bloom filter. Each tiny Bloom filter is 32 bytes to take advantage of 32-byte SIMD + * instruction. + */ +public class BlockSplitBloomFilter implements BloomFilter { + // Bytes in a tiny Bloom filter block. + private static final int BYTES_PER_BLOCK = 32; + + // Bits in a tiny Bloom filter block. + private static final int BITS_PER_BLOCK = 256; + + // The lower bound of bloom filter size, set to the size of a tiny Bloom filter block. + public static final int LOWER_BOUND_BYTES = 32; + + // The upper bound of bloom filter size, set to default row group size. + public static final int UPPER_BOUND_BYTES = 128 * 1024 * 1024; + + // The number of bits to set in a tiny Bloom filter + private static final int BITS_SET_PER_BLOCK = 8; + + // The metadata in the header of a serialized Bloom filter is four four-byte values: the number of bytes, + // the filter algorithm, the hash algorithm, and the compression. + public static final int HEADER_SIZE = 16; + + // The default false positive probability value + public static final double DEFAULT_FPP = 0.01; + + // The hash strategy used in this Bloom filter. + private final HashStrategy hashStrategy; + + // The underlying byte array for Bloom filter bitset. + private byte[] bitset; + + // A integer array buffer of underlying bitset to help setting bits. + private IntBuffer intBuffer; + + // Hash function use to compute hash for column value. + private HashFunction hashFunction; + + private int maximumBytes = UPPER_BOUND_BYTES; + private int minimumBytes = LOWER_BOUND_BYTES; + + // A cache used for hashing + private ByteBuffer cacheBuffer = ByteBuffer.allocate(Long.BYTES); + + private int[] mask = new int[BITS_SET_PER_BLOCK]; + + // The block-based algorithm needs 8 odd SALT values to calculate eight indexes + // of bits to set, one per 32-bit word. + private static final int[] SALT = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, + 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; + + /** + * Constructor of block-based Bloom filter. + * + * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within + * [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down + * to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power + * of 2. It uses XXH64 as its default hash function. + */ + public BlockSplitBloomFilter(int numBytes) { + this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64); + } + + /** + * Constructor of block-based Bloom filter. + * + * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within + * [DEFAULT_MINIMUM_BYTES, maximumBytes], it will be rounded up/down + * to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power + * of 2. It uses XXH64 as its default hash function. + * @param maximumBytes The maximum bytes of the Bloom filter. + */ + public BlockSplitBloomFilter(int numBytes, int maximumBytes) { + this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64); + } + + /** + * Constructor of block-based Bloom filter. + * + * @param numBytes The number of bytes for Bloom filter bitset + * @param hashStrategy The hash strategy of Bloom filter. + */ + private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) { + this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, hashStrategy); + } + + /** + * Constructor of block-based Bloom filter. + * + * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within + * [minimumBytes, maximumBytes], it will be rounded up/down to lower/upper bound if + * num_bytes is out of range. It will also be rounded up to a power of 2. + * @param minimumBytes The minimum bytes of the Bloom filter. + * @param maximumBytes The maximum bytes of the Bloom filter. + * @param hashStrategy The adopted hash strategy of the Bloom filter. + */ + public BlockSplitBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy) { + if (minimumBytes > maximumBytes) { + throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes"); + } + + if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) { + this.minimumBytes = minimumBytes; + } + + if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) { + this.maximumBytes = maximumBytes; + } + + initBitset(numBytes); + + cacheBuffer.order(ByteOrder.LITTLE_ENDIAN); + + switch (hashStrategy) { + case XXH64: + this.hashStrategy = hashStrategy; + hashFunction = new XxHash(); + break; + default: + throw new RuntimeException("Unsupported hash strategy"); + } + } + + + /** + * Construct the Bloom filter with given bitset, it is used when reconstructing + * Bloom filter from parquet file. It use XXH64 as its default hash + * function. + * + * @param bitset The given bitset to construct Bloom filter. + */ + public BlockSplitBloomFilter(byte[] bitset) { + this(bitset, HashStrategy.XXH64); + } + + /** + * Construct the Bloom filter with given bitset, it is used when reconstructing + * Bloom filter from parquet file. + * + * @param bitset The given bitset to construct Bloom filter. + * @param hashStrategy The hash strategy Bloom filter apply. + */ + private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) { + if (bitset == null) { + throw new RuntimeException("Given bitset is null"); + } + + cacheBuffer.order(ByteOrder.LITTLE_ENDIAN); + this.bitset = bitset; + this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer(); + switch (hashStrategy) { + case XXH64: + this.hashStrategy = hashStrategy; + hashFunction = new XxHash(); + break; + default: + throw new RuntimeException("Unsupported hash strategy"); + } + } + + /** + * Create a new bitset for Bloom filter. + * + * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within + * [minimumBytes, maximumBytes], it will be rounded up/down + * to lower/upper bound if num_bytes is out of range and also will rounded up to a power + * of 2. It uses XXH64 as its default hash function and block-based algorithm + * as default algorithm. + */ + private void initBitset(int numBytes) { + if (numBytes < minimumBytes) { + numBytes = minimumBytes; + } + // Get next power of 2 if it is not power of 2. + if ((numBytes & (numBytes - 1)) != 0) { + numBytes = Integer.highestOneBit(numBytes) << 1; + } + if (numBytes > maximumBytes || numBytes < 0) { + numBytes = maximumBytes; + } + this.bitset = new byte[numBytes]; + this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer(); + } + + @Override + public void writeTo(OutputStream out) throws IOException { + out.write(bitset); + } + + private int[] setMask(int key) { + // The following three loops are written separately so that they could be + // optimized for vectorization. + for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { + mask[i] = key * SALT[i]; + } + + for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { + mask[i] = mask[i] >>> 27; + } + + for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { + mask[i] = 0x1 << mask[i]; + } + + return mask; + } + + @Override + public void insertHash(long hash) { + long numBlocks = bitset.length / BYTES_PER_BLOCK; + long lowHash = hash >>> 32; + int blockIndex = (int)((lowHash * numBlocks) >> 32); + int key = (int)hash; + + // Calculate mask for bucket. + int[] mask = setMask(key); + for (int i = 0; i < BITS_SET_PER_BLOCK; i++) { + int value = intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i); + value |= mask[i]; + intBuffer.put(blockIndex * (BYTES_PER_BLOCK / 4) + i, value); + } + } + + @Override + public boolean findHash(long hash) { + long numBlocks = bitset.length / BYTES_PER_BLOCK; + long lowHash = hash >>> 32; + int blockIndex = (int)((lowHash * numBlocks) >> 32); + int key = (int)hash; + + // Calculate mask for the tiny Bloom filter. + int[] mask = setMask(key); + for (int i = 0; i < BITS_SET_PER_BLOCK; i++) { + if (0 == (intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) { + return false; + } + } + + return true; + } + + /** + * Calculate optimal size according to the number of distinct values and false positive probability. + * + * @param n: The number of distinct values. + * @param p: The false positive probability. + * + * @return optimal number of bits of given n and p. + */ + public static int optimalNumOfBits(long n, double p) { + Preconditions.checkArgument((p > 0.0 && p < 1.0), + "FPP should be less than 1.0 and great than 0.0"); + final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8)); + int numBits = (int)m ; + + // Handle overflow. + if (numBits > UPPER_BOUND_BYTES << 3 || m < 0) { + numBits = UPPER_BOUND_BYTES << 3; + } + + // Round numBits up to (k * BITS_PER_BLOCK) + numBits = (numBits + BITS_PER_BLOCK -1) & ~BITS_PER_BLOCK; + + if (numBits < (LOWER_BOUND_BYTES << 3)) { + numBits = LOWER_BOUND_BYTES << 3; + } + + return numBits; + } + + @Override + public int getBitsetSize() { + return this.bitset.length; + } + + @Override + public long hash(Object value) { + if (value instanceof Binary) { + return hashFunction.hashBytes(((Binary) value).getBytes()); + } + + if (value instanceof Integer) { + cacheBuffer.putInt((Integer)value); + } else if (value instanceof Long) { + cacheBuffer.putLong((Long)value); + } else if (value instanceof Float) { + cacheBuffer.putFloat((Float)value); + } else if (value instanceof Double) { + cacheBuffer.putDouble((Double) value); + } else { + throw new RuntimeException("Parquet Bloom filter: Not supported type"); + } + + return doHash(); + } + + @Override + public HashStrategy getHashStrategy() { + return HashStrategy.XXH64; + } + + @Override + public Algorithm getAlgorithm() { + return Algorithm.BLOCK; + } + + @Override + public Compression getCompression() { + return Compression.UNCOMPRESSED; + } + + private long doHash() { + cacheBuffer.flip(); + long hashResult = hashFunction.hashByteBuffer(cacheBuffer); + cacheBuffer.clear(); + + return hashResult; + } + + @Override + public long hash(int value) { + cacheBuffer.putInt(value); + return doHash(); + } + + @Override + public long hash(long value) { + cacheBuffer.putLong(value); + return doHash(); + } + + @Override + public long hash(double value) { + cacheBuffer.putDouble(value); + return doHash(); + } + + @Override + public long hash(float value) { + cacheBuffer.putFloat(value); + return doHash(); + } + + @Override + public long hash(Binary value) { + return hashFunction.hashBytes(value.getBytes()); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java new file mode 100644 index 0000000000..27926e0e2a --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.bloomfilter; + +import org.apache.parquet.io.api.Binary; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * A Bloom filter is a compact structure to indicate whether an item is not in a set or probably + * in a set. The Bloom filter usually consists of a bit set that represents a elements set, + * a hash strategy and a Bloom filter algorithm. + */ +public interface BloomFilter { + /* Bloom filter Hash strategy. + * + * xxHash is an extremely fast hash algorithm, running at RAM speed limits. It successfully + * completes the SMHasher test suite which evaluates collision, dispersion and randomness qualities + * of hash functions. It shows good performance advantage from its benchmark result. + * (see https://github.com/Cyan4973/xxHash). + */ + enum HashStrategy { + XXH64; + + @Override + public String toString() { + return "xxhash"; + } + } + + // Bloom filter algorithm. + enum Algorithm { + BLOCK; + + @Override + public String toString() { + return "block"; + } + } + + // Bloom filter compression. + enum Compression { + UNCOMPRESSED; + + @Override + public String toString() { + return "uncompressed"; + } + } + + /** + * Write the Bloom filter to an output stream. It writes the Bloom filter header including the + * bitset's length in bytes, the hash strategy, the algorithm, and the bitset. + * + * @param out the output stream to write + */ + void writeTo(OutputStream out) throws IOException; + + /** + * Insert an element to the Bloom filter, the element content is represented by + * the hash value of its plain encoding result. + * + * @param hash the hash result of element. + */ + void insertHash(long hash); + + /** + * Determine whether an element is in set or not. + * + * @param hash the hash value of element plain encoding result. + * @return false if element is must not in set, true if element probably in set. + */ + boolean findHash(long hash); + + /** + * Get the number of bytes for bitset in this Bloom filter. + * + * @return The number of bytes for bitset in this Bloom filter. + */ + int getBitsetSize(); + + /** + * Compute hash for int value by using its plain encoding result. + * + * @param value the value to hash + * @return hash result + */ + long hash(int value); + + /** + * Compute hash for long value by using its plain encoding result. + * + * @param value the value to hash + * @return hash result + */ + long hash(long value) ; + + /** + * Compute hash for double value by using its plain encoding result. + * + * @param value the value to hash + * @return hash result + */ + long hash(double value); + + /** + * Compute hash for float value by using its plain encoding result. + * + * @param value the value to hash + * @return hash result + */ + long hash(float value); + + /** + * Compute hash for Binary value by using its plain encoding result. + * + * @param value the value to hash + * @return hash result + */ + long hash(Binary value); + + /** + * Compute hash for Object value by using its plain encoding result. + * + * @param value the value to hash + * @return hash result + */ + long hash(Object value); + + // The boolean type is not supported because boolean type has only two values, while Bloom filter is + // suitable for high cardinality. + // long hash(Boolean value); + + /** + * Return the hash strategy that the bloom filter apply. + * + * @return hash strategy that the bloom filter apply + */ + HashStrategy getHashStrategy(); + + /** + * Return the algorithm that the bloom filter apply. + * + * @return algorithm that the bloom filter apply + */ + Algorithm getAlgorithm(); + + /** + * Return the compress algorithm that the bloom filter apply. + * + * @return compress algorithm that the bloom filter apply + */ + Compression getCompression(); +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java new file mode 100644 index 0000000000..f7e28fdf2d --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column.values.bloomfilter; + +import org.apache.parquet.column.ColumnDescriptor; + +/** + * Contains all writers for all columns of a row group + */ +public interface BloomFilterWriteStore { + /** + * Get bloom filter writer of a column + * + * @param path the descriptor for the column + * @return the corresponding Bloom filter writer + */ + BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path); +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java new file mode 100644 index 0000000000..e2504d8216 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column.values.bloomfilter; + +public interface BloomFilterWriter { + /** + * Write a Bloom filter + * + * @param bloomFilter the Bloom filter to write + * + */ + void writeBloomFilter(BloomFilter bloomFilter); +} + diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java new file mode 100644 index 0000000000..2043934fb2 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.bloomfilter; + +import java.nio.ByteBuffer; + +/** + * A interface contains a set of hash functions used by Bloom filter. + */ +public interface HashFunction { + + /** + * compute the hash value for a byte array. + * @param input the input byte array + * @return a result of long value. + */ + long hashBytes(byte[] input); + + /** + * compute the hash value for a ByteBuffer. + * @param input the input ByteBuffer + * @return a result of long value. + */ + long hashByteBuffer(ByteBuffer input); +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java new file mode 100644 index 0000000000..6c52b3c987 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column.values.bloomfilter; + +import net.openhft.hashing.LongHashFunction; + +import java.nio.ByteBuffer; + +/** + * The implementation of HashFunction interface. The XxHash uses XXH64 version xxHash + * with a seed of 0. + */ +public class XxHash implements HashFunction { + @Override + public long hashBytes(byte[] input) { + return LongHashFunction.xx(0).hashBytes(input); + } + + @Override + public long hashByteBuffer(ByteBuffer input) { + return LongHashFunction.xx(0).hashBytes(input); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java new file mode 100644 index 0000000000..9d2aacc7f1 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.bloomfilter; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import net.openhft.hashing.LongHashFunction; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.parquet.io.api.Binary; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestBlockSplitBloomFilter { + + @Test + public void testConstructor () { + BloomFilter bloomFilter1 = new BlockSplitBloomFilter(0); + assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.LOWER_BOUND_BYTES); + BloomFilter bloomFilter3 = new BlockSplitBloomFilter(1000); + assertEquals(bloomFilter3.getBitsetSize(), 1024); + } + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + /* + * This test is used to test basic operations including inserting, finding and + * serializing and de-serializing. + */ + @Test + public void testBloomFilterForString() { + final int numValues = 1024 * 1024; + int numBytes = BlockSplitBloomFilter.optimalNumOfBits(numValues , 0.01) / 8; + BloomFilter bloomFilter = new BlockSplitBloomFilter(numBytes); + + Set testStrings = new HashSet<>(); + for (int i = 0; i < numValues; i ++) { + String str = RandomStringUtils.randomAlphabetic(1, 64); + bloomFilter.insertHash(bloomFilter.hash(Binary.fromString(str))); + testStrings.add(str); + } + + for (String testString : testStrings) { + assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testString)))); + } + } + + @Test + public void testBloomFilterForPrimitives() { + for (int i = 0; i < 4; i++) { + long seed = System.nanoTime(); + testBloomFilterForPrimitives(seed); + } + } + + private void testBloomFilterForPrimitives(long seed) { + Random random = new Random(seed); + final int numValues = 1024 * 1024; + final int numBytes = BlockSplitBloomFilter.optimalNumOfBits(numValues, random.nextDouble() / 10) / 8; + BloomFilter bloomFilter = new BlockSplitBloomFilter(numBytes); + + Set values = new HashSet<>(); + + for (int i = 0; i < numValues; i++) { + int type = random.nextInt(4); + Object v; + switch (type) { + case 0: { + v = random.nextInt(); + break; + } + case 1: { + v = random.nextLong(); + break; + } + case 2: { + v = random.nextFloat(); + break; + } + default: { + v = random.nextDouble(); + } + } + values.add(v); + bloomFilter.insertHash(bloomFilter.hash(v)); + } + + for (Object v : values) { + assertTrue(String.format("the value %s should not be filtered, seed = %d", v, seed), + bloomFilter.findHash(bloomFilter.hash(v))); + } + } + + @Test + public void testBloomFilterFPPAccuracy() { + final int totalCount = 100000; + final double FPP = 0.01; + + BloomFilter bloomFilter = new BlockSplitBloomFilter(BlockSplitBloomFilter.optimalNumOfBits(totalCount, FPP) / 8); + + Set distinctStrings = new HashSet<>(); + while (distinctStrings.size() < totalCount) { + String str = RandomStringUtils.randomAlphabetic(12); + if (distinctStrings.add(str)) { + bloomFilter.insertHash(bloomFilter.hash(Binary.fromString(str))); + } + } + + distinctStrings.clear(); + // The exist counts the number of times FindHash returns true. + int exist = 0; + while(distinctStrings.size() < totalCount) { + String str = RandomStringUtils.randomAlphabetic(10); + if (distinctStrings.add(str) && bloomFilter.findHash(bloomFilter.hash(Binary.fromString(str)))) { + exist ++; + } + } + + // The exist should be probably less than 1000 according FPP 0.01. Add 20% here for error space. + assertTrue(exist < totalCount * (FPP * 1.2)); + } + + @Test + public void testBloomFilterNDVs(){ + // a row group of 128M with one column of long type. + int ndv = 128 * 1024 * 1024 / 8; + double fpp = 0.01; + + // the optimal value formula + double numBits = -8 * ndv / Math.log(1 - Math.pow(0.01, 1.0 / 8)); + int bytes = (int)numBits / 8; + assertTrue(bytes < 20 * 1024 * 1024); + + // a row group of 128MB with one column of UUID type + ndv = 128 * 1024 * 1024 / java.util.UUID.randomUUID().toString().length(); + numBits = -8 * ndv / Math.log(1 - Math.pow(fpp, 1.0 / 8)); + bytes = (int)numBits / 8; + assertTrue(bytes < 5 * 1024 * 1024); + } + + /** + * Test data is output of the following program with xxHash implementation + * from https://github.com/Cyan4973/xxHash with commit c8c4cc0f812719ce1f5b2c291159658980e7c255 + * + * #define XXH_INLINE_ALL + * #include "xxhash.h" + * #include + * #include + * int main() + * { + * char* src = (char*) malloc(32); + * const int N = 32; + * for (int i = 0; i < N; i++) { + * src[i] = (char) i; + * } + * + * printf("without seed\n"); + * for (int i = 0; i <= N; i++) { + * printf("%lldL,\n", (long long) XXH64(src, i, 0)); + * } + * + * printf("with seed 42\n"); + * for (int i = 0; i <= N; i++) { + * printf("%lldL,\n", (long long) XXH64(src, i, 42)); + * } + * } + */ + + + private static final long[] HASHES_OF_LOOPING_BYTES_WITH_SEED_0 = { + -1205034819632174695L, -1642502924627794072L, 5216751715308240086L, -1889335612763511331L, + -13835840860730338L, -2521325055659080948L, 4867868962443297827L, 1498682999415010002L, + -8626056615231480947L, 7482827008138251355L, -617731006306969209L, 7289733825183505098L, + 4776896707697368229L, 1428059224718910376L, 6690813482653982021L, -6248474067697161171L, + 4951407828574235127L, 6198050452789369270L, 5776283192552877204L, -626480755095427154L, + -6637184445929957204L, 8370873622748562952L, -1705978583731280501L, -7898818752540221055L, + -2516210193198301541L, 8356900479849653862L, -4413748141896466000L, -6040072975510680789L, + 1451490609699316991L, -7948005844616396060L, 8567048088357095527L, -4375578310507393311L + }; + private static final long[] HASHES_OF_LOOPING_BYTES_WITH_SEED_42 = { + -7444071767201028348L, -8959994473701255385L, 7116559933691734543L, 6019482000716350659L, + -6625277557348586272L, -5507563483608914162L, 1540412690865189709L, 4522324563441226749L, + -7143238906056518746L, -7989831429045113014L, -7103973673268129917L, -2319060423616348937L, + -7576144055863289344L, -8903544572546912743L, 6376815151655939880L, 5913754614426879871L, + 6466567997237536608L, -869838547529805462L, -2416009472486582019L, -3059673981515537339L, + 4211239092494362041L, 1414635639471257331L, 166863084165354636L, -3761330575439628223L, + 3524931906845391329L, 6070229753198168844L, -3740381894759773016L, -1268276809699008557L, + 1518581707938531581L, 7988048690914090770L, -4510281763783422346L, -8988936099728967847L + }; + + @Test + public void testXxHashCorrectness() { + byte[] data = new byte[32]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + + ByteBuffer input = ByteBuffer.wrap(data, 0, i).order(ByteOrder.nativeOrder()); + assertEquals(HASHES_OF_LOOPING_BYTES_WITH_SEED_0[i], + LongHashFunction.xx(0).hashBytes(input)); + + assertEquals(HASHES_OF_LOOPING_BYTES_WITH_SEED_42[i], + LongHashFunction.xx(42).hashBytes(input)); + } + } + +} diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java index d09d007a20..5a9983dafc 100644 --- a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java +++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java @@ -70,6 +70,14 @@ public static OffsetIndex readOffsetIndex(InputStream from) throws IOException { return read(from, new OffsetIndex()); } + public static BloomFilterHeader readBloomFilterHeader(InputStream from) throws IOException { + return read(from, new BloomFilterHeader()); + } + + public static void writeBloomFilterHeader(BloomFilterHeader header, OutputStream out) throws IOException { + write(header, out); + } + public static void writePageHeader(PageHeader pageHeader, OutputStream to) throws IOException { write(pageHeader, to); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index 13ab80b01e..b16a8c4ffa 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -30,6 +30,7 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; @@ -47,6 +48,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax, boolean useRecordFilter, boolean useColumnIndexFilter, boolean usePageChecksumVerification, + boolean useBloomFilter, FilterCompat.Filter recordFilter, MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -56,8 +58,8 @@ private HadoopReadOptions(boolean useSignedStringMinMax, Configuration conf) { super( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, - usePageChecksumVerification, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, - properties + usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, codecFactory, allocator, + maxAllocationSize, properties ); this.conf = conf; } @@ -91,6 +93,7 @@ public Builder(Configuration conf) { useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true)); usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED, usePageChecksumVerification)); + useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true)); withCodecFactory(HadoopCodecs.newFactory(conf, 0)); withRecordFilter(getFilter(conf)); withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); @@ -104,7 +107,7 @@ public Builder(Configuration conf) { public ParquetReadOptions build() { return new HadoopReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter, + useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties, conf); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index 5e4bd09ca7..2fdca3be28 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -42,6 +42,7 @@ public class ParquetReadOptions { private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true; private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false; + private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true; private final boolean useSignedStringMinMax; private final boolean useStatsFilter; @@ -49,6 +50,7 @@ public class ParquetReadOptions { private final boolean useRecordFilter; private final boolean useColumnIndexFilter; private final boolean usePageChecksumVerification; + private final boolean useBloomFilter; private final FilterCompat.Filter recordFilter; private final ParquetMetadataConverter.MetadataFilter metadataFilter; private final CompressionCodecFactory codecFactory; @@ -62,6 +64,7 @@ public class ParquetReadOptions { boolean useRecordFilter, boolean useColumnIndexFilter, boolean usePageChecksumVerification, + boolean useBloomFilter, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -74,6 +77,7 @@ public class ParquetReadOptions { this.useRecordFilter = useRecordFilter; this.useColumnIndexFilter = useColumnIndexFilter; this.usePageChecksumVerification = usePageChecksumVerification; + this.useBloomFilter = useBloomFilter; this.recordFilter = recordFilter; this.metadataFilter = metadataFilter; this.codecFactory = codecFactory; @@ -102,6 +106,10 @@ public boolean useColumnIndexFilter() { return useColumnIndexFilter; } + public boolean useBloomFilter() { + return useBloomFilter; + } + public boolean usePageChecksumVerification() { return usePageChecksumVerification; } @@ -151,6 +159,7 @@ public static class Builder { protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT; protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT; + protected boolean useBloomFilter = BLOOM_FILTER_ENABLED_DEFAULT; protected FilterCompat.Filter recordFilter = null; protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; // the page size parameter isn't used when only using the codec factory to get decompressors @@ -218,6 +227,16 @@ public Builder usePageChecksumVerification() { return usePageChecksumVerification(true); } + public Builder useBloomFilter() { + this.useBloomFilter = true; + return this; + } + + public Builder useBloomFilter(boolean useBloomFilter) { + this.useDictionaryFilter = useBloomFilter; + return this; + } + public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) { this.recordFilter = rowGroupFilter; return this; @@ -282,7 +301,7 @@ public Builder copy(ParquetReadOptions options) { public ParquetReadOptions build() { return new ParquetReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter, + useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java new file mode 100644 index 0000000000..d98416445f --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.filter2.bloomfilterlevel; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; +import org.apache.parquet.filter2.predicate.UserDefinedPredicate; +import org.apache.parquet.hadoop.BloomFilterReader; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; + +import static org.apache.parquet.Preconditions.checkNotNull; + +public class BloomFilterImpl implements FilterPredicate.Visitor{ + private static final Logger LOG = LoggerFactory.getLogger(BloomFilterImpl.class); + private static final boolean BLOCK_MIGHT_MATCH = false; + private static final boolean BLOCK_CANNOT_MATCH = true; + + private final Map columns = new HashMap(); + + public static boolean canDrop(FilterPredicate pred, List columns, BloomFilterReader bloomFilterReader) { + checkNotNull(pred, "pred"); + checkNotNull(columns, "columns"); + return pred.accept(new BloomFilterImpl(columns, bloomFilterReader)); + } + + private BloomFilterImpl(List columnsList, BloomFilterReader bloomFilterReader) { + for (ColumnChunkMetaData chunk : columnsList) { + columns.put(chunk.getPath(), chunk); + } + + this.bloomFilterReader = bloomFilterReader; + } + + private BloomFilterReader bloomFilterReader; + + private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) { + return columns.get(columnPath); + } + + @Override + public > Boolean visit(Operators.Eq eq) { + T value = eq.getValue(); + + if (value == null) { + // the bloom filter bitset contains only non-null values so isn't helpful. this + // could check the column stats, but the StatisticsFilter is responsible + return BLOCK_MIGHT_MATCH; + } + + Operators.Column filterColumn = eq.getColumn(); + ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath()); + if (meta == null) { + // the column isn't in this file so all values are null, but the value + // must be non-null because of the above check. + return BLOCK_CANNOT_MATCH; + } + + try { + BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta); + if (bloomFilter != null && !bloomFilter.findHash(bloomFilter.hash(value))) { + return BLOCK_CANNOT_MATCH; + } + } catch (RuntimeException e) { + LOG.warn(e.getMessage()); + return BLOCK_MIGHT_MATCH; + } + + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Operators.NotEq notEq) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Operators.Lt lt) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Operators.LtEq ltEq) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Operators.Gt gt) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Operators.GtEq gtEq) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public Boolean visit(Operators.And and) { + return and.getLeft().accept(this) || and.getRight().accept(this); + } + + @Override + public Boolean visit(Operators.Or or) { + return or.getLeft().accept(this) && or.getRight().accept(this); + } + + @Override + public Boolean visit(Operators.Not not) { + throw new IllegalArgumentException( + "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not); + } + + private , U extends UserDefinedPredicate> Boolean visit(Operators.UserDefined ud, boolean inverted) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public , U extends UserDefinedPredicate> Boolean visit(Operators.UserDefined udp) { + return visit(udp, false); + } + + @Override + public , U extends UserDefinedPredicate> Boolean visit(Operators.LogicalNotUserDefined udp) { + return visit(udp.getUserDefined(), true); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java index e3cd9cbf8c..73ff9aae67 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,7 @@ import java.util.List; import java.util.Objects; +import org.apache.parquet.filter2.bloomfilterlevel.BloomFilterImpl; import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter; import org.apache.parquet.filter2.compat.FilterCompat.Visitor; @@ -33,6 +34,8 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Given a {@link Filter} applies it to a list of BlockMetaData (row groups) @@ -47,7 +50,8 @@ public class RowGroupFilter implements Visitor> { public enum FilterLevel { STATISTICS, - DICTIONARY + DICTIONARY, + BLOOMFILTER } /** @@ -103,6 +107,10 @@ public List visit(FilterCompat.FilterPredicateCompat filterPredic drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block)); } + if (!drop && levels.contains(FilterLevel.BLOOMFILTER)) { + drop = BloomFilterImpl.canDrop(filterPredicate, block.getColumns(), reader.getBloomFilterDataReader(block)); + } + if(!drop) { filteredBlocks.add(block); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index bfb4aa34d2..390846307d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -20,7 +20,6 @@ import static java.util.Optional.empty; -import static java.util.Optional.empty; import static java.util.Optional.of; import static org.apache.parquet.format.Util.readFileMetaData; import static org.apache.parquet.format.Util.writePageHeader; @@ -46,6 +45,11 @@ import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.format.BloomFilterAlgorithm; +import org.apache.parquet.format.BloomFilterCompression; +import org.apache.parquet.format.BloomFilterHash; +import org.apache.parquet.format.BloomFilterHeader; import org.apache.parquet.format.BsonType; import org.apache.parquet.format.CompressionCodec; import org.apache.parquet.format.DateType; @@ -61,10 +65,13 @@ import org.apache.parquet.format.NanoSeconds; import org.apache.parquet.format.NullType; import org.apache.parquet.format.PageEncodingStats; +import org.apache.parquet.format.SplitBlockAlgorithm; import org.apache.parquet.format.StringType; import org.apache.parquet.format.TimeType; import org.apache.parquet.format.TimeUnit; import org.apache.parquet.format.TimestampType; +import org.apache.parquet.format.Uncompressed; +import org.apache.parquet.format.XxHash; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.format.BoundaryOrder; import org.apache.parquet.format.ColumnChunk; @@ -473,6 +480,7 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List rowGrou columnMetaData.getTotalSize(), columnMetaData.getFirstDataPageOffset()); columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset(); + columnChunk.meta_data.setBloom_filter_offset(columnMetaData.getBloomFilterOffset()); if (!columnMetaData.getStatistics().isEmpty()) { columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength)); } @@ -1240,6 +1248,7 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws metaData.total_uncompressed_size); column.setColumnIndexReference(toColumnIndexReference(columnChunk)); column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); + column.setBloomFilterOffset(metaData.bloom_filter_offset); // TODO // index_page_offset // key_value_metadata @@ -1611,4 +1620,32 @@ public static org.apache.parquet.internal.column.columnindex.OffsetIndex fromPar } return builder.build(); } + + public static BloomFilterHeader toBloomFilterHeader( + org.apache.parquet.column.values.bloomfilter.BloomFilter bloomFilter) { + + BloomFilterAlgorithm algorithm = null; + BloomFilterHash hashStrategy = null; + BloomFilterCompression compression = null; + + if (bloomFilter.getAlgorithm() == BloomFilter.Algorithm.BLOCK) { + algorithm = BloomFilterAlgorithm.BLOCK(new SplitBlockAlgorithm()); + } + + if (bloomFilter.getHashStrategy() == BloomFilter.HashStrategy.XXH64) { + hashStrategy = BloomFilterHash.XXHASH(new XxHash()); + } + + if (bloomFilter.getCompression() == BloomFilter.Compression.UNCOMPRESSED) { + compression = BloomFilterCompression.UNCOMPRESSED(new Uncompressed()); + } + + if (algorithm != null && hashStrategy != null && compression != null) { + return new BloomFilterHeader(bloomFilter.getBitsetSize(), algorithm, hashStrategy, compression); + } else { + throw new IllegalArgumentException(String.format("Failed to build thrift structure for BloomFilterHeader," + + "algorithm=%s, hash=%s, compression=%s", + bloomFilter.getAlgorithm(), bloomFilter.getHashStrategy(), bloomFilter.getCompression())); + } + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java new file mode 100644 index 0000000000..b9d580ffa8 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Bloom filter reader that reads Bloom filter data from an open {@link ParquetFileReader}. + */ +public class BloomFilterReader { + private final ParquetFileReader reader; + private final Map columns; + private final Map cache = new HashMap<>(); + private Logger logger = LoggerFactory.getLogger(BloomFilterReader.class); + + public BloomFilterReader(ParquetFileReader fileReader, BlockMetaData block) { + this.reader = fileReader; + this.columns = new HashMap<>(); + for (ColumnChunkMetaData column : block.getColumns()) { + columns.put(column.getPath(), column); + } + } + + public BloomFilter readBloomFilter(ColumnChunkMetaData meta) { + if (cache.containsKey(meta.getPath())) { + return cache.get(meta.getPath()); + } + try { + if (!cache.containsKey(meta.getPath())) { + BloomFilter bloomFilter = reader.readBloomFilter(meta); + if (bloomFilter == null) { + return null; + } + + cache.put(meta.getPath(), bloomFilter); + } + return cache.get(meta.getPath()); + } catch (IOException e) { + logger.error("Failed to read Bloom filter data", e); + } + + return null; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index 72f26fc115..d2e4c966ac 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -37,6 +37,9 @@ import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.page.PageWriter; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; @@ -47,12 +50,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class ColumnChunkPageWriteStore implements PageWriteStore { +class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore { private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageWriteStore.class); private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - private static final class ColumnChunkPageWriter implements PageWriter { + private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter { private final ColumnDescriptor path; private final BytesCompressor compressor; @@ -71,6 +74,7 @@ private static final class ColumnChunkPageWriter implements PageWriter { private Set dlEncodings = new HashSet(); private List dataEncodings = new ArrayList(); + private BloomFilter bloomFilter; private ColumnIndexBuilder columnIndexBuilder; private OffsetIndexBuilder offsetIndexBuilder; private Statistics totalStatistics; @@ -249,6 +253,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { totalStatistics, columnIndexBuilder, offsetIndexBuilder, + bloomFilter, rlEncodings, dlEncodings, dataEncodings); @@ -289,6 +294,10 @@ public String memUsageString(String prefix) { return buf.memUsageString(prefix + " ColumnChunkPageWriter"); } + @Override + public void writeBloomFilter(BloomFilter bloomFilter) { + this.bloomFilter = bloomFilter; + } } private final Map writers = new HashMap(); @@ -313,6 +322,11 @@ public PageWriter getPageWriter(ColumnDescriptor path) { return writers.get(path); } + @Override + public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) { + return writers.get(path); + } + public void flushToFileWriter(ParquetFileWriter writer) throws IOException { for (ColumnDescriptor path : schema.getColumns()) { ColumnChunkPageWriter pageWriter = writers.get(path); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index bd5602558a..205509c616 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -28,6 +28,7 @@ import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext; @@ -64,6 +65,7 @@ class InternalParquetRecordWriter { private ColumnWriteStore columnStore; private ColumnChunkPageWriteStore pageStore; + private BloomFilterWriteStore bloomFilterWriteStore; private RecordConsumer recordConsumer; /** @@ -101,9 +103,12 @@ public ParquetMetadata getFooter() { } private void initStore() { - pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(), - props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled()); - columnStore = props.newColumnWriteStore(schema, pageStore); + ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor, + schema, props.getAllocator(), props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled()); + pageStore = columnChunkPageWriteStore; + bloomFilterWriteStore = columnChunkPageWriteStore; + + columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); this.recordConsumer = columnIO.getRecordWriter(columnStore); writeSupport.prepareForWrite(recordConsumer); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 0b1d7405f2..18fbf6d8c5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -19,6 +19,7 @@ package org.apache.parquet.hadoop; import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian; +import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.BLOOMFILTER; import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTIONARY; import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; @@ -69,9 +70,12 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.DictionaryPageReadStore; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.format.BloomFilterHeader; import org.apache.parquet.format.DataPageHeader; import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.DictionaryPageHeader; @@ -793,6 +797,10 @@ private List filterRowGroups(List blocks) throws I levels.add(DICTIONARY); } + if (options.useBloomFilter()) { + levels.add(BLOOMFILTER); + } + FilterCompat.Filter recordFilter = options.getRecordFilter(); if (recordFilter != null) { return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); @@ -1050,6 +1058,48 @@ private DictionaryPage readCompressedDictionary( converter.getEncoding(dictHeader.getEncoding())); } + public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) { + return new BloomFilterReader(this, block); + } + + /** + * Reads Bloom filter data for the given column chunk. + * + * @param meta a column's ColumnChunkMetaData to read the dictionary from + * @return an BloomFilter object. + * @throws IOException if there is an error while reading the Bloom filter. + */ + public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException { + long bloomFilterOffset = meta.getBloomFilterOffset(); + f.seek(bloomFilterOffset); + BloomFilterHeader bloomFilterHeader; + + // Read Bloom filter data header. + try { + bloomFilterHeader = Util.readBloomFilterHeader(f); + } catch (IOException e) { + LOG.warn("read no bloom filter"); + return null; + } + + int numBytes = bloomFilterHeader.getNumBytes(); + if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.UPPER_BOUND_BYTES) { + LOG.warn("the read bloom filter size is wrong, size is {}", bloomFilterHeader.getNumBytes()); + return null; + } + + if (!bloomFilterHeader.getHash().isSetXXHASH() || !bloomFilterHeader.getAlgorithm().isSetBLOCK() + || !bloomFilterHeader.getCompression().isSetUNCOMPRESSED()) { + LOG.warn("the read bloom filter is not supported yet, algorithm = {}, hash = {}, compression = {}", + bloomFilterHeader.getAlgorithm(), bloomFilterHeader.getHash(), bloomFilterHeader.getCompression()); + return null; + } + + byte[] bitset = new byte[numBytes]; + f.readFully(bitset); + return new BlockSplitBloomFilter(bitset); + } + /** * @param column * the column chunk which the column index is to be returned for diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index d1cdee7ffe..c6ee1e9967 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -50,6 +50,7 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.format.Util; @@ -110,6 +111,9 @@ public static enum Mode { private final List> columnIndexes = new ArrayList<>(); private final List> offsetIndexes = new ArrayList<>(); + // The Bloom filters + private final List> bloomFilters = new ArrayList<>(); + // row group data private BlockMetaData currentBlock; // appended to by endColumn @@ -117,6 +121,9 @@ public static enum Mode { private List currentColumnIndexes; private List currentOffsetIndexes; + // The Bloom filter for the actual block + private Map currentBloomFilters; + // row group data set at the start of a row group private long currentRecordCount; // set in startBlock @@ -355,6 +362,8 @@ public void startBlock(long recordCount) throws IOException { currentColumnIndexes = new ArrayList<>(); currentOffsetIndexes = new ArrayList<>(); + + currentBloomFilters = new HashMap<>(); } /** @@ -574,6 +583,16 @@ private void innerWriteDataPage( currentEncodings.add(valuesEncoding); } + /** + * Add a Bloom filter that will be written out. This is only used in unit test. + * + * @param column the column name + * @param bloomFilter the bloom filter of column values + */ + void addBloomFilter(String column, BloomFilter bloomFilter) { + currentBloomFilters.put(column , bloomFilter); + } + /** * Writes a single v2 data page * @param rowCount count of rows @@ -651,6 +670,7 @@ public void writeDataPageV2(int rowCount, int nullCount, int valueCount, * @param totalStats accumulated statistics for the column chunk * @param columnIndexBuilder the builder object for the column index * @param offsetIndexBuilder the builder object for the offset index + * @param bloomFilter the bloom filter for this column * @param rlEncodings the RL encodings used in this column chunk * @param dlEncodings the DL encodings used in this column chunk * @param dataEncodings the data encodings used in this column chunk @@ -666,6 +686,7 @@ void writeColumnChunk(ColumnDescriptor descriptor, Statistics totalStats, ColumnIndexBuilder columnIndexBuilder, OffsetIndexBuilder offsetIndexBuilder, + BloomFilter bloomFilter, Set rlEncodings, Set dlEncodings, List dataEncodings) throws IOException { @@ -675,6 +696,20 @@ void writeColumnChunk(ColumnDescriptor descriptor, if (dictionaryPage != null) { writeDictionaryPage(dictionaryPage); } + + if (bloomFilter != null) { + // write bloom filter if one of data pages is not dictionary encoded + boolean isWriteBloomFilter = false; + for (Encoding encoding : dataEncodings) { + if (encoding != Encoding.RLE_DICTIONARY) { + isWriteBloomFilter = true; + break; + } + } + if (isWriteBloomFilter) { + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + } + } LOG.debug("{}: write data pages", out.getPos()); long headersSize = bytes.size() - compressedTotalPageSize; this.uncompressedLength += uncompressedTotalPageSize + headersSize; @@ -740,8 +775,10 @@ public void endBlock() throws IOException { blocks.add(currentBlock); columnIndexes.add(currentColumnIndexes); offsetIndexes.add(currentOffsetIndexes); + bloomFilters.add(currentBloomFilters); currentColumnIndexes = null; currentOffsetIndexes = null; + currentBloomFilters = null; currentBlock = null; } @@ -923,6 +960,7 @@ public void end(Map extraMetaData) throws IOException { state = state.end(); serializeColumnIndexes(columnIndexes, blocks, out); serializeOffsetIndexes(offsetIndexes, blocks, out); + serializeBloomFilters(bloomFilters, blocks, out); LOG.debug("{}: end", out.getPos()); this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); serializeFooter(footer, out); @@ -979,6 +1017,30 @@ private static void serializeOffsetIndexes( } } + private static void serializeBloomFilters( + List> bloomFilters, + List blocks, + PositionOutputStream out) throws IOException { + LOG.debug("{}: bloom filters", out.getPos()); + for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) { + List columns = blocks.get(bIndex).getColumns(); + Map blockBloomFilters = bloomFilters.get(bIndex); + if (blockBloomFilters.isEmpty()) continue; + for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) { + ColumnChunkMetaData column = columns.get(cIndex); + BloomFilter bloomFilter = blockBloomFilters.get(column.getPath().toDotString()); + if (bloomFilter == null) { + continue; + } + + long offset = out.getPos(); + column.setBloomFilterOffset(offset); + Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out); + bloomFilter.writeTo(out); + } + } + } + private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException { long footerIndex = out.getPos(); ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index c3c52e3bb9..f46f18211a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -139,6 +139,11 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String PAGE_VERIFY_CHECKSUM_ENABLED = "parquet.page.verify-checksum.enabled"; + /** + * key to configure whether row group bloom filtering is enabled + */ + public static final String BLOOM_FILTERING_ENABLED = "parquet.filter.bloom.enabled"; + /** * key to turn on or off task side metadata loading (default true) * if true then metadata is read on the task side and some tasks may finish immediately. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 1528ffb09e..d925e1dc2e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,7 +22,12 @@ import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration; import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Objects; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -145,6 +150,9 @@ public static enum JobSummaryLevel { public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate"; public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; public static final String STATISTICS_TRUNCATE_LENGTH = "parquet.statistics.truncate.length"; + public static final String BLOOM_FILTER_COLUMN_NAMES = "parquet.bloom.filter.column.names"; + public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv"; + public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; @@ -212,6 +220,43 @@ public static boolean getEnableDictionary(JobContext jobContext) { return getEnableDictionary(getConfiguration(jobContext)); } + public static int getBloomFilterMaxBytes(Configuration conf) { + return conf.getInt(BLOOM_FILTER_MAX_BYTES, + ParquetProperties.DEFAULT_MAX_BLOOM_FILTER_BYTES); + } + + public static Set getBloomFilterColumns(Configuration conf) { + String columnNames = conf.get(BLOOM_FILTER_COLUMN_NAMES); + if (columnNames != null) { + return new HashSet<>(Arrays.asList(columnNames.split(","))); + } else { + return new HashSet<>(); + } + } + + public static Map getBloomFilterColumnExpectedNDVs(Configuration conf) { + Map kv = new HashMap<>(); + String columnNamesConf = conf.get(BLOOM_FILTER_COLUMN_NAMES); + String expectedNDVsConf = conf.get(BLOOM_FILTER_EXPECTED_NDV); + + if (columnNamesConf == null || expectedNDVsConf == null) { + return kv; + } + + String[] columnNames = columnNamesConf.split(","); + String[] expectedNDVs = expectedNDVsConf.split(","); + + if (columnNames.length == expectedNDVs.length) { + for (int i = 0; i < columnNames.length; i++) { + kv.put(columnNames[i], Long.parseLong(expectedNDVs[i])); + } + } else { + LOG.warn("Bloom filter column names are not match expected NDVs"); + } + + return kv; + } + public static int getBlockSize(JobContext jobContext) { return getBlockSize(getConfiguration(jobContext)); } @@ -435,6 +480,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)) .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf)) .withStatisticsTruncateLength(getStatisticsTruncateLength(conf)) + .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf)) + .withBloomFilterColumnToNDVMap(getBloomFilterColumnExpectedNDVs(conf)) .withPageRowCountLimit(getPageRowCountLimit(conf)) .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)); new ColumnConfigParser() diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index dbee2109b6..71fde69064 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -284,6 +284,16 @@ public Builder usePageChecksumVerification(boolean usePageChecksumVerificatio return this; } + public Builder useBloomFilter(boolean useBloomFilter) { + optionsBuilder.useBloomFilter(useBloomFilter); + return this; + } + + public Builder useBloomFilter() { + optionsBuilder.useBloomFilter(); + return this; + } + public Builder usePageChecksumVerification() { optionsBuilder.usePageChecksumVerification(); return this; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 8e698f535b..4637069c75 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -555,6 +557,34 @@ public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) { return self(); } + /** + * Specified the column and the NDV map for the bloom filter. + * + * @param columnNDVMap the column to NDV map + * + * @return this builder for method chaining. + */ + public SELF withBloomFilterColumnToNDVMap(Map columnNDVMap) { + if (columnNDVMap != null) { + encodingPropsBuilder.withBloomFilterColumnToNDVMap(columnNDVMap); + } + + return self(); + } + + /** + * Specified the column names for the bloom filter. + * + * @return this builder for method chaining. + */ + public SELF withBloomFilterColumnNames(String...columns) { + if (columns != null) { + encodingPropsBuilder.withBloomFilterColumnNames(Arrays.asList(columns)); + } + + return self(); + } + /** * Set a property that will be available to the read path. For writers that use a Hadoop * configuration, this is the recommended way to add configuration values. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java index e6aa1043b4..2c24356cb9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -173,6 +173,8 @@ protected static boolean positiveLongFitsInAnInt(long value) { private IndexReference columnIndexReference; private IndexReference offsetIndexReference; + private long bloomFilterOffset; + protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) { this(null, columnChunkProperties); } @@ -274,6 +276,23 @@ public void setOffsetIndexReference(IndexReference offsetIndexReference) { this.offsetIndexReference = offsetIndexReference; } + /** + * @param bloomFilterOffset + * the reference to the Bloom filter + */ + @Private + public void setBloomFilterOffset(long bloomFilterOffset) { + this.bloomFilterOffset = bloomFilterOffset; + } + + /** + * @return the offset to the Bloom filter + */ + @Private + public long getBloomFilterOffset() { + return bloomFilterOffset; + } + /** * @return all the encodings used in this column */ diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java new file mode 100644 index 0000000000..7246d3698f --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.recordlevel.PhoneBookWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.io.api.Binary; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.parquet.filter2.predicate.FilterApi.*; +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class TestBloomFiltering { + private static final Path FILE_V1 = createTempFile(); + private static final Path FILE_V2 = createTempFile(); + private static final Logger LOGGER = LoggerFactory.getLogger(TestBloomFiltering.class); + private static final Random RANDOM = new Random(42); + private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" }; + private static final List DATA = Collections.unmodifiableList(generateData(10000)); + + private final Path file; + public TestBloomFiltering(Path file) { + this.file = file; + } + + private static Path createTempFile() { + try { + return new Path(Files.createTempFile("test-bloom-filter_", ".parquet").toAbsolutePath().toString()); + } catch (IOException e) { + throw new AssertionError("Unable to create temporary file", e); + } + } + + @Parameterized.Parameters + public static Collection params() { + return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 }); + } + + private static List generateData(int rowCount) { + List users = new ArrayList<>(); + List names = generateNames(rowCount); + for (int i = 0; i < rowCount; ++i) { + users.add(new PhoneBookWriter.User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount))); + } + return users; + } + + private static List generateNames(int rowCount) { + List list = new ArrayList<>(); + + // Adding fix values for filtering + list.add("anderson"); + list.add("anderson"); + list.add("miller"); + list.add("miller"); + list.add("miller"); + list.add("thomas"); + list.add("thomas"); + list.add("williams"); + + int nullCount = rowCount / 100; + + String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz"; + int maxLength = 8; + for (int i = rowCount - list.size() - nullCount; i >= 0; --i) { + int l = RANDOM.nextInt(maxLength); + StringBuilder builder = new StringBuilder(l); + for (int j = 0; j < l; ++j) { + builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length()))); + } + list.add(builder.toString()); + } + list.sort((str1, str2) -> -str1.compareTo(str2)); + + // Adding nulls to random places + for (int i = 0; i < nullCount; ++i) { + list.add(RANDOM.nextInt(list.size()), null); + } + + return list; + } + + private static List generatePhoneNumbers() { + int length = RANDOM.nextInt(5) - 1; + if (length < 0) { + return null; + } + List phoneNumbers = new ArrayList<>(length); + for (int i = 0; i < length; ++i) { + // 6 digits numbers + long number = Math.abs(RANDOM.nextLong() % 900000) + 100000; + phoneNumbers.add(new PhoneBookWriter.PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)])); + } + return phoneNumbers; + } + + private static PhoneBookWriter.Location generateLocation(int id, int rowCount) { + if (RANDOM.nextDouble() < 0.01) { + return null; + } + + if (RANDOM.nextDouble() < 0.001) { + return new PhoneBookWriter.Location(99.9, 99.9); + } + + double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0); + double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0); + + return new PhoneBookWriter.Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon); + } + + private List readUsers(FilterPredicate filter, boolean useOtherFiltering, + boolean useBloomFilter) throws IOException { + return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file) + .withFilter(FilterCompat.get(filter)) + .useDictionaryFilter(useOtherFiltering) + .useStatsFilter(useOtherFiltering) + .useRecordFilter(useOtherFiltering) + .useBloomFilter(useBloomFilter) + .useColumnIndexFilter(useOtherFiltering)); + } + + // Assumes that both lists are in the same order + private static void assertContains(Stream expected, List actual) { + Iterator expIt = expected.iterator(); + if (!expIt.hasNext()) { + return; + } + PhoneBookWriter.User exp = expIt.next(); + for (PhoneBookWriter.User act : actual) { + if (act.equals(exp)) { + if (!expIt.hasNext()) { + break; + } + exp = expIt.next(); + } + } + assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext()); + } + + private void assertCorrectFiltering(Predicate expectedFilter, FilterPredicate actualFilter) + throws IOException { + // Check with only bloom filter based filtering + List result = readUsers(actualFilter, false, true); + + assertTrue("Bloom filtering should drop some row groups", result.size() < DATA.size()); + LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(), + 100 * result.size() / DATA.size()); + // Asserts that all the required records are in the result + assertContains(DATA.stream().filter(expectedFilter), result); + // Asserts that all the retrieved records are in the file (validating non-matching records) + assertContains(result.stream(), DATA); + + // Check with all the filtering filtering to ensure the result contains exactly the required values + result = readUsers(actualFilter, true, false); + assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result); + } + + + @BeforeClass + public static void createFile() throws IOException { + int pageSize = DATA.size() / 100; // Ensure that several pages will be created + int rowGroupSize = pageSize * 4; // Ensure that there are more row-groups created + Map column2NDVMap = new HashMap<>(); + column2NDVMap.put("location.lat", 10000L); + column2NDVMap.put("name", 10000L); + column2NDVMap.put("id", 10000L); + PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1) + .withWriteMode(OVERWRITE) + .withRowGroupSize(rowGroupSize) + .withPageSize(pageSize) + .withBloomFilterColumnToNDVMap(column2NDVMap) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0), + DATA); + PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2) + .withWriteMode(OVERWRITE) + .withRowGroupSize(rowGroupSize) + .withPageSize(pageSize) + .withBloomFilterColumnToNDVMap(column2NDVMap) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0), + DATA); + } + + @AfterClass + public static void deleteFile() throws IOException { + FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false); + FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false); + } + + + @Test + public void testSimpleFiltering() throws IOException { + assertCorrectFiltering( + record -> record.getId() == 1234L, + eq(longColumn("id"), 1234L)); + + assertCorrectFiltering( + record -> "miller".equals(record.getName()), + eq(binaryColumn("name"), Binary.fromString("miller"))); + } + + @Test + public void testNestedFiltering() throws IOException { + assertCorrectFiltering( + record -> { + PhoneBookWriter.Location location = record.getLocation(); + return location != null && location.getLat() != null && location.getLat() == 99.9; + }, + eq(doubleColumn("location.lat"), 99.9)); + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index 88c8d83ea1..0569c42459 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -260,6 +260,7 @@ public void testColumnOrderV1() throws IOException { same(OffsetIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no offset index any(), any(), + any(), any()); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 65397199a1..9a8e6b4875 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,7 +18,6 @@ */ package org.apache.parquet.hadoop; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -28,6 +27,8 @@ import org.apache.parquet.Version; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.junit.Assume; import org.junit.Rule; @@ -220,6 +221,39 @@ public void testWriteRead() throws Exception { PrintFooter.main(new String[] {path.toString()}); } + @Test + public void testBloomFilterWriteRead() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }"); + File testFile = temp.newFile(); + testFile.delete(); + Path path = new Path(testFile.toURI()); + Configuration configuration = new Configuration(); + configuration.set("parquet.bloom.filter.column.names", "foo"); + String[] colPath = {"foo"}; + ColumnDescriptor col = schema.getColumnDescription(colPath); + BinaryStatistics stats1 = new BinaryStatistics(); + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + w.startBlock(3); + w.startColumn(col, 5, CODEC); + w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + BloomFilter blockSplitBloomFilter = new BlockSplitBloomFilter(0); + blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))); + blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("world"))); + w.addBloomFilter("foo", blockSplitBloomFilter); + w.endBlock(); + w.end(new HashMap<>()); + ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); + ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, + Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath))); + BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0)); + BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0)); + assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello")))); + assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world")))); + } + @Test public void testWriteReadDataPageV2() throws Exception { File testFile = temp.newFile(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index c837d9a036..fde3fa064f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,9 +19,10 @@ package org.apache.parquet.hadoop; import static java.util.Arrays.asList; +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY; import static org.apache.parquet.column.Encoding.PLAIN; @@ -33,22 +34,21 @@ import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; -import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import java.io.File; import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import net.openhft.hashing.LongHashFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.parquet.hadoop.example.ExampleInputFormat; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.Types; @@ -59,7 +59,6 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.example.data.Group; -import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; @@ -204,4 +203,39 @@ public void testNullValuesWithPageRowLimit() throws IOException { assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount); } } + + @Test + public void testParquetFileWithBloomFilter() throws IOException { + MessageType schema = Types.buildMessage(). + required(BINARY).as(stringType()).named("name").named("msg"); + + String[] testNames = {"hello", "parquet", "bloom", "filter"}; + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + GroupFactory factory = new SimpleGroupFactory(schema); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withPageRowCountLimit(10) + .withConf(conf) + .withDictionaryEncoding(false) + .withBloomFilterColumnNames("name") + .build()) { + for (String testName : testNames) { + writer.write(factory.newGroup().append("name", testName)); + } + } + + ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration())); + BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); + BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData) + .readBloomFilter(blockMetaData.getColumns().get(0)); + + for (String name: testNames) { + assertTrue(bloomFilter.findHash( + LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer()))); + } + } } diff --git a/pom.xml b/pom.xml index c98378184e..0e443b7fb4 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ 27.0.1-jre 0.1.1 1.10.19 + 0.9 3.1.0 @@ -269,6 +270,7 @@ ${jackson.groupId}:* it.unimi.dsi:fastutil + net.openhft:* @@ -289,6 +291,10 @@ it.unimi.dsi ${shade.prefix}.it.unimi.dsi + + net.openhft + ${shade.prefix}.net.openhft +