Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[charry-pick](branch3.0) support text hive #41656

Merged
merged 4 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,8 @@ DEFINE_Validator(tablet_meta_serialize_size_limit,
[](const int64_t config) -> bool { return config < 1717986918; });

DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60");
DEFINE_mInt32(snappy_compression_block_size, "262144");
DEFINE_mInt32(lz4_compression_block_size, "262144");

DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,9 @@ DECLARE_mInt16(topn_agg_limit_multiplier);
DECLARE_mInt64(tablet_meta_serialize_size_limit);

DECLARE_mInt64(pipeline_task_leakage_detect_period_secs);
// To be compatible with hadoop's block compression
DECLARE_mInt32(snappy_compression_block_size);
DECLARE_mInt32(lz4_compression_block_size);

DECLARE_mBool(enable_pipeline_task_leakage_detect);

Expand Down
212 changes: 142 additions & 70 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ Status Decompressor::create_decompressor(CompressType type,
case CompressType::BZIP2:
decompressor->reset(new Bzip2Decompressor());
break;
case CompressType::ZSTD:
decompressor->reset(new ZstdDecompressor());
break;
case CompressType::LZ4FRAME:
decompressor->reset(new Lz4FrameDecompressor());
break;
Expand Down Expand Up @@ -86,6 +89,9 @@ Status Decompressor::create_decompressor(TFileCompressType::type type,
case TFileCompressType::BZ2:
compress_type = CompressType::BZIP2;
break;
case TFileCompressType::ZSTD:
compress_type = CompressType::ZSTD;
break;
case TFileCompressType::LZ4FRAME:
compress_type = CompressType::LZ4FRAME;
break;
Expand Down Expand Up @@ -300,6 +306,50 @@ std::string Bzip2Decompressor::debug_info() {
return ss.str();
}

ZstdDecompressor::~ZstdDecompressor() {
ZSTD_freeDStream(_zstd_strm);
}

Status ZstdDecompressor::init() {
_zstd_strm = ZSTD_createDStream();
if (!_zstd_strm) {
std::stringstream ss;
return Status::InternalError("ZSTD_dctx creation error");
}
auto ret = ZSTD_initDStream(_zstd_strm);
if (ZSTD_isError(ret)) {
return Status::InternalError("ZSTD_initDStream error: {}", ZSTD_getErrorName(ret));
}
return Status::OK();
}

Status ZstdDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
uint8_t* output, size_t output_max_len,
size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) {
// 1. set input and output
ZSTD_inBuffer inputBuffer = {input, input_len, 0};
ZSTD_outBuffer outputBuffer = {output, output_max_len, 0};

// decompress
int ret = ZSTD_decompressStream(_zstd_strm, &outputBuffer, &inputBuffer);
*input_bytes_read = inputBuffer.pos;
*decompressed_len = outputBuffer.pos;

if (ZSTD_isError(ret)) {
return Status::InternalError("Failed to zstd decompress: {}", ZSTD_getErrorName(ret));
}

*stream_end = ret == 0;
return Status::OK();
}

std::string ZstdDecompressor::debug_info() {
std::stringstream ss;
ss << "ZstdDecompressor.";
return ss.str();
}

// Lz4Frame
// Lz4 version: 1.7.5
// define LZ4F_VERSION = 100
Expand Down Expand Up @@ -468,7 +518,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
}

std::size_t decompressed_large_block_len = 0;
do {
while (remaining_decompressed_large_block_len > 0) {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
Expand Down Expand Up @@ -505,8 +555,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -= decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;

} while (remaining_decompressed_large_block_len > 0);
};

