Skip to content

Commit

Permalink
Fix page reader and support case-sensitivity in struct reader (facebo…
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Jul 21, 2023
1 parent 78a3e2a commit 33f7ee6
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 23 deletions.
6 changes: 5 additions & 1 deletion velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) {
pageData_,
pageData_ + defineLength,
arrow::bit_util::NumRequiredBits(maxDefine_));
wideDefineDecoder_ = std::make_unique<arrow::util::RleDecoder>(
reinterpret_cast<const uint8_t*>(pageData_),
defineLength,
arrow::bit_util::NumRequiredBits(maxDefine_));
} else {
wideDefineDecoder_ = std::make_unique<arrow::util::RleDecoder>(
reinterpret_cast<const uint8_t*>(pageData_),
Expand Down Expand Up @@ -613,7 +617,7 @@ void PageReader::preloadRepDefs() {
}

void PageReader::decodeRepDefs(int32_t numTopLevelRows) {
if (definitionLevels_.empty()) {
if (definitionLevels_.empty() && maxDefine_ > 0) {
preloadRepDefs();
}
repDefBegin_ = repDefEnd_;
Expand Down
12 changes: 8 additions & 4 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ namespace facebook::velox::parquet {
std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec) {
common::ScanSpec& scanSpec,
bool caseSensitive) {
auto colName = scanSpec.fieldName();

switch (dataType->type->kind()) {
Expand All @@ -57,17 +58,20 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
dataType, dataType->type, params, scanSpec);

case TypeKind::ROW:
return std::make_unique<StructColumnReader>(dataType, params, scanSpec);
return std::make_unique<StructColumnReader>(
dataType, params, scanSpec, caseSensitive);

case TypeKind::VARBINARY:
case TypeKind::VARCHAR:
return std::make_unique<StringColumnReader>(dataType, params, scanSpec);

case TypeKind::ARRAY:
return std::make_unique<ListColumnReader>(dataType, params, scanSpec);
return std::make_unique<ListColumnReader>(
dataType, params, scanSpec, caseSensitive);

case TypeKind::MAP:
return std::make_unique<MapColumnReader>(dataType, params, scanSpec);
return std::make_unique<MapColumnReader>(
dataType, params, scanSpec, caseSensitive);

case TypeKind::BOOLEAN:
return std::make_unique<BooleanColumnReader>(dataType, params, scanSpec);
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/ParquetColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ParquetColumnReader {
static std::unique_ptr<dwio::common::SelectiveColumnReader> build(
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
bool caseSensitive);
};
} // namespace facebook::velox::parquet
9 changes: 6 additions & 3 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ int64_t ReaderBase::rowGroupUncompressedSize(

ParquetRowReader::ParquetRowReader(
const std::shared_ptr<ReaderBase>& readerBase,
const dwio::common::RowReaderOptions& options)
const dwio::common::RowReaderOptions& options,
bool caseSensitive)
: pool_(readerBase->getMemoryPool()),
readerBase_(readerBase),
options_(options),
Expand Down Expand Up @@ -537,7 +538,8 @@ ParquetRowReader::ParquetRowReader(
columnReader_ = ParquetColumnReader::build(
readerBase_->schemaWithId(), // Id is schema id
params,
*options_.getScanSpec());
*options_.getScanSpec(),
caseSensitive);

filterRowGroups();
if (!rowGroupIds_.empty()) {
Expand Down Expand Up @@ -665,6 +667,7 @@ ParquetReader::ParquetReader(

std::unique_ptr<dwio::common::RowReader> ParquetReader::createRowReader(
const dwio::common::RowReaderOptions& options) const {
return std::make_unique<ParquetRowReader>(readerBase_, options);
return std::make_unique<ParquetRowReader>(
readerBase_, options, readerBase_->isCaseSensitive());
}
} // namespace facebook::velox::parquet
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/ParquetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class ParquetRowReader : public dwio::common::RowReader {
public:
ParquetRowReader(
const std::shared_ptr<ReaderBase>& readerBase,
const dwio::common::RowReaderOptions& options);
const dwio::common::RowReaderOptions& options,
bool caseSensitive);
~ParquetRowReader() override = default;

int64_t nextRowNumber() override;
Expand Down
16 changes: 9 additions & 7 deletions velox/dwio/parquet/reader/RepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,19 @@ void ensureRepDefs(
MapColumnReader::MapColumnReader(
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
bool caseSensitive)
: dwio::common::SelectiveMapColumnReader(
requestedType,
requestedType,
params,
scanSpec) {
auto& keyChildType = requestedType->childAt(0);
auto& elementChildType = requestedType->childAt(1);
keyReader_ =
ParquetColumnReader::build(keyChildType, params, *scanSpec.children()[0]);
keyReader_ = ParquetColumnReader::build(
keyChildType, params, *scanSpec.children()[0], caseSensitive);
elementReader_ = ParquetColumnReader::build(
elementChildType, params, *scanSpec.children()[1]);
elementChildType, params, *scanSpec.children()[1], caseSensitive);
reinterpret_cast<const ParquetTypeWithId*>(requestedType.get())
->makeLevelInfo(levelInfo_);
children_ = {keyReader_.get(), elementReader_.get()};
Expand Down Expand Up @@ -219,15 +220,16 @@ void MapColumnReader::filterRowGroups(
ListColumnReader::ListColumnReader(
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
bool caseSensitive)
: dwio::common::SelectiveListColumnReader(
requestedType,
requestedType,
params,
scanSpec) {
auto& childType = requestedType->childAt(0);
child_ =
ParquetColumnReader::build(childType, params, *scanSpec.children()[0]);
child_ = ParquetColumnReader::build(
childType, params, *scanSpec.children()[0], caseSensitive);
reinterpret_cast<const ParquetTypeWithId*>(requestedType.get())
->makeLevelInfo(levelInfo_);
children_ = {child_.get()};
Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/parquet/reader/RepeatedColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
MapColumnReader(
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
bool caseSensitive);

void prepareRead(
vector_size_t offset,
Expand Down Expand Up @@ -113,7 +114,8 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
ListColumnReader(
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
bool caseSensitive);

void prepareRead(
vector_size_t offset,
Expand Down
12 changes: 9 additions & 3 deletions velox/dwio/parquet/reader/StructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ namespace facebook::velox::parquet {
StructColumnReader::StructColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
bool caseSensitive)
: SelectiveStructColumnReader(dataType, dataType, params, scanSpec) {
auto& childSpecs = scanSpec_->stableChildren();
for (auto i = 0; i < childSpecs.size(); ++i) {
if (childSpecs[i]->isConstant()) {
continue;
}
auto childDataType = nodeType_->childByName(childSpecs[i]->fieldName());
std::string fieldName = childSpecs[i]->fieldName();
if (!caseSensitive) {
folly::toLowerAscii(fieldName);
}
auto childDataType = nodeType_->childByName(fieldName);

addChild(ParquetColumnReader::build(childDataType, params, *childSpecs[i]));
addChild(ParquetColumnReader::build(
childDataType, params, *childSpecs[i], caseSensitive));
childSpecs[i]->setSubscript(children_.size() - 1);
}
auto type = reinterpret_cast<const ParquetTypeWithId*>(nodeType_.get());
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/StructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader {
StructColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
bool caseSensitive);

void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls)
override;
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added velox/dwio/parquet/tests/examples/type1.parquet
Binary file not shown.
64 changes: 64 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,70 @@ TEST_F(ParquetTableScanTest, timestampFilter) {
"Unsupported expression for range filter: lt(ROW[\"t\"],cast \"2000-09-12 22:36:29\" as TIMESTAMP)");
}

// A fixed core dump issue.
TEST_F(ParquetTableScanTest, map) {
auto vector = makeMapVector<StringView, StringView>({{{"name", "gluten"}}});

loadData(
getExampleFilePath("type1.parquet"),
ROW({"map"}, {MAP(VARCHAR(), VARCHAR())}),
makeRowVector(
{"map"},
{
vector,
}));

assertSelectWithFilter({"map"}, {}, "", "SELECT map FROM tmp");
}

// Array reader result has missing result.
// TEST_F(ParquetTableScanTest, array) {
// auto vector = makeArrayVector<int32_t>({{1, 2, 3}});

// loadData(
// getExampleFilePath("old-repeated-int.parquet"),
// ROW({"repeatedInt"}, {ARRAY(INTEGER())}),
// makeRowVector(
// {"repeatedInt"},
// {
// vector,
// }));

// assertSelectWithFilter({"repeatedInt"}, {}, "", "SELECT repeatedInt FROM
// tmp");
// }

// Failed unit test on Velox map reader.
// TEST_F(ParquetTableScanTest, nestedMapWithStruct) {
// auto vector = makeArrayVector<int32_t>({{1, 2, 3}});

// loadData(
// getExampleFilePath("nested-map-with-struct.parquet"),
// ROW({"_1"}, {MAP(ROW({"_1", "_2"}, {INTEGER(), VARCHAR()}),
// VARCHAR())}), makeRowVector(
// {"_1"},
// {
// vector,
// }));

// assertSelectWithFilter({"_1"}, {}, "", "SELECT _1");
// }

// A fixed core dump issue.
TEST_F(ParquetTableScanTest, singleRowStruct) {
auto vector = makeArrayVector<int32_t>({{1, 2, 3}});
loadData(
getExampleFilePath("single-row-struct.parquet"),
ROW({"s"}, {ROW({"a", "b"}, {BIGINT(), BIGINT()})}),
makeRowVector(
{"s"},
{
vector,
}));

assertSelectWithFilter({"s"}, {}, "", "SELECT (0, 1)");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, false);
Expand Down

0 comments on commit 33f7ee6

Please sign in to comment.