Skip to content

Commit

Permalink
apacheGH-15216: [C++][Parquet] Parquet writer accepts RecordBatch
Browse files Browse the repository at this point in the history
 - Parquet arrow::FileWriter supports buffered row group mode.
 - Parquet arrow::FileWriter accepts arrow::RecordBatch.
  • Loading branch information
wgtmac committed Jan 7, 2023
1 parent 21d6374 commit 0d9f0e2
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 10 deletions.
106 changes: 106 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4727,5 +4727,111 @@ std::vector<NestedFilterTestCase> GenerateMapFilteredTestCases() {
INSTANTIATE_TEST_SUITE_P(MapFilteredReads, TestNestedSchemaFilteredReader,
::testing::ValuesIn(GenerateMapFilteredTestCases()));

template <typename TestType>
class TestBufferedParquetIO : public TestParquetIO<TestType> {
public:
void WriteBufferedFile(const std::shared_ptr<Array>& values, int64_t batch_size) {
std::shared_ptr<GroupNode> schema =
MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
SchemaDescriptor descriptor;
ASSERT_NO_THROW(descriptor.Init(schema));
std::shared_ptr<::arrow::Schema> arrow_schema;
ArrowReaderProperties props;
ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));

std::unique_ptr<FileWriter> writer;
ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
this->MakeWriter(schema), arrow_schema,
default_arrow_writer_properties(), &writer));
for (int i = 0; i < 4; i++) {
if (i % 2 == 0) {
ASSERT_OK_NO_THROW(writer->NewBufferedRowGroup());
}
std::shared_ptr<Array> sliced_array = values->Slice(i * batch_size, batch_size);
std::vector<std::shared_ptr<Array>> arrays = {sliced_array};
auto batch = ::arrow::RecordBatch::Make(arrow_schema, batch_size, arrays);
ASSERT_OK_NO_THROW(writer->WriteRecordBatch(*batch));
}
ASSERT_OK_NO_THROW(writer->Close());
}
};

TYPED_TEST_SUITE(TestBufferedParquetIO, TestTypes);

TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmall) {
constexpr int64_t batch_size = SMALL_SIZE / 4;
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
this->WriteBufferedFile(values, batch_size);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
}

TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) {
constexpr int64_t batch_size = LARGE_SIZE / 4;
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
this->WriteBufferedFile(values, batch_size);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values));
}

TEST(TestReadWriteArrow, WriteAndReadRecordBatch) {
constexpr int64_t num_row_groups = 3;
auto pool = ::arrow::default_memory_pool();
auto sink = CreateOutputStream();
auto writer_properties = default_writer_properties();
auto arrow_writer_properties = default_arrow_writer_properties();

// Prepare schema
auto schema = ::arrow::schema(
{::arrow::field("a", ::arrow::int64()), ::arrow::field("b", ::arrow::utf8())});
std::shared_ptr<SchemaDescriptor> parquet_schema;
ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
*arrow_writer_properties, &parquet_schema));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());

// Prepare data
auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
[1, "alfa"],
[null, "alfa"],
[3, "beta"],
[null, "gama"],
[5, null ],
[6, "alfa"],
[7, "beta"],
[8, "beta"],
[9, null ],
[null, "gama"]
])");

// Create writer to write several row groups with same batch data via RecordBatch.
auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
std::unique_ptr<FileWriter> arrow_writer;
ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
arrow_writer_properties, &arrow_writer));
for (int64_t i = 0; i < num_row_groups; ++i) {
ASSERT_OK_NO_THROW(arrow_writer->NewBufferedRowGroup());
ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
}
ASSERT_OK_NO_THROW(arrow_writer->Close());
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());

// Create reader with batch size specified.
auto read_properties = default_arrow_reader_properties();
read_properties.set_batch_size(record_batch->num_rows());
auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
std::unique_ptr<FileReader> arrow_reader;
ASSERT_OK(FileReader::Make(pool, std::move(reader), read_properties, &arrow_reader));

