Skip to content

Commit

Permalink
PARQUET-533: Add a Buffer abstraction, refactor input/output classes …
Browse files Browse the repository at this point in the history
…to be simpler using Buffers

I have also removed all RowGroupReader and ColumnReader caching until we have an idea of how the caching will help users while also keeping memory-use under control.

The goal with this patch is to encapsulate the "data pointer and size" concept and lightly abstract away buffer ownership. The particular motivation is being able to deal with both normal files (bytes arriving via `fread`) and memory-mapped files (copying of bytes into new memory not required). It also helps do away with a bunch of functions that write output into a `std::vector<uint8_t>` now in favor of the `OwnedMutableBuffer`. Feedback welcome; plenty more work that can be done here.

Requires PARQUET-457. Will rebase once that's merged.

Author: Wes McKinney <wesm@apache.org>

Closes apache#59 from wesm/PARQUET-533 and squashes the following commits:

a39a4bc [Wes McKinney] Add Buffer abstraction, refactor input/output interface classes to use it where relevant. Output Buffer from InMemoryOutputStream. Stop caching ColumnReader objects

Change-Id: Ia183f168634020910201801bc4d0d3978df5d42d
  • Loading branch information
wesm authored and julienledem committed Feb 21, 2016
1 parent 5d05c2e commit 70665ce
Show file tree
Hide file tree
Showing 19 changed files with 378 additions and 164 deletions.
12 changes: 4 additions & 8 deletions cpp/src/parquet/column/column-reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ class TestPrimitiveReader : public ::testing::Test {
TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

std::vector<uint8_t> buffer;
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, {}, 0,
{}, 0, &buffer);
{}, 0);
pages_.push_back(page);

NodePtr type = schema::Int32("a", Repetition::REQUIRED);
Expand All @@ -90,9 +89,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
vector<int32_t> values = {1, 2, 3, 4, 5};
vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};

std::vector<uint8_t> buffer;
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, def_levels, 1,
{}, 0, &buffer);
{}, 0);

pages_.push_back(page);

Expand Down Expand Up @@ -137,9 +135,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};

std::vector<uint8_t> buffer;
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values,
def_levels, 2, rep_levels, 1, &buffer);
def_levels, 2, rep_levels, 1);

pages_.push_back(page);

Expand Down Expand Up @@ -190,12 +187,11 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeatedMultiplePages) {
vector<int16_t> rep_levels[2] = {{0, 1, 1, 0, 0, 1, 1, 0, 0, 1},
{0, 0, 1, 0, 1, 1, 0, 1, 0, 1}};

std::vector<uint8_t> buffer[4];
std::shared_ptr<DataPage> page;

for (int i = 0; i < 4; i++) {
page = MakeDataPage<Type::INT32>(values[i % 2],
def_levels[i % 2], 2, rep_levels[i % 2], 1, &buffer[i]);
def_levels[i % 2], 2, rep_levels[i % 2], 1);
pages_.push_back(page);
}

Expand Down
24 changes: 11 additions & 13 deletions cpp/src/parquet/column/page.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <string>

#include "parquet/types.h"
#include "parquet/util/buffer.h"

