Skip to content

Commit

Permalink
PARQUET-610: Print additional ColumnMetaData for each RowGroup
Browse files Browse the repository at this point in the history
Also fixes few warnings in release mode build

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes apache#102 from majetideepak/ReleaseBuildErrors and squashes the following commits:

b54f126 [Deepak Majeti] added api to read additional column metadata

Change-Id: Iddf193def450c092090a95758f27c1dcad87185a
  • Loading branch information
Deepak Majeti authored and wesm committed May 12, 2016
1 parent 6d4af3b commit 456dd7f
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 60 deletions.
4 changes: 2 additions & 2 deletions cpp/src/parquet/column/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ class TypedScanner : public Scanner {
}

// Out of values
int16_t def_level;
int16_t rep_level;
int16_t def_level = -1;
int16_t rep_level = -1;
NextLevels(&def_level, &rep_level);
*is_null = def_level < descr()->max_definition_level();

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/encodings/delta-byte-array-encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class DeltaByteArrayDecoder : public Decoder<ByteArrayType> {
for (int i = 0; i < max_values; ++i) {
int prefix_len = 0;
prefix_len_decoder_.Decode(&prefix_len, 1);
ByteArray suffix;
ByteArray suffix = {0, NULL};
suffix_decoder_.Decode(&suffix, 1);
buffer[i].len = prefix_len + suffix.len;

Expand Down
33 changes: 31 additions & 2 deletions cpp/src/parquet/file/reader-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
std::move(stream), FromThrift(col.meta_data.codec), properties_.allocator()));
}

RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) {
RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) const {
const format::ColumnMetaData& meta_data = metadata_->columns[i].meta_data;

RowGroupStatistics result;
Expand All @@ -180,10 +180,39 @@ RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) {
result.distinct_count = meta_data.statistics.distinct_count;
result.max = &meta_data.statistics.max;
result.min = &meta_data.statistics.min;

return result;
}

bool SerializedRowGroup::IsColumnStatsSet(int i) const {
const format::ColumnMetaData& meta_data = metadata_->columns[i].meta_data;
return meta_data.__isset.statistics;
}

Compression::type SerializedRowGroup::GetColumnCompression(int i) const {
const format::ColumnMetaData& meta_data = metadata_->columns[i].meta_data;
return FromThrift(meta_data.codec);
}

std::vector<Encoding::type> SerializedRowGroup::GetColumnEncodings(int i) const {
const format::ColumnMetaData& meta_data = metadata_->columns[i].meta_data;

std::vector<Encoding::type> encodings;
for (auto encoding : meta_data.encodings) {
encodings.push_back(FromThrift(encoding));
}
return encodings;
}

int64_t SerializedRowGroup::GetColumnUnCompressedSize(int i) const {
const format::ColumnMetaData& meta_data = metadata_->columns[i].meta_data;
return meta_data.total_uncompressed_size;
}

int64_t SerializedRowGroup::GetColumnCompressedSize(int i) const {
const format::ColumnMetaData& meta_data = metadata_->columns[i].meta_data;
return meta_data.total_compressed_size;
}

// ----------------------------------------------------------------------
// SerializedFile: Parquet on-disk layout

