Skip to content

Commit

Permalink
PARQUET-1516: Store Bloom filters near to footer (#608)
Browse files Browse the repository at this point in the history
* PARQUET-1516: Store Bloom filters near to footer
  • Loading branch information
chenjunjiedada authored and gszadovszky committed Feb 12, 2019
1 parent d473d17 commit 96c2fef
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1192,12 +1192,12 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
messageType.getType(path.toArray()).asPrimitiveType()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.bloom_filter_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
column.setColumnIndexReference(toColumnIndexReference(columnChunk));
column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
column.setBloomFilterOffset(metaData.bloom_filter_offset);
// TODO
// index_page_offset
// key_value_metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
Expand All @@ -50,7 +52,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {

private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

private static final class ColumnChunkPageWriter implements PageWriter {
private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter {

private final ColumnDescriptor path;
private final BytesCompressor compressor;
Expand All @@ -69,6 +71,7 @@ private static final class ColumnChunkPageWriter implements PageWriter {
private Set<Encoding> dlEncodings = new HashSet<Encoding>();
private List<Encoding> dataEncodings = new ArrayList<Encoding>();

private BloomFilter bloomFilter;
private ColumnIndexBuilder columnIndexBuilder;
private OffsetIndexBuilder offsetIndexBuilder;
private Statistics totalStatistics;
Expand Down Expand Up @@ -227,6 +230,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
totalStatistics,
columnIndexBuilder,
offsetIndexBuilder,
bloomFilter,
rlEncodings,
dlEncodings,
dataEncodings);
Expand Down Expand Up @@ -267,6 +271,10 @@ public String memUsageString(String prefix) {
return buf.memUsageString(prefix + " ColumnChunkPageWriter");
}

@Override
public void writeBloomFilter(BloomFilter bloomFilter) {
this.bloomFilter = bloomFilter;
}
}

private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,19 @@ public static enum Mode {
private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();

// The Bloom filters
private final List<List<BloomFilter>> bloomFilters = new ArrayList<>();

// row group data
private BlockMetaData currentBlock; // appended to by endColumn

// The column/offset indexes for the actual block
private List<ColumnIndex> currentColumnIndexes;
private List<OffsetIndex> currentOffsetIndexes;

// The Bloom filter for the actual block
private List<BloomFilter> currentBloomFilters;

// row group data set at the start of a row group
private long currentRecordCount; // set in startBlock

Expand All @@ -151,7 +157,6 @@ public static enum Mode {
private long currentChunkValueCount; // set in startColumn
private long currentChunkFirstDataPage; // set in startColumn (out.pos())
private long currentChunkDictionaryPageOffset; // set in writeDictionaryPage
private long currentChunkBloomFilterDataOffset; // set in writeBloomData

// set when end is called
private ParquetMetadata footer = null;
Expand Down Expand Up @@ -354,6 +359,8 @@ public void startBlock(long recordCount) throws IOException {

currentColumnIndexes = new ArrayList<>();
currentOffsetIndexes = new ArrayList<>();

currentBloomFilters = new ArrayList<>();
}

/**
Expand Down Expand Up @@ -410,16 +417,6 @@ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOExceptio
currentEncodings.add(dictionaryPage.getEncoding());
}

/**
* Write a Bloom filter
* @param bloomFilter the bloom filter of column values
* @throws IOException if there is an error while writing
*/
public void writeBloomFilter(BloomFilter bloomFilter) throws IOException {
state = state.write();
currentChunkBloomFilterDataOffset = out.getPos();
bloomFilter.writeTo(out);
}

/**
* writes a single page
Expand Down Expand Up @@ -558,6 +555,14 @@ private void innerWriteDataPage(
currentEncodings.add(valuesEncoding);
}

/**
* Write a Bloom filter
* @param bloomFilter the bloom filter of column values
*/
void writeBloomFilter(BloomFilter bloomFilter) {
currentBloomFilters.add(bloomFilter);
}

/**
* Writes a column chunk at once
* @param descriptor the descriptor of the column
Expand All @@ -570,6 +575,7 @@ private void innerWriteDataPage(
* @param totalStats accumulated statistics for the column chunk
* @param columnIndexBuilder the builder object for the column index
* @param offsetIndexBuilder the builder object for the offset index
* @param bloomFilter the bloom filter for this column
* @param rlEncodings the RL encodings used in this column chunk
* @param dlEncodings the DL encodings used in this column chunk
* @param dataEncodings the data encodings used in this column chunk
Expand All @@ -585,14 +591,18 @@ void writeColumnChunk(ColumnDescriptor descriptor,
Statistics<?> totalStats,
ColumnIndexBuilder columnIndexBuilder,
OffsetIndexBuilder offsetIndexBuilder,
BloomFilter bloomFilter,
Set<Encoding> rlEncodings,
Set<Encoding> dlEncodings,
List<Encoding> dataEncodings) throws IOException {
startColumn(descriptor, valueCount, compressionCodecName);

state = state.write();

if (dictionaryPage != null) {
writeDictionaryPage(dictionaryPage);
} else if (bloomFilter != null) {
currentBloomFilters.add(bloomFilter);
}
LOG.debug("{}: write data pages", out.getPos());
long headersSize = bytes.size() - compressedTotalPageSize;
Expand Down Expand Up @@ -638,7 +648,6 @@ public void endColumn() throws IOException {
currentStatistics,
currentChunkFirstDataPage,
currentChunkDictionaryPageOffset,
currentChunkBloomFilterDataOffset,
currentChunkValueCount,
compressedLength,
uncompressedLength));
Expand All @@ -660,8 +669,10 @@ public void endBlock() throws IOException {
blocks.add(currentBlock);
columnIndexes.add(currentColumnIndexes);
offsetIndexes.add(currentOffsetIndexes);
bloomFilters.add(currentBloomFilters);
currentColumnIndexes = null;
currentOffsetIndexes = null;
currentBloomFilters = null;
currentBlock = null;
}

Expand Down Expand Up @@ -898,7 +909,6 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
chunk.getStatistics(),
newChunkStart,
newChunkStart,
chunk.getBloomFilterOffset(),
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
Expand Down Expand Up @@ -958,6 +968,7 @@ public void end(Map<String, String> extraMetaData) throws IOException {
state = state.end();
serializeColumnIndexes(columnIndexes, blocks, out);
serializeOffsetIndexes(offsetIndexes, blocks, out);
serializeBloomFilters(bloomFilters, blocks, out);
LOG.debug("{}: end", out.getPos());
this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer, out);
Expand Down Expand Up @@ -1007,6 +1018,28 @@ private static void serializeOffsetIndexes(
}
}

private static void serializeBloomFilters(
List<List<BloomFilter>> bloomFilters,
List<BlockMetaData> blocks,
PositionOutputStream out) throws IOException {
LOG.debug("{}: bloom filters", out.getPos());
for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
List<BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
if (blockBloomFilters.isEmpty()) continue;
for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
BloomFilter bloomFilter = blockBloomFilters.get(cIndex);
if (bloomFilter == null) {
continue;
}
ColumnChunkMetaData column = columns.get(cIndex);
long offset = out.getPos();
column.setBloomFilterOffset(offset);
bloomFilter.writeTo(out);
}
}
}

private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
long footerIndex = out.getPos();
org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
Expand Down
Loading

0 comments on commit 96c2fef

Please sign in to comment.