From 1149caadf4d3ab9a814b6f3fe54a768f15450bf9 Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Thu, 15 Sep 2016 12:30:45 -0400 Subject: [PATCH] PARQUET-719: Fix WriterBatch API to handle NULL values Fixed bug and added a test case Author: Deepak Majeti Closes #160 from majetideepak/PARQUET-719 and squashes the following commits: ff56788 [Deepak Majeti] use using c34ce78 [Deepak Majeti] added comments f38d6ed [Deepak Majeti] Fixed bug and added test case Change-Id: Id62e493565e8c6c0f7aa9fe94c846e796a4ff546 --- cpp/src/parquet/column/column-writer-test.cc | 23 ++++++++++++++++++++ cpp/src/parquet/column/scanner-test.cc | 12 +++++----- cpp/src/parquet/column/writer.h | 14 +++++++----- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/cpp/src/parquet/column/column-writer-test.cc b/cpp/src/parquet/column/column-writer-test.cc index 230a843376cf3..57f1d1be41c88 100644 --- a/cpp/src/parquet/column/column-writer-test.cc +++ b/cpp/src/parquet/column/column-writer-test.cc @@ -150,6 +150,8 @@ typedef ::testing::Types; + TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { this->TestRequiredWithEncoding(Encoding::PLAIN); } @@ -303,5 +305,26 @@ TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) { } } +// PARQUET-719 +// Test case for NULL values +TEST_F(TestNullValuesWriter, OptionalNullValueChunk) { + this->SetUpSchemaOptional(); + + this->GenerateData(LARGE_SIZE); + + std::vector definition_levels(LARGE_SIZE, 0); + std::vector repetition_levels(LARGE_SIZE, 0); + + auto writer = this->BuildWriter(LARGE_SIZE); + // All values being written are NULL + writer->WriteBatch( + this->values_.size(), definition_levels.data(), repetition_levels.data(), NULL); + writer->Close(); + + // Just read the first SMALL_SIZE rows to ensure we could read it back in + this->ReadColumn(); + ASSERT_EQ(0, this->values_read_); +} + } // namespace test } // namespace parquet diff --git a/cpp/src/parquet/column/scanner-test.cc b/cpp/src/parquet/column/scanner-test.cc index 78cd16c5bfc91..fb5178ad18c02 100644 --- a/cpp/src/parquet/column/scanner-test.cc +++ b/cpp/src/parquet/column/scanner-test.cc @@ -141,8 +141,6 @@ class TestFlatScanner : public ::testing::Test { vector data_buffer_; // For BA and FLBA }; -typedef TestFlatScanner TestFlatFLBAScanner; - static int num_levels_per_page = 100; static int num_pages = 20; static int batch_size = 32; @@ -150,8 +148,8 @@ static int batch_size = 32; typedef ::testing::Types TestTypes; -typedef TestFlatScanner TestBooleanFlatScanner; -typedef TestFlatScanner TestFLBAFlatScanner; +using TestBooleanFlatScanner = TestFlatScanner; +using TestFLBAFlatScanner = TestFlatScanner; TYPED_TEST_CASE(TestFlatScanner, TestTypes); @@ -183,7 +181,7 @@ TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) { } // PARQUET 502 -TEST_F(TestFlatFLBAScanner, TestSmallBatch) { +TEST_F(TestFLBAFlatScanner, TestSmallBatch) { NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2); const ColumnDescriptor d(type, 0, 0); @@ -194,7 +192,7 @@ TEST_F(TestFlatFLBAScanner, TestSmallBatch) { CheckResults(1, &d); } -TEST_F(TestFlatFLBAScanner, TestDescriptorAPI) { +TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) { NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2); const ColumnDescriptor d(type, 4, 0); @@ -209,7 +207,7 @@ TEST_F(TestFlatFLBAScanner, TestDescriptorAPI) { ASSERT_EQ(FLBA_LENGTH, scanner->descr()->type_length()); } -TEST_F(TestFlatFLBAScanner, TestFLBAPrinterNext) { +TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) { NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2); const ColumnDescriptor d(type, 4, 0); diff --git a/cpp/src/parquet/column/writer.h b/cpp/src/parquet/column/writer.h index 6a6ee5fc28314..4b2a02192ee0c 100644 --- a/cpp/src/parquet/column/writer.h +++ b/cpp/src/parquet/column/writer.h @@ -156,7 +156,7 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter { void CheckDictionarySizeLimit() override; private: - void WriteMiniBatch(int64_t num_values, const int16_t* def_levels, + int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const T* values); typedef Encoder EncoderType; @@ -167,7 +167,7 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter { }; template -inline void TypedColumnWriter::WriteMiniBatch(int64_t num_values, +inline int64_t TypedColumnWriter::WriteMiniBatch(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const T* values) { int64_t values_to_write = 0; // If the field is required and non-repeated, there are no definition levels @@ -209,6 +209,8 @@ inline void TypedColumnWriter::WriteMiniBatch(int64_t num_values, AddDataPage(); } if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); } + + return values_to_write; } template @@ -222,15 +224,17 @@ inline void TypedColumnWriter::WriteBatch(int64_t num_values, int64_t write_batch_size = properties_->write_batch_size(); int num_batches = num_values / write_batch_size; int64_t num_remaining = num_values % write_batch_size; + int64_t value_offset = 0; for (int round = 0; round < num_batches; round++) { int64_t offset = round * write_batch_size; - WriteMiniBatch( - write_batch_size, &def_levels[offset], &rep_levels[offset], &values[offset]); + int64_t num_values = WriteMiniBatch(write_batch_size, &def_levels[offset], + &rep_levels[offset], &values[value_offset]); + value_offset += num_values; } // Write the remaining values int64_t offset = num_batches * write_batch_size; WriteMiniBatch( - num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]); + num_remaining, &def_levels[offset], &rep_levels[offset], &values[value_offset]); } template