Skip to content

Commit

Permalink
PARQUET-41: Add bloom filter (apache#757)
Browse files Browse the repository at this point in the history
* PARQUET-1328: Add Bloom filter reader and writer (apache#587)
* PARQUET-1516: Store Bloom filters near to footer (apache#608)
* PARQUET-1391: Integrate Bloom filter logic (apache#619)
* PARQUET-1660: align Bloom filter implementation with format (apache#686)
  • Loading branch information
chenjunjiedada authored and shangxinli committed Mar 1, 2020
1 parent fdc1b7b commit 3c77510
Show file tree
Hide file tree
Showing 36 changed files with 2,094 additions and 62 deletions.
5 changes: 5 additions & 0 deletions parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<artifactId>fastutil</artifactId>
<version>${fastutil.version}</version>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>${net.openhft.version}</version>
</dependency>

<dependency>
<groupId>com.carrotsearch</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
*/
package org.apache.parquet.column;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.bytes.HeapByteBufferAllocator;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.util.Objects;

import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
Expand All @@ -56,6 +60,7 @@ public class ParquetProperties {
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;

public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;

Expand Down Expand Up @@ -96,6 +101,11 @@ public static WriterVersion fromString(String name) {
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
private final int statisticsTruncateLength;

// The key-value pair represents the column name and its expected distinct number of values in a row group.
private final Map<String, Long> bloomFilterExpectedDistinctNumbers;
private final int maxBloomFilterBytes;
private final List<String> bloomFilterColumns;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final boolean enableByteStreamSplit;
Expand All @@ -115,6 +125,9 @@ private ParquetProperties(Builder builder) {
this.valuesWriterFactory = builder.valuesWriterFactory;
this.columnIndexTruncateLength = builder.columnIndexTruncateLength;
this.statisticsTruncateLength = builder.statisticsTruncateLength;
this.bloomFilterExpectedDistinctNumbers = builder.bloomFilterColumnExpectedNDVs;
this.bloomFilterColumns = builder.bloomFilterColumns;
this.maxBloomFilterBytes = builder.maxBloomFilterBytes;
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.enableByteStreamSplit = builder.enableByteStreamSplit;
Expand Down Expand Up @@ -189,11 +202,24 @@ public ByteBufferAllocator getAllocator() {

public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(schema, pageStore, this);
case PARQUET_2_0:
return new ColumnWriteStoreV2(schema, pageStore, this);
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
}

public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore,
BloomFilterWriteStore bloomFilterWriteStore) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(schema, pageStore, this);
return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this);
case PARQUET_2_0:
return new ColumnWriteStoreV2(schema, pageStore, this);
return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this);
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
Expand Down Expand Up @@ -231,6 +257,22 @@ public boolean getPageWriteChecksumEnabled() {
return pageWriteChecksumEnabled;
}

public Map<String, Long> getBloomFilterColumnExpectedNDVs() {
return bloomFilterExpectedDistinctNumbers;
}

public Set<String> getBloomFilterColumns() {
if (bloomFilterColumns != null && bloomFilterColumns.size() > 0){
return new HashSet<>(bloomFilterColumns);
}

return bloomFilterExpectedDistinctNumbers.keySet();
}

public int getMaxBloomFilterBytes() {
return maxBloomFilterBytes;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -250,6 +292,9 @@ public String toString() {
+ "Max row count for page size check is: " + getMaxRowCountForPageSizeCheck() + '\n'
+ "Truncate length for column indexes is: " + getColumnIndexTruncateLength() + '\n'
+ "Truncate length for statistics min/max is: " + getStatisticsTruncateLength() + '\n'
+ "Bloom filter enabled column names are: " + getBloomFilterColumns() + '\n'
+ "Max Bloom filter size for a column is " + getMaxBloomFilterBytes() + '\n'
+ "Bloom filter enabled column expected number of distinct values are: " + getBloomFilterColumnExpectedNDVs().values() + '\n'
+ "Page row count limit to " + getPageRowCountLimit() + '\n'
+ "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off");
}
Expand All @@ -266,6 +311,9 @@ public static class Builder {
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
private Map<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
private List<String> bloomFilterColumns = new ArrayList<>();
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
Expand All @@ -286,6 +334,9 @@ private Builder(ParquetProperties toCopy) {
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
this.bloomFilterColumns = toCopy.bloomFilterColumns;
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
}

Expand Down Expand Up @@ -396,6 +447,41 @@ public Builder withStatisticsTruncateLength(int length) {
return this;
}

/**
* Set max Bloom filter bytes for related columns.
*
* @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column.
* @return this builder for method chaining
*/
public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) {
this.maxBloomFilterBytes = maxBloomFilterBytes;
return this;
}

/**
* Set Bloom filter column names and expected NDVs.
*
* @param columnToNDVMap the columns which has bloom filter enabled.
*
* @return this builder for method chaining
*/
public Builder withBloomFilterColumnToNDVMap(Map<String, Long> columnToNDVMap) {
this.bloomFilterColumnExpectedNDVs = columnToNDVMap;
return this;
}

/**
* Set Bloom filter column names.
*
* @param columns the columns which has bloom filter enabled.
*
* @return this builder for method chaining
*/
public Builder withBloomFilterColumnNames(List<String> columns) {
this.bloomFilterColumns = columns;
return this;
}

public Builder withPageRowCountLimit(int rowCount) {
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
pageRowCountLimit = rowCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

/**
Expand Down Expand Up @@ -74,7 +76,7 @@ private interface ColumnWriterProvider {
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterBase column = columns.get(path);
if (column == null) {
column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props);
columns.put(path, column);
}
return column;
Expand All @@ -91,7 +93,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, createColumnWriter(path, pageWriter, props));
mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
}
this.columns = unmodifiableMap(mcolumns);

Expand All @@ -105,7 +107,38 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
};
}

abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);
// The Bloom filter is written to a specified bitset instead of pages, so it needs a separate write store abstract.
ColumnWriteStoreBase(
MessageType schema,
PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
this.props = props;
this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
if (props.getBloomFilterColumns() != null && props.getBloomFilterColumns().size() > 0) {
BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props));
} else {
mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
}
}
this.columns = unmodifiableMap(mcolumns);

this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();

columnWriterProvider = new ColumnWriterProvider() {
@Override
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
return columns.get(path);
}
};
}

abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props);

@Override
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {
Expand All @@ -36,8 +38,15 @@ public ColumnWriteStoreV1(final PageWriteStore pageWriteStore,
super(pageWriteStore, props);
}

public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
super(schema, pageWriteStore, bloomFilterWriteStore, props);
}

@Override
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
return new ColumnWriterV1(path, pageWriter, props);
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
return new ColumnWriterV1(path, pageWriter, bloomFilterWriter, props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
Expand All @@ -30,8 +32,15 @@ public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, Par
super(schema, pageWriteStore, props);
}

public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
super(schema, pageWriteStore, bloomFilterWriteStore, props);
}

@Override
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
return new ColumnWriterV2(path, pageWriter, props);
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
return new ColumnWriterV2(path, pageWriter, bloomFilterWriter, props);
}
}
Loading

0 comments on commit 3c77510

Please sign in to comment.