namespace parquet_cpp {

Expand All @@ -39,9 +40,8 @@ namespace parquet_cpp {
// here, both on the read and write path
class Page {
public:
Page(const uint8_t* buffer, int32_t buffer_size, PageType::type type) :
Page(const std::shared_ptr<Buffer>& buffer, PageType::type type) :
buffer_(buffer),
buffer_size_(buffer_size),
type_(type) {}

PageType::type type() const {
Expand All @@ -50,29 +50,27 @@ class Page {

// @returns: a pointer to the page's data
const uint8_t* data() const {
return buffer_;
return buffer_->data();
}

// @returns: the total size in bytes of the page's data buffer
int32_t size() const {
return buffer_size_;
return buffer_->size();
}

private:
const uint8_t* buffer_;
int32_t buffer_size_;

std::shared_ptr<Buffer> buffer_;
PageType::type type_;
};


class DataPage : public Page {
public:
DataPage(const uint8_t* buffer, int32_t buffer_size,
DataPage(const std::shared_ptr<Buffer>& buffer,
int32_t num_values, Encoding::type encoding,
Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding) :
Page(buffer, buffer_size, PageType::DATA_PAGE),
Page(buffer, PageType::DATA_PAGE),
num_values_(num_values),
encoding_(encoding),
definition_level_encoding_(definition_level_encoding),
Expand Down Expand Up @@ -119,12 +117,12 @@ class DataPage : public Page {

class DataPageV2 : public Page {
public:
DataPageV2(const uint8_t* buffer, int32_t buffer_size,
DataPageV2(const std::shared_ptr<Buffer>& buffer,
int32_t num_values, int32_t num_nulls, int32_t num_rows,
Encoding::type encoding,
int32_t definition_levels_byte_length,
int32_t repetition_levels_byte_length, bool is_compressed = false) :
Page(buffer, buffer_size, PageType::DATA_PAGE_V2),
Page(buffer, PageType::DATA_PAGE_V2),
num_values_(num_values),
num_nulls_(num_nulls),
num_rows_(num_rows),
Expand Down Expand Up @@ -176,9 +174,9 @@ class DataPageV2 : public Page {

class DictionaryPage : public Page {
public:
DictionaryPage(const uint8_t* buffer, int32_t buffer_size,
DictionaryPage(const std::shared_ptr<Buffer>& buffer,
int32_t num_values, Encoding::type encoding, bool is_sorted = false) :
Page(buffer, buffer_size, PageType::DICTIONARY_PAGE),
Page(buffer, PageType::DICTIONARY_PAGE),
num_values_(num_values),
encoding_(encoding),
is_sorted_(is_sorted) {}
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/parquet/column/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ class DataPageBuilder {
template <int TYPE, typename T>
static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,
const std::vector<int16_t>& def_levels, int16_t max_def_level,
const std::vector<int16_t>& rep_levels, int16_t max_rep_level,
std::vector<uint8_t>* out_buffer) {
const std::vector<int16_t>& rep_levels, int16_t max_rep_level) {
size_t num_values = values.size();

InMemoryOutputStream page_stream;
Expand All @@ -183,10 +182,10 @@ static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,
}

page_builder.AppendValues(values);
page_stream.Transfer(out_buffer);

return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(),
page_builder.num_values(),
auto buffer = page_stream.GetBuffer();

return std::make_shared<DataPage>(buffer, page_builder.num_values(),
page_builder.encoding(),
page_builder.def_level_encoding(),
page_builder.rep_level_encoding());
Expand Down
17 changes: 9 additions & 8 deletions cpp/src/parquet/encodings/plain-encoding-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "parquet/types.h"
#include "parquet/schema/types.h"
#include "parquet/util/bit-util.h"
#include "parquet/util/buffer.h"
#include "parquet/util/output.h"
#include "parquet/util/test-common.h"

Expand All @@ -52,15 +53,13 @@ TEST(VectorBooleanTest, TestEncodeDecode) {
InMemoryOutputStream dst;
encoder.Encode(draws, nvalues, &dst);

vector<uint8_t> encode_buffer;
dst.Transfer(&encode_buffer);

ASSERT_EQ(nbytes, encode_buffer.size());
std::shared_ptr<Buffer> encode_buffer = dst.GetBuffer();
ASSERT_EQ(nbytes, encode_buffer->size());

vector<uint8_t> decode_buffer(nbytes);
const uint8_t* decode_data = &decode_buffer[0];

decoder.SetData(nvalues, &encode_buffer[0], encode_buffer.size());
decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size());
size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
ASSERT_EQ(nvalues, values_decoded);

Expand Down Expand Up @@ -92,9 +91,10 @@ class EncodeDecode{
InMemoryOutputStream dst;
encoder.Encode(draws_, num_values_, &dst);

dst.Transfer(&encode_buffer_);
encode_buffer_ = dst.GetBuffer();

decoder.SetData(num_values_, &encode_buffer_[0], encode_buffer_.size());
decoder.SetData(num_values_, encode_buffer_->data(),
encode_buffer_->size());
size_t values_decoded = decoder.Decode(decode_buf_, num_values_);
ASSERT_EQ(num_values_, values_decoded);
}
Expand All @@ -119,7 +119,8 @@ class EncodeDecode{
vector<uint8_t> input_bytes_;
vector<uint8_t> output_bytes_;
vector<uint8_t> data_buffer_;
vector<uint8_t> encode_buffer_;

std::shared_ptr<Buffer> encode_buffer_;
};

template<>
Expand Down
10 changes: 3 additions & 7 deletions cpp/src/parquet/file/file-deserialize-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class TestPageSerde : public ::testing::Test {
Compression::UNCOMPRESSED) {
EndStream();
std::unique_ptr<InputStream> stream;
stream.reset(new InMemoryInputStream(out_buffer_.data(),
out_buffer_.size()));
stream.reset(new InMemoryInputStream(out_buffer_));
page_reader_.reset(new SerializedPageReader(std::move(stream), codec));
}

Expand All @@ -89,19 +88,16 @@ class TestPageSerde : public ::testing::Test {
}

void ResetStream() {
out_buffer_.resize(0);
out_stream_.reset(new InMemoryOutputStream());
}

void EndStream() {
out_stream_->Transfer(&out_buffer_);
out_buffer_ = out_stream_->GetBuffer();
}

protected:
std::unique_ptr<InMemoryOutputStream> out_stream_;

// TODO(wesm): Owns the results of the output stream. To be refactored
std::vector<uint8_t> out_buffer_;
std::shared_ptr<Buffer> out_buffer_;

std::unique_ptr<SerializedPageReader> page_reader_;
parquet::PageHeader page_header_;
Expand Down
32 changes: 13 additions & 19 deletions cpp/src/parquet/file/reader-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "parquet/schema/types.h"
#include "parquet/thrift/util.h"
#include "parquet/types.h"
#include "parquet/util/buffer.h"
#include "parquet/util/input.h"

namespace parquet_cpp {
Expand Down Expand Up @@ -83,7 +84,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
}
}
// Advance the stream offset
stream_->Read(header_size, &bytes_read);
stream_->Advance(header_size);

int compressed_len = current_page_header_.compressed_page_size;
int uncompressed_len = current_page_header_.uncompressed_page_size;
Expand All @@ -103,19 +104,21 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
buffer = &decompression_buffer_[0];
}

auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);

if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
const parquet::DictionaryPageHeader& dict_header =
current_page_header_.dictionary_page_header;

bool is_sorted = dict_header.__isset.is_sorted? dict_header.is_sorted : false;

return std::make_shared<DictionaryPage>(buffer, uncompressed_len,
return std::make_shared<DictionaryPage>(page_buffer,
dict_header.num_values, FromThrift(dict_header.encoding),
is_sorted);
} else if (current_page_header_.type == parquet::PageType::DATA_PAGE) {
const parquet::DataPageHeader& header = current_page_header_.data_page_header;

auto page = std::make_shared<DataPage>(buffer, uncompressed_len,
auto page = std::make_shared<DataPage>(page_buffer,
header.num_values,
FromThrift(header.encoding),
FromThrift(header.definition_level_encoding),
Expand All @@ -134,7 +137,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
} else if (current_page_header_.type == parquet::PageType::DATA_PAGE_V2) {
const parquet::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
bool is_compressed = header.__isset.is_compressed? header.is_compressed : false;
return std::make_shared<DataPageV2>(buffer, uncompressed_len,
return std::make_shared<DataPageV2>(page_buffer,
header.num_values, header.num_nulls, header.num_rows,
FromThrift(header.encoding),
header.definition_levels_byte_length,
Expand Down Expand Up @@ -165,24 +168,15 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
col_start = col.meta_data.dictionary_page_offset;
}

// TODO(wesm): some input streams (e.g. memory maps) may not require
// copying data. This should be added to the input stream API to support
// zero-copy streaming
std::unique_ptr<InputStream> input(
new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));

source_->Seek(col_start);
ScopedInMemoryInputStream* scoped_input =
static_cast<ScopedInMemoryInputStream*>(input.get());
size_t bytes_read = source_->Read(scoped_input->size(), scoped_input->data());
int64_t bytes_to_read = col.meta_data.total_compressed_size;
std::shared_ptr<Buffer> buffer = source_->ReadAt(col_start, bytes_to_read);

if (bytes_read != scoped_input->size()) {
if (buffer->size() < bytes_to_read) {
throw ParquetException("Unable to read column chunk data");
}

const ColumnDescriptor* descr = schema_->Column(i);

return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(input),
std::unique_ptr<InputStream> stream(new InMemoryInputStream(buffer));
return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(stream),
FromThrift(col.meta_data.codec)));
}

Expand Down Expand Up @@ -223,7 +217,7 @@ void SerializedFile::Close() {

std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
std::unique_ptr<SerializedRowGroup> contents(new SerializedRowGroup(source_.get(),
&schema_, &metadata_.row_groups[i]));
&metadata_.row_groups[i]));

return std::make_shared<RowGroupReader>(&schema_, std::move(contents));
}
Expand Down
4 changes: 1 addition & 3 deletions cpp/src/parquet/file/reader-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ class SerializedPageReader : public PageReader {
// RowGroupReader::Contents implementation for the Parquet file specification
class SerializedRowGroup : public RowGroupReader::Contents {
public:
SerializedRowGroup(RandomAccessSource* source, const SchemaDescriptor* schema,
SerializedRowGroup(RandomAccessSource* source,
const parquet::RowGroup* metadata) :
source_(source),
schema_(schema),
metadata_(metadata) {}

virtual int num_columns() const;
Expand All @@ -84,7 +83,6 @@ class SerializedRowGroup : public RowGroupReader::Contents {

private:
RandomAccessSource* source_;
const SchemaDescriptor* schema_;
const parquet::RowGroup* metadata_;
};

Expand Down
Loading

0 comments on commit 70665ce

Please sign in to comment.