if (*more_input_bytes != 0) {
// Need more input buffer
Expand Down Expand Up @@ -535,90 +584,113 @@ Status SnappyBlockDecompressor::init() {
return Status::OK();
}

// Hadoop snappycodec source :
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
// Example:
// OriginData(The original data will be divided into several large data block.) :
// large data block1 | large data block2 | large data block3 | ....
// The large data block will be divided into several small data block.
// Suppose a large data block is divided into three small blocks:
// large data block1: | small block1 | small block2 | small block3 |
// CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
//
// A : original length of the current block of large data block.
// sizeof(A) = 4 bytes.
// A = length(small block1) + length(small block2) + length(small block3)
// Bx : length of small data block bx.
// sizeof(Bx) = 4 bytes.
// Bx = length(compress(small blockx))
Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len,
bool* stream_end, size_t* more_input_bytes,
size_t* more_output_bytes) {
uint8_t* src = input;
size_t remaining_input_size = input_len;
int64_t uncompressed_total_len = 0;
*input_bytes_read = 0;
auto* input_ptr = input;
auto* output_ptr = output;

// The hadoop snappy codec is as:
// <4 byte big endian uncompressed size>
// <4 byte big endian compressed size>
// <snappy compressed block>
// ....
// <4 byte big endian uncompressed size>
// <4 byte big endian compressed size>
// <snappy compressed block>
//
// See:
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
while (remaining_input_size > 0) {
if (remaining_input_size < 4) {
*more_input_bytes = 4 - remaining_input_size;
break;
while (input_len > 0) {
//if faild , fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

if (input_len < sizeof(uint32_t)) {
return Status::InvalidArgument(strings::Substitute(
"fail to do hadoop-snappy decompress, input_len=$0", input_len));
}
// Read uncompressed size
uint32_t uncompressed_block_len = Decompressor::_read_int32(src);
int64_t remaining_output_len = output_max_len - uncompressed_total_len;
if (remaining_output_len < uncompressed_block_len) {

uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);

input_ptr += sizeof(uint32_t);
input_len -= sizeof(uint32_t);

std::size_t remaining_output_len = output_max_len - *decompressed_len;

if (remaining_output_len < remaining_decompressed_large_block_len) {
// Need more output buffer
*more_output_bytes = uncompressed_block_len - remaining_output_len;
break;
}
*more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
input_ptr = large_block_input_ptr;
output_ptr = large_block_output_ptr;

if (uncompressed_block_len == 0) {
remaining_input_size -= sizeof(uint32_t);
break;
}

if (remaining_input_size <= 2 * sizeof(uint32_t)) {
// The remaining input size should be larger then <uncompressed size><compressed size><compressed data>
// +1 means we need at least 1 bytes of compressed data.
*more_input_bytes = 2 * sizeof(uint32_t) + 1 - remaining_input_size;
break;
}
std::size_t decompressed_large_block_len = 0;
while (remaining_decompressed_large_block_len > 0) {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
break;
}

// Read compressed size
size_t tmp_remaining_size = remaining_input_size - 2 * sizeof(uint32_t);
size_t compressed_len = _read_int32(src + sizeof(uint32_t));
if (compressed_len > tmp_remaining_size) {
// Need more input data
*more_input_bytes = compressed_len - tmp_remaining_size;
break;
}
// Read the length of the next snappy compressed block.
size_t compressed_small_block_len = BigEndian::Load32(input_ptr);

src += 2 * sizeof(uint32_t);
remaining_input_size -= 2 * sizeof(uint32_t);

// ATTN: the uncompressed len from GetUncompressedLength() is same as
// uncompressed_block_len, so I think it is unnecessary to get it again.
// Get uncompressed len from snappy
// size_t uncompressed_len;
// if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(src),
// compressed_len, &uncompressed_len)) {
// return Status::InternalError("snappy block decompress failed to get uncompressed len");
// }

// Decompress
if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), compressed_len,
reinterpret_cast<char*>(output))) {
return Status::InternalError(
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
uncompressed_block_len, compressed_len);
input_ptr += sizeof(uint32_t);
input_len -= sizeof(uint32_t);

if (compressed_small_block_len == 0) {
continue;
}

if (compressed_small_block_len > input_len) {
// Need more input buffer
*more_input_bytes = compressed_small_block_len - input_len;
break;
}

// Decompress this block.
size_t decompressed_small_block_len;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input_ptr),
compressed_small_block_len,
&decompressed_small_block_len)) {
return Status::InternalError(
"snappy block decompress failed to get uncompressed len");
}
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input_ptr),
compressed_small_block_len,
reinterpret_cast<char*>(output_ptr))) {
return Status::InternalError(
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
decompressed_small_block_len, compressed_small_block_len);
}
input_ptr += compressed_small_block_len;
input_len -= compressed_small_block_len;

output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -= decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;
};

if (*more_input_bytes != 0) {
// Need more input buffer
input_ptr = large_block_input_ptr;
output_ptr = large_block_output_ptr;
break;
}

output += uncompressed_block_len;
src += compressed_len;
remaining_input_size -= compressed_len;
uncompressed_total_len += uncompressed_block_len;
*decompressed_len += decompressed_large_block_len;
}

*input_bytes_read += (input_len - remaining_input_size);
*decompressed_len = uncompressed_total_len;
*input_bytes_read += (input_ptr - input);
// If no more input and output need, means this is the end of a compressed block
*stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);

Expand Down
32 changes: 31 additions & 1 deletion be/src/exec/decompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <stddef.h>
#include <stdint.h>
#include <zlib.h>
#include <zstd.h>

#include <memory>
#include <string>
Expand All @@ -34,7 +35,17 @@

namespace doris {

enum CompressType { UNCOMPRESSED, GZIP, DEFLATE, BZIP2, LZ4FRAME, LZOP, LZ4BLOCK, SNAPPYBLOCK };
enum CompressType {
UNCOMPRESSED,
GZIP,
DEFLATE,
BZIP2,
ZSTD,
LZ4FRAME,
LZOP,
LZ4BLOCK,
SNAPPYBLOCK
};

class Decompressor {
public:
Expand Down Expand Up @@ -126,6 +137,25 @@ class Bzip2Decompressor : public Decompressor {
bz_stream _bz_strm;
};

class ZstdDecompressor : public Decompressor {
public:
~ZstdDecompressor() override;

Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) override;

std::string debug_info() override;

private:
friend class Decompressor;
ZstdDecompressor() : Decompressor(CompressType::ZSTD) {}
Status init() override;

private:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers]

Suggested change
private:
Additional context

be/src/exec/decompressor.h:149: previously declared here

private:
^

ZSTD_DStream* _zstd_strm {nullptr};
};

class Lz4FrameDecompressor : public Decompressor {
public:
~Lz4FrameDecompressor() override;
Expand Down
Loading
Loading