Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](hive) support hive write text table #38549

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
04696bd
enable FORMAT_CSV_PLAIN in HiveTableSink
suxiaogang223 Jul 18, 2024
d68e2be
add FORMAT_CSV_PLAIN in vhive_partition_writer
suxiaogang223 Jul 18, 2024
1bb1160
support hive serde properties
suxiaogang223 Jul 19, 2024
504bfdb
keep THiveSerDeProperties as reference
suxiaogang223 Jul 20, 2024
f2f0590
support block_compression in vcsv_transformer
suxiaogang223 Jul 30, 2024
c41e77f
add hive_text_compression SessionVariable
suxiaogang223 Jul 31, 2024
4dee797
add more compress file extension name
suxiaogang223 Aug 1, 2024
918eca5
supoort HadoopLz4Codec and HadoopSnappyCodec
suxiaogang223 Aug 1, 2024
fe0d390
impl Bzip2BlockCompression
suxiaogang223 Aug 1, 2024
84df838
fix bug
suxiaogang223 Aug 5, 2024
8175f93
fix Gzip and Bzip2 bug
suxiaogang223 Aug 5, 2024
bc86cd3
hadoop block compression
suxiaogang223 Aug 5, 2024
7cd1c63
add regress-test
suxiaogang223 Aug 12, 2024
b70da8f
create table
suxiaogang223 Aug 12, 2024
9014501
fix regresstest
suxiaogang223 Aug 12, 2024
7a4468d
fix
suxiaogang223 Aug 12, 2024
13a2fb8
fix
suxiaogang223 Aug 12, 2024
f00fcb0
add collection_delim and mapkv_delim for complex types
suxiaogang223 Aug 13, 2024
8f7ae82
fix fe
suxiaogang223 Aug 13, 2024
77c340a
write the hive text table
suxiaogang223 Aug 13, 2024
d8ef300
fix
suxiaogang223 Aug 13, 2024
692d84a
fix
suxiaogang223 Aug 13, 2024
6ef9abb
fix regresstest
suxiaogang223 Aug 13, 2024
7730d63
fix snappy and lz4 compression bug
suxiaogang223 Aug 14, 2024
99e9286
fix the fucking bug :(
suxiaogang223 Aug 14, 2024
14b4ba5
update format_compressions in regression test
suxiaogang223 Aug 14, 2024
525ad4c
add config
suxiaogang223 Aug 14, 2024
51e18c8
change serialize_one_cell_to_hive_text return Status
suxiaogang223 Aug 14, 2024
8ecbf47
gen output
suxiaogang223 Aug 14, 2024
217cb56
remove deflate
suxiaogang223 Aug 14, 2024
7078d9d
fix
suxiaogang223 Aug 14, 2024
df4d1de
add escape_char and null_format
suxiaogang223 Aug 14, 2024
cb5fb78
refactor VCSVTransformer code
suxiaogang223 Aug 14, 2024
ac78e1b
support escape.delim and serialization.null.format
suxiaogang223 Aug 14, 2024
3d75c40
fix
suxiaogang223 Aug 14, 2024
589b501
fix build
suxiaogang223 Aug 15, 2024
57740f0
support escape_char and null_fromat for csv_reader
suxiaogang223 Aug 18, 2024
0bc5c91
Revert "support escape_char and null_fromat for csv_reader"
suxiaogang223 Aug 20, 2024
b88534f
fix complie
suxiaogang223 Aug 24, 2024
7299015
add regression test
suxiaogang223 Aug 26, 2024
8d2f9e1
fix snappy read bug
suxiaogang223 Aug 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,8 @@ DEFINE_Validator(tablet_meta_serialize_size_limit,
[](const int64_t config) -> bool { return config < 1717986918; });

DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60");
DEFINE_mInt32(snappy_compression_block_size, "262144");
DEFINE_mInt32(lz4_compression_block_size, "262144");

// clang-format off
#ifdef BE_TEST
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,9 @@ DECLARE_mInt16(topn_agg_limit_multiplier);
DECLARE_mInt64(tablet_meta_serialize_size_limit);

DECLARE_mInt64(pipeline_task_leakage_detect_period_secs);
// To be compatible with hadoop's block compression
DECLARE_mInt32(snappy_compression_block_size);
DECLARE_mInt32(lz4_compression_block_size);

#ifdef BE_TEST
// test s3
Expand Down
158 changes: 91 additions & 67 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,90 +535,114 @@ Status SnappyBlockDecompressor::init() {
return Status::OK();
}

// Hadoop snappycodec source :
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
// Example:
// OriginData(The original data will be divided into several large data block.) :
// large data block1 | large data block2 | large data block3 | ....
// The large data block will be divided into several small data block.
// Suppose a large data block is divided into three small blocks:
// large data block1: | small block1 | small block2 | small block3 |
// CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
//
// A : original length of the current block of large data block.
// sizeof(A) = 4 bytes.
// A = length(small block1) + length(small block2) + length(small block3)
// Bx : length of small data block bx.
// sizeof(Bx) = 4 bytes.
// Bx = length(compress(small blockx))
Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len,
bool* stream_end, size_t* more_input_bytes,
size_t* more_output_bytes) {
uint8_t* src = input;
size_t remaining_input_size = input_len;
int64_t uncompressed_total_len = 0;
*input_bytes_read = 0;
auto* input_ptr = input;
auto* output_ptr = output;

// The hadoop snappy codec is as:
// <4 byte big endian uncompressed size>
// <4 byte big endian compressed size>
// <snappy compressed block>
// ....
// <4 byte big endian uncompressed size>
// <4 byte big endian compressed size>
// <snappy compressed block>
//
// See:
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
while (remaining_input_size > 0) {
if (remaining_input_size < 4) {
*more_input_bytes = 4 - remaining_input_size;
break;
while (input_len > 0) {
//if faild , fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

if (input_len < sizeof(uint32_t)) {
return Status::InvalidArgument(strings::Substitute(
"fail to do hadoop-snappy decompress, input_len=$0", input_len));
}
// Read uncompressed size
uint32_t uncompressed_block_len = Decompressor::_read_int32(src);
int64_t remaining_output_len = output_max_len - uncompressed_total_len;
if (remaining_output_len < uncompressed_block_len) {

uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);

input_ptr += sizeof(uint32_t);
input_len -= sizeof(uint32_t);

std::size_t remaining_output_len = output_max_len - *decompressed_len;

if (remaining_output_len < remaining_decompressed_large_block_len) {
// Need more output buffer
*more_output_bytes = uncompressed_block_len - remaining_output_len;
break;
}
*more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
input_ptr = large_block_input_ptr;
output_ptr = large_block_output_ptr;

if (uncompressed_block_len == 0) {
remaining_input_size -= sizeof(uint32_t);
break;
}

if (remaining_input_size <= 2 * sizeof(uint32_t)) {
// The remaining input size should be larger then <uncompressed size><compressed size><compressed data>
// +1 means we need at least 1 bytes of compressed data.
*more_input_bytes = 2 * sizeof(uint32_t) + 1 - remaining_input_size;
break;
}
std::size_t decompressed_large_block_len = 0;
do {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
break;
}

// Read compressed size
size_t tmp_remaining_size = remaining_input_size - 2 * sizeof(uint32_t);
size_t compressed_len = _read_int32(src + sizeof(uint32_t));
if (compressed_len > tmp_remaining_size) {
// Need more input data
*more_input_bytes = compressed_len - tmp_remaining_size;
break;
}
// Read the length of the next snappy compressed block.
size_t compressed_small_block_len = BigEndian::Load32(input_ptr);

src += 2 * sizeof(uint32_t);
remaining_input_size -= 2 * sizeof(uint32_t);

// ATTN: the uncompressed len from GetUncompressedLength() is same as
// uncompressed_block_len, so I think it is unnecessary to get it again.
// Get uncompressed len from snappy
// size_t uncompressed_len;
// if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(src),
// compressed_len, &uncompressed_len)) {
// return Status::InternalError("snappy block decompress failed to get uncompressed len");
// }

// Decompress
if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), compressed_len,
reinterpret_cast<char*>(output))) {
return Status::InternalError(
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
uncompressed_block_len, compressed_len);
input_ptr += sizeof(uint32_t);
input_len -= sizeof(uint32_t);

if (compressed_small_block_len == 0) {
continue;
}

if (compressed_small_block_len > input_len) {
// Need more input buffer
*more_input_bytes = compressed_small_block_len - input_len;
break;
}

// Decompress this block.
size_t decompressed_small_block_len;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input_ptr),
compressed_small_block_len,
&decompressed_small_block_len)) {
return Status::InternalError(
"snappy block decompress failed to get uncompressed len");
}
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input_ptr),
compressed_small_block_len,
reinterpret_cast<char*>(output_ptr))) {
return Status::InternalError(
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
decompressed_small_block_len, compressed_small_block_len);
}
input_ptr += compressed_small_block_len;
input_len -= compressed_small_block_len;

output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -= decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;

} while (remaining_decompressed_large_block_len > 0);

if (*more_input_bytes != 0) {
// Need more input buffer
input_ptr = large_block_input_ptr;
output_ptr = large_block_output_ptr;
break;
}

output += uncompressed_block_len;
src += compressed_len;
remaining_input_size -= compressed_len;
uncompressed_total_len += uncompressed_block_len;
*decompressed_len += decompressed_large_block_len;
}

*input_bytes_read += (input_len - remaining_input_size);
*decompressed_len = uncompressed_total_len;
*input_bytes_read += (input_ptr - input);
// If no more input and output need, means this is the end of a compressed block
*stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);

Expand Down
Loading
Loading