From 728185119b1d21860d0803752f92d17a6b92b443 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 2 Nov 2023 10:07:06 +0100 Subject: [PATCH] GH-38542: [C++][Parquet] Faster scalar BYTE_STREAM_SPLIT (#38529) ### Rationale for this change BYTE_STREAM_SPLIT encoding and decoding benefit from SIMD accelerations on x86, but scalar implementations are used otherwise. ### What changes are included in this PR? Improve the speed of scalar implementations of BYTE_STREAM_SPLIT by using a blocked algorithm. Benchmark numbers on the author's machine (AMD Ryzen 9 3900X): * before: ``` ------------------------------------------------------------------------------------------------------- Benchmark Time CPU Iterations UserCounters... ------------------------------------------------------------------------------------------------------- BM_ByteStreamSplitDecode_Float_Scalar/1024 1374 ns 1374 ns 510396 bytes_per_second=2.77574G/s BM_ByteStreamSplitDecode_Float_Scalar/4096 5483 ns 5483 ns 127498 bytes_per_second=2.78303G/s BM_ByteStreamSplitDecode_Float_Scalar/32768 44042 ns 44035 ns 15905 bytes_per_second=2.77212G/s BM_ByteStreamSplitDecode_Float_Scalar/65536 87966 ns 87952 ns 7962 bytes_per_second=2.77583G/s BM_ByteStreamSplitDecode_Double_Scalar/1024 2583 ns 2583 ns 271436 bytes_per_second=2.95408G/s BM_ByteStreamSplitDecode_Double_Scalar/4096 10533 ns 10532 ns 65695 bytes_per_second=2.89761G/s BM_ByteStreamSplitDecode_Double_Scalar/32768 84067 ns 84053 ns 8275 bytes_per_second=2.90459G/s BM_ByteStreamSplitDecode_Double_Scalar/65536 168332 ns 168309 ns 4155 bytes_per_second=2.9011G/s BM_ByteStreamSplitEncode_Float_Scalar/1024 1435 ns 1435 ns 484278 bytes_per_second=2.65802G/s BM_ByteStreamSplitEncode_Float_Scalar/4096 5725 ns 5725 ns 121877 bytes_per_second=2.66545G/s BM_ByteStreamSplitEncode_Float_Scalar/32768 46291 ns 46283 ns 15134 bytes_per_second=2.63745G/s BM_ByteStreamSplitEncode_Float_Scalar/65536 91139 ns 91128 ns 7707 bytes_per_second=2.6791G/s BM_ByteStreamSplitEncode_Double_Scalar/1024 3093 ns 3093 ns 226198 bytes_per_second=2.46663G/s BM_ByteStreamSplitEncode_Double_Scalar/4096 12724 ns 12722 ns 54522 bytes_per_second=2.39873G/s BM_ByteStreamSplitEncode_Double_Scalar/32768 100488 ns 100475 ns 6957 bytes_per_second=2.42987G/s BM_ByteStreamSplitEncode_Double_Scalar/65536 200885 ns 200852 ns 3486 bytes_per_second=2.43105G/s ``` * after: ``` ------------------------------------------------------------------------------------------------------- Benchmark Time CPU Iterations UserCounters... ------------------------------------------------------------------------------------------------------- BM_ByteStreamSplitDecode_Float_Scalar/1024 932 ns 932 ns 753352 bytes_per_second=4.09273G/s BM_ByteStreamSplitDecode_Float_Scalar/4096 3715 ns 3715 ns 188394 bytes_per_second=4.10783G/s BM_ByteStreamSplitDecode_Float_Scalar/32768 30167 ns 30162 ns 23441 bytes_per_second=4.04716G/s BM_ByteStreamSplitDecode_Float_Scalar/65536 59483 ns 59475 ns 11744 bytes_per_second=4.10496G/s BM_ByteStreamSplitDecode_Double_Scalar/1024 1862 ns 1862 ns 374715 bytes_per_second=4.09823G/s BM_ByteStreamSplitDecode_Double_Scalar/4096 7554 ns 7553 ns 91975 bytes_per_second=4.04038G/s BM_ByteStreamSplitDecode_Double_Scalar/32768 60429 ns 60421 ns 11499 bytes_per_second=4.04067G/s BM_ByteStreamSplitDecode_Double_Scalar/65536 120992 ns 120972 ns 5756 bytes_per_second=4.03631G/s BM_ByteStreamSplitEncode_Float_Scalar/1024 737 ns 737 ns 947423 bytes_per_second=5.17843G/s BM_ByteStreamSplitEncode_Float_Scalar/4096 2934 ns 2933 ns 239459 bytes_per_second=5.20175G/s BM_ByteStreamSplitEncode_Float_Scalar/32768 23730 ns 23727 ns 29243 bytes_per_second=5.14485G/s BM_ByteStreamSplitEncode_Float_Scalar/65536 47671 ns 47664 ns 14682 bytes_per_second=5.12209G/s BM_ByteStreamSplitEncode_Double_Scalar/1024 1517 ns 1517 ns 458928 bytes_per_second=5.02827G/s BM_ByteStreamSplitEncode_Double_Scalar/4096 6224 ns 6223 ns 111361 bytes_per_second=4.90407G/s BM_ByteStreamSplitEncode_Double_Scalar/32768 49719 ns 49713 ns 14059 bytes_per_second=4.91099G/s BM_ByteStreamSplitEncode_Double_Scalar/65536 99445 ns 99432 ns 7027 bytes_per_second=4.91072G/s ``` ### Are these changes tested? Yes, though the scalar implementations are unfortunately only exercised on non-x86 by default (see added comment in the PR). ### Are there any user-facing changes? No. * Closes: #38542 Authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- cpp/src/arrow/util/CMakeLists.txt | 1 + ...m_split.h => byte_stream_split_internal.h} | 148 ++++++++++++--- cpp/src/arrow/util/byte_stream_split_test.cc | 172 ++++++++++++++++++ cpp/src/parquet/encoding.cc | 6 +- cpp/src/parquet/encoding_benchmark.cc | 2 +- 5 files changed, 298 insertions(+), 31 deletions(-) rename cpp/src/arrow/util/{byte_stream_split.h => byte_stream_split_internal.h} (84%) create mode 100644 cpp/src/arrow/util/byte_stream_split_test.cc diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 3cecab3a633cc..3dc8eac1abf64 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -43,6 +43,7 @@ add_arrow_test(utility-test align_util_test.cc atfork_test.cc byte_size_test.cc + byte_stream_split_test.cc cache_test.cc checked_cast_test.cc compression_test.cc diff --git a/cpp/src/arrow/util/byte_stream_split.h b/cpp/src/arrow/util/byte_stream_split_internal.h similarity index 84% rename from cpp/src/arrow/util/byte_stream_split.h rename to cpp/src/arrow/util/byte_stream_split_internal.h index d428df0659b28..ae85e2cfa81a3 100644 --- a/cpp/src/arrow/util/byte_stream_split.h +++ b/cpp/src/arrow/util/byte_stream_split_internal.h @@ -17,20 +17,24 @@ #pragma once +#include "arrow/util/endian.h" #include "arrow/util/simd.h" #include "arrow/util/ubsan.h" -#include #include +#include +#include #ifdef ARROW_HAVE_SSE4_2 // Enable the SIMD for ByteStreamSplit Encoder/Decoder #define ARROW_HAVE_SIMD_SPLIT #endif // ARROW_HAVE_SSE4_2 -namespace arrow { -namespace util { -namespace internal { +namespace arrow::util::internal { + +// +// SIMD implementations +// #if defined(ARROW_HAVE_SSE4_2) template @@ -565,48 +569,140 @@ void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int64_t num_values, } template -void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const size_t num_values, +void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const int64_t num_values, uint8_t* output_buffer_raw) { #if defined(ARROW_HAVE_AVX512) - return ByteStreamSplitEncodeAvx512(raw_values, num_values, output_buffer_raw); + return ByteStreamSplitEncodeAvx512(raw_values, static_cast(num_values), + output_buffer_raw); #elif defined(ARROW_HAVE_AVX2) - return ByteStreamSplitEncodeAvx2(raw_values, num_values, output_buffer_raw); + return ByteStreamSplitEncodeAvx2(raw_values, static_cast(num_values), + output_buffer_raw); #elif defined(ARROW_HAVE_SSE4_2) - return ByteStreamSplitEncodeSse2(raw_values, num_values, output_buffer_raw); + return ByteStreamSplitEncodeSse2(raw_values, static_cast(num_values), + output_buffer_raw); #else #error "ByteStreamSplitEncodeSimd not implemented" #endif } #endif +// +// Scalar implementations +// + +inline void DoSplitStreams(const uint8_t* src, int width, int64_t nvalues, + uint8_t** dest_streams) { + // Value empirically chosen to provide the best performance on the author's machine + constexpr int kBlockSize = 32; + + while (nvalues >= kBlockSize) { + for (int stream = 0; stream < width; ++stream) { + uint8_t* dest = dest_streams[stream]; + for (int i = 0; i < kBlockSize; i += 8) { + uint64_t a = src[stream + i * width]; + uint64_t b = src[stream + (i + 1) * width]; + uint64_t c = src[stream + (i + 2) * width]; + uint64_t d = src[stream + (i + 3) * width]; + uint64_t e = src[stream + (i + 4) * width]; + uint64_t f = src[stream + (i + 5) * width]; + uint64_t g = src[stream + (i + 6) * width]; + uint64_t h = src[stream + (i + 7) * width]; +#if ARROW_LITTLE_ENDIAN + uint64_t r = a | (b << 8) | (c << 16) | (d << 24) | (e << 32) | (f << 40) | + (g << 48) | (h << 56); +#else + uint64_t r = (a << 56) | (b << 48) | (c << 40) | (d << 32) | (e << 24) | + (f << 16) | (g << 8) | h; +#endif + arrow::util::SafeStore(&dest[i], r); + } + dest_streams[stream] += kBlockSize; + } + src += width * kBlockSize; + nvalues -= kBlockSize; + } + + // Epilog + for (int stream = 0; stream < width; ++stream) { + uint8_t* dest = dest_streams[stream]; + for (int64_t i = 0; i < nvalues; ++i) { + dest[i] = src[stream + i * width]; + } + } +} + +inline void DoMergeStreams(const uint8_t** src_streams, int width, int64_t nvalues, + uint8_t* dest) { + // Value empirically chosen to provide the best performance on the author's machine + constexpr int kBlockSize = 128; + + while (nvalues >= kBlockSize) { + for (int stream = 0; stream < width; ++stream) { + // Take kBlockSize bytes from the given stream and spread them + // to their logical places in destination. + const uint8_t* src = src_streams[stream]; + for (int i = 0; i < kBlockSize; i += 8) { + uint64_t v = arrow::util::SafeLoadAs(&src[i]); +#if ARROW_LITTLE_ENDIAN + dest[stream + i * width] = static_cast(v); + dest[stream + (i + 1) * width] = static_cast(v >> 8); + dest[stream + (i + 2) * width] = static_cast(v >> 16); + dest[stream + (i + 3) * width] = static_cast(v >> 24); + dest[stream + (i + 4) * width] = static_cast(v >> 32); + dest[stream + (i + 5) * width] = static_cast(v >> 40); + dest[stream + (i + 6) * width] = static_cast(v >> 48); + dest[stream + (i + 7) * width] = static_cast(v >> 56); +#else + dest[stream + i * width] = static_cast(v >> 56); + dest[stream + (i + 1) * width] = static_cast(v >> 48); + dest[stream + (i + 2) * width] = static_cast(v >> 40); + dest[stream + (i + 3) * width] = static_cast(v >> 32); + dest[stream + (i + 4) * width] = static_cast(v >> 24); + dest[stream + (i + 5) * width] = static_cast(v >> 16); + dest[stream + (i + 6) * width] = static_cast(v >> 8); + dest[stream + (i + 7) * width] = static_cast(v); +#endif + } + src_streams[stream] += kBlockSize; + } + dest += width * kBlockSize; + nvalues -= kBlockSize; + } + + // Epilog + for (int stream = 0; stream < width; ++stream) { + const uint8_t* src = src_streams[stream]; + for (int64_t i = 0; i < nvalues; ++i) { + dest[stream + i * width] = src[i]; + } + } +} + template -void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const size_t num_values, +void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const int64_t num_values, uint8_t* output_buffer_raw) { - constexpr size_t kNumStreams = sizeof(T); - for (size_t i = 0U; i < num_values; ++i) { - for (size_t j = 0U; j < kNumStreams; ++j) { - const uint8_t byte_in_value = raw_values[i * kNumStreams + j]; - output_buffer_raw[j * num_values + i] = byte_in_value; - } + constexpr int kNumStreams = static_cast(sizeof(T)); + std::array dest_streams; + for (int stream = 0; stream < kNumStreams; ++stream) { + dest_streams[stream] = &output_buffer_raw[stream * num_values]; } + DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data()); } template void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride, T* out) { - constexpr size_t kNumStreams = sizeof(T); - auto output_buffer_raw = reinterpret_cast(out); - - for (int64_t i = 0; i < num_values; ++i) { - for (size_t b = 0; b < kNumStreams; ++b) { - const size_t byte_index = b * stride + i; - output_buffer_raw[i * kNumStreams + b] = data[byte_index]; - } + constexpr int kNumStreams = static_cast(sizeof(T)); + std::array src_streams; + for (int stream = 0; stream < kNumStreams; ++stream) { + src_streams[stream] = &data[stream * stride]; } + DoMergeStreams(src_streams.data(), kNumStreams, num_values, + reinterpret_cast(out)); } template -void inline ByteStreamSplitEncode(const uint8_t* raw_values, const size_t num_values, +void inline ByteStreamSplitEncode(const uint8_t* raw_values, const int64_t num_values, uint8_t* output_buffer_raw) { #if defined(ARROW_HAVE_SIMD_SPLIT) return ByteStreamSplitEncodeSimd(raw_values, num_values, output_buffer_raw); @@ -625,6 +721,4 @@ void inline ByteStreamSplitDecode(const uint8_t* data, int64_t num_values, int64 #endif } -} // namespace internal -} // namespace util -} // namespace arrow +} // namespace arrow::util::internal diff --git a/cpp/src/arrow/util/byte_stream_split_test.cc b/cpp/src/arrow/util/byte_stream_split_test.cc new file mode 100644 index 0000000000000..3ea27f57da881 --- /dev/null +++ b/cpp/src/arrow/util/byte_stream_split_test.cc @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" +#include "arrow/util/byte_stream_split_internal.h" + +namespace arrow::util::internal { + +using ByteStreamSplitTypes = ::testing::Types; + +template +struct NamedFunc { + std::string name; + Func func; + + friend std::ostream& operator<<(std::ostream& os, const NamedFunc& func) { + os << func.name; + return os; + } +}; + +// A simplistic reference implementation for validation +void RefererenceByteStreamSplitEncode(const uint8_t* src, int width, + const int64_t num_values, uint8_t* dest) { + for (int64_t i = 0; i < num_values; ++i) { + for (int stream = 0; stream < width; ++stream) { + dest[stream * num_values + i] = *src++; + } + } +} + +template +class TestByteStreamSplitSpecialized : public ::testing::Test { + public: + using EncodeFunc = NamedFunc)>>; + using DecodeFunc = NamedFunc)>>; + + static constexpr int kWidth = static_cast(sizeof(T)); + + void SetUp() override { + encode_funcs_.push_back({"reference", &ReferenceEncode}); + encode_funcs_.push_back({"scalar", &ByteStreamSplitEncodeScalar}); + decode_funcs_.push_back({"scalar", &ByteStreamSplitDecodeScalar}); +#if defined(ARROW_HAVE_SIMD_SPLIT) + encode_funcs_.push_back({"simd", &ByteStreamSplitEncodeSimd}); + decode_funcs_.push_back({"simd", &ByteStreamSplitDecodeSimd}); +#endif + } + + void TestRoundtrip(int64_t num_values) { + // Test one-shot roundtrip among all encode/decode function combinations + ARROW_SCOPED_TRACE("num_values = ", num_values); + const auto input = MakeRandomInput(num_values); + std::vector encoded(num_values * kWidth); + std::vector decoded(num_values); + + for (const auto& encode_func : encode_funcs_) { + ARROW_SCOPED_TRACE("encode_func = ", encode_func); + encoded.assign(encoded.size(), 0); + encode_func.func(reinterpret_cast(input.data()), num_values, + encoded.data()); + for (const auto& decode_func : decode_funcs_) { + ARROW_SCOPED_TRACE("decode_func = ", decode_func); + decoded.assign(decoded.size(), T{}); + decode_func.func(encoded.data(), num_values, /*stride=*/num_values, + decoded.data()); + ASSERT_EQ(decoded, input); + } + } + } + + void TestPiecewiseDecode(int64_t num_values) { + // Test chunked decoding against the reference encode function + ARROW_SCOPED_TRACE("num_values = ", num_values); + const auto input = MakeRandomInput(num_values); + std::vector encoded(num_values * kWidth); + ReferenceEncode(reinterpret_cast(input.data()), num_values, + encoded.data()); + std::vector decoded(num_values); + + std::default_random_engine gen(seed_++); + std::uniform_int_distribution chunk_size_dist(1, 123); + + for (const auto& decode_func : decode_funcs_) { + ARROW_SCOPED_TRACE("decode_func = ", decode_func); + decoded.assign(decoded.size(), T{}); + + int64_t offset = 0; + while (offset < num_values) { + auto chunk_size = std::min(num_values - offset, chunk_size_dist(gen)); + decode_func.func(encoded.data() + offset, chunk_size, /*stride=*/num_values, + decoded.data() + offset); + offset += chunk_size; + } + ASSERT_EQ(offset, num_values); + ASSERT_EQ(decoded, input); + } + } + + protected: + static void ReferenceEncode(const uint8_t* raw_values, const int64_t num_values, + uint8_t* output_buffer_raw) { + RefererenceByteStreamSplitEncode(raw_values, kWidth, num_values, output_buffer_raw); + } + + static std::vector MakeRandomInput(int64_t num_values) { + std::vector input(num_values); + random_bytes(kWidth * num_values, seed_++, reinterpret_cast(input.data())); + // Avoid NaNs to ease comparison + for (auto& value : input) { + if (std::isnan(value)) { + value = nan_replacement_++; + } + } + return input; + } + + std::vector encode_funcs_; + std::vector decode_funcs_; + + static inline uint32_t seed_ = 42; + static inline T nan_replacement_ = 0; +}; + +TYPED_TEST_SUITE(TestByteStreamSplitSpecialized, ByteStreamSplitTypes); + +TYPED_TEST(TestByteStreamSplitSpecialized, RoundtripSmall) { + for (int64_t num_values : {1, 5, 7, 12, 19, 31, 32}) { + this->TestRoundtrip(num_values); + } +} + +TYPED_TEST(TestByteStreamSplitSpecialized, RoundtripMidsized) { + for (int64_t num_values : {126, 127, 128, 129, 133, 200}) { + this->TestRoundtrip(num_values); + } +} + +TYPED_TEST(TestByteStreamSplitSpecialized, PiecewiseDecode) { + this->TestPiecewiseDecode(/*num_values=*/500); +} + +} // namespace arrow::util::internal diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 5221f2588c0d3..1bb487c20d3e2 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -37,7 +37,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/bitmap_writer.h" -#include "arrow/util/byte_stream_split.h" +#include "arrow/util/byte_stream_split_internal.h" #include "arrow/util/checked_cast.h" #include "arrow/util/hashing.h" #include "arrow/util/int_util_overflow.h" @@ -850,8 +850,8 @@ std::shared_ptr ByteStreamSplitEncoder::FlushValues() { AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize()); uint8_t* output_buffer_raw = output_buffer->mutable_data(); const uint8_t* raw_values = sink_.data(); - ::arrow::util::internal::ByteStreamSplitEncode( - raw_values, static_cast(num_values_in_buffer_), output_buffer_raw); + ::arrow::util::internal::ByteStreamSplitEncode(raw_values, num_values_in_buffer_, + output_buffer_raw); sink_.Reset(); num_values_in_buffer_ = 0; return std::move(output_buffer); diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc index 717c716330563..b5b6cc8d93e03 100644 --- a/cpp/src/parquet/encoding_benchmark.cc +++ b/cpp/src/parquet/encoding_benchmark.cc @@ -24,7 +24,7 @@ #include "arrow/testing/random.h" #include "arrow/testing/util.h" #include "arrow/type.h" -#include "arrow/util/byte_stream_split.h" +#include "arrow/util/byte_stream_split_internal.h" #include "arrow/visit_data_inline.h" #include "parquet/encoding.h"