diff --git a/velox/dwio/dwrf/reader/ColumnReader.cpp b/velox/dwio/dwrf/reader/ColumnReader.cpp index b9951c3cd466..1ebc3203ca9e 100644 --- a/velox/dwio/dwrf/reader/ColumnReader.cpp +++ b/velox/dwio/dwrf/reader/ColumnReader.cpp @@ -1024,41 +1024,58 @@ class StringDictionaryColumnReader : public ColumnReader { void ensureInitialized(); - void initOrc(StripeStreams& stripe) { + void init(StripeStreams& stripe) { + auto format = stripe.format(); EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - RleVersion rleVersion = - convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); - dictionaryCount = stripe.getEncodingOrc(encodingKey).dictionarysize(); - - const auto dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); - bool dictVInts = stripe.getUseVInts(dataId); - dictIndex = createRleDecoder( - stripe.getStream(dataId, true), - rleVersion, - memoryPool_, - dictVInts, - dwio::common::INT_BYTE_SIZE); - const auto lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); - bool lenVInts = stripe.getUseVInts(lenId); - lengthDecoder = createRleDecoder( - stripe.getStream(lenId, false), - rleVersion, - memoryPool_, - lenVInts, - dwio::common::INT_BYTE_SIZE); - - blobStream = stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA), false); - } - - void initDwrf(StripeStreams& stripe) { - EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - RleVersion rleVersion = - convertRleVersion(stripe.getEncoding(encodingKey).kind()); - dictionaryCount = stripe.getEncoding(encodingKey).dictionarysize(); + RleVersion rleVersion; + DwrfStreamIdentifier dataId; + DwrfStreamIdentifier lenId; + DwrfStreamIdentifier dictionaryId; + if (format == DwrfFormat::kDwrf) { + rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + dictionaryCount = stripe.getEncoding(encodingKey).dictionarysize(); + dataId = encodingKey.forKind(proto::Stream_Kind_DATA); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + dictionaryId = encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA); + + // handle in dictionary stream + std::unique_ptr inDictStream = + stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY), false); + if (inDictStream) { + inDictionaryReader = + createBooleanRleDecoder(std::move(inDictStream), encodingKey); + + // stride dictionary only exists if in dictionary exists + strideDictStream = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY), true); + DWIO_ENSURE_NOT_NULL(strideDictStream, "Stride dictionary is missing"); + + indexStream_ = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), true); + DWIO_ENSURE_NOT_NULL(indexStream_, "String index is missing"); + + const auto strideDictLenId = + encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH); + bool strideLenVInt = stripe.getUseVInts(strideDictLenId); + strideDictLengthDecoder = createRleDecoder( + stripe.getStream(strideDictLenId, true), + rleVersion, + memoryPool_, + strideLenVInt, + dwio::common::INT_BYTE_SIZE); + } + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + dictionaryCount = stripe.getEncodingOrc(encodingKey).dictionarysize(); + dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + dictionaryId = + encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA); + } - const auto dataId = encodingKey.forKind(proto::Stream_Kind_DATA); bool dictVInts = stripe.getUseVInts(dataId); dictIndex = createRleDecoder( stripe.getStream(dataId, true), @@ -1067,7 +1084,6 @@ class StringDictionaryColumnReader : public ColumnReader { dictVInts, dwio::common::INT_BYTE_SIZE); - const auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); bool lenVInts = stripe.getUseVInts(lenId); lengthDecoder = createRleDecoder( stripe.getStream(lenId, false), @@ -1076,46 +1092,7 @@ class StringDictionaryColumnReader : public ColumnReader { lenVInts, dwio::common::INT_BYTE_SIZE); - blobStream = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA), false); - - // handle in dictionary stream - std::unique_ptr inDictStream = - stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY), false); - if (inDictStream) { - inDictionaryReader = - createBooleanRleDecoder(std::move(inDictStream), encodingKey); - - // stride dictionary only exists if in dictionary exists - strideDictStream = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY), true); - DWIO_ENSURE_NOT_NULL(strideDictStream, "Stride dictionary is missing"); - - indexStream_ = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), true); - DWIO_ENSURE_NOT_NULL(indexStream_, "String index is missing"); - - const auto strideDictLenId = - encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH); - bool strideLenVInt = stripe.getUseVInts(strideDictLenId); - strideDictLengthDecoder = createRleDecoder( - stripe.getStream(strideDictLenId, true), - rleVersion, - memoryPool_, - strideLenVInt, - dwio::common::INT_BYTE_SIZE); - } - } - - void init(StripeStreams& stripe) { - auto format = stripe.format(); - if (format == DwrfFormat::kDwrf) { - initDwrf(stripe); - } else { - VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(stripe); - } + blobStream = stripe.getStream(dictionaryId, false); } public: diff --git a/velox/dwio/dwrf/reader/DwrfData.cpp b/velox/dwio/dwrf/reader/DwrfData.cpp index 5af50699bbb1..37541ba8f551 100644 --- a/velox/dwio/dwrf/reader/DwrfData.cpp +++ b/velox/dwio/dwrf/reader/DwrfData.cpp @@ -22,34 +22,18 @@ namespace facebook::velox::dwrf { void DwrfData::init(StripeStreams& stripe) { auto format = stripe.format(); + EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; + + DwrfStreamIdentifier presentStream; + DwrfStreamIdentifier rowIndexStream; if (format == DwrfFormat::kDwrf) { - initDwrf(stripe); + presentStream = encodingKey.forKind(proto::Stream_Kind_PRESENT); + rowIndexStream = encodingKey.forKind(proto::Stream_Kind_ROW_INDEX); } else { VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(stripe); - } -} - -void DwrfData::initDwrf(StripeStreams& stripe) { - EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - - std::unique_ptr stream = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_PRESENT), - streamLabels.label(), - false); - if (stream) { - notNullDecoder_ = createBooleanRleDecoder(std::move(stream), encodingKey); + presentStream = encodingKey.forKind(proto::orc::Stream_Kind_PRESENT); + rowIndexStream = encodingKey.forKind(proto::orc::Stream_Kind_ROW_INDEX); } - - // We always initialize indexStream_ because indices are needed as - // soon as there is a single filter that can trigger row group skips - // anywhere in the reader tree. This is not known at construct time - // because the first filter can come from a hash join or other run - // time pushdown. - indexStream_ = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), - streamLabels.label(), - false); } void DwrfData::initOrc(StripeStreams& stripe) { @@ -57,6 +41,8 @@ void DwrfData::initOrc(StripeStreams& stripe) { std::unique_ptr stream = stripe.getStream( encodingKey.forKind(proto::orc::Stream_Kind_PRESENT), false); + std::unique_ptr stream = + stripe.getStream(presentStream, streamLabels.label(), false); if (stream) { notNullDecoder_ = createBooleanRleDecoder(std::move(stream), encodingKey); } @@ -66,8 +52,7 @@ void DwrfData::initOrc(StripeStreams& stripe) { // anywhere in the reader tree. This is not known at construct time // because the first filter can come from a hash join or other run // time pushdown. - indexStream_ = stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_ROW_INDEX), false); + indexStream_ = stripe.getStream(rowIndexStream, false); } DwrfData::DwrfData( diff --git a/velox/dwio/dwrf/reader/DwrfData.h b/velox/dwio/dwrf/reader/DwrfData.h index 6d56ea3e746c..4d7c153cf207 100644 --- a/velox/dwio/dwrf/reader/DwrfData.h +++ b/velox/dwio/dwrf/reader/DwrfData.h @@ -97,8 +97,6 @@ class DwrfData : public dwio::common::FormatData { } void init(StripeStreams& stripe); - void initDwrf(StripeStreams& stripe); - void initOrc(StripeStreams& stripe); memory::MemoryPool& memoryPool_; const std::shared_ptr nodeType_; diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index c645bddabad7..c9c764f92565 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -81,12 +81,12 @@ class ReaderBase { memory::MemoryPool& pool, std::unique_ptr input, std::unique_ptr ps, - const proto::Footer* footer, + std::unique_ptr footer, std::unique_ptr cache, std::unique_ptr handler = nullptr) : pool_{pool}, postScript_{std::move(ps)}, - footer_{std::make_unique(footer)}, + footer_{std::move(footer)}, cache_{std::move(cache)}, handler_{std::move(handler)}, input_{std::move(input)}, @@ -94,31 +94,6 @@ class ReaderBase { std::dynamic_pointer_cast(convertType(*footer_))}, fileLength_{0}, psLength_{0} { - DWIO_ENSURE(footer_->getDwrfPtr()->GetArena()); - DWIO_ENSURE_NOT_NULL(schema_, "invalid schema"); - if (!handler_) { - handler_ = encryption::DecryptionHandler::create(*footer); - } - } - - ReaderBase( - memory::MemoryPool& pool, - std::unique_ptr input, - std::unique_ptr ps, - const proto::orc::Footer* footer, - std::unique_ptr cache, - std::unique_ptr handler = nullptr) - : pool_{pool}, - postScript_{std::move(ps)}, - footer_{std::make_unique(footer)}, - cache_{std::move(cache)}, - handler_{std::move(handler)}, - input_{std::move(input)}, - schema_{ - std::dynamic_pointer_cast(convertType(*footer_))}, - fileLength_{0}, - psLength_{0} { - DWIO_ENSURE(footer_->getOrcPtr()->GetArena()); DWIO_ENSURE_NOT_NULL(schema_, "invalid schema"); if (!handler_) { handler_ = encryption::DecryptionHandler::create(*footer_); diff --git a/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h b/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h index e16c3ca5e123..ff67ab8f75ae 100644 --- a/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h @@ -45,39 +45,22 @@ class SelectiveByteRleColumnReader std::move(dataType)) { EncodingKey encodingKey{fileType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); - if (isBool) { - boolRle_ = createBooleanRleDecoder( - stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_DATA), - params.streamLabels().label(), - true), - encodingKey); + + DwrfStreamIdentifier dataId; + if (format == DwrfFormat::kDwrf) { + dataId = encodingKey.forKind(proto::Stream_Kind_DATA); } else { - byteRle_ = createByteRleDecoder( - stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_DATA), - params.streamLabels().label(), - true), - encodingKey); + VELOX_CHECK(format == DwrfFormat::kOrc); + dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); } - } - void initOrc(DwrfParams& params, bool isBool) { - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto& stripe = params.stripeStreams(); if (isBool) { boolRle_ = createBooleanRleDecoder( - stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_DATA), - params.streamLabels().label(), - true), + stripe.getStream(dataId, params.streamLabels().label(), true), encodingKey); } else { byteRle_ = createByteRleDecoder( - stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_DATA), - params.streamLabels().label(), - true), + stripe.getStream(dataId, params.streamLabels().label(), true), encodingKey); } } diff --git a/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp index f5d364a40799..a3db62c57984 100644 --- a/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp @@ -26,29 +26,25 @@ std::unique_ptr> makeLengthDecoder( EncodingKey encodingKey{nodeType.id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); auto format = stripe.format(); + + RleVersion rleVersion; + DwrfStreamIdentifier lenId; if (format == DwrfFormat::kDwrf) { - auto rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - bool lenVints = stripe.getUseVInts(lenId); - return createRleDecoder( - stripe.getStream(lenId, params.streamLabels().label(), true), - rleVersion, - pool, - lenVints, - dwio::common::INT_BYTE_SIZE); + rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); } else { VELOX_CHECK(format == DwrfFormat::kOrc); - auto rleVersion = - convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); - bool lenVints = stripe.getUseVInts(lenId); - return createRleDecoder( - stripe.getStream(lenId, params.streamLabels().label(), true), - rleVersion, - pool, - lenVints, - dwio::common::INT_BYTE_SIZE); + rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); } + + bool lenVints = stripe.getUseVInts(lenId); + return createRleDecoder( + stripe.getStream(lenId, params.streamLabels().label(), true), + rleVersion, + pool, + lenVints, + dwio::common::INT_BYTE_SIZE); } } // namespace diff --git a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp index d280270b6096..377c5e40a3e6 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp @@ -36,7 +36,6 @@ SelectiveStringDictionaryColumnReader::SelectiveStringDictionaryColumnReader( scanState_.dictionary.numValues = stripe.getEncoding(encodingKey).dictionarysize(); - const auto dataId = encodingKey.forKind(proto::Stream_Kind_DATA); bool dictVInts = stripe.getUseVInts(dataId); dictIndex_ = createRleDecoder( stripe.getStream(dataId, params.streamLabels().label(), true), @@ -45,7 +44,6 @@ SelectiveStringDictionaryColumnReader::SelectiveStringDictionaryColumnReader( dictVInts, dwio::common::INT_BYTE_SIZE); - const auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); bool lenVInts = stripe.getUseVInts(lenId); lengthDecoder_ = createRleDecoder( stripe.getStream(lenId, params.streamLabels().label(), false), diff --git a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h index ea2d747237f1..608b4cd864c7 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h @@ -72,8 +72,6 @@ class SelectiveStringDictionaryColumnReader void makeDictionaryBaseVector(); void init(DwrfParams& params); - void initDwrf(DwrfParams& params); - void initOrc(DwrfParams& params); template void readWithVisitor(RowSet rows, TVisitor visitor); diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp index 8de13b9b1ec2..9a058e4ed87e 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp @@ -27,30 +27,25 @@ SelectiveStringDirectColumnReader::SelectiveStringDirectColumnReader( : SelectiveColumnReader(nodeType->type, params, scanSpec, nodeType) { EncodingKey encodingKey{nodeType->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); - - RleVersion rleVersion = - convertRleVersion(stripe.getEncoding(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - bool lenVInts = stripe.getUseVInts(lenId); - lengthDecoder_ = createRleDecoder( - stripe.getStream(lenId, params.streamLabels().label(), true), - rleVersion, - memoryPool_, - lenVInts, - dwio::common::INT_BYTE_SIZE); - blobStream_ = stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_DATA), - params.streamLabels().label(), - true); } void SelectiveStringDirectColumnReader::initOrc(DwrfParams& params) { EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); + DwrfStreamIdentifier lenId; + DwrfStreamIdentifier dataId; + RleVersion rleVersion; + if (format == DwrfFormat::kDwrf) { + rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + dataId = encodingKey.forKind(proto::Stream_Kind_DATA); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + } - RleVersion rleVersion = - convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); bool lenVInts = stripe.getUseVInts(lenId); lengthDecoder_ = createRleDecoder( stripe.getStream(lenId, true), @@ -58,8 +53,7 @@ void SelectiveStringDirectColumnReader::initOrc(DwrfParams& params) { memoryPool_, lenVInts, dwio::common::INT_BYTE_SIZE); - blobStream_ = - stripe.getStream(encodingKey.forKind(proto::orc::Stream_Kind_DATA), true); + blobStream_ = stripe.getStream(dataId, true); } SelectiveStringDirectColumnReader::SelectiveStringDirectColumnReader( diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h index 5bc1b9f4f16e..9e4be8aad1b3 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h @@ -23,20 +23,6 @@ namespace facebook::velox::dwrf { class SelectiveStringDirectColumnReader : public dwio::common::SelectiveColumnReader { - void init(DwrfParams& params) { - auto format = params.stripeStreams().format(); - if (format == DwrfFormat::kDwrf) { - initDwrf(params); - } else { - VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(params); - } - } - - void initDwrf(DwrfParams& params); - - void initOrc(DwrfParams& params); - public: using ValueType = StringView; SelectiveStringDirectColumnReader( @@ -56,6 +42,7 @@ class SelectiveStringDirectColumnReader bufferStart_ = bufferEnd_; } + void init(DwrfParams& params); uint64_t skip(uint64_t numValues) override; void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) diff --git a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h index 4640baa23bdb..be1c5b22ff95 100644 --- a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h @@ -89,36 +89,26 @@ struct SelectiveStructColumnReader : SelectiveStructColumnReaderBase { private: void init(DwrfParams& params) { auto format = params.stripeStreams().format(); + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto& stripe = params.stripeStreams(); if (format == DwrfFormat::kDwrf) { - initDwrf(params); + auto encoding = + static_cast(stripe.getEncoding(encodingKey).kind()); + DWIO_ENSURE_EQ( + encoding, + proto::ColumnEncoding_Kind_DIRECT, + "Unknown dwrf encoding for StructColumnReader"); } else { VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(params); + auto encoding = + static_cast(stripe.getEncodingOrc(encodingKey).kind()); + DWIO_ENSURE_EQ( + encoding, + proto::orc::ColumnEncoding_Kind_DIRECT, + "Unknown orc encoding for StructColumnReader"); } } - void initDwrf(DwrfParams& params) { - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto& stripe = params.stripeStreams(); - auto encoding = - static_cast(stripe.getEncoding(encodingKey).kind()); - DWIO_ENSURE_EQ( - encoding, - proto::ColumnEncoding_Kind_DIRECT, - "Unknown dwrf encoding for StructColumnReader"); - } - - void initOrc(DwrfParams& params) { - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto& stripe = params.stripeStreams(); - auto encoding = - static_cast(stripe.getEncodingOrc(encodingKey).kind()); - DWIO_ENSURE_EQ( - encoding, - proto::orc::ColumnEncoding_Kind_DIRECT, - "Unknown orc encoding for StructColumnReader"); - } - void addChild(std::unique_ptr child) { children_.push_back(child.get()); childrenOwned_.push_back(std::move(child)); diff --git a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp index e24a03ae2c28..b21ec588f6a5 100644 --- a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp @@ -52,22 +52,31 @@ SelectiveTimestampColumnReader::SelectiveTimestampColumnReader( void SelectiveTimestampColumnReader::initOrc(DwrfParams& params) { EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); - version = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + DwrfStreamIdentifier dataId; + DwrfStreamIdentifier nanoDataId; + if (format == DwrfFormat::kDwrf) { + version = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + dataId = encodingKey.forKind(proto::Stream_Kind_DATA); + nanoDataId = encodingKey.forKind(proto::Stream_Kind_NANO_DATA); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + version = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + nanoDataId = encodingKey.forKind(proto::orc::Stream_Kind_SECONDARY); + } - auto data = encodingKey.forKind(proto::orc::Stream_Kind_DATA); - bool vints = stripe.getUseVInts(data); + bool vints = stripe.getUseVInts(dataId); seconds_ = createRleDecoder( - stripe.getStream(data, true), + stripe.getStream(dataId, params.streamLabels().label(), true), version, memoryPool_, vints, LONG_BYTE_SIZE); - auto nanoData = encodingKey.forKind(proto::orc::Stream_Kind_SECONDARY); - bool nanoVInts = stripe.getUseVInts(nanoData); + bool nanoVInts = stripe.getUseVInts(nanoDataId); nano_ = createRleDecoder( - stripe.getStream(nanoData, true), + stripe.getStream(nanoDataId, params.streamLabels().label(), true), version, memoryPool_, nanoVInts, diff --git a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h index c6a882116d22..914e441af02a 100644 --- a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h @@ -23,20 +23,6 @@ namespace facebook::velox::dwrf { class SelectiveTimestampColumnReader : public dwio::common::SelectiveColumnReader { - void init(DwrfParams& params) { - auto format = params.stripeStreams().format(); - if (format == DwrfFormat::kDwrf) { - initDwrf(params); - } else { - VELOX_CHECK(format == DwrfFormat::kOrc); - initOrc(params); - } - } - - void initDwrf(DwrfParams& params); - - void initOrc(DwrfParams& params); - public: // The readers produce int64_t, the vector is Timestamps. using ValueType = int64_t; @@ -46,6 +32,7 @@ class SelectiveTimestampColumnReader DwrfParams& params, common::ScanSpec& scanSpec); + void init(DwrfParams& params); void seekToRowGroup(uint32_t index) override; uint64_t skip(uint64_t numValues) override; diff --git a/velox/dwio/dwrf/test/ReaderBaseTests.cpp b/velox/dwio/dwrf/test/ReaderBaseTests.cpp index b6343373ddb9..f7aa5380c0b1 100644 --- a/velox/dwio/dwrf/test/ReaderBaseTests.cpp +++ b/velox/dwio/dwrf/test/ReaderBaseTests.cpp @@ -102,7 +102,7 @@ class EncryptedStatsTest : public Test { *readerPool_, std::make_unique(readFile, *readerPool_), std::make_unique(std::move(ps)), - footer, + std::make_unique(footer), nullptr, std::move(handler)); } diff --git a/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp b/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp index d27a81139edd..71ded6207551 100644 --- a/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp +++ b/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp @@ -70,7 +70,7 @@ class StripeLoadKeysTest : public Test { std::make_unique( std::make_shared(std::string()), *pool_), nullptr, - footer, + std::make_unique(footer), nullptr, std::move(handler)); stripeReader_ = diff --git a/velox/dwio/dwrf/test/TestStripeStream.cpp b/velox/dwio/dwrf/test/TestStripeStream.cpp index 6fe58bbd7ea0..0a8d55135bf9 100644 --- a/velox/dwio/dwrf/test/TestStripeStream.cpp +++ b/velox/dwio/dwrf/test/TestStripeStream.cpp @@ -118,7 +118,7 @@ TEST(StripeStream, planReads) { BufferedInput::kMaxMergeDistance, true), std::make_unique(proto::PostScript{}), - footer, + std::make_unique(footer), nullptr); ColumnSelector cs{readerBase->getSchema(), std::vector{2}, true}; auto stripeFooter = @@ -163,7 +163,7 @@ TEST(StripeStream, filterSequences) { *pool, std::make_unique(std::move(is), *pool), std::make_unique(proto::PostScript{}), - footer, + std::make_unique(footer), nullptr); // mock a filter that we only need one node and one sequence @@ -222,7 +222,7 @@ TEST(StripeStream, zeroLength) { *pool, std::make_unique(std::move(is), *pool), std::make_unique(std::move(ps)), - footer, + std::make_unique(footer), nullptr); auto stripeFooter = @@ -297,7 +297,7 @@ TEST(StripeStream, planReadsIndex) { *pool, std::make_unique(std::move(is), *pool), std::make_unique(std::move(ps)), - footer, + std::make_unique(footer), std::move(cache)); auto stripeFooter = @@ -421,7 +421,7 @@ TEST(StripeStream, readEncryptedStreams) { std::make_shared(std::string()), *readerPool), std::make_unique(std::move(ps)), - footer, + std::make_unique(footer), nullptr, std::move(handler)); auto stripeReader = @@ -491,7 +491,7 @@ TEST(StripeStream, schemaMismatch) { std::make_shared(std::string()), *pool), std::make_unique(std::move(ps)), - footer, + std::make_unique(footer), nullptr, std::move(handler)); auto stripeReader =