Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-41: Add bloom filter #757

Merged
merged 18 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<artifactId>fastutil</artifactId>
<version>${fastutil.version}</version>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>${net.openhft.version}</version>
</dependency>

<dependency>
<groupId>com.carrotsearch</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
*/
package org.apache.parquet.column;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.bytes.HeapByteBufferAllocator;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.util.Objects;

import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
Expand All @@ -56,6 +60,7 @@ public class ParquetProperties {
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjunjiedada
I'm curious about the maximum default value. Could you please explain why you choose 1 MB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume we have a row group with only one column of UUID (36 bytes), according to the formula and FPP = 0.01 we will need about 4MB. I expect that we will have more columns in the real scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjunjiedada Thanks for the clarification!


public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;

Expand Down Expand Up @@ -96,6 +101,11 @@ public static WriterVersion fromString(String name) {
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
private final int statisticsTruncateLength;

// The key-value pair represents the column name and its expected distinct number of values in a row group.
private final Map<String, Long> bloomFilterExpectedDistinctNumbers;
private final int maxBloomFilterBytes;
private final List<String> bloomFilterColumns;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final boolean enableByteStreamSplit;
Expand All @@ -115,6 +125,9 @@ private ParquetProperties(Builder builder) {
this.valuesWriterFactory = builder.valuesWriterFactory;
this.columnIndexTruncateLength = builder.columnIndexTruncateLength;
this.statisticsTruncateLength = builder.statisticsTruncateLength;
this.bloomFilterExpectedDistinctNumbers = builder.bloomFilterColumnExpectedNDVs;
this.bloomFilterColumns = builder.bloomFilterColumns;
this.maxBloomFilterBytes = builder.maxBloomFilterBytes;
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.enableByteStreamSplit = builder.enableByteStreamSplit;
Expand Down Expand Up @@ -189,11 +202,24 @@ public ByteBufferAllocator getAllocator() {

public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(schema, pageStore, this);
case PARQUET_2_0:
return new ColumnWriteStoreV2(schema, pageStore, this);
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
}

public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore,
BloomFilterWriteStore bloomFilterWriteStore) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(schema, pageStore, this);
return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this);
case PARQUET_2_0:
return new ColumnWriteStoreV2(schema, pageStore, this);
return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this);
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
Expand Down Expand Up @@ -231,6 +257,22 @@ public boolean getPageWriteChecksumEnabled() {
return pageWriteChecksumEnabled;
}

public Map<String, Long> getBloomFilterColumnExpectedNDVs() {
return bloomFilterExpectedDistinctNumbers;
}

public Set<String> getBloomFilterColumns() {
if (bloomFilterColumns != null && bloomFilterColumns.size() > 0){
return new HashSet<>(bloomFilterColumns);
}

return bloomFilterExpectedDistinctNumbers.keySet();
gszadovszky marked this conversation as resolved.
Show resolved Hide resolved
}

public int getMaxBloomFilterBytes() {
return maxBloomFilterBytes;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -250,6 +292,9 @@ public String toString() {
+ "Max row count for page size check is: " + getMaxRowCountForPageSizeCheck() + '\n'
+ "Truncate length for column indexes is: " + getColumnIndexTruncateLength() + '\n'
+ "Truncate length for statistics min/max is: " + getStatisticsTruncateLength() + '\n'
+ "Bloom filter enabled column names are: " + getBloomFilterColumns() + '\n'
+ "Max Bloom filter size for a column is " + getMaxBloomFilterBytes() + '\n'
+ "Bloom filter enabled column expected number of distinct values are: " + getBloomFilterColumnExpectedNDVs().values() + '\n'
+ "Page row count limit to " + getPageRowCountLimit() + '\n'
+ "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off");
}
Expand All @@ -266,6 +311,9 @@ public static class Builder {
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
private Map<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
private List<String> bloomFilterColumns = new ArrayList<>();
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
Expand All @@ -286,6 +334,9 @@ private Builder(ParquetProperties toCopy) {
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
this.bloomFilterColumns = toCopy.bloomFilterColumns;
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
}

Expand Down Expand Up @@ -396,6 +447,41 @@ public Builder withStatisticsTruncateLength(int length) {
return this;
}

/**
* Set max Bloom filter bytes for related columns.
*
* @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column.
* @return this builder for method chaining
*/
public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) {
this.maxBloomFilterBytes = maxBloomFilterBytes;
return this;
}

/**
* Set Bloom filter column names and expected NDVs.
*
* @param columnToNDVMap the columns which has bloom filter enabled.
*
* @return this builder for method chaining
*/
public Builder withBloomFilterColumnToNDVMap(Map<String, Long> columnToNDVMap) {
this.bloomFilterColumnExpectedNDVs = columnToNDVMap;
return this;
}

/**
* Set Bloom filter column names.
*
* @param columns the columns which has bloom filter enabled.
*
* @return this builder for method chaining
*/
public Builder withBloomFilterColumnNames(List<String> columns) {
this.bloomFilterColumns = columns;
return this;
}

public Builder withPageRowCountLimit(int rowCount) {
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
pageRowCountLimit = rowCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

/**
Expand Down Expand Up @@ -74,7 +76,7 @@ private interface ColumnWriterProvider {
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterBase column = columns.get(path);
if (column == null) {
column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props);
columns.put(path, column);
}
return column;
Expand All @@ -91,7 +93,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, createColumnWriter(path, pageWriter, props));
mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
}
this.columns = unmodifiableMap(mcolumns);

Expand All @@ -105,7 +107,38 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
};
}

abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);
// The Bloom filter is written to a specified bitset instead of pages, so it needs a separate write store abstract.
ColumnWriteStoreBase(
MessageType schema,
PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
this.props = props;
this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
if (props.getBloomFilterColumns() != null && props.getBloomFilterColumns().size() > 0) {
BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props));
} else {
mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
}
}
this.columns = unmodifiableMap(mcolumns);

this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();

columnWriterProvider = new ColumnWriterProvider() {
@Override
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
return columns.get(path);
}
};
}

abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props);

@Override
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {
Expand All @@ -36,8 +38,15 @@ public ColumnWriteStoreV1(final PageWriteStore pageWriteStore,
super(pageWriteStore, props);
}

public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
super(schema, pageWriteStore, bloomFilterWriteStore, props);
}

@Override
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
return new ColumnWriterV1(path, pageWriter, props);
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
return new ColumnWriterV1(path, pageWriter, bloomFilterWriter, props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
Expand All @@ -30,8 +32,15 @@ public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, Par
super(schema, pageWriteStore, props);
}

public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
super(schema, pageWriteStore, bloomFilterWriteStore, props);
}

@Override
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
return new ColumnWriterV2(path, pageWriter, props);
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
return new ColumnWriterV2(path, pageWriter, bloomFilterWriter, props);
}
}
Loading