Skip to content

Commit

Permalink
Support DELTA_BYTE_ARRAY encoding in native Parquet reader (#10589)
Browse files Browse the repository at this point in the history
Summary:
Support Parquet v2 DELTA_BYTE_ARRAY encoding, it is an encoding for string.

Pull Request resolved: #10589

Reviewed By: Yuhta

Differential Revision: D64920962

Pulled By: pedroerp

fbshipit-source-id: 2624f26c3ad8ddc4d58b3c6bb578c9f19cacc57a
  • Loading branch information
liujiayi771 authored and facebook-github-bot committed Oct 29, 2024
1 parent 2e0acb7 commit 6440a44
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 1 deletion.
8 changes: 8 additions & 0 deletions velox/dwio/parquet/reader/DeltaBpDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ class DeltaBpDecoder {
return static_cast<int64_t>(totalValuesRemaining_);
}

template <typename T>
void readValues(T* values, int32_t numValues) {
VELOX_DCHECK_LE(numValues, totalValuesRemaining_);
for (auto i = 0; i < numValues; i++) {
values[i] = T(readLong());
}
}

private:
bool getVlqInt(uint64_t& v) {
uint64_t tmp = 0;
Expand Down
183 changes: 183 additions & 0 deletions velox/dwio/parquet/reader/DeltaByteArrayDecoder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/common/base/BitUtil.h"
#include "velox/dwio/parquet/reader/DeltaBpDecoder.h"

namespace facebook::velox::parquet {

// DeltaByteArrayDecoder is adapted from Apache Arrow:
// https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/parquet/encoding.cc#L2758-L2889
class DeltaLengthByteArrayDecoder {
public:
explicit DeltaLengthByteArrayDecoder(const char* start) {
lengthDecoder_ = std::make_unique<DeltaBpDecoder>(start);
decodeLengths();
bufferStart_ = lengthDecoder_->bufferStart();
}

std::string_view readString() {
int32_t dataSize = 0;
const int64_t length = bufferedLength_[lengthIdx_++];
VELOX_CHECK_GE(length, 0, "negative string delta length");
bufferStart_ += length;
return std::string_view(bufferStart_ - length, length);
}

private:
void decodeLengths() {
int64_t numLength = lengthDecoder_->validValuesCount();
bufferedLength_.resize(numLength);
lengthDecoder_->readValues<uint32_t>(bufferedLength_.data(), numLength);

lengthIdx_ = 0;
numValidValues_ = numLength;
}

const char* bufferStart_;
std::unique_ptr<DeltaBpDecoder> lengthDecoder_;
int32_t numValidValues_{0};
uint32_t lengthIdx_{0};
std::vector<uint32_t> bufferedLength_;
};

// DeltaByteArrayDecoder is adapted from Apache Arrow:
// https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/parquet/encoding.cc#L3301-L3545
class DeltaByteArrayDecoder {
public:
explicit DeltaByteArrayDecoder(const char* start) {
prefixLenDecoder_ = std::make_unique<DeltaBpDecoder>(start);
int64_t numPrefix = prefixLenDecoder_->validValuesCount();
bufferedPrefixLength_.resize(numPrefix);
prefixLenDecoder_->readValues<uint32_t>(
bufferedPrefixLength_.data(), numPrefix);
prefixLenOffset_ = 0;
numValidValues_ = numPrefix;

suffixDecoder_ = std::make_unique<DeltaLengthByteArrayDecoder>(
prefixLenDecoder_->bufferStart());
}

void skip(uint64_t numValues) {
skip<false>(numValues, 0, nullptr);
}

template <bool hasNulls>
inline void skip(int32_t numValues, int32_t current, const uint64_t* nulls) {
if (hasNulls) {
numValues = bits::countNonNulls(nulls, current, current + numValues);
}
for (int32_t i = 0; i < numValues; ++i) {
readString();
}
}

template <bool hasNulls, typename Visitor>
void readWithVisitor(const uint64_t* nulls, Visitor visitor) {
int32_t current = visitor.start();
skip<hasNulls>(current, 0, nulls);
int32_t toSkip;
bool atEnd = false;
const bool allowNulls = hasNulls && visitor.allowNulls();
for (;;) {
if (hasNulls && allowNulls && bits::isBitNull(nulls, current)) {
toSkip = visitor.processNull(atEnd);
} else {
if (hasNulls && !allowNulls) {
toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd);
if (!Visitor::dense) {
skip<false>(toSkip, current, nullptr);
}
if (atEnd) {
return;
}
}

// We are at a non-null value on a row to visit.
toSkip = visitor.process(readString(), atEnd);
}
++current;
if (toSkip) {
skip<hasNulls>(toSkip, current, nulls);
current += toSkip;
}
if (atEnd) {
return;
}
}
}

std::string_view readString() {
auto suffix = suffixDecoder_->readString();
bool isFirstRun = (prefixLenOffset_ == 0);
const int64_t prefixLength = bufferedPrefixLength_[prefixLenOffset_++];

VELOX_CHECK_GE(
prefixLength, 0, "negative prefix length in DELTA_BYTE_ARRAY");

buildReadValue(isFirstRun, prefixLength, suffix);

numValidValues_--;
return {lastValue_};
}

private:
void buildReadValue(
bool isFirstRun,
const int64_t prefixLength,
std::string_view suffix) {
VELOX_CHECK_LE(
prefixLength,
lastValue_.size(),
"prefix length too large in DELTA_BYTE_ARRAY");

if (prefixLength == 0) {
// prefix is empty.
lastValue_ = std::string{suffix};
return;
}

if (!isFirstRun) {
if (suffix.empty()) {
// suffix is empty: read value can simply point to the prefix
// of the lastValue_. This is not possible for the first run since
// the prefix would point to the mutable `lastValue_`.
lastValue_ = lastValue_.substr(0, prefixLength);
return;
}
}

lastValue_.resize(prefixLength + suffix.size());

// Both prefix and suffix are non-empty, so we need to decode the string
// into read value.
// Just keep the prefix in lastValue_, and copy the suffix.
memcpy(lastValue_.data() + prefixLength, suffix.data(), suffix.size());
}

std::unique_ptr<DeltaBpDecoder> prefixLenDecoder_;
std::unique_ptr<DeltaBpDecoder> suffixLenDecoder_;
std::unique_ptr<DeltaLengthByteArrayDecoder> suffixDecoder_;

std::string lastValue_;
int32_t numValidValues_{0};
uint32_t prefixLenOffset_{0};
std::vector<uint32_t> bufferedPrefixLength_;
};

} // namespace facebook::velox::parquet
9 changes: 9 additions & 0 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,13 @@ void PageReader::makeDecoder() {
"DELTA_BINARY_PACKED decoder only supports INT32 and INT64");
}
break;
case Encoding::DELTA_BYTE_ARRAY:
if (parquetType == thrift::Type::BYTE_ARRAY) {
deltaByteArrDecoder_ =
std::make_unique<DeltaByteArrayDecoder>(pageData_);
break;
}
FMT_FALLTHROUGH;
default:
VELOX_UNSUPPORTED("Encoding not supported yet: {}", encoding_);
}
Expand Down Expand Up @@ -739,6 +746,8 @@ void PageReader::skip(int64_t numRows) {
booleanDecoder_->skip(toSkip);
} else if (deltaBpDecoder_) {
deltaBpDecoder_->skip(toSkip);
} else if (deltaByteArrDecoder_) {
deltaByteArrDecoder_->skip(toSkip);
} else {
VELOX_FAIL("No decoder to skip");
}
Expand Down
11 changes: 11 additions & 0 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "velox/dwio/common/compression/Compression.h"
#include "velox/dwio/parquet/reader/BooleanDecoder.h"
#include "velox/dwio/parquet/reader/DeltaBpDecoder.h"
#include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h"
#include "velox/dwio/parquet/reader/ParquetTypeWithId.h"
#include "velox/dwio/parquet/reader/RleBpDataDecoder.h"
#include "velox/dwio/parquet/reader/StringDecoder.h"
Expand Down Expand Up @@ -131,6 +132,10 @@ class PageReader {
return encoding_ == thrift::Encoding::DELTA_BINARY_PACKED;
}

