Skip to content

Commit

Permalink
PARQUET-690: [C++] Reuse Thrift resources when serializing metadata s…
Browse files Browse the repository at this point in the history
…tructures

This patch should yield fewer memory allocations on the Parquet write path, using the same approach from Apache Impala. Before we were allocating a new buffer for each Thrift object serialization. Since a ColumnChunk generally will contain many data page headers, this is a bit wasteful

Author: Wes McKinney <wesm+git@apache.org>

Closes apache#3268 from wesm/PARQUET-690 and squashes the following commits:

a5303f8 <Wes McKinney> Fix lint issues
47de835 <Wes McKinney> Reuse Thrift resources when serializing metadata structures
  • Loading branch information
wesm authored and emkornfield committed Jan 10, 2019
1 parent 5f064c7 commit 0934cf7
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 43 deletions.
9 changes: 5 additions & 4 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class SerializedPageWriter : public PageWriter {
total_uncompressed_size_(0),
total_compressed_size_(0) {
compressor_ = GetCodecFromArrow(codec);
thrift_serializer_.reset(new ThriftSerializer);
}

int64_t WriteDictionaryPage(const DictionaryPage& page) override {
Expand Down Expand Up @@ -171,8 +172,7 @@ class SerializedPageWriter : public PageWriter {
if (dictionary_page_offset_ == 0) {
dictionary_page_offset_ = start_pos;
}
int64_t header_size =
SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_);
sink_->Write(compressed_data->data(), compressed_data->size());

total_uncompressed_size_ += uncompressed_size + header_size;
Expand Down Expand Up @@ -237,8 +237,7 @@ class SerializedPageWriter : public PageWriter {
data_page_offset_ = start_pos;
}

int64_t header_size =
SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_);
sink_->Write(compressed_data->data(), compressed_data->size());

total_uncompressed_size_ += uncompressed_size + header_size;
Expand Down Expand Up @@ -270,6 +269,8 @@ class SerializedPageWriter : public PageWriter {
int64_t total_uncompressed_size_;
int64_t total_compressed_size_;

std::unique_ptr<ThriftSerializer> thrift_serializer_;

// Compression codec to use.
std::unique_ptr<::arrow::util::Codec> compressor_;
};
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/file-deserialize-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ class TestPageSerde : public ::testing::Test {
page_header_.compressed_page_size = compressed_size;
page_header_.type = format::PageType::DATA_PAGE;

ASSERT_NO_THROW(
SerializeThriftMsg(&page_header_, max_serialized_len, out_stream_.get()));
ThriftSerializer serializer;
ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
}

void ResetStream() { out_stream_.reset(new InMemoryOutputStream); }
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ class FileMetaData::FileMetaDataImpl {
const ApplicationVersion& writer_version() const { return writer_version_; }

void WriteTo(OutputStream* dst) const {
SerializeThriftMsg(metadata_.get(), 1024, dst);
ThriftSerializer serializer;
serializer.Serialize(metadata_.get(), dst);
}

std::unique_ptr<RowGroupMetaData> RowGroup(int i) {
Expand Down Expand Up @@ -667,7 +668,8 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
}

void WriteTo(OutputStream* sink) {
SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink);
ThriftSerializer serializer;
serializer.Serialize(column_chunk_, sink);
}

const ColumnDescriptor* descr() const { return column_; }
Expand Down
93 changes: 58 additions & 35 deletions cpp/src/parquet/thrift.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#ifndef PARQUET_THRIFT_UTIL_H
#define PARQUET_THRIFT_UTIL_H
#pragma once

#include "arrow/util/windows_compatibility.h"

Expand All @@ -28,6 +27,7 @@
#else
#include <memory>
#endif
#include <string>

// TCompactProtocol requires some #defines to work right.
#define SIGNED_RIGHT_SHIFT_IS 1
Expand Down Expand Up @@ -105,18 +105,18 @@ static inline format::CompressionCodec::type ToThrift(Compression::type type) {
// ----------------------------------------------------------------------
// Thrift struct serialization / deserialization utilities

using ThriftBuffer = apache::thrift::transport::TMemoryBuffer;

// Deserialize a thrift message from buf/len. buf/len must at least contain
// all the bytes needed to store the thrift message. On return, len will be
// set to the actual length of the header.
template <class T>
inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) {
// Deserialize msg bytes into c++ thrift msg using memory transport.
shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport(
new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len));
apache::thrift::protocol::TCompactProtocolFactoryT<
apache::thrift::transport::TMemoryBuffer>
tproto_factory;
shared_ptr<apache::thrift::protocol::TProtocol> tproto =
shared_ptr<ThriftBuffer> tmem_transport(
new ThriftBuffer(const_cast<uint8_t*>(buf), *len));
apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory;
shared_ptr<apache::thrift::protocol::TProtocol> tproto = //
tproto_factory.getProtocol(tmem_transport);
try {
deserialized_msg->read(tproto.get());
Expand All @@ -129,34 +129,57 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali
*len = *len - bytes_left;
}

// Serialize obj into a buffer. The result is returned as a string.
// The arguments are the object to be serialized and
// the expected size of the serialized object
template <class T>
inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer(
new apache::thrift::transport::TMemoryBuffer(len));
apache::thrift::protocol::TCompactProtocolFactoryT<
apache::thrift::transport::TMemoryBuffer>
tproto_factory;
shared_ptr<apache::thrift::protocol::TProtocol> tproto =
tproto_factory.getProtocol(mem_buffer);
try {
mem_buffer->resetBuffer();
obj->write(tproto.get());
} catch (std::exception& e) {
std::stringstream ss;
ss << "Couldn't serialize thrift: " << e.what() << "\n";
throw ParquetException(ss.str());
/// Utility class to serialize thrift objects to a binary format. This object
/// should be reused if possible to reuse the underlying memory.
/// Note: thrift will encode NULLs into the serialized buffer so it is not valid
/// to treat it as a string.
class ThriftSerializer {
public:
explicit ThriftSerializer(int initial_buffer_size = 1024)
: mem_buffer_(new ThriftBuffer(initial_buffer_size)) {
apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> factory;
protocol_ = factory.getProtocol(mem_buffer_);
}

uint8_t* out_buffer;
uint32_t out_length;
mem_buffer->getBuffer(&out_buffer, &out_length);
out->Write(out_buffer, out_length);
return out_length;
}
/// Serialize obj into a memory buffer. The result is returned in buffer/len. The
/// memory returned is owned by this object and will be invalid when another object
/// is serialized.
template <class T>
void SerializeToBuffer(const T* obj, uint32_t* len, uint8_t** buffer) {
SerializeObject(obj);
mem_buffer_->getBuffer(buffer, len);
}

} // namespace parquet
template <class T>
void SerializeToString(const T* obj, std::string* result) {
SerializeObject(obj);
*result = mem_buffer_->getBufferAsString();
}

template <class T>
int64_t Serialize(const T* obj, OutputStream* out) {
uint8_t* out_buffer;
uint32_t out_length;
SerializeToBuffer(obj, &out_length, &out_buffer);
out->Write(out_buffer, out_length);
return static_cast<int64_t>(out_length);
}

#endif // PARQUET_THRIFT_UTIL_H
private:
template <class T>
void SerializeObject(const T* obj) {
try {
mem_buffer_->resetBuffer();
obj->write(protocol_.get());
} catch (std::exception& e) {
std::stringstream ss;
ss << "Couldn't serialize thrift: " << e.what() << "\n";
throw ParquetException(ss.str());
}
}

shared_ptr<ThriftBuffer> mem_buffer_;
shared_ptr<apache::thrift::protocol::TProtocol> protocol_;
};

} // namespace parquet

0 comments on commit 0934cf7

Please sign in to comment.