Skip to content

Commit

Permalink
[fix] disable transfer data large than 2GB by brpc (apache#9770)
Browse files Browse the repository at this point in the history
because of brpc and protobuf cannot transfer data large than 2GB, if large than 2GB will overflow, so add a check before send
  • Loading branch information
yangzhg authored and yinzhijian committed May 26, 2022
1 parent afe6e78 commit 643de5a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 16 deletions.
11 changes: 2 additions & 9 deletions be/src/runtime/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,6 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
try {
// Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
_compression_scratch.resize(max_compressed_size);
} catch (const std::bad_alloc& e) {
can_compress = false;
LOG(WARNING) << "Try to alloc " << max_compressed_size
<< " bytes for compression scratch failed. " << e.what();
} catch (...) {
can_compress = false;
std::exception_ptr p = std::current_exception();
Expand Down Expand Up @@ -309,11 +305,8 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
*compressed_size = pb_size;
if (pb_size > std::numeric_limits<int32_t>::max()) {
// the protobuf has a hard limit of 2GB for serialized data.
return Status::InternalError(
fmt::format("The rowbatch is large than 2GB({}), can not send by Protobuf. "
"please set BE config 'transfer_data_by_brpc_attachment' to true "
"and restart BE.",
pb_size));
return Status::InternalError(fmt::format(
"The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size));
}
} else {
*uncompressed_size = pb_size + tuple_byte_size;
Expand Down
35 changes: 28 additions & 7 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,16 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp

// serialize data values
// when data type is HLL, content_uncompressed_size maybe larger than real size.
allocated_buf->resize(content_uncompressed_size);
try {
allocated_buf->resize(content_uncompressed_size);
} catch (...) {
std::exception_ptr p = std::current_exception();
std::string msg = fmt::format("Try to alloc {} bytes for allocated_buf failed. reason {}",
content_uncompressed_size,
p ? p.__cxa_exception_type()->name() : "null");
LOG(WARNING) << msg;
return Status::BufferAllocFailed(msg);
}
char* buf = allocated_buf->data();
for (const auto& c : *this) {
buf = c.type->serialize(*(c.column), buf);
Expand All @@ -678,12 +687,21 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp

// compress
if (config::compress_rowbatches && content_uncompressed_size > 0) {
// Try compressing the content to compression_scratch,
// swap if compressed data is smaller
size_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size);
std::string compression_scratch;
uint32_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size);
compression_scratch.resize(max_compressed_size);

try {
// Try compressing the content to compression_scratch,
// swap if compressed data is smaller
// Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
compression_scratch.resize(max_compressed_size);
} catch (...) {
std::exception_ptr p = std::current_exception();
std::string msg =
fmt::format("Try to alloc {} bytes for compression scratch failed. reason {}",
max_compressed_size, p ? p.__cxa_exception_type()->name() : "null");
LOG(WARNING) << msg;
return Status::BufferAllocFailed(msg);
}
size_t compressed_size = 0;
char* compressed_output = compression_scratch.data();
snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, compressed_output,
Expand All @@ -701,7 +719,10 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
VLOG_ROW << "uncompressed size: " << content_uncompressed_size
<< ", compressed size: " << compressed_size;
}

if (*compressed_bytes >= std::numeric_limits<int32_t>::max()) {
return Status::InternalError(fmt::format(
"The block is large than 2GB({}), can not send by Protobuf.", *compressed_bytes));
}
return Status::OK();
}

Expand Down

0 comments on commit 643de5a

Please sign in to comment.