// Verify batch data read via RecordBatch
std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
ASSERT_OK_NO_THROW(arrow_reader->GetRecordBatchReader(
Iota(arrow_reader->parquet_reader()->metadata()->num_row_groups()), &batch_reader));
std::shared_ptr<::arrow::RecordBatch> read_record_batch;
for (int64_t i = 0; i < num_row_groups; ++i) {
ASSERT_OK(batch_reader->ReadNext(&read_record_batch));
EXPECT_TRUE(record_batch->Equals(*read_record_batch));
}
}

} // namespace arrow
} // namespace parquet
80 changes: 70 additions & 10 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/array.h"
#include "arrow/extension_type.h"
#include "arrow/ipc/writer.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/util/base64.h"
Expand Down Expand Up @@ -58,6 +59,7 @@ using arrow::ListArray;
using arrow::MemoryPool;
using arrow::NumericArray;
using arrow::PrimitiveArray;
using arrow::RecordBatch;
using arrow::ResizableBuffer;
using arrow::Result;
using arrow::Status;
Expand Down Expand Up @@ -114,8 +116,10 @@ class ArrowColumnWriterV2 {
// level_builders should contain one MultipathLevelBuilder per chunk of the
// Arrow-column to write.
ArrowColumnWriterV2(std::vector<std::unique_ptr<MultipathLevelBuilder>> level_builders,
int leaf_count, RowGroupWriter* row_group_writer)
int start_leaf_column_index, int leaf_count,
RowGroupWriter* row_group_writer)
: level_builders_(std::move(level_builders)),
start_leaf_column_index_(start_leaf_column_index),
leaf_count_(leaf_count),
row_group_writer_(row_group_writer) {}