bool isDeltaByteArray() const {
return encoding_ == thrift::Encoding::DELTA_BYTE_ARRAY;
}

/// Returns the range of repdefs for the top level rows covered by the last
/// decoderepDefs().
std::pair<int32_t, int32_t> repDefRange() const {
Expand Down Expand Up @@ -305,6 +310,9 @@ class PageReader {
nullsFromFastPath = dwio::common::useFastPath<Visitor, true>(visitor);
auto dictVisitor = visitor.toStringDictionaryColumnVisitor();
dictionaryIdDecoder_->readWithVisitor<true>(nulls, dictVisitor);
} else if (encoding_ == thrift::Encoding::DELTA_BYTE_ARRAY) {
nullsFromFastPath = false;
deltaByteArrDecoder_->readWithVisitor<true>(nulls, visitor);
} else {
nullsFromFastPath = false;
stringDecoder_->readWithVisitor<true>(nulls, visitor);
Expand All @@ -313,6 +321,8 @@ class PageReader {
if (isDictionary()) {
auto dictVisitor = visitor.toStringDictionaryColumnVisitor();
dictionaryIdDecoder_->readWithVisitor<false>(nullptr, dictVisitor);
} else if (encoding_ == thrift::Encoding::DELTA_BYTE_ARRAY) {
deltaByteArrDecoder_->readWithVisitor<false>(nulls, visitor);
} else {
stringDecoder_->readWithVisitor<false>(nulls, visitor);
}
Expand Down Expand Up @@ -489,6 +499,7 @@ class PageReader {
std::unique_ptr<StringDecoder> stringDecoder_;
std::unique_ptr<BooleanDecoder> booleanDecoder_;
std::unique_ptr<DeltaBpDecoder> deltaBpDecoder_;
std::unique_ptr<DeltaByteArrayDecoder> deltaByteArrDecoder_;
// Add decoders for other encodings here.
};

Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/parquet/reader/ParquetData.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ class ParquetData : public dwio::common::FormatData {
return reader_->isDeltaBinaryPacked();
}

bool isDeltaByteArray() const {
return reader_->isDeltaByteArray();
}

bool parentNullsInLeaves() const override {
return true;
}
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/StringColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class StringColumnReader : public dwio::common::SelectiveColumnReader {

bool hasBulkPath() const override {
// Non-dictionary encodings do not have fast path.
return scanState_.dictionary.values != nullptr;
return !formatData_->as<ParquetData>().isDeltaByteArray() &&
scanState_.dictionary.values != nullptr;
}

void seekToRowGroup(int64_t index) override {
Expand Down
Binary file not shown.
17 changes: 17 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,23 @@ TEST_F(E2EFilterTest, stringDictionary) {
20);
}

TEST_F(E2EFilterTest, stringDeltaByteArray) {
options_.enableDictionary = false;
options_.encoding =
facebook::velox::parquet::arrow::Encoding::DELTA_BYTE_ARRAY;

testWithTypes(
"string_val:string,"
"string_val_2:string",
[&]() {
makeStringUnique("string_val");
makeStringUnique("string_val_2");
},
true,
{"string_val", "string_val_2"},
20);
}

TEST_F(E2EFilterTest, dedictionarize) {
rowsInRowGroup_ = 10'000;
options_.dictionaryPageSizeLimit = 20'000;
Expand Down
13 changes: 13 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,19 @@ TEST_F(ParquetTableScanTest, readParquetColumnsByIndex) {
VeloxRuntimeError);
}

TEST_F(ParquetTableScanTest, deltaByteArray) {
auto a = makeFlatVector<StringView>({"axis", "axle", "babble", "babyhood"});
auto expected = makeRowVector({"a"}, {a});
createDuckDbTable("expected", {expected});

auto vector = makeFlatVector<StringView>({{}});
loadData(
getExampleFilePath("delta_byte_array.parquet"),
ROW({"a"}, {VARCHAR()}),
makeRowVector({"a"}, {vector}));
assertSelect({"a"}, "SELECT a from expected");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
Expand Down

0 comments on commit 6440a44

Please sign in to comment.