Skip to content

Commit

Permalink
Code review feedback. Parallelize compression also
Browse files Browse the repository at this point in the history
  • Loading branch information
wesm committed Mar 31, 2020
1 parent aea4d68 commit aa28280
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 14 deletions.
11 changes: 11 additions & 0 deletions cpp/src/arrow/ipc/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,16 @@ IpcWriteOptions IpcWriteOptions::Defaults() { return IpcWriteOptions(); }

IpcReadOptions IpcReadOptions::Defaults() { return IpcReadOptions(); }

namespace internal {

Status CheckCompressionSupported(Compression::type codec) {
if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD)) {
return Status::Invalid("Only LZ4_FRAME and ZSTD compression allowed");
}
return Status::OK();
}

} // namespace internal

} // namespace ipc
} // namespace arrow
9 changes: 9 additions & 0 deletions cpp/src/arrow/ipc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ struct ARROW_EXPORT IpcWriteOptions {
Compression::type compression = Compression::UNCOMPRESSED;
int compression_level = Compression::kUseDefaultCompressionLevel;

/// \brief Use global CPU thread pool to parallelize any computational tasks
/// like compression
bool use_threads = true;

static IpcWriteOptions Defaults();
};

Expand All @@ -87,5 +91,10 @@ struct ARROW_EXPORT IpcReadOptions {
static IpcReadOptions Defaults();
};

namespace internal {

Status CheckCompressionSupported(Compression::type codec);

} // namespace internal
} // namespace ipc
} // namespace arrow
11 changes: 6 additions & 5 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -505,14 +505,15 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
if (!util::Codec::IsAvailable(codec)) {
continue;
}
IpcWriteOptions options = IpcWriteOptions::Defaults();
options.compression = codec;
CheckRoundtrip(*batch, options);
IpcWriteOptions write_options = IpcWriteOptions::Defaults();
write_options.compression = codec;
CheckRoundtrip(*batch, write_options);

// Check non-parallel read
// Check non-parallel read and write
IpcReadOptions read_options = IpcReadOptions::Defaults();
write_options.use_threads = false;
read_options.use_threads = false;
CheckRoundtrip(*batch, options, read_options);
CheckRoundtrip(*batch, write_options, read_options);
}

std::vector<Compression::type> disallowed_codecs = {
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,15 @@ Status DecompressBuffers(const std::vector<std::shared_ptr<ArrayData>>& fields,
if (arr->buffers[i]->size() == 0) {
continue;
}
if (arr->buffers[i]->size() < 8) {
return Status::Invalid(
"Likely corrupted message, compressed buffers "
"are larger than 8 bytes by construction");
}
const uint8_t* data = arr->buffers[i]->data();
int64_t compressed_size = arr->buffers[i]->size() - sizeof(int64_t);
int64_t uncompressed_size = util::SafeLoadAs<int64_t>(data);
int64_t uncompressed_size =
BitUtil::FromLittleEndian(util::SafeLoadAs<int64_t>(data));

std::shared_ptr<Buffer> uncompressed;
RETURN_NOT_OK(
Expand All @@ -385,8 +391,8 @@ Status DecompressBuffers(const std::vector<std::shared_ptr<ArrayData>>& fields,
for (int i = 0; i < static_cast<int>(fields.size()); ++i) {
RETURN_NOT_OK(DecompressOne(i));
}
return Status::OK();
}
return Status::OK();
}

Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
Expand Down Expand Up @@ -464,6 +470,7 @@ Status GetCompression(const flatbuf::Message* message, Compression::type* out) {
ARROW_ASSIGN_OR_RAISE(*out,
util::Codec::GetCompressionType(metadata->value(index)));
}
RETURN_NOT_OK(internal::CheckCompressionSupported(*out));
}
return Status::OK();
}
Expand Down
22 changes: 15 additions & 7 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/parallel.h"
#include "arrow/visitor_inline.h"

namespace arrow {
Expand Down Expand Up @@ -182,25 +183,32 @@ class RecordBatchSerializer {
Status CompressBodyBuffers() {
std::unique_ptr<util::Codec> codec;

if (!(options_.compression == Compression::LZ4_FRAME ||
options_.compression == Compression::ZSTD)) {
return Status::Invalid("Only LZ4_FRAME and ZSTD compression allowed");
}
RETURN_NOT_OK(internal::CheckCompressionSupported(options_.compression));

// TODO check allowed values for compression?
AppendCustomMetadata("ARROW:experimental_compression",
util::Codec::GetCodecAsString(options_.compression));

ARROW_ASSIGN_OR_RAISE(
codec, util::Codec::Create(options_.compression, options_.compression_level));
// TODO: Parallelize buffer compression
for (size_t i = 0; i < out_->body_buffers.size(); ++i) {

auto CompressOne = [&](size_t i) {
if (out_->body_buffers[i]->size() > 0) {
RETURN_NOT_OK(
CompressBuffer(*out_->body_buffers[i], codec.get(), &out_->body_buffers[i]));
}
return Status::OK();
};

if (options_.use_threads) {
return ::arrow::internal::ParallelFor(static_cast<int>(out_->body_buffers.size()),
CompressOne);
} else {
for (size_t i = 0; i < out_->body_buffers.size(); ++i) {
RETURN_NOT_OK(CompressOne(i));
}
return Status::OK();
}
return Status::OK();
}

Status Assemble(const RecordBatch& batch) {
Expand Down

0 comments on commit aa28280

Please sign in to comment.