Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-41: Add bloom filter #757

Merged
merged 18 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@
<artifactId>fastutil</artifactId>
<version>${fastutil.version}</version>
</dependency>

<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>junit-benchmarks</artifactId>
Expand Down Expand Up @@ -87,7 +91,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand All @@ -109,6 +112,33 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
gszadovszky marked this conversation as resolved.
Show resolved Hide resolved
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
<artifactSet>
<includes>
<include>net.openhft</include>
<include>it.unimi.dsi:fastutil</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>it.unimi.dsi</pattern>
<shadedPattern>${shade.prefix}.it.unimi.dsi</shadedPattern>
</relocation>
<relocation>
<pattern>net.openhft</pattern>
<shadedPattern>${shade.prefix}.net.openhft</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -18,20 +18,25 @@
*/
package org.apache.parquet.column;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
gszadovszky marked this conversation as resolved.
Show resolved Hide resolved
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.bytes.HeapByteBufferAllocator;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.util.Objects;

import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
Expand All @@ -56,6 +61,7 @@ public class ParquetProperties {
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjunjiedada
I'm curious about the maximum default value. Could you please explain why you choose 1 MB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume we have a row group with only one column of UUID (36 bytes), according to the formula and FPP = 0.01 we will need about 4MB. I expect that we will have more columns in the real scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjunjiedada Thanks for the clarification!


public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;

Expand Down Expand Up @@ -96,14 +102,21 @@ public static WriterVersion fromString(String name) {
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
private final int statisticsTruncateLength;

// The key-value pair represents the column name and its expected distinct number of values in a row group.
private final Map<String, Long> bloomFilterExpectedDistinctNumbers;
private final int maxBloomFilterBytes;
private final List<String> bloomFilterColumns;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final boolean enableByteStreamSplit;

private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
boolean pageWriteChecksumEnabled, int statisticsTruncateLength, boolean enableByteStreamSplit) {
boolean pageWriteChecksumEnabled, int statisticsTruncateLength, boolean enableByteStreamSplit,
Map<String, Long> bloomFilterExpectedDistinctNumber, List<String> bloomFilterColumns,
int maxBloomFilterBytes) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
Expand All @@ -114,10 +127,12 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
this.estimateNextSizeCheck = estimateNextSizeCheck;
this.allocator = allocator;

this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
this.statisticsTruncateLength = statisticsTruncateLength;
this.bloomFilterExpectedDistinctNumbers = bloomFilterExpectedDistinctNumber;
this.bloomFilterColumns = bloomFilterColumns;
this.maxBloomFilterBytes = maxBloomFilterBytes;
this.pageRowCountLimit = pageRowCountLimit;
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
this.enableByteStreamSplit = enableByteStreamSplit;
Expand Down Expand Up @@ -187,11 +202,24 @@ public ByteBufferAllocator getAllocator() {

public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(schema, pageStore, this);
case PARQUET_2_0:
return new ColumnWriteStoreV2(schema, pageStore, this);
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
}

public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore,
BloomFilterWriteStore bloomFilterWriteStore) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(schema, pageStore, this);
return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this);
case PARQUET_2_0:
return new ColumnWriteStoreV2(schema, pageStore, this);
return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this);
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
Expand Down Expand Up @@ -229,6 +257,22 @@ public boolean getPageWriteChecksumEnabled() {
return pageWriteChecksumEnabled;
}

public Map<String, Long> getBloomFilterColumnExpectedNDVs() {
return bloomFilterExpectedDistinctNumbers;
}

public Set<String> getBloomFilterColumns() {
if (bloomFilterColumns != null && bloomFilterColumns.size() > 0){
return new HashSet<>(bloomFilterColumns);
}

return bloomFilterExpectedDistinctNumbers.keySet();
gszadovszky marked this conversation as resolved.
Show resolved Hide resolved
}

public int getMaxBloomFilterBytes() {
return maxBloomFilterBytes;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -249,6 +293,9 @@ public static class Builder {
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
private Map<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
private List<String> bloomFilterColumns = new ArrayList<>();
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
Expand All @@ -268,6 +315,9 @@ private Builder(ParquetProperties toCopy) {
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
this.bloomFilterColumns = toCopy.bloomFilterColumns;
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
}

Expand Down Expand Up @@ -366,6 +416,41 @@ public Builder withStatisticsTruncateLength(int length) {
return this;
}

/**
* Set max Bloom filter bytes for related columns.
*
* @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column.
* @return this builder for method chaining
*/
public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) {
this.maxBloomFilterBytes = maxBloomFilterBytes;
return this;
}

/**
* Set Bloom filter column names and expected NDVs.
*
* @param columnToNDVMap the columns which has bloom filter enabled.
*
* @return this builder for method chaining
*/
public Builder withBloomFilterColumnToNDVMap(Map<String, Long> columnToNDVMap) {
this.bloomFilterColumnExpectedNDVs = columnToNDVMap;
return this;
}

/**
* Set Bloom filter column names.
*
* @param columns the columns which has bloom filter enabled.
*
* @return this builder for method chaining
*/
public Builder withBloomFilterColumnNames(List<String> columns) {
this.bloomFilterColumns = columns;
return this;
}

public Builder withPageRowCountLimit(int rowCount) {
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
pageRowCountLimit = rowCount;
Expand All @@ -382,7 +467,8 @@ public ParquetProperties build() {
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength, enableByteStreamSplit);
pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength, enableByteStreamSplit,
bloomFilterColumnExpectedNDVs, bloomFilterColumns, maxBloomFilterBytes);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

/**
Expand Down Expand Up @@ -74,7 +76,7 @@ private interface ColumnWriterProvider {
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterBase column = columns.get(path);
if (column == null) {
column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props);
columns.put(path, column);
}
return column;
Expand All @@ -91,7 +93,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, createColumnWriter(path, pageWriter, props));
mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
}
this.columns = unmodifiableMap(mcolumns);

Expand All @@ -105,7 +107,38 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
};
}

abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);
// The Bloom filter is written to a specified bitset instead of pages, so it needs a separate write store abstract.
ColumnWriteStoreBase(
MessageType schema,
PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
this.props = props;
this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
if (props.getBloomFilterColumns() != null && props.getBloomFilterColumns().size() > 0) {
BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props));
} else {
mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
}
}
this.columns = unmodifiableMap(mcolumns);

this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();

columnWriterProvider = new ColumnWriterProvider() {
@Override
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
return columns.get(path);
}
};
}

abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props);

@Override
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {
Expand All @@ -36,8 +38,15 @@ public ColumnWriteStoreV1(final PageWriteStore pageWriteStore,
super(pageWriteStore, props);
}

public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
super(schema, pageWriteStore, bloomFilterWriteStore, props);
}

@Override
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
return new ColumnWriterV1(path, pageWriter, props);
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
return new ColumnWriterV1(path, pageWriter, bloomFilterWriter, props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
Expand All @@ -30,8 +32,15 @@ public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, Par
super(schema, pageWriteStore, props);
}

public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
super(schema, pageWriteStore, bloomFilterWriteStore, props);
}

@Override
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
return new ColumnWriterV2(path, pageWriter, props);
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
return new ColumnWriterV2(path, pageWriter, bloomFilterWriter, props);
}
}
Loading