Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduced memory overhead of preparing LZ4-compressed data for server. #110

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clickhouse/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
SET ( clickhouse-cpp-lib-src
base/coded.cpp
base/compressed.cpp
base/input.cpp
base/output.cpp
base/platform.cpp
base/socket.cpp
base/wire_format.cpp

columns/array.cpp
columns/date.cpp
Expand Down
100 changes: 0 additions & 100 deletions clickhouse/base/coded.cpp

This file was deleted.

65 changes: 0 additions & 65 deletions clickhouse/base/coded.h

This file was deleted.

92 changes: 84 additions & 8 deletions clickhouse/base/compressed.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
#include "compressed.h"
#include "wire_format.h"
#include "output.h"

#include <cityhash/city.h>
#include <lz4/lz4.h>
#include <stdexcept>
#include <system_error>

#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB
namespace {
constexpr size_t HEADER_SIZE = 9;
// see DB::CompressionMethodByte::LZ4 from src/Compression/CompressionInfo.h of ClickHouse project
constexpr uint8_t COMPRESSION_METHOD = 0x82;
// Documentation says that compression is faster when output buffer is larger than LZ4_compressBound estimation.
constexpr size_t EXTRA_COMPRESS_BUFFER_SIZE = 4096;
constexpr size_t DBMS_MAX_COMPRESSED_SIZE = 0x40000000ULL; // 1GB
}

namespace clickhouse {

CompressedInput::CompressedInput(CodedInputStream* input)
CompressedInput::CompressedInput(InputStream* input)
: input_(input)
{
}
Expand All @@ -22,7 +30,7 @@ CompressedInput::~CompressedInput() {
#else
if (!std::uncaught_exceptions()) {
#endif
throw std::runtime_error("some data was not readed");
throw std::runtime_error("some data was not read");
}
}
}
Expand Down Expand Up @@ -50,9 +58,8 @@ bool CompressedInput::Decompress() {
return false;
}

if (method != 0x82) {
throw std::runtime_error("unsupported compression method " +
std::to_string(int(method)));
if (method != COMPRESSION_METHOD) {
throw std::runtime_error("unsupported compression method " + std::to_string(int(method)));
} else {
if (!WireFormat::ReadFixed(input_, &compressed)) {
return false;
Expand All @@ -75,7 +82,7 @@ bool CompressedInput::Decompress() {
out.Write(&original, sizeof(original));
}

if (!WireFormat::ReadBytes(input_, tmp.data() + 9, compressed - 9)) {
if (!WireFormat::ReadBytes(input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
return false;
} else {
if (hash != CityHash128((const char*)tmp.data(), compressed)) {
Expand All @@ -85,7 +92,7 @@ bool CompressedInput::Decompress() {

data_ = Buffer(original);

if (LZ4_decompress_safe((const char*)tmp.data() + 9, (char*)data_.data(), compressed - 9, original) < 0) {
if (LZ4_decompress_safe((const char*)tmp.data() + HEADER_SIZE, (char*)data_.data(), compressed - HEADER_SIZE, original) < 0) {
throw std::runtime_error("can't decompress data");
} else {
mem_.Reset(data_.data(), original);
Expand All @@ -95,4 +102,73 @@ bool CompressedInput::Decompress() {
return true;
}


CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size)
: destination_(destination)
, max_compressed_chunk_size_(max_compressed_chunk_size)
{
PreallocateCompressBuffer(max_compressed_chunk_size);
}

CompressedOutput::~CompressedOutput() {
Flush();
}

size_t CompressedOutput::DoWrite(const void* data, size_t len) {
const size_t original_len = len;
// what if len > max_compressed_chunk_size_ ?
const size_t max_chunk_size = max_compressed_chunk_size_ > 0 ? max_compressed_chunk_size_ : len;
if (max_chunk_size > max_compressed_chunk_size_) {
PreallocateCompressBuffer(len);
}

while (len > 0) {
auto to_compress = std::min(len, max_chunk_size);
Compress(data, to_compress);

len -= to_compress;
data = reinterpret_cast<const char*>(data) + to_compress;
}

return original_len - len;
}

void CompressedOutput::DoFlush() {
destination_->Flush();
}

void CompressedOutput::Compress(const void * data, size_t len) {
const auto compressed_size = LZ4_compress_default(
(const char*)data,
(char*)compressed_buffer_.data() + HEADER_SIZE,
len,
compressed_buffer_.size() - HEADER_SIZE);
if (compressed_size <= 0)
throw std::runtime_error("Failed to compress chunk of " + std::to_string(len) + " bytes, "
"LZ4 error: " + std::to_string(compressed_size));

{
auto header = compressed_buffer_.data();
WriteUnaligned(header, COMPRESSION_METHOD);
// Compressed data size with header
WriteUnaligned(header + 1, static_cast<uint32_t>(compressed_size + HEADER_SIZE));
// Original data size
WriteUnaligned(header + 5, static_cast<uint32_t>(len));
}

WireFormat::WriteFixed(destination_, CityHash128(
(const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
WireFormat::WriteBytes(destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);

destination_->Flush();
}

void CompressedOutput::PreallocateCompressBuffer(size_t input_size) {
const auto estimated_compressed_buffer_size = LZ4_compressBound(input_size);
if (estimated_compressed_buffer_size <= 0)
throw std::runtime_error("Failed to estimate compressed buffer size, LZ4 error: " + std::to_string(estimated_compressed_buffer_size));

compressed_buffer_.resize(estimated_compressed_buffer_size + HEADER_SIZE + EXTRA_COMPRESS_BUFFER_SIZE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a low hanging fruit for optimization: resize without initialization.

}

}
27 changes: 24 additions & 3 deletions clickhouse/base/compressed.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once

#include "coded.h"
#include "input.h"
#include "output.h"
#include "buffer.h"

namespace clickhouse {

class CompressedInput : public ZeroCopyInput {
public:
CompressedInput(CodedInputStream* input);
CompressedInput(InputStream* input);
~CompressedInput();

protected:
Expand All @@ -15,10 +17,29 @@ class CompressedInput : public ZeroCopyInput {
bool Decompress();

private:
CodedInputStream* const input_;
InputStream* const input_;

Buffer data_;
ArrayInput mem_;
};

class CompressedOutput : public OutputStream {
public:
CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size = 0);
~CompressedOutput();

protected:
size_t DoWrite(const void* data, size_t len) override;
void DoFlush() override;

private:
void Compress(const void * data, size_t len);
void PreallocateCompressBuffer(size_t input_size);

Enmk marked this conversation as resolved.
Show resolved Hide resolved
private:
OutputStream * destination_;
const size_t max_compressed_chunk_size_;
Buffer compressed_buffer_;
};

}
15 changes: 15 additions & 0 deletions clickhouse/base/input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@

namespace clickhouse {

bool ZeroCopyInput::Skip(size_t bytes) {
while (bytes > 0) {
const void* ptr;
Enmk marked this conversation as resolved.
Show resolved Hide resolved
size_t len = Next(&ptr, bytes);

if (len == 0) {
return false;
}

bytes -= len;
}

return true;
}

size_t ZeroCopyInput::DoRead(void* buf, size_t len) {
const void* ptr;
size_t result = DoNext(&ptr, len);
Expand Down
Loading