Skip to content

Commit

Permalink
[charry-pick](branch3.0) support text hive (#41656)
Browse files Browse the repository at this point in the history
## Proposed changes
pick from master:
  #40315
  #40291
  #40183
  #38549
  • Loading branch information
suxiaogang223 authored Oct 10, 2024
1 parent 6883d86 commit eb419c2
Show file tree
Hide file tree
Showing 40 changed files with 2,740 additions and 215 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,8 @@ DEFINE_Validator(tablet_meta_serialize_size_limit,
[](const int64_t config) -> bool { return config < 1717986918; });

DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60");
DEFINE_mInt32(snappy_compression_block_size, "262144");
DEFINE_mInt32(lz4_compression_block_size, "262144");

DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,9 @@ DECLARE_mInt16(topn_agg_limit_multiplier);
DECLARE_mInt64(tablet_meta_serialize_size_limit);

DECLARE_mInt64(pipeline_task_leakage_detect_period_secs);
// To be compatible with hadoop's block compression
DECLARE_mInt32(snappy_compression_block_size);
DECLARE_mInt32(lz4_compression_block_size);

DECLARE_mBool(enable_pipeline_task_leakage_detect);

Expand Down
212 changes: 142 additions & 70 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ Status Decompressor::create_decompressor(CompressType type,
case CompressType::BZIP2:
decompressor->reset(new Bzip2Decompressor());
break;
case CompressType::ZSTD:
decompressor->reset(new ZstdDecompressor());
break;
case CompressType::LZ4FRAME:
decompressor->reset(new Lz4FrameDecompressor());
break;
Expand Down Expand Up @@ -86,6 +89,9 @@ Status Decompressor::create_decompressor(TFileCompressType::type type,
case TFileCompressType::BZ2:
compress_type = CompressType::BZIP2;
break;
case TFileCompressType::ZSTD:
compress_type = CompressType::ZSTD;
break;
case TFileCompressType::LZ4FRAME:
compress_type = CompressType::LZ4FRAME;
break;
Expand Down Expand Up @@ -300,6 +306,50 @@ std::string Bzip2Decompressor::debug_info() {
return ss.str();
}

ZstdDecompressor::~ZstdDecompressor() {
ZSTD_freeDStream(_zstd_strm);
}

Status ZstdDecompressor::init() {
_zstd_strm = ZSTD_createDStream();
if (!_zstd_strm) {
std::stringstream ss;
return Status::InternalError("ZSTD_dctx creation error");
}
auto ret = ZSTD_initDStream(_zstd_strm);
if (ZSTD_isError(ret)) {
return Status::InternalError("ZSTD_initDStream error: {}", ZSTD_getErrorName(ret));
}
return Status::OK();
}

Status ZstdDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
uint8_t* output, size_t output_max_len,
size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) {
// 1. set input and output
ZSTD_inBuffer inputBuffer = {input, input_len, 0};
ZSTD_outBuffer outputBuffer = {output, output_max_len, 0};

// decompress
int ret = ZSTD_decompressStream(_zstd_strm, &outputBuffer, &inputBuffer);
*input_bytes_read = inputBuffer.pos;
*decompressed_len = outputBuffer.pos;

if (ZSTD_isError(ret)) {
return Status::InternalError("Failed to zstd decompress: {}", ZSTD_getErrorName(ret));
}

*stream_end = ret == 0;
return Status::OK();
}

std::string ZstdDecompressor::debug_info() {
std::stringstream ss;
ss << "ZstdDecompressor.";
return ss.str();
}

// Lz4Frame
// Lz4 version: 1.7.5
// define LZ4F_VERSION = 100
Expand Down Expand Up @@ -468,7 +518,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
}

std::size_t decompressed_large_block_len = 0;
do {
while (remaining_decompressed_large_block_len > 0) {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
Expand Down Expand Up @@ -505,8 +555,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -= decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;

} while (remaining_decompressed_large_block_len > 0);
};

if (*more_input_bytes != 0) {
// Need more input buffer
Expand Down Expand Up @@ -535,90 +584,113 @@ Status SnappyBlockDecompressor::init() {
return Status::OK();
}

// Hadoop snappycodec source :
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
// Example:
// OriginData(The original data will be divided into several large data block.) :
// large data block1 | large data block2 | large data block3 | ....
// The large data block will be divided into several small data block.
// Suppose a large data block is divided into three small blocks:
// large data block1: | small block1 | small block2 | small block3 |
// CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
//
// A : original length of the current block of large data block.
// sizeof(A) = 4 bytes.
// A = length(small block1) + length(small block2) + length(small block3)
// Bx : length of small data block bx.
// sizeof(Bx) = 4 bytes.
// Bx = length(compress(small blockx))
Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len,
bool* stream_end, size_t* more_input_bytes,
size_t* more_output_bytes) {
uint8_t* src = input;
size_t remaining_input_size = input_len;
int64_t uncompressed_total_len = 0;
*input_bytes_read = 0;
auto* input_ptr = input;
auto* output_ptr = output;

// The hadoop snappy codec is as:
// <4 byte big endian uncompressed size>
// <4 byte big endian compressed size>
// <snappy compressed block>
// ....
// <4 byte big endian uncompressed size>
// <4 byte big endian compressed size>
// <snappy compressed block>
//
// See:
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
while (remaining_input_size > 0) {
if (remaining_input_size < 4) {
*more_input_bytes = 4 - remaining_input_size;
break;
while (input_len > 0) {
//if faild , fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

if (input_len < sizeof(uint32_t)) {
return Status::InvalidArgument(strings::Substitute(
"fail to do hadoop-snappy decompress, input_len=$0", input_len));
}
// Read uncompressed size
uint32_t uncompressed_block_len = Decompressor::_read_int32(src);
int64_t remaining_output_len = output_max_len - uncompressed_total_len;
if (remaining_output_len < uncompressed_block_len) {

uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);

input_ptr += sizeof(uint32_t);
input_len -= sizeof(uint32_t);

std::size_t remaining_output_len = output_max_len - *decompressed_len;

if (remaining_output_len < remaining_decompressed_large_block_len) {
// Need more output buffer
*more_output_bytes = uncompressed_block_len - remaining_output_len;
break;
}
*more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
input_ptr = large_block_input_ptr;
output_ptr = large_block_output_ptr;

if (uncompressed_block_len == 0) {
remaining_input_size -= sizeof(uint32_t);
break;
}

if (remaining_input_size <= 2 * sizeof(uint32_t)) {
// The remaining input size should be larger then <uncompressed size><compressed size><compressed data>
// +1 means we need at least 1 bytes of compressed data.
*more_input_bytes = 2 * sizeof(uint32_t) + 1 - remaining_input_size;
break;
}
std::size_t decompressed_large_block_len = 0;
while (remaining_decompressed_large_block_len > 0) {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
break;
}

// Read compressed size
size_t tmp_remaining_size = remaining_input_size - 2 * sizeof(uint32_t);
size_t compressed_len = _read_int32(src + sizeof(uint32_t));
if (compressed_len > tmp_remaining_size) {
// Need more input data
*more_input_bytes = compressed_len - tmp_remaining_size;
break;
}
// Read the length of the next snappy compressed block.
size_t compressed_small_block_len = BigEndian::Load32(input_ptr);

src += 2 * sizeof(uint32_t);
remaining_input_size -= 2 * sizeof(uint32_t);

// ATTN: the uncompressed len from GetUncompressedLength() is same as
// uncompressed_block_len, so I think it is unnecessary to get it again.
// Get uncompressed len from snappy
// size_t uncompressed_len;
// if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(src),
// compressed_len, &uncompressed_len)) {
// return Status::InternalError("snappy block decompress failed to get uncompressed len");
// }

// Decompress
if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), compressed_len,
reinterpret_cast<char*>(output))) {
return Status::InternalError(
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
uncompressed_block_len, compressed_len);
input_ptr += sizeof(uint32_t);
input_len -= sizeof(uint32_t);

if (compressed_small_block_len == 0) {
continue;
}

if (compressed_small_block_len > input_len) {
// Need more input buffer
*more_input_bytes = compressed_small_block_len - input_len;
break;
}

// Decompress this block.
size_t decompressed_small_block_len;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input_ptr),
compressed_small_block_len,
&decompressed_small_block_len)) {
return Status::InternalError(
"snappy block decompress failed to get uncompressed len");
}
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input_ptr),
compressed_small_block_len,
reinterpret_cast<char*>(output_ptr))) {
return Status::InternalError(
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
decompressed_small_block_len, compressed_small_block_len);
}
input_ptr += compressed_small_block_len;
input_len -= compressed_small_block_len;

