Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-1516: Store Bloom filters near to footer #608

Merged
merged 3 commits into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1192,12 +1192,12 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
messageType.getType(path.toArray()).asPrimitiveType()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.bloom_filter_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
column.setColumnIndexReference(toColumnIndexReference(columnChunk));
column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
column.setBloomFilterOffset(metaData.bloom_filter_offset);
// TODO
// index_page_offset
// key_value_metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
Expand All @@ -50,7 +52,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {

private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

private static final class ColumnChunkPageWriter implements PageWriter {
private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter {

private final ColumnDescriptor path;
private final BytesCompressor compressor;
Expand All @@ -69,6 +71,7 @@ private static final class ColumnChunkPageWriter implements PageWriter {
private Set<Encoding> dlEncodings = new HashSet<Encoding>();
private List<Encoding> dataEncodings = new ArrayList<Encoding>();

private BloomFilter bloomFilter;
private ColumnIndexBuilder columnIndexBuilder;
private OffsetIndexBuilder offsetIndexBuilder;
private Statistics totalStatistics;
Expand Down Expand Up @@ -227,6 +230,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
totalStatistics,
columnIndexBuilder,
offsetIndexBuilder,
bloomFilter,
rlEncodings,
dlEncodings,
dataEncodings);
Expand Down Expand Up @@ -267,6 +271,10 @@ public String memUsageString(String prefix) {
return buf.memUsageString(prefix + " ColumnChunkPageWriter");
}

@Override
public void writeBloomFilter(BloomFilter bloomFilter) {
this.bloomFilter = bloomFilter;
}
}

private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,19 @@ public static enum Mode {
private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();

// The Bloom filters
private final List<List<BloomFilter>> bloomFilters = new ArrayList<>();

// row group data
private BlockMetaData currentBlock; // appended to by endColumn

// The column/offset indexes for the actual block
private List<ColumnIndex> currentColumnIndexes;
private List<OffsetIndex> currentOffsetIndexes;

// The Bloom filter for the actual block
private List<BloomFilter> currentBloomFilters;

// row group data set at the start of a row group
private long currentRecordCount; // set in startBlock

Expand All @@ -151,7 +157,6 @@ public static enum Mode {
private long currentChunkValueCount; // set in startColumn
private long currentChunkFirstDataPage; // set in startColumn (out.pos())
private long currentChunkDictionaryPageOffset; // set in writeDictionaryPage
private long currentChunkBloomFilterDataOffset; // set in writeBloomData

// set when end is called
private ParquetMetadata footer = null;
Expand Down Expand Up @@ -354,6 +359,8 @@ public void startBlock(long recordCount) throws IOException {

currentColumnIndexes = new ArrayList<>();
currentOffsetIndexes = new ArrayList<>();

currentBloomFilters = new ArrayList<>();
}

/**
Expand Down Expand Up @@ -410,16 +417,6 @@ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOExceptio
currentEncodings.add(dictionaryPage.getEncoding());
}

/**
* Write a Bloom filter
* @param bloomFilter the bloom filter of column values
* @throws IOException if there is an error while writing
*/
public void writeBloomFilter(BloomFilter bloomFilter) throws IOException {
state = state.write();
currentChunkBloomFilterDataOffset = out.getPos();
bloomFilter.writeTo(out);
}

/**
* writes a single page
Expand Down Expand Up @@ -558,6 +555,14 @@ private void innerWriteDataPage(
currentEncodings.add(valuesEncoding);
}

/**
* Write a Bloom filter
* @param bloomFilter the bloom filter of column values
*/
public void writeBloomFilter(BloomFilter bloomFilter) {
gszadovszky marked this conversation as resolved.
Show resolved Hide resolved
currentBloomFilters.add(bloomFilter);
}

/**
* Writes a column chunk at once
* @param descriptor the descriptor of the column
Expand All @@ -570,6 +575,7 @@ private void innerWriteDataPage(
* @param totalStats accumulated statistics for the column chunk
* @param columnIndexBuilder the builder object for the column index
* @param offsetIndexBuilder the builder object for the offset index
* @param bloomFilter the bloom filter for this column
* @param rlEncodings the RL encodings used in this column chunk
* @param dlEncodings the DL encodings used in this column chunk
* @param dataEncodings the data encodings used in this column chunk
Expand All @@ -585,14 +591,18 @@ void writeColumnChunk(ColumnDescriptor descriptor,
Statistics<?> totalStats,
ColumnIndexBuilder columnIndexBuilder,
OffsetIndexBuilder offsetIndexBuilder,
BloomFilter bloomFilter,
Set<Encoding> rlEncodings,
Set<Encoding> dlEncodings,
List<Encoding> dataEncodings) throws IOException {
startColumn(descriptor, valueCount, compressionCodecName);

state = state.write();

if (dictionaryPage != null) {
writeDictionaryPage(dictionaryPage);
} else if (bloomFilter != null) {
currentBloomFilters.add(bloomFilter);
}
LOG.debug("{}: write data pages", out.getPos());
long headersSize = bytes.size() - compressedTotalPageSize;
Expand Down Expand Up @@ -638,7 +648,6 @@ public void endColumn() throws IOException {
currentStatistics,
currentChunkFirstDataPage,
currentChunkDictionaryPageOffset,
currentChunkBloomFilterDataOffset,
currentChunkValueCount,
compressedLength,
uncompressedLength));
Expand All @@ -660,8 +669,10 @@ public void endBlock() throws IOException {
blocks.add(currentBlock);
columnIndexes.add(currentColumnIndexes);
offsetIndexes.add(currentOffsetIndexes);
bloomFilters.add(currentBloomFilters);
currentColumnIndexes = null;
currentOffsetIndexes = null;
currentBloomFilters = null;
currentBlock = null;
}

Expand Down Expand Up @@ -898,7 +909,6 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
chunk.getStatistics(),
newChunkStart,
newChunkStart,
chunk.getBloomFilterOffset(),
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
Expand Down Expand Up @@ -958,6 +968,7 @@ public void end(Map<String, String> extraMetaData) throws IOException {
state = state.end();
serializeColumnIndexes(columnIndexes, blocks, out);
serializeOffsetIndexes(offsetIndexes, blocks, out);
serializeBloomFilters(bloomFilters, blocks, out);
LOG.debug("{}: end", out.getPos());
this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer, out);
Expand Down Expand Up @@ -1007,6 +1018,28 @@ private static void serializeOffsetIndexes(
}
}

private static void serializeBloomFilters(
List<List<BloomFilter>> bloomFilters,
List<BlockMetaData> blocks,
PositionOutputStream out) throws IOException {
LOG.debug("{}: bloom filters", out.getPos());
for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
List<BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
if (blockBloomFilters.isEmpty()) continue;
for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
BloomFilter bloomFilter = blockBloomFilters.get(cIndex);
if (bloomFilter == null) {
continue;
}
ColumnChunkMetaData column = columns.get(cIndex);
long offset = out.getPos();
column.setBloomFilterOffset(offset);
bloomFilter.writeTo(out);
}
}
}

private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
long footerIndex = out.getPos();
org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
Expand Down
Loading