From 0934cf798aac4e145f3830d20f5ac428fac0ac26 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 4 Jan 2019 15:17:13 -0600 Subject: [PATCH] PARQUET-690: [C++] Reuse Thrift resources when serializing metadata structures This patch should yield fewer memory allocations on the Parquet write path, using the same approach from Apache Impala. Before we were allocating a new buffer for each Thrift object serialization. Since a ColumnChunk generally will contain many data page headers, this is a bit wasteful Author: Wes McKinney Closes #3268 from wesm/PARQUET-690 and squashes the following commits: a5303f826 Fix lint issues 47de8356c Reuse Thrift resources when serializing metadata structures --- cpp/src/parquet/column_writer.cc | 9 ++- cpp/src/parquet/file-deserialize-test.cc | 4 +- cpp/src/parquet/metadata.cc | 6 +- cpp/src/parquet/thrift.h | 93 +++++++++++++++--------- 4 files changed, 69 insertions(+), 43 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 37fce9c036b31..dfb65f1969777 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -141,6 +141,7 @@ class SerializedPageWriter : public PageWriter { total_uncompressed_size_(0), total_compressed_size_(0) { compressor_ = GetCodecFromArrow(codec); + thrift_serializer_.reset(new ThriftSerializer); } int64_t WriteDictionaryPage(const DictionaryPage& page) override { @@ -171,8 +172,7 @@ class SerializedPageWriter : public PageWriter { if (dictionary_page_offset_ == 0) { dictionary_page_offset_ = start_pos; } - int64_t header_size = - SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); + int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_); sink_->Write(compressed_data->data(), compressed_data->size()); total_uncompressed_size_ += uncompressed_size + header_size; @@ -237,8 +237,7 @@ class SerializedPageWriter : public PageWriter { data_page_offset_ = start_pos; } - int64_t header_size = - SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); + int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_); sink_->Write(compressed_data->data(), compressed_data->size()); total_uncompressed_size_ += uncompressed_size + header_size; @@ -270,6 +269,8 @@ class SerializedPageWriter : public PageWriter { int64_t total_uncompressed_size_; int64_t total_compressed_size_; + std::unique_ptr thrift_serializer_; + // Compression codec to use. std::unique_ptr<::arrow::util::Codec> compressor_; }; diff --git a/cpp/src/parquet/file-deserialize-test.cc b/cpp/src/parquet/file-deserialize-test.cc index f1c17240439fb..4db338b4bcb54 100644 --- a/cpp/src/parquet/file-deserialize-test.cc +++ b/cpp/src/parquet/file-deserialize-test.cc @@ -85,8 +85,8 @@ class TestPageSerde : public ::testing::Test { page_header_.compressed_page_size = compressed_size; page_header_.type = format::PageType::DATA_PAGE; - ASSERT_NO_THROW( - SerializeThriftMsg(&page_header_, max_serialized_len, out_stream_.get())); + ThriftSerializer serializer; + ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get())); } void ResetStream() { out_stream_.reset(new InMemoryOutputStream); } diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index f05918d9fd7f0..cc0bfec6321cd 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -361,7 +361,8 @@ class FileMetaData::FileMetaDataImpl { const ApplicationVersion& writer_version() const { return writer_version_; } void WriteTo(OutputStream* dst) const { - SerializeThriftMsg(metadata_.get(), 1024, dst); + ThriftSerializer serializer; + serializer.Serialize(metadata_.get(), dst); } std::unique_ptr RowGroup(int i) { @@ -667,7 +668,8 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { } void WriteTo(OutputStream* sink) { - SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink); + ThriftSerializer serializer; + serializer.Serialize(column_chunk_, sink); } const ColumnDescriptor* descr() const { return column_; } diff --git a/cpp/src/parquet/thrift.h b/cpp/src/parquet/thrift.h index 9c665acfac4ff..1afd9bf436550 100644 --- a/cpp/src/parquet/thrift.h +++ b/cpp/src/parquet/thrift.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_THRIFT_UTIL_H -#define PARQUET_THRIFT_UTIL_H +#pragma once #include "arrow/util/windows_compatibility.h" @@ -28,6 +27,7 @@ #else #include #endif +#include // TCompactProtocol requires some #defines to work right. #define SIGNED_RIGHT_SHIFT_IS 1 @@ -105,18 +105,18 @@ static inline format::CompressionCodec::type ToThrift(Compression::type type) { // ---------------------------------------------------------------------- // Thrift struct serialization / deserialization utilities +using ThriftBuffer = apache::thrift::transport::TMemoryBuffer; + // Deserialize a thrift message from buf/len. buf/len must at least contain // all the bytes needed to store the thrift message. On return, len will be // set to the actual length of the header. template inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) { // Deserialize msg bytes into c++ thrift msg using memory transport. - shared_ptr tmem_transport( - new apache::thrift::transport::TMemoryBuffer(const_cast(buf), *len)); - apache::thrift::protocol::TCompactProtocolFactoryT< - apache::thrift::transport::TMemoryBuffer> - tproto_factory; - shared_ptr tproto = + shared_ptr tmem_transport( + new ThriftBuffer(const_cast(buf), *len)); + apache::thrift::protocol::TCompactProtocolFactoryT tproto_factory; + shared_ptr tproto = // tproto_factory.getProtocol(tmem_transport); try { deserialized_msg->read(tproto.get()); @@ -129,34 +129,57 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali *len = *len - bytes_left; } -// Serialize obj into a buffer. The result is returned as a string. -// The arguments are the object to be serialized and -// the expected size of the serialized object -template -inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { - shared_ptr mem_buffer( - new apache::thrift::transport::TMemoryBuffer(len)); - apache::thrift::protocol::TCompactProtocolFactoryT< - apache::thrift::transport::TMemoryBuffer> - tproto_factory; - shared_ptr tproto = - tproto_factory.getProtocol(mem_buffer); - try { - mem_buffer->resetBuffer(); - obj->write(tproto.get()); - } catch (std::exception& e) { - std::stringstream ss; - ss << "Couldn't serialize thrift: " << e.what() << "\n"; - throw ParquetException(ss.str()); +/// Utility class to serialize thrift objects to a binary format. This object +/// should be reused if possible to reuse the underlying memory. +/// Note: thrift will encode NULLs into the serialized buffer so it is not valid +/// to treat it as a string. +class ThriftSerializer { + public: + explicit ThriftSerializer(int initial_buffer_size = 1024) + : mem_buffer_(new ThriftBuffer(initial_buffer_size)) { + apache::thrift::protocol::TCompactProtocolFactoryT factory; + protocol_ = factory.getProtocol(mem_buffer_); } - uint8_t* out_buffer; - uint32_t out_length; - mem_buffer->getBuffer(&out_buffer, &out_length); - out->Write(out_buffer, out_length); - return out_length; -} + /// Serialize obj into a memory buffer. The result is returned in buffer/len. The + /// memory returned is owned by this object and will be invalid when another object + /// is serialized. + template + void SerializeToBuffer(const T* obj, uint32_t* len, uint8_t** buffer) { + SerializeObject(obj); + mem_buffer_->getBuffer(buffer, len); + } -} // namespace parquet + template + void SerializeToString(const T* obj, std::string* result) { + SerializeObject(obj); + *result = mem_buffer_->getBufferAsString(); + } + + template + int64_t Serialize(const T* obj, OutputStream* out) { + uint8_t* out_buffer; + uint32_t out_length; + SerializeToBuffer(obj, &out_length, &out_buffer); + out->Write(out_buffer, out_length); + return static_cast(out_length); + } -#endif // PARQUET_THRIFT_UTIL_H + private: + template + void SerializeObject(const T* obj) { + try { + mem_buffer_->resetBuffer(); + obj->write(protocol_.get()); + } catch (std::exception& e) { + std::stringstream ss; + ss << "Couldn't serialize thrift: " << e.what() << "\n"; + throw ParquetException(ss.str()); + } + } + + shared_ptr mem_buffer_; + shared_ptr protocol_; +}; + +} // namespace parquet