Expand Down
7 changes: 6 additions & 1 deletion cpp/src/parquet/file/reader-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ class SerializedRowGroup : public RowGroupReader::Contents {
virtual int num_columns() const;
virtual int64_t num_rows() const;
virtual std::unique_ptr<PageReader> GetColumnPageReader(int i);
virtual RowGroupStatistics GetColumnStats(int i);
virtual RowGroupStatistics GetColumnStats(int i) const;
virtual bool IsColumnStatsSet(int i) const;
virtual Compression::type GetColumnCompression(int i) const;
virtual std::vector<Encoding::type> GetColumnEncodings(int i) const;
virtual int64_t GetColumnCompressedSize(int i) const;
virtual int64_t GetColumnUnCompressedSize(int i) const;

private:
RandomAccessSource* source_;
Expand Down
50 changes: 42 additions & 8 deletions cpp/src/parquet/file/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ RowGroupStatistics RowGroupReader::GetColumnStats(int i) const {
return contents_->GetColumnStats(i);
}

bool RowGroupReader::IsColumnStatsSet(int i) const {
return contents_->IsColumnStatsSet(i);
}

Compression::type RowGroupReader::GetColumnCompression(int i) const {
return contents_->GetColumnCompression(i);
}

std::vector<Encoding::type> RowGroupReader::GetColumnEncodings(int i) const {
return contents_->GetColumnEncodings(i);
}

int64_t RowGroupReader::GetColumnUnCompressedSize(int i) const {
return contents_->GetColumnUnCompressedSize(i);
}

int64_t RowGroupReader::GetColumnCompressedSize(int i) const {
return contents_->GetColumnCompressedSize(i);
}

// ----------------------------------------------------------------------
// ParquetFileReader public API

Expand Down Expand Up @@ -166,14 +186,28 @@ void ParquetFileReader::DebugPrint(
RowGroupStatistics stats = group_reader->GetColumnStats(i);

const ColumnDescriptor* descr = schema_->Column(i);
stream << "Column " << i << ": " << group_reader->num_rows() << " rows, "
<< stats.num_values << " values, " << stats.null_count << " null values, "
<< stats.distinct_count << " distinct values, "
<< FormatValue(
descr->physical_type(), stats.max->c_str(), descr->type_length())
<< " max, " << FormatValue(descr->physical_type(), stats.min->c_str(),
descr->type_length())
<< " min, " << std::endl;
stream << "Column " << i << std::endl
<< " rows: " << group_reader->num_rows() << ", values: " << stats.num_values
<< ", null values: " << stats.null_count
<< ", distinct values: " << stats.distinct_count << std::endl;
if (group_reader->IsColumnStatsSet(i)) {
stream << " max: " << FormatStatValue(descr->physical_type(), stats.max->c_str())
<< ", min: "
<< FormatStatValue(descr->physical_type(), stats.min->c_str());
} else {
stream << " Statistics Not Set";
}
stream << std::endl
<< " compression: "
<< compression_to_string(group_reader->GetColumnCompression(i))
<< ", encodings: ";
for (auto encoding : group_reader->GetColumnEncodings(i)) {
stream << encoding_to_string(encoding) << " ";
}
stream << std::endl
<< " uncompressed size: " << group_reader->GetColumnUnCompressedSize(i)
<< ", compressed size: " << group_reader->GetColumnCompressedSize(i)
<< std::endl;
}

if (!print_values) { continue; }
Expand Down
13 changes: 12 additions & 1 deletion cpp/src/parquet/file/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <list>
#include <string>
#include <vector>

#include "parquet/column/page.h"
#include "parquet/column/properties.h"
Expand All @@ -47,8 +48,13 @@ class RowGroupReader {
struct Contents {
virtual int num_columns() const = 0;
virtual int64_t num_rows() const = 0;
virtual RowGroupStatistics GetColumnStats(int i) = 0;
virtual std::unique_ptr<PageReader> GetColumnPageReader(int i) = 0;
virtual RowGroupStatistics GetColumnStats(int i) const = 0;
virtual bool IsColumnStatsSet(int i) const = 0;
virtual Compression::type GetColumnCompression(int i) const = 0;
virtual std::vector<Encoding::type> GetColumnEncodings(int i) const = 0;
virtual int64_t GetColumnCompressedSize(int i) const = 0;
virtual int64_t GetColumnUnCompressedSize(int i) const = 0;
};

RowGroupReader(const SchemaDescriptor* schema, std::unique_ptr<Contents> contents,
Expand All @@ -61,6 +67,11 @@ class RowGroupReader {
int64_t num_rows() const;

RowGroupStatistics GetColumnStats(int i) const;
bool IsColumnStatsSet(int i) const;
Compression::type GetColumnCompression(int i) const;
std::vector<Encoding::type> GetColumnEncodings(int i) const;
int64_t GetColumnCompressedSize(int i) const;
int64_t GetColumnUnCompressedSize(int i) const;

private:
// Owned by the parent ParquetFileReader
Expand Down
54 changes: 21 additions & 33 deletions cpp/src/parquet/types-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,67 +60,55 @@ TEST(TestLogicalTypeToString, LogicalTypes) {
ASSERT_STREQ("INTERVAL", logical_type_to_string(LogicalType::INTERVAL).c_str());
}

TEST(TypePrinter, PhysicalTypes) {
TEST(TypePrinter, StatisticsTypes) {
std::string smin;
std::string smax;
int32_t int_min = 1024;
int32_t int_max = 2048;
smin = std::string(reinterpret_cast<char*>(&int_min), sizeof(int32_t));
smax = std::string(reinterpret_cast<char*>(&int_max), sizeof(int32_t));
ASSERT_STREQ("1024", FormatValue(Type::INT32, smin.c_str(), 0).c_str());
ASSERT_STREQ("2048", FormatValue(Type::INT32, smax.c_str(), 0).c_str());
ASSERT_STREQ("1024", FormatStatValue(Type::INT32, smin.c_str()).c_str());
ASSERT_STREQ("2048", FormatStatValue(Type::INT32, smax.c_str()).c_str());

int64_t int64_min = 10240000000000;
int64_t int64_max = 20480000000000;
smin = std::string(reinterpret_cast<char*>(&int64_min), sizeof(int64_t));
smax = std::string(reinterpret_cast<char*>(&int64_max), sizeof(int64_t));
ASSERT_STREQ("10240000000000", FormatValue(Type::INT64, smin.c_str(), 0).c_str());
ASSERT_STREQ("20480000000000", FormatValue(Type::INT64, smax.c_str(), 0).c_str());
ASSERT_STREQ("10240000000000", FormatStatValue(Type::INT64, smin.c_str()).c_str());
ASSERT_STREQ("20480000000000", FormatStatValue(Type::INT64, smax.c_str()).c_str());

float float_min = 1.024;
float float_max = 2.048;
smin = std::string(reinterpret_cast<char*>(&float_min), sizeof(float));
smax = std::string(reinterpret_cast<char*>(&float_max), sizeof(float));
ASSERT_STREQ("1.024", FormatValue(Type::FLOAT, smin.c_str(), 0).c_str());
ASSERT_STREQ("2.048", FormatValue(Type::FLOAT, smax.c_str(), 0).c_str());
ASSERT_STREQ("1.024", FormatStatValue(Type::FLOAT, smin.c_str()).c_str());
ASSERT_STREQ("2.048", FormatStatValue(Type::FLOAT, smax.c_str()).c_str());

double double_min = 1.0245;
double double_max = 2.0489;
smin = std::string(reinterpret_cast<char*>(&double_min), sizeof(double));
smax = std::string(reinterpret_cast<char*>(&double_max), sizeof(double));
ASSERT_STREQ("1.0245", FormatValue(Type::DOUBLE, smin.c_str(), 0).c_str());
ASSERT_STREQ("2.0489", FormatValue(Type::DOUBLE, smax.c_str(), 0).c_str());
ASSERT_STREQ("1.0245", FormatStatValue(Type::DOUBLE, smin.c_str()).c_str());
ASSERT_STREQ("2.0489", FormatStatValue(Type::DOUBLE, smax.c_str()).c_str());

Int96 Int96_min = {{1024, 2048, 4096}};
Int96 Int96_max = {{2048, 4096, 8192}};
smin = std::string(reinterpret_cast<char*>(&Int96_min), sizeof(Int96));
smax = std::string(reinterpret_cast<char*>(&Int96_max), sizeof(Int96));
ASSERT_STREQ("1024 2048 4096 ", FormatValue(Type::INT96, smin.c_str(), 0).c_str());
ASSERT_STREQ("2048 4096 8192 ", FormatValue(Type::INT96, smax.c_str(), 0).c_str());
ASSERT_STREQ("1024 2048 4096 ", FormatStatValue(Type::INT96, smin.c_str()).c_str());
ASSERT_STREQ("2048 4096 8192 ", FormatStatValue(Type::INT96, smax.c_str()).c_str());

ByteArray BA_min;
ByteArray BA_max;
BA_min.ptr = reinterpret_cast<const uint8_t*>("abcdef");
BA_min.len = 6;
BA_max.ptr = reinterpret_cast<const uint8_t*>("ijklmnop");
BA_max.len = 8;
smin = std::string(reinterpret_cast<char*>(&BA_min), sizeof(ByteArray));
smax = std::string(reinterpret_cast<char*>(&BA_max), sizeof(ByteArray));
ASSERT_STREQ("a b c d e f ", FormatValue(Type::BYTE_ARRAY, smin.c_str(), 0).c_str());
ASSERT_STREQ(
"i j k l m n o p ", FormatValue(Type::BYTE_ARRAY, smax.c_str(), 0).c_str());
smin = std::string("abcdef");
smax = std::string("ijklmnop");
ASSERT_STREQ("abcdef ", FormatStatValue(Type::BYTE_ARRAY, smin.c_str()).c_str());
ASSERT_STREQ("ijklmnop ", FormatStatValue(Type::BYTE_ARRAY, smax.c_str()).c_str());

FLBA FLBA_min;
FLBA FLBA_max;
FLBA_min.ptr = reinterpret_cast<const uint8_t*>("abcdefgh");
FLBA_max.ptr = reinterpret_cast<const uint8_t*>("ijklmnop");
int len = 8;
smin = std::string(reinterpret_cast<char*>(&FLBA_min), sizeof(FLBA));
smax = std::string(reinterpret_cast<char*>(&FLBA_max), sizeof(FLBA));
ASSERT_STREQ("a b c d e f g h ",
FormatValue(Type::FIXED_LEN_BYTE_ARRAY, smin.c_str(), len).c_str());
ASSERT_STREQ("i j k l m n o p ",
FormatValue(Type::FIXED_LEN_BYTE_ARRAY, smax.c_str(), len).c_str());
smin = std::string("abcdefgh");
smax = std::string("ijklmnop");
ASSERT_STREQ(
"abcdefgh ", FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smin.c_str()).c_str());
ASSERT_STREQ(
"ijklmnop ", FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smax.c_str()).c_str());
}

} // namespace parquet
64 changes: 55 additions & 9 deletions cpp/src/parquet/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace parquet {

std::string FormatValue(Type::type parquet_type, const char* val, int length) {
std::string FormatStatValue(Type::type parquet_type, const char* val) {
std::stringstream result;
switch (parquet_type) {
case Type::BOOLEAN:
Expand All @@ -49,17 +49,11 @@ std::string FormatValue(Type::type parquet_type, const char* val, int length) {
break;
}
case Type::BYTE_ARRAY: {
const ByteArray* a = reinterpret_cast<const ByteArray*>(val);
for (int i = 0; i < static_cast<int>(a->len); i++) {
result << a[0].ptr[i] << " ";
}
result << val << " ";
break;
}
case Type::FIXED_LEN_BYTE_ARRAY: {
const FLBA* a = reinterpret_cast<const FLBA*>(val);
for (int i = 0; i < length; i++) {
result << a[0].ptr[i] << " ";
}
result << val << " ";
break;
}
default:
Expand All @@ -68,6 +62,58 @@ std::string FormatValue(Type::type parquet_type, const char* val, int length) {
return result.str();
}

std::string encoding_to_string(Encoding::type t) {
switch (t) {
case Encoding::PLAIN:
return "PLAIN";
break;
case Encoding::PLAIN_DICTIONARY:
return "PLAIN_DICTIONARY";
break;
case Encoding::RLE:
return "RLE";
break;
case Encoding::BIT_PACKED:
return "BIT_PACKED";
break;
case Encoding::DELTA_BINARY_PACKED:
return "DELTA_BINARY_PACKED";
break;
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
return "DELTA_LENGTH_BYTE_ARRAY";
break;
case Encoding::DELTA_BYTE_ARRAY:
return "DELTA_BYTE_ARRAY";
break;
case Encoding::RLE_DICTIONARY:
return "RLE_DICTIONARY";
break;
default:
return "UNKNOWN";
break;
}
}

std::string compression_to_string(Compression::type t) {
switch (t) {
case Compression::UNCOMPRESSED:
return "UNCOMPRESSED";
break;
case Compression::SNAPPY:
return "SNAPPY";
break;
case Compression::GZIP:
return "GZIP";
break;
case Compression::LZO:
return "LZO";
break;
default:
return "UNKNOWN";
break;
}
}

std::string type_to_string(Type::type t) {
switch (t) {
case Type::BOOLEAN:
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/parquet/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,15 @@ inline std::string format_fwf(int width) {
return ss.str();
}

std::string compression_to_string(Compression::type t);

std::string encoding_to_string(Encoding::type t);

std::string logical_type_to_string(LogicalType::type t);

std::string type_to_string(Type::type t);

std::string FormatValue(Type::type parquet_type, const char* val, int length);
std::string FormatStatValue(Type::type parquet_type, const char* val);
} // namespace parquet

#endif // PARQUET_TYPES_H
4 changes: 2 additions & 2 deletions cpp/src/parquet/util/rle-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void TestBitArrayValues(int bit_width, int num_vals) {

BitReader reader(buffer, len);
for (int i = 0; i < num_vals; ++i) {
int64_t val;
int64_t val = 0;
bool result = reader.GetValue(bit_width, &val);
EXPECT_TRUE(result);
EXPECT_EQ(val, i % mod);
Expand Down Expand Up @@ -215,7 +215,7 @@ bool CheckRoundTrip(const vector<int>& values, int bit_width) {
if (!result) { return false; }
}
int encoded_len = encoder.Flush();
int out;
int out = 0;

RleDecoder decoder(buffer, encoded_len, bit_width);
for (size_t i = 0; i < values.size(); ++i) {
Expand Down

0 comments on commit 456dd7f

Please sign in to comment.