diff --git a/velox/dwio/dwrf/reader/ColumnReader.cpp b/velox/dwio/dwrf/reader/ColumnReader.cpp index b9951c3cd466..1ebc3203ca9e 100644 --- a/velox/dwio/dwrf/reader/ColumnReader.cpp +++ b/velox/dwio/dwrf/reader/ColumnReader.cpp @@ -1024,41 +1024,58 @@ class StringDictionaryColumnReader : public ColumnReader { void ensureInitialized(); - void initOrc(StripeStreams& stripe) { + void init(StripeStreams& stripe) { + auto format = stripe.format(); EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - RleVersion rleVersion = - convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); - dictionaryCount = stripe.getEncodingOrc(encodingKey).dictionarysize(); - - const auto dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); - bool dictVInts = stripe.getUseVInts(dataId); - dictIndex = createRleDecoder( - stripe.getStream(dataId, true), - rleVersion, - memoryPool_, - dictVInts, - dwio::common::INT_BYTE_SIZE); - const auto lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); - bool lenVInts = stripe.getUseVInts(lenId); - lengthDecoder = createRleDecoder( - stripe.getStream(lenId, false), - rleVersion, - memoryPool_, - lenVInts, - dwio::common::INT_BYTE_SIZE); - - blobStream = stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA), false); - } - - void initDwrf(StripeStreams& stripe) { - EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - RleVersion rleVersion = - convertRleVersion(stripe.getEncoding(encodingKey).kind()); - dictionaryCount = stripe.getEncoding(encodingKey).dictionarysize(); + RleVersion rleVersion; + DwrfStreamIdentifier dataId; + DwrfStreamIdentifier lenId; + DwrfStreamIdentifier dictionaryId; + if (format == DwrfFormat::kDwrf) { + rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + dictionaryCount = stripe.getEncoding(encodingKey).dictionarysize(); + dataId = encodingKey.forKind(proto::Stream_Kind_DATA); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + dictionaryId = encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA); + + // handle in dictionary stream + std::unique_ptr inDictStream = + stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY), false); + if (inDictStream) { + inDictionaryReader = + createBooleanRleDecoder(std::move(inDictStream), encodingKey); + + // stride dictionary only exists if in dictionary exists + strideDictStream = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY), true); + DWIO_ENSURE_NOT_NULL(strideDictStream, "Stride dictionary is missing"); + + indexStream_ = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), true); + DWIO_ENSURE_NOT_NULL(indexStream_, "String index is missing"); + + const auto strideDictLenId = + encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH); + bool strideLenVInt = stripe.getUseVInts(strideDictLenId); + strideDictLengthDecoder = createRleDecoder( + stripe.getStream(strideDictLenId, true), + rleVersion, + memoryPool_, + strideLenVInt, + dwio::common::INT_BYTE_SIZE); + } + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + dictionaryCount = stripe.getEncodingOrc(encodingKey).dictionarysize(); + dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + dictionaryId = + encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA); + } - const auto dataId = encodingKey.forKind(proto::Stream_Kind_DATA); bool dictVInts = stripe.getUseVInts(dataId); dictIndex = createRleDecoder( stripe.getStream(dataId, true), @@ -1067,7 +1084,6 @@ class StringDictionaryColumnReader : public ColumnReader { dictVInts, dwio::common::INT_BYTE_SIZE); - const auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); bool lenVInts = stripe.getUseVInts(lenId); lengthDecoder = createRleDecoder( stripe.getStream(lenId, false), @@ -1076,46 +1092,7 @@ class StringDictionaryColumnReader : public ColumnReader { lenVInts, dwio::common::INT_BYTE_SIZE); - blobStream = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA), false); - - // handle in dictionary stream - std::unique_ptr inDictStream = - stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY), false); - if (inDictStream) { - inDictionaryReader = - createBooleanRleDecoder(std::move(inDictStream), encodingKey); - - // stride dictionary only exists if in dictionary exists - strideDictStream = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY), true); - DWIO_ENSURE_NOT_NULL(strideDictStream, "Stride dictionary is missing"); - - indexStream_ = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), true); - DWIO_ENSURE_NOT_NULL(indexStream_, "String index is missing"); - - const auto strideDictLenId = - encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH); - bool strideLenVInt = stripe.getUseVInts(strideDictLenId); - strideDictLengthDecoder = createRleDecoder( - stripe.getStream(strideDictLenId, true), - rleVersion, - memoryPool_, - strideLenVInt, - dwio::common::INT_BYTE_SIZE); - } - } - - void init(StripeStreams& stripe) { - auto format = stripe.format(); - if (format == DwrfFormat::kDwrf) { - initDwrf(stripe); - } else { - VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(stripe); - } + blobStream = stripe.getStream(dictionaryId, false); } public: diff --git a/velox/dwio/dwrf/reader/DwrfData.cpp b/velox/dwio/dwrf/reader/DwrfData.cpp index 5af50699bbb1..37541ba8f551 100644 --- a/velox/dwio/dwrf/reader/DwrfData.cpp +++ b/velox/dwio/dwrf/reader/DwrfData.cpp @@ -22,34 +22,18 @@ namespace facebook::velox::dwrf { void DwrfData::init(StripeStreams& stripe) { auto format = stripe.format(); + EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; + + DwrfStreamIdentifier presentStream; + DwrfStreamIdentifier rowIndexStream; if (format == DwrfFormat::kDwrf) { - initDwrf(stripe); + presentStream = encodingKey.forKind(proto::Stream_Kind_PRESENT); + rowIndexStream = encodingKey.forKind(proto::Stream_Kind_ROW_INDEX); } else { VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(stripe); - } -} - -void DwrfData::initDwrf(StripeStreams& stripe) { - EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - - std::unique_ptr stream = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_PRESENT), - streamLabels.label(), - false); - if (stream) { - notNullDecoder_ = createBooleanRleDecoder(std::move(stream), encodingKey); + presentStream = encodingKey.forKind(proto::orc::Stream_Kind_PRESENT); + rowIndexStream = encodingKey.forKind(proto::orc::Stream_Kind_ROW_INDEX); } - - // We always initialize indexStream_ because indices are needed as - // soon as there is a single filter that can trigger row group skips - // anywhere in the reader tree. This is not known at construct time - // because the first filter can come from a hash join or other run - // time pushdown. - indexStream_ = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), - streamLabels.label(), - false); } void DwrfData::initOrc(StripeStreams& stripe) { @@ -57,6 +41,8 @@ void DwrfData::initOrc(StripeStreams& stripe) { std::unique_ptr stream = stripe.getStream( encodingKey.forKind(proto::orc::Stream_Kind_PRESENT), false); + std::unique_ptr stream = + stripe.getStream(presentStream, streamLabels.label(), false); if (stream) { notNullDecoder_ = createBooleanRleDecoder(std::move(stream), encodingKey); } @@ -66,8 +52,7 @@ void DwrfData::initOrc(StripeStreams& stripe) { // anywhere in the reader tree. This is not known at construct time // because the first filter can come from a hash join or other run // time pushdown. - indexStream_ = stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_ROW_INDEX), false); + indexStream_ = stripe.getStream(rowIndexStream, false); } DwrfData::DwrfData( diff --git a/velox/dwio/dwrf/reader/DwrfData.h b/velox/dwio/dwrf/reader/DwrfData.h index 6d56ea3e746c..4d7c153cf207 100644 --- a/velox/dwio/dwrf/reader/DwrfData.h +++ b/velox/dwio/dwrf/reader/DwrfData.h @@ -97,8 +97,6 @@ class DwrfData : public dwio::common::FormatData { } void init(StripeStreams& stripe); - void initDwrf(StripeStreams& stripe); - void initOrc(StripeStreams& stripe); memory::MemoryPool& memoryPool_; const std::shared_ptr nodeType_; diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index 7097546ab982..a7ae0cda2e7f 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -81,12 +81,12 @@ class ReaderBase { memory::MemoryPool& pool, std::unique_ptr input, std::unique_ptr ps, - const proto::Footer* footer, + std::unique_ptr footer, std::unique_ptr cache, std::unique_ptr handler = nullptr) : pool_{pool}, postScript_{std::move(ps)}, - footer_{std::make_unique(footer)}, + footer_{std::move(footer)}, cache_{std::move(cache)}, handler_{std::move(handler)}, input_{std::move(input)}, @@ -94,31 +94,6 @@ class ReaderBase { std::dynamic_pointer_cast(convertType(*footer_))}, fileLength_{0}, psLength_{0} { - DWIO_ENSURE(footer_->getDwrfPtr()->GetArena()); - DWIO_ENSURE_NOT_NULL(schema_, "invalid schema"); - if (!handler_) { - handler_ = encryption::DecryptionHandler::create(*footer); - } - } - - ReaderBase( - memory::MemoryPool& pool, - std::unique_ptr input, - std::unique_ptr ps, - const proto::orc::Footer* footer, - std::unique_ptr cache, - std::unique_ptr handler = nullptr) - : pool_{pool}, - postScript_{std::move(ps)}, - footer_{std::make_unique(footer)}, - cache_{std::move(cache)}, - handler_{std::move(handler)}, - input_{std::move(input)}, - schema_{ - std::dynamic_pointer_cast(convertType(*footer_))}, - fileLength_{0}, - psLength_{0} { - DWIO_ENSURE(footer_->getOrcPtr()->GetArena()); DWIO_ENSURE_NOT_NULL(schema_, "invalid schema"); if (!handler_) { handler_ = encryption::DecryptionHandler::create(*footer_); diff --git a/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h b/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h index 2528fcf02f20..d4eb4bdc72b8 100644 --- a/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h @@ -24,50 +24,24 @@ class SelectiveByteRleColumnReader : public dwio::common::SelectiveByteRleColumnReader { void init(DwrfParams& params, bool isBool) { auto format = params.stripeStreams().format(); - if (format == DwrfFormat::kDwrf) { - initDwrf(params, isBool); - } else { - VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(params, isBool); - } - } - - void initDwrf(DwrfParams& params, bool isBool) { EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); - if (isBool) { - boolRle_ = createBooleanRleDecoder( - stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_DATA), - params.streamLabels().label(), - true), - encodingKey); + + DwrfStreamIdentifier dataId; + if (format == DwrfFormat::kDwrf) { + dataId = encodingKey.forKind(proto::Stream_Kind_DATA); } else { - byteRle_ = createByteRleDecoder( - stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_DATA), - params.streamLabels().label(), - true), - encodingKey); + VELOX_CHECK(format == DwrfFormat::kOrc); + dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); } - } - void initOrc(DwrfParams& params, bool isBool) { - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto& stripe = params.stripeStreams(); if (isBool) { boolRle_ = createBooleanRleDecoder( - stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_DATA), - params.streamLabels().label(), - true), + stripe.getStream(dataId, params.streamLabels().label(), true), encodingKey); } else { byteRle_ = createByteRleDecoder( - stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_DATA), - params.streamLabels().label(), - true), + stripe.getStream(dataId, params.streamLabels().label(), true), encodingKey); } } diff --git a/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp index 235dcc7ac7e1..14ee7fec5493 100644 --- a/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp @@ -26,29 +26,25 @@ std::unique_ptr> makeLengthDecoder( EncodingKey encodingKey{nodeType.id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); auto format = stripe.format(); + + RleVersion rleVersion; + DwrfStreamIdentifier lenId; if (format == DwrfFormat::kDwrf) { - auto rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - bool lenVints = stripe.getUseVInts(lenId); - return createRleDecoder( - stripe.getStream(lenId, params.streamLabels().label(), true), - rleVersion, - pool, - lenVints, - dwio::common::INT_BYTE_SIZE); + rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); } else { VELOX_CHECK(format == DwrfFormat::kOrc); - auto rleVersion = - convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); - bool lenVints = stripe.getUseVInts(lenId); - return createRleDecoder( - stripe.getStream(lenId, params.streamLabels().label(), true), - rleVersion, - pool, - lenVints, - dwio::common::INT_BYTE_SIZE); + rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); } + + bool lenVints = stripe.getUseVInts(lenId); + return createRleDecoder( + stripe.getStream(lenId, params.streamLabels().label(), true), + rleVersion, + pool, + lenVints, + dwio::common::INT_BYTE_SIZE); } } // namespace diff --git a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp index 55cc41769792..667518aed56e 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp @@ -24,11 +24,57 @@ using namespace dwio::common; void SelectiveStringDictionaryColumnReader::init(DwrfParams& params) { format_ = params.stripeStreams().format(); + auto& stripe = params.stripeStreams(); + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + + DwrfStreamIdentifier dataId; + DwrfStreamIdentifier lenId; + DwrfStreamIdentifier dictId; if (format_ == DwrfFormat::kDwrf) { - initDwrf(params); + rleVersion_ = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + scanState_.dictionary.numValues = + stripe.getEncoding(encodingKey).dictionarysize(); + dataId = encodingKey.forKind(proto::Stream_Kind_DATA); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + dictId = encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA); + + // handle in dictionary stream + std::unique_ptr inDictStream = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY), + params.streamLabels().label(), + false); + if (inDictStream) { + formatData_->as().ensureRowGroupIndex(); + + inDictionaryReader_ = + createBooleanRleDecoder(std::move(inDictStream), encodingKey); + + // stride dictionary only exists if in dictionary exists + strideDictStream_ = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY), + params.streamLabels().label(), + true); + DWIO_ENSURE_NOT_NULL(strideDictStream_, "Stride dictionary is missing"); + + const auto strideDictLenId = + encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH); + bool strideLenVInt = stripe.getUseVInts(strideDictLenId); + strideDictLengthDecoder_ = createRleDecoder( + stripe.getStream( + strideDictLenId, params.streamLabels().label(), true), + rleVersion_, + memoryPool_, + strideLenVInt, + dwio::common::INT_BYTE_SIZE); + } } else { VELOX_CHECK(format_ == DwrfFormat::kOrc); - initOrc(params); + rleVersion_ = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + scanState_.dictionary.numValues = + stripe.getEncodingOrc(encodingKey).dictionarysize(); + dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + dictId = encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA); } } @@ -40,7 +86,6 @@ void SelectiveStringDictionaryColumnReader::initDwrf(DwrfParams& params) { scanState_.dictionary.numValues = stripe.getEncoding(encodingKey).dictionarysize(); - const auto dataId = encodingKey.forKind(proto::Stream_Kind_DATA); bool dictVInts = stripe.getUseVInts(dataId); dictIndex_ = createRleDecoder( stripe.getStream(dataId, params.streamLabels().label(), true), @@ -49,7 +94,6 @@ void SelectiveStringDictionaryColumnReader::initDwrf(DwrfParams& params) { dictVInts, dwio::common::INT_BYTE_SIZE); - const auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); bool lenVInts = stripe.getUseVInts(lenId); lengthDecoder_ = createRleDecoder( stripe.getStream(lenId, params.streamLabels().label(), false), diff --git a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h index ea2d747237f1..608b4cd864c7 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h @@ -72,8 +72,6 @@ class SelectiveStringDictionaryColumnReader void makeDictionaryBaseVector(); void init(DwrfParams& params); - void initDwrf(DwrfParams& params); - void initOrc(DwrfParams& params); template void readWithVisitor(RowSet rows, TVisitor visitor); diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp index c2f8600b1814..8ceb07106cbd 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp @@ -20,33 +20,29 @@ namespace facebook::velox::dwrf { -void SelectiveStringDirectColumnReader::initDwrf(DwrfParams& params) { +void SelectiveStringDirectColumnReader::init(DwrfParams& params) { + auto format = params.stripeStreams().format(); EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); - - RleVersion rleVersion = - convertRleVersion(stripe.getEncoding(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - bool lenVInts = stripe.getUseVInts(lenId); - lengthDecoder_ = createRleDecoder( - stripe.getStream(lenId, params.streamLabels().label(), true), - rleVersion, - memoryPool_, - lenVInts, - dwio::common::INT_BYTE_SIZE); - blobStream_ = stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_DATA), - params.streamLabels().label(), - true); } void SelectiveStringDirectColumnReader::initOrc(DwrfParams& params) { EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); + DwrfStreamIdentifier lenId; + DwrfStreamIdentifier dataId; + RleVersion rleVersion; + if (format == DwrfFormat::kDwrf) { + rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + dataId = encodingKey.forKind(proto::Stream_Kind_DATA); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + } - RleVersion rleVersion = - convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); bool lenVInts = stripe.getUseVInts(lenId); lengthDecoder_ = createRleDecoder( stripe.getStream(lenId, true), @@ -54,8 +50,7 @@ void SelectiveStringDirectColumnReader::initOrc(DwrfParams& params) { memoryPool_, lenVInts, dwio::common::INT_BYTE_SIZE); - blobStream_ = - stripe.getStream(encodingKey.forKind(proto::orc::Stream_Kind_DATA), true); + blobStream_ = stripe.getStream(dataId, true); } SelectiveStringDirectColumnReader::SelectiveStringDirectColumnReader( diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h index 89eb1c671d90..0157612fb83e 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h @@ -23,20 +23,6 @@ namespace facebook::velox::dwrf { class SelectiveStringDirectColumnReader : public dwio::common::SelectiveColumnReader { - void init(DwrfParams& params) { - auto format = params.stripeStreams().format(); - if (format == DwrfFormat::kDwrf) { - initDwrf(params); - } else { - VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(params); - } - } - - void initDwrf(DwrfParams& params); - - void initOrc(DwrfParams& params); - public: using ValueType = StringView; SelectiveStringDirectColumnReader( @@ -56,6 +42,7 @@ class SelectiveStringDirectColumnReader bufferStart_ = bufferEnd_; } + void init(DwrfParams& params); uint64_t skip(uint64_t numValues) override; void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) diff --git a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h index ca7f5dc4ce40..44bd6589460b 100644 --- a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h @@ -89,36 +89,26 @@ struct SelectiveStructColumnReader : SelectiveStructColumnReaderBase { private: void init(DwrfParams& params) { auto format = params.stripeStreams().format(); + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto& stripe = params.stripeStreams(); if (format == DwrfFormat::kDwrf) { - initDwrf(params); + auto encoding = + static_cast(stripe.getEncoding(encodingKey).kind()); + DWIO_ENSURE_EQ( + encoding, + proto::ColumnEncoding_Kind_DIRECT, + "Unknown dwrf encoding for StructColumnReader"); } else { VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(params); + auto encoding = + static_cast(stripe.getEncodingOrc(encodingKey).kind()); + DWIO_ENSURE_EQ( + encoding, + proto::orc::ColumnEncoding_Kind_DIRECT, + "Unknown orc encoding for StructColumnReader"); } } - void initDwrf(DwrfParams& params) { - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto& stripe = params.stripeStreams(); - auto encoding = - static_cast(stripe.getEncoding(encodingKey).kind()); - DWIO_ENSURE_EQ( - encoding, - proto::ColumnEncoding_Kind_DIRECT, - "Unknown dwrf encoding for StructColumnReader"); - } - - void initOrc(DwrfParams& params) { - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto& stripe = params.stripeStreams(); - auto encoding = - static_cast(stripe.getEncodingOrc(encodingKey).kind()); - DWIO_ENSURE_EQ( - encoding, - proto::orc::ColumnEncoding_Kind_DIRECT, - "Unknown orc encoding for StructColumnReader"); - } - void addChild(std::unique_ptr child) { children_.push_back(child.get()); childrenOwned_.push_back(std::move(child)); diff --git a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp index 35472662fb52..b58003e8cbf3 100644 --- a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp @@ -22,7 +22,8 @@ namespace facebook::velox::dwrf { using namespace dwio::common; -void SelectiveTimestampColumnReader::initDwrf(DwrfParams& params) { +void SelectiveTimestampColumnReader::init(DwrfParams& params) { + auto format = params.stripeStreams().format(); EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); version_ = convertRleVersion(stripe.getEncoding(encodingKey).kind()); @@ -48,22 +49,31 @@ void SelectiveTimestampColumnReader::initDwrf(DwrfParams& params) { void SelectiveTimestampColumnReader::initOrc(DwrfParams& params) { EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); - version = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + DwrfStreamIdentifier dataId; + DwrfStreamIdentifier nanoDataId; + if (format == DwrfFormat::kDwrf) { + version = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + dataId = encodingKey.forKind(proto::Stream_Kind_DATA); + nanoDataId = encodingKey.forKind(proto::Stream_Kind_NANO_DATA); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + version = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + nanoDataId = encodingKey.forKind(proto::orc::Stream_Kind_SECONDARY); + } - auto data = encodingKey.forKind(proto::orc::Stream_Kind_DATA); - bool vints = stripe.getUseVInts(data); + bool vints = stripe.getUseVInts(dataId); seconds_ = createRleDecoder( - stripe.getStream(data, true), + stripe.getStream(dataId, params.streamLabels().label(), true), version, memoryPool_, vints, LONG_BYTE_SIZE); - auto nanoData = encodingKey.forKind(proto::orc::Stream_Kind_SECONDARY); - bool nanoVInts = stripe.getUseVInts(nanoData); + bool nanoVInts = stripe.getUseVInts(nanoDataId); nano_ = createRleDecoder( - stripe.getStream(nanoData, true), + stripe.getStream(nanoDataId, params.streamLabels().label(), true), version, memoryPool_, nanoVInts, diff --git a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h index c6a882116d22..914e441af02a 100644 --- a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h @@ -23,20 +23,6 @@ namespace facebook::velox::dwrf { class SelectiveTimestampColumnReader : public dwio::common::SelectiveColumnReader { - void init(DwrfParams& params) { - auto format = params.stripeStreams().format(); - if (format == DwrfFormat::kDwrf) { - initDwrf(params); - } else { - VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(params); - } - } - - void initDwrf(DwrfParams& params); - - void initOrc(DwrfParams& params); - public: // The readers produce int64_t, the vector is Timestamps. using ValueType = int64_t; @@ -46,6 +32,7 @@ class SelectiveTimestampColumnReader DwrfParams& params, common::ScanSpec& scanSpec); + void init(DwrfParams& params); void seekToRowGroup(uint32_t index) override; uint64_t skip(uint64_t numValues) override; diff --git a/velox/dwio/dwrf/test/ReaderBaseTests.cpp b/velox/dwio/dwrf/test/ReaderBaseTests.cpp index 5f7002b67e33..c3f743d2959b 100644 --- a/velox/dwio/dwrf/test/ReaderBaseTests.cpp +++ b/velox/dwio/dwrf/test/ReaderBaseTests.cpp @@ -101,7 +101,7 @@ class EncryptedStatsTest : public Test { *readerPool_, std::make_unique(readFile, *readerPool_), std::make_unique(std::move(ps)), - footer, + std::make_unique(footer), nullptr, std::move(handler)); } diff --git a/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp b/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp index d27a81139edd..71ded6207551 100644 --- a/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp +++ b/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp @@ -70,7 +70,7 @@ class StripeLoadKeysTest : public Test { std::make_unique( std::make_shared(std::string()), *pool_), nullptr, - footer, + std::make_unique(footer), nullptr, std::move(handler)); stripeReader_ = diff --git a/velox/dwio/dwrf/test/TestStripeStream.cpp b/velox/dwio/dwrf/test/TestStripeStream.cpp index 6fe58bbd7ea0..0a8d55135bf9 100644 --- a/velox/dwio/dwrf/test/TestStripeStream.cpp +++ b/velox/dwio/dwrf/test/TestStripeStream.cpp @@ -118,7 +118,7 @@ TEST(StripeStream, planReads) { BufferedInput::kMaxMergeDistance, true), std::make_unique(proto::PostScript{}), - footer, + std::make_unique(footer), nullptr); ColumnSelector cs{readerBase->getSchema(), std::vector{2}, true}; auto stripeFooter = @@ -163,7 +163,7 @@ TEST(StripeStream, filterSequences) { *pool, std::make_unique(std::move(is), *pool), std::make_unique(proto::PostScript{}), - footer, + std::make_unique(footer), nullptr); // mock a filter that we only need one node and one sequence @@ -222,7 +222,7 @@ TEST(StripeStream, zeroLength) { *pool, std::make_unique(std::move(is), *pool), std::make_unique(std::move(ps)), - footer, + std::make_unique(footer), nullptr); auto stripeFooter = @@ -297,7 +297,7 @@ TEST(StripeStream, planReadsIndex) { *pool, std::make_unique(std::move(is), *pool), std::make_unique(std::move(ps)), - footer, + std::make_unique(footer), std::move(cache)); auto stripeFooter = @@ -421,7 +421,7 @@ TEST(StripeStream, readEncryptedStreams) { std::make_shared(std::string()), *readerPool), std::make_unique(std::move(ps)), - footer, + std::make_unique(footer), nullptr, std::move(handler)); auto stripeReader = @@ -491,7 +491,7 @@ TEST(StripeStream, schemaMismatch) { std::make_shared(std::string()), *pool), std::make_unique(std::move(ps)), - footer, + std::make_unique(footer), nullptr, std::move(handler)); auto stripeReader =