Expand All @@ -127,7 +131,12 @@ class ArrowColumnWriterV2 {
Status Write(ArrowWriteContext* ctx) {
for (int leaf_idx = 0; leaf_idx < leaf_count_; leaf_idx++) {
ColumnWriter* column_writer;
PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
if (row_group_writer_->buffered()) {
const int column_index = start_leaf_column_index_ + leaf_idx;
PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->column(column_index));
} else {
PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
}
for (auto& level_builder : level_builders_) {
RETURN_NOT_OK(level_builder->Write(
leaf_idx, ctx, [&](const MultipathLevelBuilderResult& result) {
Expand All @@ -147,7 +156,9 @@ class ArrowColumnWriterV2 {
}));
}

PARQUET_CATCH_NOT_OK(column_writer->Close());
if (!row_group_writer_->buffered()) {
PARQUET_CATCH_NOT_OK(column_writer->Close());
}
}
return Status::OK();
}
Expand All @@ -162,13 +173,14 @@ class ArrowColumnWriterV2 {
// RowGroupWriters (we could construct each builder on demand in that case).
static ::arrow::Result<std::unique_ptr<ArrowColumnWriterV2>> Make(
const ChunkedArray& data, int64_t offset, const int64_t size,
const SchemaManifest& schema_manifest, RowGroupWriter* row_group_writer) {
const SchemaManifest& schema_manifest, RowGroupWriter* row_group_writer,
int start_leaf_column_index = -1) {
int64_t absolute_position = 0;
int chunk_index = 0;
int64_t chunk_offset = 0;
if (data.length() == 0) {
return std::make_unique<ArrowColumnWriterV2>(
std::vector<std::unique_ptr<MultipathLevelBuilder>>{},
std::vector<std::unique_ptr<MultipathLevelBuilder>>{}, start_leaf_column_index,
CalculateLeafCount(data.type().get()), row_group_writer);
}
while (chunk_index < data.num_chunks() && absolute_position < offset) {
Expand All @@ -192,9 +204,16 @@ class ArrowColumnWriterV2 {
std::vector<std::unique_ptr<MultipathLevelBuilder>> builders;
const int leaf_count = CalculateLeafCount(data.type().get());
bool is_nullable = false;
// The row_group_writer hasn't been advanced yet so add 1 to the current
// which is the one this instance will start writing for.
int column_index = row_group_writer->current_column() + 1;

int column_index = 0;
if (row_group_writer->buffered()) {
column_index = start_leaf_column_index;
} else {
// The row_group_writer hasn't been advanced yet so add 1 to the current
// which is the one this instance will start writing for.
column_index = row_group_writer->current_column() + 1;
}

for (int leaf_offset = 0; leaf_offset < leaf_count; ++leaf_offset) {
const SchemaField* schema_field = nullptr;
RETURN_NOT_OK(
Expand Down Expand Up @@ -240,13 +259,16 @@ class ArrowColumnWriterV2 {
}
values_written += chunk_write_size;
}
return std::make_unique<ArrowColumnWriterV2>(std::move(builders), leaf_count,
row_group_writer);
return std::make_unique<ArrowColumnWriterV2>(std::move(builders), column_index,
leaf_count, row_group_writer);
}

int leaf_count() const { return leaf_count_; }

private:
// One builder per column-chunk.
std::vector<std::unique_ptr<MultipathLevelBuilder>> level_builders_;
int start_leaf_column_index_;
int leaf_count_;
RowGroupWriter* row_group_writer_;
};
Expand Down Expand Up @@ -304,12 +326,16 @@ class FileWriterImpl : public FileWriter {
int64_t size) override {
if (arrow_properties_->engine_version() == ArrowWriterProperties::V2 ||
arrow_properties_->engine_version() == ArrowWriterProperties::V1) {
if (row_group_writer_->buffered()) {
return Status::Invalid("Cannot write column chunk into the buffered row group.");
}
ARROW_ASSIGN_OR_RAISE(
std::unique_ptr<ArrowColumnWriterV2> writer,
ArrowColumnWriterV2::Make(*data, offset, size, schema_manifest_,
row_group_writer_));
return writer->Write(&column_write_context_);
}

return Status::NotImplemented("Unknown engine version.");
}

Expand Down Expand Up @@ -355,6 +381,40 @@ class FileWriterImpl : public FileWriter {
return Status::OK();
}

Status NewBufferedRowGroup() override {
if (row_group_writer_ != nullptr) {
PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendBufferedRowGroup());
return Status::OK();
}

Status WriteRecordBatch(const RecordBatch& batch) override {
if (batch.num_rows() == 0) {
return Status::OK();
}

if (row_group_writer_ == nullptr || !row_group_writer_->buffered()) {
RETURN_NOT_OK(NewBufferedRowGroup());
}

auto WriteBatch = [&](int64_t offset, int64_t size) {
int column_index_start = 0;
for (int i = 0; i < batch.num_columns(); i++) {
ChunkedArray chunkedArray(batch.column(i));
ARROW_ASSIGN_OR_RAISE(
std::unique_ptr<ArrowColumnWriterV2> writer,
ArrowColumnWriterV2::Make(chunkedArray, offset, size, schema_manifest_,
row_group_writer_, column_index_start));
RETURN_NOT_OK(writer->Write(&column_write_context_));
column_index_start += writer->leaf_count();
}
return Status::OK();
};

return WriteBatch(0, batch.num_rows());
}

const WriterProperties& properties() const { return *writer_->properties(); }

::arrow::MemoryPool* memory_pool() const override {
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace arrow {

class Array;
class ChunkedArray;
class RecordBatch;
class Schema;
class Table;

Expand Down Expand Up @@ -114,6 +115,14 @@ class PARQUET_EXPORT FileWriter {
virtual ::arrow::Status WriteColumnChunk(
const std::shared_ptr<::arrow::ChunkedArray>& data) = 0;

/// \brief Start a new buffered row group.
///
/// Returns an error if not all columns have been written.
virtual ::arrow::Status NewBufferedRowGroup() = 0;

/// \brief Write a RecordBatch into the buffered row group.
virtual ::arrow::Status WriteRecordBatch(const ::arrow::RecordBatch& batch) = 0;

/// \brief Write the footer and close the file.
virtual ::arrow::Status Close() = 0;
virtual ~FileWriter();
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ int64_t RowGroupWriter::total_bytes_written() const {
return contents_->total_bytes_written();
}

bool RowGroupWriter::buffered() const { return contents_->buffered(); }

int RowGroupWriter::current_column() { return contents_->current_column(); }

int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
Expand Down Expand Up @@ -177,6 +179,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
return total_bytes_written;
}

bool buffered() const override { return buffered_row_group_; }

void Close() override {
if (!closed_) {
closed_ = true;
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class PARQUET_EXPORT RowGroupWriter {
virtual int64_t total_bytes_written() const = 0;
// total bytes still compressed but not written
virtual int64_t total_compressed_bytes() const = 0;

virtual bool buffered() const = 0;
};

explicit RowGroupWriter(std::unique_ptr<Contents> contents);
Expand Down Expand Up @@ -91,6 +93,10 @@ class PARQUET_EXPORT RowGroupWriter {
int64_t total_bytes_written() const;
int64_t total_compressed_bytes() const;

/// Returns whether the current RowGroupWriter is in the buffered mode and is created
/// by calling ParquetFileWriter::AppendBufferedRowGroup.
bool buffered() const;

private:
// Holds a pointer to an instance of Contents implementation
std::unique_ptr<Contents> contents_;
Expand Down

0 comments on commit 0d9f0e2

Please sign in to comment.