Skip to content

Commit

Permalink
PARQUET-719: Fix WriterBatch API to handle NULL values
Browse files Browse the repository at this point in the history
Fixed bug and added a test case

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes apache#160 from majetideepak/PARQUET-719 and squashes the following commits:

ff56788 [Deepak Majeti] use using
c34ce78 [Deepak Majeti] added comments
f38d6ed [Deepak Majeti] Fixed bug and added test case

Change-Id: Id62e493565e8c6c0f7aa9fe94c846e796a4ff546
  • Loading branch information
Deepak Majeti authored and wesm committed Sep 15, 2016
1 parent 6847cdc commit bcaafd5
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 12 deletions.
23 changes: 23 additions & 0 deletions cpp/src/parquet/column/column-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,

TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);

using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>;

TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
this->TestRequiredWithEncoding(Encoding::PLAIN);
}
Expand Down Expand Up @@ -303,5 +305,26 @@ TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
}
}

// PARQUET-719
// Test case for NULL values
TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
this->SetUpSchemaOptional();

this->GenerateData(LARGE_SIZE);

std::vector<int16_t> definition_levels(LARGE_SIZE, 0);
std::vector<int16_t> repetition_levels(LARGE_SIZE, 0);

auto writer = this->BuildWriter(LARGE_SIZE);
// All values being written are NULL
writer->WriteBatch(
this->values_.size(), definition_levels.data(), repetition_levels.data(), NULL);
writer->Close();

// Just read the first SMALL_SIZE rows to ensure we could read it back in
this->ReadColumn();
ASSERT_EQ(0, this->values_read_);
}

} // namespace test
} // namespace parquet
12 changes: 5 additions & 7 deletions cpp/src/parquet/column/scanner-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,15 @@ class TestFlatScanner : public ::testing::Test {
vector<uint8_t> data_buffer_; // For BA and FLBA
};

typedef TestFlatScanner<FLBAType> TestFlatFLBAScanner;

static int num_levels_per_page = 100;
static int num_pages = 20;
static int batch_size = 32;

typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
ByteArrayType> TestTypes;

typedef TestFlatScanner<BooleanType> TestBooleanFlatScanner;
typedef TestFlatScanner<FLBAType> TestFLBAFlatScanner;
using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
using TestFLBAFlatScanner = TestFlatScanner<FLBAType>;

TYPED_TEST_CASE(TestFlatScanner, TestTypes);

Expand Down Expand Up @@ -183,7 +181,7 @@ TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) {
}

// PARQUET 502
TEST_F(TestFlatFLBAScanner, TestSmallBatch) {
TEST_F(TestFLBAFlatScanner, TestSmallBatch) {
NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
const ColumnDescriptor d(type, 0, 0);
Expand All @@ -194,7 +192,7 @@ TEST_F(TestFlatFLBAScanner, TestSmallBatch) {
CheckResults(1, &d);
}

TEST_F(TestFlatFLBAScanner, TestDescriptorAPI) {
TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) {
NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
const ColumnDescriptor d(type, 4, 0);
Expand All @@ -209,7 +207,7 @@ TEST_F(TestFlatFLBAScanner, TestDescriptorAPI) {
ASSERT_EQ(FLBA_LENGTH, scanner->descr()->type_length());
}

TEST_F(TestFlatFLBAScanner, TestFLBAPrinterNext) {
TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) {
NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
const ColumnDescriptor d(type, 4, 0);
Expand Down
14 changes: 9 additions & 5 deletions cpp/src/parquet/column/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
void CheckDictionarySizeLimit() override;

private:
void WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const T* values);

typedef Encoder<DType> EncoderType;
Expand All @@ -167,7 +167,7 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
};

template <typename DType>
inline void TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
int64_t values_to_write = 0;
// If the field is required and non-repeated, there are no definition levels
Expand Down Expand Up @@ -209,6 +209,8 @@ inline void TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
AddDataPage();
}
if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }

return values_to_write;
}

template <typename DType>
Expand All @@ -222,15 +224,17 @@ inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
int64_t write_batch_size = properties_->write_batch_size();
int num_batches = num_values / write_batch_size;
int64_t num_remaining = num_values % write_batch_size;
int64_t value_offset = 0;
for (int round = 0; round < num_batches; round++) {
int64_t offset = round * write_batch_size;
WriteMiniBatch(
write_batch_size, &def_levels[offset], &rep_levels[offset], &values[offset]);
int64_t num_values = WriteMiniBatch(write_batch_size, &def_levels[offset],
&rep_levels[offset], &values[value_offset]);
value_offset += num_values;
}
// Write the remaining values
int64_t offset = num_batches * write_batch_size;
WriteMiniBatch(
num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]);
num_remaining, &def_levels[offset], &rep_levels[offset], &values[value_offset]);
}

template <typename DType>
Expand Down

0 comments on commit bcaafd5

Please sign in to comment.