output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -= decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;
};

if (*more_input_bytes != 0) {
// Need more input buffer
input_ptr = large_block_input_ptr;
output_ptr = large_block_output_ptr;
break;
}

output += uncompressed_block_len;
src += compressed_len;
remaining_input_size -= compressed_len;
uncompressed_total_len += uncompressed_block_len;
*decompressed_len += decompressed_large_block_len;
}

*input_bytes_read += (input_len - remaining_input_size);
*decompressed_len = uncompressed_total_len;
*input_bytes_read += (input_ptr - input);
// If no more input and output need, means this is the end of a compressed block
*stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);

Expand Down
32 changes: 31 additions & 1 deletion be/src/exec/decompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <stddef.h>
#include <stdint.h>
#include <zlib.h>
#include <zstd.h>

#include <memory>
#include <string>
Expand All @@ -34,7 +35,17 @@

namespace doris {

enum CompressType { UNCOMPRESSED, GZIP, DEFLATE, BZIP2, LZ4FRAME, LZOP, LZ4BLOCK, SNAPPYBLOCK };
enum CompressType {
UNCOMPRESSED,
GZIP,
DEFLATE,
BZIP2,
ZSTD,
LZ4FRAME,
LZOP,
LZ4BLOCK,
SNAPPYBLOCK
};

class Decompressor {
public:
Expand Down Expand Up @@ -126,6 +137,25 @@ class Bzip2Decompressor : public Decompressor {
bz_stream _bz_strm;
};

class ZstdDecompressor : public Decompressor {
public:
~ZstdDecompressor() override;

Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) override;

std::string debug_info() override;

private:
friend class Decompressor;
ZstdDecompressor() : Decompressor(CompressType::ZSTD) {}
Status init() override;

private:
ZSTD_DStream* _zstd_strm {nullptr};
};

class Lz4FrameDecompressor : public Decompressor {
public:
~Lz4FrameDecompressor() override;
Expand Down
Loading

0 comments on commit eb419c2

Please sign in to comment.