From b148348da6947e0f4cf1f94fd64f5c073379dafb Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Wed, 27 Oct 2021 22:14:32 +0300 Subject: [PATCH 1/3] Reduced memory overhead of preparing LZ4-compressed data for server. Do not compress a whole serialized block, but instead only a reasonable-sized chunk. This removes some temporary buffers and reduces memory pressure. Also minor refactoring: - moved all serialization-format code to WireFormat class. - removed CodedOutputStream and CodedInputStream classes. --- clickhouse/CMakeLists.txt | 1 + clickhouse/base/coded.cpp | 102 ++-------- clickhouse/base/coded.h | 55 ------ clickhouse/base/compressed.cpp | 76 +++++++- clickhouse/base/compressed.h | 25 ++- clickhouse/base/input.cpp | 15 ++ clickhouse/base/input.h | 5 + clickhouse/base/output.cpp | 13 +- clickhouse/base/output.h | 19 +- clickhouse/base/socket.cpp | 10 +- clickhouse/base/socket.h | 3 +- clickhouse/base/sslsocket.cpp | 4 +- clickhouse/base/sslsocket.h | 5 +- clickhouse/base/streamstack.h | 59 ++++++ clickhouse/base/wire_format.cpp | 105 ++++++++++ clickhouse/base/wire_format.h | 112 ++++------- clickhouse/client.cpp | 213 +++++++++------------ clickhouse/client.h | 10 + clickhouse/columns/array.cpp | 5 +- clickhouse/columns/array.h | 5 +- clickhouse/columns/column.h | 9 +- clickhouse/columns/date.cpp | 12 +- clickhouse/columns/date.h | 12 +- clickhouse/columns/decimal.cpp | 4 +- clickhouse/columns/decimal.h | 4 +- clickhouse/columns/enum.cpp | 12 +- clickhouse/columns/enum.h | 4 +- clickhouse/columns/ip4.cpp | 4 +- clickhouse/columns/ip4.h | 4 +- clickhouse/columns/ip6.cpp | 4 +- clickhouse/columns/ip6.h | 4 +- clickhouse/columns/lowcardinality.cpp | 6 +- clickhouse/columns/lowcardinality.h | 4 +- clickhouse/columns/lowcardinalityadaptor.h | 6 +- clickhouse/columns/nothing.h | 4 +- clickhouse/columns/nullable.cpp | 4 +- clickhouse/columns/nullable.h | 4 +- clickhouse/columns/numeric.cpp | 10 +- clickhouse/columns/numeric.h | 4 +- clickhouse/columns/string.cpp | 8 +- clickhouse/columns/string.h | 8 +- clickhouse/columns/tuple.cpp | 4 +- clickhouse/columns/tuple.h | 4 +- clickhouse/columns/uuid.cpp | 4 +- clickhouse/columns/uuid.h | 4 +- ut/columns_ut.cpp | 20 +- ut/performance_tests.cpp | 6 +- ut/stream_ut.cpp | 12 +- ut/utils.h | 1 + 49 files changed, 566 insertions(+), 462 deletions(-) create mode 100644 clickhouse/base/streamstack.h create mode 100644 clickhouse/base/wire_format.cpp diff --git a/clickhouse/CMakeLists.txt b/clickhouse/CMakeLists.txt index 3bef4142..beb411d3 100644 --- a/clickhouse/CMakeLists.txt +++ b/clickhouse/CMakeLists.txt @@ -5,6 +5,7 @@ SET ( clickhouse-cpp-lib-src base/output.cpp base/platform.cpp base/socket.cpp + base/wire_format.cpp columns/array.cpp columns/date.cpp diff --git a/clickhouse/base/coded.cpp b/clickhouse/base/coded.cpp index fbd29a21..35a23b9c 100644 --- a/clickhouse/base/coded.cpp +++ b/clickhouse/base/coded.cpp @@ -4,97 +4,27 @@ namespace clickhouse { -static const int MAX_VARINT_BYTES = 10; +//static const int MAX_VARINT_BYTES = 10; -CodedInputStream::CodedInputStream(ZeroCopyInput* input) - : input_(input) -{ -} - -bool CodedInputStream::ReadRaw(void* buffer, size_t size) { - uint8_t* p = static_cast(buffer); - - while (size > 0) { - const void* ptr; - size_t len = input_->Next(&ptr, size); - - memcpy(p, ptr, len); - - p += len; - size -= len; - } - - return true; -} - -bool CodedInputStream::Skip(size_t count) { - while (count > 0) { - const void* ptr; - size_t len = input_->Next(&ptr, count); - - if (len == 0) { - return false; - } - - count -= len; - } - - return true; -} - -bool CodedInputStream::ReadVarint64(uint64_t* value) { - *value = 0; +//CodedInputStream::CodedInputStream(ZeroCopyInput* input) +// : input_(input) +//{ +//} - for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) { - uint8_t byte; +//bool CodedInputStream::ReadRaw(void* buffer, size_t size) { +// uint8_t* p = static_cast(buffer); - if (!input_->ReadByte(&byte)) { - return false; - } else { - *value |= uint64_t(byte & 0x7F) << (7 * i); +// while (size > 0) { +// const void* ptr; +// size_t len = input_->Next(&ptr, size); - if (!(byte & 0x80)) { - return true; - } - } - } +// memcpy(p, ptr, len); - // TODO skip invalid - return false; -} - - -CodedOutputStream::CodedOutputStream(ZeroCopyOutput* output) - : output_(output) -{ -} +// p += len; +// size -= len; +// } -void CodedOutputStream::Flush() { - output_->Flush(); -} - -void CodedOutputStream::WriteRaw(const void* buffer, int size) { - output_->Write(buffer, size); -} - -void CodedOutputStream::WriteVarint64(uint64_t value) { - uint8_t bytes[MAX_VARINT_BYTES]; - int size = 0; - - for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) { - uint8_t byte = value & 0x7F; - if (value > 0x7F) - byte |= 0x80; - - bytes[size++] = byte; - - value >>= 7; - if (!value) { - break; - } - } - - WriteRaw(bytes, size); -} +// return true; +//} } diff --git a/clickhouse/base/coded.h b/clickhouse/base/coded.h index a171ac5e..b6b6ca7c 100644 --- a/clickhouse/base/coded.h +++ b/clickhouse/base/coded.h @@ -7,59 +7,4 @@ namespace clickhouse { -/** - * Class which reads and decodes binary data which is composed of varint- - * encoded integers and fixed-width pieces. - */ -class CodedInputStream { -public: - /// Create a CodedInputStream that reads from the given ZeroCopyInput. - explicit CodedInputStream(ZeroCopyInput* input); - - // Read an unsigned integer with Varint encoding, truncating to 32 bits. - // Reading a 32-bit value is equivalent to reading a 64-bit one and casting - // it to uint32, but may be more efficient. - bool ReadVarint32(uint32_t* value); - - // Read an unsigned integer with Varint encoding. - bool ReadVarint64(uint64_t* value); - - // Read raw bytes, copying them into the given buffer. - bool ReadRaw(void* buffer, size_t size); - - // Like ReadRaw, but reads into a string. - // - // Implementation Note: ReadString() grows the string gradually as it - // reads in the data, rather than allocating the entire requested size - // upfront. This prevents denial-of-service attacks in which a client - // could claim that a string is going to be MAX_INT bytes long in order to - // crash the server because it can't allocate this much space at once. - bool ReadString(std::string* buffer, int size); - - // Skips a number of bytes. Returns false if an underlying read error - // occurs. - bool Skip(size_t count); - -private: - ZeroCopyInput* input_; -}; - - -class CodedOutputStream { -public: - /// Create a CodedInputStream that writes to the given ZeroCopyOutput. - explicit CodedOutputStream(ZeroCopyOutput* output); - - void Flush(); - - // Write raw bytes, copying them from the given buffer. - void WriteRaw(const void* buffer, int size); - - /// Write an unsigned integer with Varint encoding. - void WriteVarint64(const uint64_t value); - -private: - ZeroCopyOutput* output_; -}; - } diff --git a/clickhouse/base/compressed.cpp b/clickhouse/base/compressed.cpp index 40599f7b..0239c6f2 100644 --- a/clickhouse/base/compressed.cpp +++ b/clickhouse/base/compressed.cpp @@ -1,16 +1,24 @@ #include "compressed.h" #include "wire_format.h" +#include "output.h" #include #include #include #include +#include + +namespace { +static const size_t HEADER_SIZE = 9; +static const size_t EXTRA_PREALLOCATE_COMPRESS_BUFFER = 15; +static const uint8_t COMPRESSION_METHOD = 0x82; #define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB +} namespace clickhouse { -CompressedInput::CompressedInput(CodedInputStream* input) +CompressedInput::CompressedInput(InputStream* input) : input_(input) { } @@ -50,7 +58,7 @@ bool CompressedInput::Decompress() { return false; } - if (method != 0x82) { + if (method != COMPRESSION_METHOD) { throw std::runtime_error("unsupported compression method " + std::to_string(int(method))); } else { @@ -75,7 +83,7 @@ bool CompressedInput::Decompress() { out.Write(&original, sizeof(original)); } - if (!WireFormat::ReadBytes(input_, tmp.data() + 9, compressed - 9)) { + if (!WireFormat::ReadBytes(input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) { return false; } else { if (hash != CityHash128((const char*)tmp.data(), compressed)) { @@ -85,7 +93,7 @@ bool CompressedInput::Decompress() { data_ = Buffer(original); - if (LZ4_decompress_safe((const char*)tmp.data() + 9, (char*)data_.data(), compressed - 9, original) < 0) { + if (LZ4_decompress_safe((const char*)tmp.data() + HEADER_SIZE, (char*)data_.data(), compressed - HEADER_SIZE, original) < 0) { throw std::runtime_error("can't decompress data"); } else { mem_.Reset(data_.data(), original); @@ -95,4 +103,64 @@ bool CompressedInput::Decompress() { return true; } + +CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size) + : destination_(destination), + max_compressed_chunk_size_(max_compressed_chunk_size) +{ +} + +CompressedOutput::~CompressedOutput() { + Flush(); +} + +size_t CompressedOutput::DoWrite(const void* data, size_t len) { + const size_t original_len = len; + const size_t max_chunk_size = max_compressed_chunk_size_ ? max_compressed_chunk_size_ : len; + + while (len > 0) + { + auto to_compress = std::min(len, max_chunk_size); + if (!Compress(data, to_compress)) + break; + + len -= to_compress; + data = reinterpret_cast(data) + to_compress; + } + + return original_len - len; +} + +void CompressedOutput::DoFlush() { + destination_->Flush(); +} + +bool CompressedOutput::Compress(const void * data, size_t len) { + + const size_t expected_out_size = LZ4_compressBound(len); + compressed_buffer_.resize(std::max(compressed_buffer_.size(), expected_out_size + HEADER_SIZE + EXTRA_PREALLOCATE_COMPRESS_BUFFER)); + + const int compressed_size = LZ4_compress_default( + (const char*)data, + (char*)compressed_buffer_.data() + HEADER_SIZE, + len, + compressed_buffer_.size() - HEADER_SIZE); + + { + auto header = compressed_buffer_.data(); + WriteUnaligned(header, COMPRESSION_METHOD); + // Compressed data size with header + WriteUnaligned(header + 1, static_cast(compressed_size + HEADER_SIZE)); + // Original data size + WriteUnaligned(header + 5, static_cast(len)); + } + + WireFormat::WriteFixed(destination_, CityHash128( + (const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE)); + WireFormat::WriteBytes(destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE); + + destination_->Flush(); + return true; +} + } diff --git a/clickhouse/base/compressed.h b/clickhouse/base/compressed.h index 8c1b461e..390d3ab9 100644 --- a/clickhouse/base/compressed.h +++ b/clickhouse/base/compressed.h @@ -1,12 +1,14 @@ #pragma once -#include "coded.h" +#include "input.h" +#include "output.h" +#include "buffer.h" namespace clickhouse { class CompressedInput : public ZeroCopyInput { public: - CompressedInput(CodedInputStream* input); + CompressedInput(InputStream* input); ~CompressedInput(); protected: @@ -15,10 +17,27 @@ class CompressedInput : public ZeroCopyInput { bool Decompress(); private: - CodedInputStream* const input_; + InputStream* const input_; Buffer data_; ArrayInput mem_; }; +class CompressedOutput : public OutputStream { +public: + CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size = 0); + ~CompressedOutput(); + +protected: + size_t DoWrite(const void* data, size_t len) override; + void DoFlush() override; + bool Compress(const void * data, size_t len); + + +private: + OutputStream * destination_; + Buffer compressed_buffer_; + size_t max_compressed_chunk_size_; +}; + } diff --git a/clickhouse/base/input.cpp b/clickhouse/base/input.cpp index f7b7ff61..e1c409da 100644 --- a/clickhouse/base/input.cpp +++ b/clickhouse/base/input.cpp @@ -5,6 +5,21 @@ namespace clickhouse { +bool ZeroCopyInput::Skip(size_t bytes) { + while (bytes > 0) { + const void* ptr; + size_t len = Next(&ptr, bytes); + + if (len == 0) { + return false; + } + + bytes -= len; + } + + return true; +} + size_t ZeroCopyInput::DoRead(void* buf, size_t len) { const void* ptr; size_t result = DoNext(&ptr, len); diff --git a/clickhouse/base/input.h b/clickhouse/base/input.h index 052fab85..9f35ddda 100644 --- a/clickhouse/base/input.h +++ b/clickhouse/base/input.h @@ -21,6 +21,9 @@ class InputStream { return DoRead(buf, len); } + // Skips a number of bytes. Returns false if an underlying read error occurs. + virtual bool Skip(size_t bytes) = 0; + protected: virtual size_t DoRead(void* buf, size_t len) = 0; }; @@ -32,6 +35,8 @@ class ZeroCopyInput : public InputStream { return DoNext(buf, len); } + bool Skip(size_t bytes) override; + protected: virtual size_t DoNext(const void** ptr, size_t len) = 0; diff --git a/clickhouse/base/output.cpp b/clickhouse/base/output.cpp index dcd25ef6..fd751167 100644 --- a/clickhouse/base/output.cpp +++ b/clickhouse/base/output.cpp @@ -6,7 +6,8 @@ namespace clickhouse { -void ZeroCopyOutput::DoWrite(const void* data, size_t len) { +size_t ZeroCopyOutput::DoWrite(const void* data, size_t len) { + const size_t original_len = len; while (len > 0) { void* ptr; size_t result = DoNext(&ptr, len); @@ -19,12 +20,15 @@ void ZeroCopyOutput::DoWrite(const void* data, size_t len) { break; } } + + return original_len - len; } ArrayOutput::ArrayOutput(void* buf, size_t len) : buf_(static_cast(buf)) , end_(buf_ + len) + , buffer_size_(len) { } @@ -105,17 +109,16 @@ size_t BufferedOutput::DoNext(void** data, size_t len) { } -void BufferedOutput::DoWrite(const void* data, size_t len) { +size_t BufferedOutput::DoWrite(const void* data, size_t len) { if (array_output_.Avail() < len) { Flush(); if (len > buffer_.size() / 2) { - slave_->Write(data, len); - return; + return slave_->Write(data, len); } } - array_output_.Write(data, len); + return array_output_.Write(data, len); } } diff --git a/clickhouse/base/output.h b/clickhouse/base/output.h index e53aadf4..b23cb08b 100644 --- a/clickhouse/base/output.h +++ b/clickhouse/base/output.h @@ -18,14 +18,14 @@ class OutputStream { DoFlush(); } - inline void Write(const void* data, size_t len) { - DoWrite(data, len); + inline size_t Write(const void* data, size_t len) { + return DoWrite(data, len); } protected: virtual void DoFlush() { } - virtual void DoWrite(const void* data, size_t len) = 0; + virtual size_t DoWrite(const void* data, size_t len) = 0; }; @@ -41,7 +41,7 @@ class ZeroCopyOutput : public OutputStream { // be written to the output. virtual size_t DoNext(void** data, size_t len) = 0; - void DoWrite(const void* data, size_t len) override; + size_t DoWrite(const void* data, size_t len) override; }; @@ -72,6 +72,12 @@ class ArrayOutput : public ZeroCopyOutput { inline void Reset(void* buf, size_t len) noexcept { buf_ = static_cast(buf); end_ = buf_ + len; + buffer_size_ = len; + } + + /// Number of bytes written to the buffer. + inline size_t Size() const noexcept { + return buffer_size_ - Avail(); } protected: @@ -80,11 +86,12 @@ class ArrayOutput : public ZeroCopyOutput { private: uint8_t* buf_; uint8_t* end_; + size_t buffer_size_; }; /** - * A ZeroCopyOutput stream backed by an vector of bytes. + * A ZeroCopyOutput stream backed by a vector. */ class BufferOutput : public ZeroCopyOutput { public: @@ -110,7 +117,7 @@ class BufferedOutput : public ZeroCopyOutput { protected: void DoFlush() override; size_t DoNext(void** data, size_t len) override; - void DoWrite(const void* data, size_t len) override; + size_t DoWrite(const void* data, size_t len) override; private: OutputStream* const slave_; diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index 285f98ba..703b0e5a 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -271,6 +271,10 @@ size_t SocketInput::DoRead(void* buf, size_t len) { ); } +bool SocketInput::Skip(size_t /*bytes*/) { + return false; +} + SocketOutput::SocketOutput(SOCKET s) : s_(s) @@ -279,7 +283,7 @@ SocketOutput::SocketOutput(SOCKET s) SocketOutput::~SocketOutput() = default; -void SocketOutput::DoWrite(const void* data, size_t len) { +size_t SocketOutput::DoWrite(const void* data, size_t len) { #if defined (_linux_) static const int flags = MSG_NOSIGNAL; #else @@ -288,9 +292,11 @@ void SocketOutput::DoWrite(const void* data, size_t len) { if (::send(s_, (const char*)data, (int)len, flags) != (int)len) { throw std::system_error( - errno, std::system_category(), "fail to send data" + errno, std::system_category(), "fail to send " + std::to_string(len) + " bytes of data" ); } + + return len; } diff --git a/clickhouse/base/socket.h b/clickhouse/base/socket.h index ac5cda5a..00bda042 100644 --- a/clickhouse/base/socket.h +++ b/clickhouse/base/socket.h @@ -84,6 +84,7 @@ class SocketInput : public InputStream { ~SocketInput(); protected: + bool Skip(size_t bytes) override; size_t DoRead(void* buf, size_t len) override; private: @@ -96,7 +97,7 @@ class SocketOutput : public OutputStream { ~SocketOutput(); protected: - void DoWrite(const void* data, size_t len) override; + size_t DoWrite(const void* data, size_t len) override; private: SOCKET s_; diff --git a/clickhouse/base/sslsocket.cpp b/clickhouse/base/sslsocket.cpp index d6301525..6a78405f 100644 --- a/clickhouse/base/sslsocket.cpp +++ b/clickhouse/base/sslsocket.cpp @@ -203,8 +203,8 @@ SSLSocketOutput::SSLSocketOutput(SSL *ssl) : ssl_(ssl) {} -void SSLSocketOutput::DoWrite(const void* data, size_t len) { - HANDLE_SSL_ERROR(ssl_, SSL_write(ssl_, data, len)); +size_t SSLSocketOutput::DoWrite(const void* data, size_t len) { + return static_cast(HANDLE_SSL_ERROR(ssl_, SSL_write(ssl_, data, len))); } #undef HANDLE_SSL_ERROR diff --git a/clickhouse/base/sslsocket.h b/clickhouse/base/sslsocket.h index 7e213d1b..19d42be9 100644 --- a/clickhouse/base/sslsocket.h +++ b/clickhouse/base/sslsocket.h @@ -61,6 +61,9 @@ class SSLSocketInput : public InputStream { explicit SSLSocketInput(SSL *ssl); ~SSLSocketInput() = default; + bool Skip(size_t /*bytes*/) override { + return false; + } protected: size_t DoRead(void* buf, size_t len) override; @@ -75,7 +78,7 @@ class SSLSocketOutput : public OutputStream { ~SSLSocketOutput() = default; protected: - void DoWrite(const void* data, size_t len) override; + size_t DoWrite(const void* data, size_t len) override; private: // Not owning diff --git a/clickhouse/base/streamstack.h b/clickhouse/base/streamstack.h new file mode 100644 index 00000000..a8bfeee5 --- /dev/null +++ b/clickhouse/base/streamstack.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include + +namespace clickhouse { + +/** Collection of owned OutputStream or InputStream instances. + * Simplifies building chains or trees of streams, like: + * + * A => B => C => F + * ^ + * / + * D ====> E + * + * Streams are destroyed in LIFO order, allowing proper flushing of internal buffers. + */ +template +class Streams +{ +public: + Streams() = default; + Streams(Streams&&) = default; + Streams& operator=(Streams&&) = default; + + ~Streams() { + while (!streams_.empty()) { + streams_.pop(); + } + } + + template + inline ConcreteStreamType * Add(std::unique_ptr && stream) { + auto ret = stream.get(); + streams_.emplace(std::move(stream)); + return ret; + } + + template + inline ConcreteStreamType * AddNew(Args&&... args) { + return Add(std::make_unique(std::forward(args)...)); + } + + inline StreamType * Top() const { + return streams_.top().get(); + } + +private: + std::stack> streams_; +}; + +class OutputStream; +class InputStream; + +using OutputStreams = Streams; +using InputStreams = Streams; + +} diff --git a/clickhouse/base/wire_format.cpp b/clickhouse/base/wire_format.cpp new file mode 100644 index 00000000..d44730b0 --- /dev/null +++ b/clickhouse/base/wire_format.cpp @@ -0,0 +1,105 @@ +#include "wire_format.h" + +#include "input.h" +#include "output.h" + +#include + +namespace { +static const int MAX_VARINT_BYTES = 10; +} + +namespace clickhouse { + +bool WireFormat::ReadAll(InputStream * input, void* buf, size_t len) { + uint8_t* p = static_cast(buf); + + size_t read_previously = 1; // 1 to execute loop at least once + while (len > 0 && read_previously) { + read_previously = input->Read(p, len); + + p += read_previously; + len -= read_previously; + } + + // true if all was read successfully + return !len; +// if (len) { +// throw std::runtime_error("Failed to read " + std::to_string(original_len) +// + " bytes, only read " + std::to_string(original_len - len)); +// } +} + +void WireFormat::WriteAll(OutputStream* output, const void* buf, size_t len) { + const size_t original_len = len; + const uint8_t* p = static_cast(buf); + + size_t written_previously = 1; // 1 to execute loop at least once + while (len > 0 && written_previously) { + written_previously = output->Write(p, len); + + p += written_previously; + len -= written_previously; + } + + if (len) { + throw std::runtime_error("Failed to write " + std::to_string(original_len) + + " bytes, only written " + std::to_string(original_len - len)); + } +} + +bool WireFormat::ReadVarint64(InputStream* input, uint64_t* value) { + *value = 0; + + for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) { + uint8_t byte; + + if (!input->ReadByte(&byte)) { + return false; + } else { + *value |= uint64_t(byte & 0x7F) << (7 * i); + + if (!(byte & 0x80)) { + return true; + } + } + } + + // TODO skip invalid + return false; +} + +void WireFormat::WriteVarint64(OutputStream* output, uint64_t value) { + uint8_t bytes[MAX_VARINT_BYTES]; + int size = 0; + + for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) { + uint8_t byte = value & 0x7F; + if (value > 0x7F) + byte |= 0x80; + + bytes[size++] = byte; + + value >>= 7; + if (!value) { + break; + } + } + + WriteAll(output, bytes, size); +} + +bool WireFormat::SkipString(InputStream* input) { + uint64_t len; + + if (ReadVarint64(input, &len)) { + if (len > 0x00FFFFFFULL) + return false; + + return input->Skip((size_t)len); + } + + return false; +} + +} diff --git a/clickhouse/base/wire_format.h b/clickhouse/base/wire_format.h index 89fe915a..a2c803f4 100644 --- a/clickhouse/base/wire_format.h +++ b/clickhouse/base/wire_format.h @@ -1,117 +1,77 @@ #pragma once -#include "coded.h" - #include namespace clickhouse { +class InputStream; +class OutputStream; + class WireFormat { public: template - static bool ReadFixed(CodedInputStream* input, T* value); - - static bool ReadString(CodedInputStream* input, std::string* value); - static bool SkipString(CodedInputStream* input); - - static bool ReadBytes(CodedInputStream* input, void* buf, size_t len); - - static bool ReadUInt64(CodedInputStream* input, uint64_t* value); - + static bool ReadFixed(InputStream* input, T* value); + static bool ReadString(InputStream* input, std::string* value); + static bool SkipString(InputStream* input); + static bool ReadBytes(InputStream* input, void* buf, size_t len); + static bool ReadUInt64(InputStream* input, uint64_t* value); + static bool ReadVarint64(InputStream* output, uint64_t* value); template - static void WriteFixed(CodedOutputStream* output, const T& value); - - static void WriteBytes(CodedOutputStream* output, const void* buf, size_t len); - - static void WriteString(CodedOutputStream* output, std::string_view value); - - static void WriteUInt64(CodedOutputStream* output, const uint64_t value); + static void WriteFixed(OutputStream* output, const T& value); + static void WriteBytes(OutputStream* output, const void* buf, size_t len); + static void WriteString(OutputStream* output, std::string_view value); + static void WriteUInt64(OutputStream* output, const uint64_t value); + static void WriteVarint64(OutputStream* output, uint64_t value); + +private: + static bool ReadAll(InputStream * input, void* buf, size_t len); + static void WriteAll(OutputStream* output, const void* buf, size_t len); }; template -inline bool WireFormat::ReadFixed( - CodedInputStream* input, - T* value) -{ - return input->ReadRaw(value, sizeof(T)); +inline bool WireFormat::ReadFixed(InputStream* input, T* value) { + return ReadAll(input, value, sizeof(T)); } -inline bool WireFormat::ReadString( - CodedInputStream* input, - std::string* value) -{ +inline bool WireFormat::ReadString(InputStream* input, std::string* value) { uint64_t len; - if (input->ReadVarint64(&len)) { + if (ReadVarint64(input, &len)) { if (len > 0x00FFFFFFULL) { return false; } value->resize((size_t)len); - return input->ReadRaw(&(*value)[0], (size_t)len); - } - - return false; -} - -inline bool WireFormat::SkipString( - CodedInputStream* input) -{ - uint64_t len; - - if (input->ReadVarint64(&len)) { - if (len > 0x00FFFFFFULL) { - return false; - } - return input->Skip((size_t)len); + return ReadAll(input, value->data(), (size_t)len); } return false; } -inline bool WireFormat::ReadBytes( - CodedInputStream* input, void* buf, size_t len) -{ - return input->ReadRaw(buf, len); +inline bool WireFormat::ReadBytes(InputStream* input, void* buf, size_t len) { + return ReadAll(input, buf, len); } -inline bool WireFormat::ReadUInt64( - CodedInputStream* input, - uint64_t* value) -{ - return input->ReadVarint64(value); +inline bool WireFormat::ReadUInt64(InputStream* input, uint64_t* value) { + return ReadVarint64(input, value); } - template -inline void WireFormat::WriteFixed( - CodedOutputStream* output, - const T& value) -{ - output->WriteRaw(&value, sizeof(T)); +inline void WireFormat::WriteFixed(OutputStream* output, const T& value) { + WriteAll(output, &value, sizeof(T)); } -inline void WireFormat::WriteBytes( - CodedOutputStream* output, - const void* buf, - size_t len) -{ - output->WriteRaw(buf, len); +inline void WireFormat::WriteBytes(OutputStream* output, const void* buf, size_t len) { + WriteAll(output, buf, len); } -inline void WireFormat::WriteString( - CodedOutputStream* output, - std::string_view value) -{ - output->WriteVarint64(value.size()); - output->WriteRaw(value.data(), value.size()); +inline void WireFormat::WriteString(OutputStream* output, std::string_view value) { + WriteVarint64(output, value.size()); + WriteAll(output, value.data(), value.size()); } -inline void WireFormat::WriteUInt64( - CodedOutputStream* output, - const uint64_t value) -{ - output->WriteVarint64(value); +inline void WireFormat::WriteUInt64(OutputStream* output, const uint64_t value) { + WriteVarint64(output, value); } } diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 25b24938..47f5c370 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -3,13 +3,11 @@ #include "base/compressed.h" #include "base/socket.h" +#include "base/streamstack.h" #include "base/wire_format.h" #include "columns/factory.h" -#include -#include - #include #include #include @@ -33,10 +31,10 @@ #define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058 #define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060 //#define DBMS_MIN_REVISION_WITH_TABLES_STATUS 54226 +#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337 #define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372 #define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401 #define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405 -#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337 #define REVISION DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE @@ -112,7 +110,7 @@ class Client::Impl { bool SendHello(); - bool ReadBlock(Block* block, CodedInputStream* input); + bool ReadBlock(Block* block, InputStream* input); bool ReceiveHello(); @@ -122,7 +120,7 @@ class Client::Impl { /// Reads exception packet form input stream. bool ReceiveException(bool rethrow = false); - void WriteBlock(const Block& block, CodedOutputStream* output); + void WriteBlock(const Block& block, OutputStream* output); private: /// In case of network errors tries to reconnect to server and @@ -156,13 +154,11 @@ class Client::Impl { QueryEvents* events_; int compression_ = CompressionState::Disable; - std::unique_ptr socket_input_; - std::unique_ptr buffered_input_; - std::unique_ptr input_; + InputStreams input_streams_; + InputStream* input_; - std::unique_ptr socket_output_; - std::unique_ptr buffered_output_; - std::unique_ptr output_; + OutputStreams output_streams_; + OutputStream* output_; std::unique_ptr socket_; @@ -288,7 +284,7 @@ void Client::Impl::Insert(const std::string& table_name, const Block& block) { } void Client::Impl::Ping() { - WireFormat::WriteUInt64(output_.get(), ClientCodes::Ping); + WireFormat::WriteUInt64(output_, ClientCodes::Ping); output_->Flush(); uint64_t server_packet; @@ -340,20 +336,19 @@ void Client::Impl::ResetConnection() { socket->SetTcpNoDelay(options_.tcp_nodelay); } - auto socket_input = socket->makeInputStream(); - auto socket_output = socket->makeOutputStream(); - auto buffered_input = std::make_unique(socket_input.get()); - auto buffered_output = std::make_unique(socket_output.get()); - auto input = std::make_unique(buffered_input.get()); - auto output = std::make_unique(buffered_output.get()); + OutputStreams output_streams; + auto socket_output = output_streams.Add(socket->makeOutputStream()); + auto output = output_streams.AddNew(socket_output); - std::swap(socket_input, socket_input_); - std::swap(socket_output, socket_output_); - std::swap(buffered_input, buffered_input_); - std::swap(buffered_output, buffered_output_); - std::swap(input, input_); - std::swap(output, output_); + InputStreams input_streams; + auto socket_input = input_streams.Add(socket->makeInputStream()); + auto input = input_streams.AddNew(socket_input); + + std::swap(output_streams, output_streams_); + std::swap(input_streams, input_streams_); std::swap(socket, socket_); + output_ = output; + input_ = input; #if defined(WITH_OPENSSL) std::swap(ssl_context_, ssl_context); @@ -381,7 +376,7 @@ bool Client::Impl::Handshake() { bool Client::Impl::ReceivePacket(uint64_t* server_packet) { uint64_t packet_type = 0; - if (!input_->ReadVarint64(&packet_type)) { + if (!WireFormat::ReadVarint64(input_, &packet_type)) { return false; } if (server_packet) { @@ -404,22 +399,22 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { case ServerCodes::ProfileInfo: { Profile profile; - if (!WireFormat::ReadUInt64(input_.get(), &profile.rows)) { + if (!WireFormat::ReadUInt64(input_, &profile.rows)) { return false; } - if (!WireFormat::ReadUInt64(input_.get(), &profile.blocks)) { + if (!WireFormat::ReadUInt64(input_, &profile.blocks)) { return false; } - if (!WireFormat::ReadUInt64(input_.get(), &profile.bytes)) { + if (!WireFormat::ReadUInt64(input_, &profile.bytes)) { return false; } - if (!WireFormat::ReadFixed(input_.get(), &profile.applied_limit)) { + if (!WireFormat::ReadFixed(input_, &profile.applied_limit)) { return false; } - if (!WireFormat::ReadUInt64(input_.get(), &profile.rows_before_limit)) { + if (!WireFormat::ReadUInt64(input_, &profile.rows_before_limit)) { return false; } - if (!WireFormat::ReadFixed(input_.get(), &profile.calculated_rows_before_limit)) { + if (!WireFormat::ReadFixed(input_, &profile.calculated_rows_before_limit)) { return false; } @@ -433,14 +428,14 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { case ServerCodes::Progress: { Progress info; - if (!WireFormat::ReadUInt64(input_.get(), &info.rows)) { + if (!WireFormat::ReadUInt64(input_, &info.rows)) { return false; } - if (!WireFormat::ReadUInt64(input_.get(), &info.bytes)) { + if (!WireFormat::ReadUInt64(input_, &info.bytes)) { return false; } if (REVISION >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS) { - if (!WireFormat::ReadUInt64(input_.get(), &info.total_rows)) { + if (!WireFormat::ReadUInt64(input_, &info.total_rows)) { return false; } } @@ -471,7 +466,7 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { return false; } -bool Client::Impl::ReadBlock(Block* block, CodedInputStream* input) { +bool Client::Impl::ReadBlock(Block* block, InputStream* input) { // Additional information about block. if (REVISION >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { uint64_t num; @@ -538,20 +533,18 @@ bool Client::Impl::ReceiveData() { Block block; if (REVISION >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { - if (!WireFormat::SkipString(input_.get())) { + if (!WireFormat::SkipString(input_)) { return false; } } if (compression_ == CompressionState::Enable) { - CompressedInput compressed(input_.get()); - CodedInputStream coded(&compressed); - - if (!ReadBlock(&block, &coded)) { + CompressedInput compressed(input_); + if (!ReadBlock(&block, &compressed)) { return false; } } else { - if (!ReadBlock(&block, input_.get())) { + if (!ReadBlock(&block, input_)) { return false; } } @@ -574,23 +567,23 @@ bool Client::Impl::ReceiveException(bool rethrow) { do { bool has_nested = false; - if (!WireFormat::ReadFixed(input_.get(), ¤t->code)) { + if (!WireFormat::ReadFixed(input_, ¤t->code)) { exception_received = false; break; } - if (!WireFormat::ReadString(input_.get(), ¤t->name)) { + if (!WireFormat::ReadString(input_, ¤t->name)) { exception_received = false; break; } - if (!WireFormat::ReadString(input_.get(), ¤t->display_text)) { + if (!WireFormat::ReadString(input_, ¤t->display_text)) { exception_received = false; break; } - if (!WireFormat::ReadString(input_.get(), ¤t->stack_trace)) { + if (!WireFormat::ReadString(input_, ¤t->stack_trace)) { exception_received = false; break; } - if (!WireFormat::ReadFixed(input_.get(), &has_nested)) { + if (!WireFormat::ReadFixed(input_, &has_nested)) { exception_received = false; break; } @@ -615,13 +608,13 @@ bool Client::Impl::ReceiveException(bool rethrow) { } void Client::Impl::SendCancel() { - WireFormat::WriteUInt64(output_.get(), ClientCodes::Cancel); + WireFormat::WriteUInt64(output_, ClientCodes::Cancel); output_->Flush(); } void Client::Impl::SendQuery(const std::string& query) { - WireFormat::WriteUInt64(output_.get(), ClientCodes::Query); - WireFormat::WriteString(output_.get(), std::string()); + WireFormat::WriteUInt64(output_, ClientCodes::Query); + WireFormat::WriteString(output_, std::string()); /// Client info. if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) { @@ -634,23 +627,23 @@ void Client::Impl::SendQuery(const std::string& query) { info.client_revision = REVISION; - WireFormat::WriteFixed(output_.get(), info.query_kind); - WireFormat::WriteString(output_.get(), info.initial_user); - WireFormat::WriteString(output_.get(), info.initial_query_id); - WireFormat::WriteString(output_.get(), info.initial_address); - WireFormat::WriteFixed(output_.get(), info.iface_type); + WireFormat::WriteFixed(output_, info.query_kind); + WireFormat::WriteString(output_, info.initial_user); + WireFormat::WriteString(output_, info.initial_query_id); + WireFormat::WriteString(output_, info.initial_address); + WireFormat::WriteFixed(output_, info.iface_type); - WireFormat::WriteString(output_.get(), info.os_user); - WireFormat::WriteString(output_.get(), info.client_hostname); - WireFormat::WriteString(output_.get(), info.client_name); - WireFormat::WriteUInt64(output_.get(), info.client_version_major); - WireFormat::WriteUInt64(output_.get(), info.client_version_minor); - WireFormat::WriteUInt64(output_.get(), info.client_revision); + WireFormat::WriteString(output_, info.os_user); + WireFormat::WriteString(output_, info.client_hostname); + WireFormat::WriteString(output_, info.client_name); + WireFormat::WriteUInt64(output_, info.client_version_major); + WireFormat::WriteUInt64(output_, info.client_version_minor); + WireFormat::WriteUInt64(output_, info.client_revision); if (server_info_.revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO) - WireFormat::WriteString(output_.get(), info.quota_key); + WireFormat::WriteString(output_, info.quota_key); if (server_info_.revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) { - WireFormat::WriteUInt64(output_.get(), info.client_version_patch); + WireFormat::WriteUInt64(output_, info.client_version_patch); } } @@ -658,11 +651,11 @@ void Client::Impl::SendQuery(const std::string& query) { //if (settings) // settings->serialize(*out); //else - WireFormat::WriteString(output_.get(), std::string()); + WireFormat::WriteString(output_, std::string()); - WireFormat::WriteUInt64(output_.get(), Stages::Complete); - WireFormat::WriteUInt64(output_.get(), compression_); - WireFormat::WriteString(output_.get(), query); + WireFormat::WriteUInt64(output_, Stages::Complete); + WireFormat::WriteUInt64(output_, compression_); + WireFormat::WriteString(output_, query); // Send empty block as marker of // end of data SendData(Block()); @@ -671,13 +664,13 @@ void Client::Impl::SendQuery(const std::string& query) { } -void Client::Impl::WriteBlock(const Block& block, CodedOutputStream* output) { +void Client::Impl::WriteBlock(const Block& block, OutputStream* output) { // Additional information about block. if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { WireFormat::WriteUInt64(output, 1); - WireFormat::WriteFixed (output, block.Info().is_overflows); + WireFormat::WriteFixed(output, block.Info().is_overflows); WireFormat::WriteUInt64(output, 2); - WireFormat::WriteFixed (output, block.Info().bucket_num); + WireFormat::WriteFixed(output, block.Info().bucket_num); WireFormat::WriteUInt64(output, 0); } @@ -690,69 +683,37 @@ void Client::Impl::WriteBlock(const Block& block, CodedOutputStream* output) { bi.Column()->Save(output); } + output->Flush(); } void Client::Impl::SendData(const Block& block) { - WireFormat::WriteUInt64(output_.get(), ClientCodes::Data); + WireFormat::WriteUInt64(output_, ClientCodes::Data); if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { - WireFormat::WriteString(output_.get(), std::string()); + WireFormat::WriteString(output_, std::string()); } if (compression_ == CompressionState::Enable) { - switch (options_.compression_method) { - case CompressionMethod::None: { - assert(false); - break; - } - - case CompressionMethod::LZ4: { - Buffer tmp; - // Serialize block's data - { - BufferOutput out(&tmp); - CodedOutputStream coded(&out); - WriteBlock(block, &coded); - } - // Reserver space for data - Buffer buf; - buf.resize(9 + LZ4_compressBound(tmp.size())); - - // Compress data - int size = LZ4_compress_default((const char*)tmp.data(), (char*)buf.data() + 9, tmp.size(), buf.size() - 9); - buf.resize(9 + size); - - // Fill header - uint8_t* p = buf.data(); - // Compression method - WriteUnaligned(p, (uint8_t)0x82); p += 1; - // Compressed data size with header - WriteUnaligned(p, (uint32_t)buf.size()); p += 4; - // Original data size - WriteUnaligned(p, (uint32_t)tmp.size()); - - WireFormat::WriteFixed(output_.get(), CityHash128( - (const char*)buf.data(), buf.size())); - WireFormat::WriteBytes(output_.get(), buf.data(), buf.size()); - break; - } - } + assert(options_.compression_method == CompressionMethod::LZ4); + CompressedOutput compressed_ouput(output_, options_.max_compression_chunk_size); + BufferedOutput buffered(&compressed_ouput, options_.max_compression_chunk_size); + WriteBlock(block, &buffered); } else { - WriteBlock(block, output_.get()); + WriteBlock(block, output_); } output_->Flush(); } bool Client::Impl::SendHello() { - WireFormat::WriteUInt64(output_.get(), ClientCodes::Hello); - WireFormat::WriteString(output_.get(), std::string(DBMS_NAME) + " client"); - WireFormat::WriteUInt64(output_.get(), DBMS_VERSION_MAJOR); - WireFormat::WriteUInt64(output_.get(), DBMS_VERSION_MINOR); - WireFormat::WriteUInt64(output_.get(), REVISION); - WireFormat::WriteString(output_.get(), options_.default_database); - WireFormat::WriteString(output_.get(), options_.user); - WireFormat::WriteString(output_.get(), options_.password); + WireFormat::WriteUInt64(output_, ClientCodes::Hello); + WireFormat::WriteString(output_, std::string(DBMS_NAME) + " client"); + WireFormat::WriteUInt64(output_, DBMS_VERSION_MAJOR); + WireFormat::WriteUInt64(output_, DBMS_VERSION_MINOR); + WireFormat::WriteUInt64(output_, REVISION); + WireFormat::WriteString(output_, options_.default_database); + WireFormat::WriteString(output_, options_.user); + WireFormat::WriteString(output_, options_.password); output_->Flush(); @@ -762,38 +723,38 @@ bool Client::Impl::SendHello() { bool Client::Impl::ReceiveHello() { uint64_t packet_type = 0; - if (!input_->ReadVarint64(&packet_type)) { + if (!WireFormat::ReadVarint64(input_, &packet_type)) { return false; } if (packet_type == ServerCodes::Hello) { - if (!WireFormat::ReadString(input_.get(), &server_info_.name)) { + if (!WireFormat::ReadString(input_, &server_info_.name)) { return false; } - if (!WireFormat::ReadUInt64(input_.get(), &server_info_.version_major)) { + if (!WireFormat::ReadUInt64(input_, &server_info_.version_major)) { return false; } - if (!WireFormat::ReadUInt64(input_.get(), &server_info_.version_minor)) { + if (!WireFormat::ReadUInt64(input_, &server_info_.version_minor)) { return false; } - if (!WireFormat::ReadUInt64(input_.get(), &server_info_.revision)) { + if (!WireFormat::ReadUInt64(input_, &server_info_.revision)) { return false; } if (server_info_.revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) { - if (!WireFormat::ReadString(input_.get(), &server_info_.timezone)) { + if (!WireFormat::ReadString(input_, &server_info_.timezone)) { return false; } } if (server_info_.revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) { - if (!WireFormat::ReadString(input_.get(), &server_info_.display_name)) { + if (!WireFormat::ReadString(input_, &server_info_.display_name)) { return false; } } if (server_info_.revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) { - if (!WireFormat::ReadUInt64(input_.get(), &server_info_.version_patch)) { + if (!WireFormat::ReadUInt64(input_, &server_info_.version_patch)) { return false; } } diff --git a/clickhouse/client.h b/clickhouse/client.h index 0e759319..58611f49 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -95,6 +95,16 @@ struct ClientOptions { */ DECLARE_FIELD(backward_compatibility_lowcardinality_as_wrapped_column, bool, SetBakcwardCompatibilityFeatureLowCardinalityAsWrappedColumn, true); + /** Set max size data to compress if compression enabled. + * + * Allows choosing tradeoff betwen RAM\CPU: + * - Lower value reduces RAM usage, but slightly increases CPU usage. + * - Higher value increases RAM usage but slightly decreases CPU usage. + * + * Default is 0, use natural implementation-defined chunk size. + */ + DECLARE_FIELD(max_compression_chunk_size, unsigned int, SetMaxCompressionChunkSize, 65535); + #if defined(WITH_OPENSSL) struct SSLOptions { bool use_ssl = true; // not expected to be set manually. diff --git a/clickhouse/columns/array.cpp b/clickhouse/columns/array.cpp index 473e36e6..e9d5a4d3 100644 --- a/clickhouse/columns/array.cpp +++ b/clickhouse/columns/array.cpp @@ -1,4 +1,5 @@ #include "array.h" +#include "numeric.h" #include namespace clickhouse { @@ -54,7 +55,7 @@ void ColumnArray::Append(ColumnRef column) { } } -bool ColumnArray::Load(CodedInputStream* input, size_t rows) { +bool ColumnArray::Load(InputStream* input, size_t rows) { if (!rows) { return true; } @@ -67,7 +68,7 @@ bool ColumnArray::Load(CodedInputStream* input, size_t rows) { return true; } -void ColumnArray::Save(CodedOutputStream* output) { +void ColumnArray::Save(OutputStream* output) { offsets_->Save(output); data_->Save(output); } diff --git a/clickhouse/columns/array.h b/clickhouse/columns/array.h index b20645ea..e96e70c4 100644 --- a/clickhouse/columns/array.h +++ b/clickhouse/columns/array.h @@ -1,5 +1,6 @@ #pragma once +#include "column.h" #include "numeric.h" namespace clickhouse { @@ -24,10 +25,10 @@ class ColumnArray : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; diff --git a/clickhouse/columns/column.h b/clickhouse/columns/column.h index 9fbf2efa..19e50988 100644 --- a/clickhouse/columns/column.h +++ b/clickhouse/columns/column.h @@ -1,7 +1,5 @@ #pragma once -#include "../base/coded.h" -#include "../base/input.h" #include "../types/types.h" #include "../columns/itemview.h" @@ -10,6 +8,9 @@ namespace clickhouse { +class InputStream; +class OutputStream; + using ColumnRef = std::shared_ptr; /** @@ -41,10 +42,10 @@ class Column : public std::enable_shared_from_this { virtual void Append(ColumnRef column) = 0; /// Loads column data from input stream. - virtual bool Load(CodedInputStream* input, size_t rows) = 0; + virtual bool Load(InputStream* input, size_t rows) = 0; /// Saves column data to output stream. - virtual void Save(CodedOutputStream* output) = 0; + virtual void Save(OutputStream* output) = 0; /// Clear column data . virtual void Clear() = 0; diff --git a/clickhouse/columns/date.cpp b/clickhouse/columns/date.cpp index fcf0df0a..1301c2ff 100644 --- a/clickhouse/columns/date.cpp +++ b/clickhouse/columns/date.cpp @@ -27,11 +27,11 @@ void ColumnDate::Append(ColumnRef column) { } } -bool ColumnDate::Load(CodedInputStream* input, size_t rows) { +bool ColumnDate::Load(InputStream* input, size_t rows) { return data_->Load(input, rows); } -void ColumnDate::Save(CodedOutputStream* output) { +void ColumnDate::Save(OutputStream* output) { data_->Save(output); } @@ -89,11 +89,11 @@ void ColumnDateTime::Append(ColumnRef column) { } } -bool ColumnDateTime::Load(CodedInputStream* input, size_t rows) { +bool ColumnDateTime::Load(InputStream* input, size_t rows) { return data_->Load(input, rows); } -void ColumnDateTime::Save(CodedOutputStream* output) { +void ColumnDateTime::Save(OutputStream* output) { data_->Save(output); } @@ -162,11 +162,11 @@ void ColumnDateTime64::Append(ColumnRef column) { } } -bool ColumnDateTime64::Load(CodedInputStream* input, size_t rows) { +bool ColumnDateTime64::Load(InputStream* input, size_t rows) { return data_->Load(input, rows); } -void ColumnDateTime64::Save(CodedOutputStream* output) { +void ColumnDateTime64::Save(OutputStream* output) { data_->Save(output); } diff --git a/clickhouse/columns/date.h b/clickhouse/columns/date.h index 6923a4be..62ca4e05 100644 --- a/clickhouse/columns/date.h +++ b/clickhouse/columns/date.h @@ -26,10 +26,10 @@ class ColumnDate : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; @@ -70,13 +70,13 @@ class ColumnDateTime : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Clear column data . void Clear() override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Returns count of rows in the column. size_t Size() const override; @@ -118,13 +118,13 @@ class ColumnDateTime64 : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Clear column data . void Clear() override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Returns count of rows in the column. size_t Size() const override; diff --git a/clickhouse/columns/decimal.cpp b/clickhouse/columns/decimal.cpp index 7334bb12..5a90da1a 100644 --- a/clickhouse/columns/decimal.cpp +++ b/clickhouse/columns/decimal.cpp @@ -197,11 +197,11 @@ void ColumnDecimal::Append(ColumnRef column) { } } -bool ColumnDecimal::Load(CodedInputStream* input, size_t rows) { +bool ColumnDecimal::Load(InputStream * input, size_t rows) { return data_->Load(input, rows); } -void ColumnDecimal::Save(CodedOutputStream* output) { +void ColumnDecimal::Save(OutputStream* output) { data_->Save(output); } diff --git a/clickhouse/columns/decimal.h b/clickhouse/columns/decimal.h index c88ac37f..b28699ae 100644 --- a/clickhouse/columns/decimal.h +++ b/clickhouse/columns/decimal.h @@ -21,8 +21,8 @@ class ColumnDecimal : public Column { public: void Append(ColumnRef column) override; - bool Load(CodedInputStream* input, size_t rows) override; - void Save(CodedOutputStream* output) override; + bool Load(InputStream* input, size_t rows) override; + void Save(OutputStream* output) override; void Clear() override; size_t Size() const override; ColumnRef Slice(size_t begin, size_t len) const override; diff --git a/clickhouse/columns/enum.cpp b/clickhouse/columns/enum.cpp index d2747f20..bc908dda 100644 --- a/clickhouse/columns/enum.cpp +++ b/clickhouse/columns/enum.cpp @@ -1,6 +1,10 @@ #include "enum.h" #include "utils.h" +#include "../base/input.h" +#include "../base/output.h" +#include "../base/wire_format.h" + namespace clickhouse { template @@ -70,14 +74,14 @@ void ColumnEnum::Append(ColumnRef column) { } template -bool ColumnEnum::Load(CodedInputStream* input, size_t rows) { +bool ColumnEnum::Load(InputStream* input, size_t rows) { data_.resize(rows); - return input->ReadRaw(data_.data(), data_.size() * sizeof(T)); + return WireFormat::ReadBytes(input, data_.data(), data_.size() * sizeof(T)); } template -void ColumnEnum::Save(CodedOutputStream* output) { - output->WriteRaw(data_.data(), data_.size() * sizeof(T)); +void ColumnEnum::Save(OutputStream* output) { + WireFormat::WriteBytes(output, data_.data(), data_.size() * sizeof(T)); } template diff --git a/clickhouse/columns/enum.h b/clickhouse/columns/enum.h index 2f1a6e4c..34c672f6 100644 --- a/clickhouse/columns/enum.h +++ b/clickhouse/columns/enum.h @@ -33,10 +33,10 @@ class ColumnEnum : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; diff --git a/clickhouse/columns/ip4.cpp b/clickhouse/columns/ip4.cpp index fd78305e..70a81d75 100644 --- a/clickhouse/columns/ip4.cpp +++ b/clickhouse/columns/ip4.cpp @@ -62,11 +62,11 @@ void ColumnIPv4::Append(ColumnRef column) { } } -bool ColumnIPv4::Load(CodedInputStream* input, size_t rows) { +bool ColumnIPv4::Load(InputStream * input, size_t rows) { return data_->Load(input, rows); } -void ColumnIPv4::Save(CodedOutputStream* output) { +void ColumnIPv4::Save(OutputStream* output) { data_->Save(output); } diff --git a/clickhouse/columns/ip4.h b/clickhouse/columns/ip4.h index 442942a3..ec6e51ba 100644 --- a/clickhouse/columns/ip4.h +++ b/clickhouse/columns/ip4.h @@ -29,10 +29,10 @@ class ColumnIPv4 : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; diff --git a/clickhouse/columns/ip6.cpp b/clickhouse/columns/ip6.cpp index f7c30bd0..6b532d25 100644 --- a/clickhouse/columns/ip6.cpp +++ b/clickhouse/columns/ip6.cpp @@ -62,11 +62,11 @@ void ColumnIPv6::Append(ColumnRef column) { } } -bool ColumnIPv6::Load(CodedInputStream* input, size_t rows) { +bool ColumnIPv6::Load(InputStream* input, size_t rows) { return data_->Load(input, rows); } -void ColumnIPv6::Save(CodedOutputStream* output) { +void ColumnIPv6::Save(OutputStream* output) { data_->Save(output); } diff --git a/clickhouse/columns/ip6.h b/clickhouse/columns/ip6.h index 48e8c507..e620ef90 100644 --- a/clickhouse/columns/ip6.h +++ b/clickhouse/columns/ip6.h @@ -28,10 +28,10 @@ class ColumnIPv6 : public Column{ void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; diff --git a/clickhouse/columns/lowcardinality.cpp b/clickhouse/columns/lowcardinality.cpp index 8b5c33e7..bcbbeaaf 100644 --- a/clickhouse/columns/lowcardinality.cpp +++ b/clickhouse/columns/lowcardinality.cpp @@ -191,7 +191,7 @@ void ColumnLowCardinality::Append(ColumnRef col) { namespace { -auto Load(ColumnRef new_dictionary_column, CodedInputStream* input, size_t rows) { +auto Load(ColumnRef new_dictionary_column, InputStream* input, size_t rows) { // This code tries to follow original implementation of ClickHouse's LowCardinality serialization with // NativeBlockOutputStream::writeData() for DataTypeLowCardinality // (see corresponding serializeBinaryBulkStateSuffix, serializeBinaryBulkStatePrefix, and serializeBinaryBulkWithMultipleStreams), @@ -250,7 +250,7 @@ auto Load(ColumnRef new_dictionary_column, CodedInputStream* input, size_t rows) } -bool ColumnLowCardinality::Load(CodedInputStream* input, size_t rows) { +bool ColumnLowCardinality::Load(InputStream* input, size_t rows) { try { auto [new_dictionary, new_index, new_unique_items_map] = ::Load(dictionary_column_->Slice(0, 0), input, rows); @@ -264,7 +264,7 @@ bool ColumnLowCardinality::Load(CodedInputStream* input, size_t rows) { } } -void ColumnLowCardinality::Save(CodedOutputStream* output) { +void ColumnLowCardinality::Save(OutputStream* output) { // prefix const uint64_t version = static_cast(KeySerializationVersion::SharedDictionariesWithAdditionalKeys); WireFormat::WriteFixed(output, version); diff --git a/clickhouse/columns/lowcardinality.h b/clickhouse/columns/lowcardinality.h index 05486e0c..6d834428 100644 --- a/clickhouse/columns/lowcardinality.h +++ b/clickhouse/columns/lowcardinality.h @@ -55,10 +55,10 @@ class ColumnLowCardinality : public Column { void Append(ColumnRef /*column*/) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data. void Clear() override; diff --git a/clickhouse/columns/lowcardinalityadaptor.h b/clickhouse/columns/lowcardinalityadaptor.h index 6477e47c..70261a51 100644 --- a/clickhouse/columns/lowcardinalityadaptor.h +++ b/clickhouse/columns/lowcardinalityadaptor.h @@ -7,7 +7,7 @@ namespace clickhouse { -class CodedOutputStream; +class OutputStream; class CodedInputStream; /** Adapts any ColumnType to be serialized\deserialized as LowCardinality, @@ -28,7 +28,7 @@ class LowCardinalitySerializationAdaptor : public AdaptedColumnType using AdaptedColumnType::AdaptedColumnType; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override { + bool Load(InputStream* input, size_t rows) override { auto new_data_column = this->Slice(0, 0)->template As(); ColumnLowCardinalityT low_cardinality_col(new_data_column); @@ -46,7 +46,7 @@ class LowCardinalitySerializationAdaptor : public AdaptedColumnType } /// Saves column data to output stream. - void Save(CodedOutputStream* output) override { + void Save(OutputStream* output) override { ColumnLowCardinalityT(this->template As()).Save(output); } }; diff --git a/clickhouse/columns/nothing.h b/clickhouse/columns/nothing.h index 75b3e526..591c1ecd 100644 --- a/clickhouse/columns/nothing.h +++ b/clickhouse/columns/nothing.h @@ -50,14 +50,14 @@ class ColumnNothing : public Column { } /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override { + bool Load(InputStream* input, size_t rows) override { input->Skip(rows); size_ += rows; return true; } /// Saves column data to output stream. - void Save(CodedOutputStream*) override { + void Save(OutputStream*) override { throw std::runtime_error("method Save is not supported for Nothing column"); } diff --git a/clickhouse/columns/nullable.cpp b/clickhouse/columns/nullable.cpp index 6c043cab..1f0f1efb 100644 --- a/clickhouse/columns/nullable.cpp +++ b/clickhouse/columns/nullable.cpp @@ -50,7 +50,7 @@ void ColumnNullable::Clear() { nulls_->Clear(); } -bool ColumnNullable::Load(CodedInputStream* input, size_t rows) { +bool ColumnNullable::Load(InputStream* input, size_t rows) { if (!nulls_->Load(input, rows)) { return false; } @@ -60,7 +60,7 @@ bool ColumnNullable::Load(CodedInputStream* input, size_t rows) { return true; } -void ColumnNullable::Save(CodedOutputStream* output) { +void ColumnNullable::Save(OutputStream* output) { nulls_->Save(output); nested_->Save(output); } diff --git a/clickhouse/columns/nullable.h b/clickhouse/columns/nullable.h index cf726448..8cde2781 100644 --- a/clickhouse/columns/nullable.h +++ b/clickhouse/columns/nullable.h @@ -29,10 +29,10 @@ class ColumnNullable : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; diff --git a/clickhouse/columns/numeric.cpp b/clickhouse/columns/numeric.cpp index 68cc495d..b101281c 100644 --- a/clickhouse/columns/numeric.cpp +++ b/clickhouse/columns/numeric.cpp @@ -1,6 +1,8 @@ #include "numeric.h" #include "utils.h" +#include "../base/wire_format.h" + namespace clickhouse { template @@ -59,15 +61,15 @@ void ColumnVector::Append(ColumnRef column) { } template -bool ColumnVector::Load(CodedInputStream* input, size_t rows) { +bool ColumnVector::Load(InputStream* input, size_t rows) { data_.resize(rows); - return input->ReadRaw(data_.data(), data_.size() * sizeof(T)); + return WireFormat::ReadBytes(input, data_.data(), data_.size() * sizeof(T)); } template -void ColumnVector::Save(CodedOutputStream* output) { - output->WriteRaw(data_.data(), data_.size() * sizeof(T)); +void ColumnVector::Save(OutputStream* output) { + WireFormat::WriteBytes(output, data_.data(), data_.size() * sizeof(T)); } template diff --git a/clickhouse/columns/numeric.h b/clickhouse/columns/numeric.h index 1bb5b985..65b21300 100644 --- a/clickhouse/columns/numeric.h +++ b/clickhouse/columns/numeric.h @@ -35,10 +35,10 @@ class ColumnVector : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; diff --git a/clickhouse/columns/string.cpp b/clickhouse/columns/string.cpp index 3f10ad6d..ff042ce5 100644 --- a/clickhouse/columns/string.cpp +++ b/clickhouse/columns/string.cpp @@ -77,7 +77,7 @@ void ColumnFixedString::Append(ColumnRef column) { } } -bool ColumnFixedString::Load(CodedInputStream* input, size_t rows) { +bool ColumnFixedString::Load(InputStream * input, size_t rows) { data_.resize(string_size_ * rows); if (!WireFormat::ReadBytes(input, &data_[0], data_.size())) { return false; @@ -86,7 +86,7 @@ bool ColumnFixedString::Load(CodedInputStream* input, size_t rows) { return true; } -void ColumnFixedString::Save(CodedOutputStream* output) { +void ColumnFixedString::Save(OutputStream* output) { WireFormat::WriteBytes(output, data_.data(), data_.size()); } @@ -220,7 +220,7 @@ void ColumnString::Append(ColumnRef column) { } } -bool ColumnString::Load(CodedInputStream* input, size_t rows) { +bool ColumnString::Load(InputStream* input, size_t rows) { items_.clear(); blocks_.clear(); @@ -245,7 +245,7 @@ bool ColumnString::Load(CodedInputStream* input, size_t rows) { return true; } -void ColumnString::Save(CodedOutputStream* output) { +void ColumnString::Save(OutputStream* output) { for (const auto & item : items_) { WireFormat::WriteString(output, item); } diff --git a/clickhouse/columns/string.h b/clickhouse/columns/string.h index 35c1f32f..d1cec652 100644 --- a/clickhouse/columns/string.h +++ b/clickhouse/columns/string.h @@ -43,10 +43,10 @@ class ColumnFixedString : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; @@ -95,10 +95,10 @@ class ColumnString : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; diff --git a/clickhouse/columns/tuple.cpp b/clickhouse/columns/tuple.cpp index 9e4b4838..301fcf94 100644 --- a/clickhouse/columns/tuple.cpp +++ b/clickhouse/columns/tuple.cpp @@ -24,7 +24,7 @@ size_t ColumnTuple::Size() const { return columns_.empty() ? 0 : columns_[0]->Size(); } -bool ColumnTuple::Load(CodedInputStream* input, size_t rows) { +bool ColumnTuple::Load(InputStream* input, size_t rows) { for (auto ci = columns_.begin(); ci != columns_.end(); ++ci) { if (!(*ci)->Load(input, rows)) { return false; @@ -34,7 +34,7 @@ bool ColumnTuple::Load(CodedInputStream* input, size_t rows) { return true; } -void ColumnTuple::Save(CodedOutputStream* output) { +void ColumnTuple::Save(OutputStream* output) { for (auto ci = columns_.begin(); ci != columns_.end(); ++ci) { (*ci)->Save(output); } diff --git a/clickhouse/columns/tuple.h b/clickhouse/columns/tuple.h index 3c993a7c..bf84d1b6 100644 --- a/clickhouse/columns/tuple.h +++ b/clickhouse/columns/tuple.h @@ -25,10 +25,10 @@ class ColumnTuple : public Column { void Append(ColumnRef) override { } /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; diff --git a/clickhouse/columns/uuid.cpp b/clickhouse/columns/uuid.cpp index 63d3f9a2..b69ab9c6 100644 --- a/clickhouse/columns/uuid.cpp +++ b/clickhouse/columns/uuid.cpp @@ -43,11 +43,11 @@ void ColumnUUID::Append(ColumnRef column) { } } -bool ColumnUUID::Load(CodedInputStream* input, size_t rows) { +bool ColumnUUID::Load(InputStream* input, size_t rows) { return data_->Load(input, rows * 2); } -void ColumnUUID::Save(CodedOutputStream* output) { +void ColumnUUID::Save(OutputStream* output) { data_->Save(output); } diff --git a/clickhouse/columns/uuid.h b/clickhouse/columns/uuid.h index b52ca4e8..40319459 100644 --- a/clickhouse/columns/uuid.h +++ b/clickhouse/columns/uuid.h @@ -30,10 +30,10 @@ class ColumnUUID : public Column { void Append(ColumnRef column) override; /// Loads column data from input stream. - bool Load(CodedInputStream* input, size_t rows) override; + bool Load(InputStream* input, size_t rows) override; /// Saves column data to output stream. - void Save(CodedOutputStream* output) override; + void Save(OutputStream* output) override; /// Clear column data . void Clear() override; diff --git a/ut/columns_ut.cpp b/ut/columns_ut.cpp index 0229ba66..02983ba8 100644 --- a/ut/columns_ut.cpp +++ b/ut/columns_ut.cpp @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include #include "utils.h" @@ -546,9 +548,8 @@ TEST(ColumnsCase, ColumnLowCardinalityString_Load) { const auto & data = LOWCARDINALITY_STRING_FOOBAR_10_ITEMS_BINARY; ArrayInput buffer(data.data(), data.size()); - CodedInputStream stream(&buffer); - EXPECT_TRUE(col.Load(&stream, items_count)); + ASSERT_TRUE(col.Load(&buffer, items_count)); for (size_t i = 0; i < items_count; ++i) { EXPECT_EQ(col.At(i), FooBarSeq(i)) << " at pos: " << i; @@ -565,24 +566,23 @@ TEST(ColumnsCase, DISABLED_ColumnLowCardinalityString_Save) { } ArrayOutput output(0, 0); - CodedOutputStream output_stream(&output); const size_t expected_output_size = LOWCARDINALITY_STRING_FOOBAR_10_ITEMS_BINARY.size(); // Enough space to account for possible overflow from both right and left sides. - char buffer[expected_output_size * 10] = {'\0'}; + std::string buffer(expected_output_size * 10, '\0');// = {'\0'}; const char margin_content[sizeof(buffer)] = {'\0'}; const size_t left_margin_size = 10; const size_t right_margin_size = sizeof(buffer) - left_margin_size - expected_output_size; // Since overflow from left side is less likely to happen, leave only tiny margin there. - auto write_pos = buffer + left_margin_size; - const auto left_margin = buffer; + auto write_pos = buffer.data() + left_margin_size; + const auto left_margin = buffer.data(); const auto right_margin = write_pos + expected_output_size; output.Reset(write_pos, expected_output_size); - EXPECT_NO_THROW(col.Save(&output_stream)); + EXPECT_NO_THROW(col.Save(&output)); // Left margin should be blank EXPECT_EQ(std::string_view(margin_content, left_margin_size), std::string_view(left_margin, left_margin_size)); @@ -606,8 +606,7 @@ TEST(ColumnsCase, ColumnLowCardinalityString_SaveAndLoad) { char buffer[256] = {'\0'}; // about 3 times more space than needed for this set of values. { ArrayOutput output(buffer, sizeof(buffer)); - CodedOutputStream output_stream(&output); - EXPECT_NO_THROW(col.Save(&output_stream)); + EXPECT_NO_THROW(col.Save(&output)); } col.Clear(); @@ -615,8 +614,7 @@ TEST(ColumnsCase, ColumnLowCardinalityString_SaveAndLoad) { { // Load the data back ArrayInput input(buffer, sizeof(buffer)); - CodedInputStream input_stream(&input); - EXPECT_TRUE(col.Load(&input_stream, items.size())); + EXPECT_TRUE(col.Load(&input, items.size())); } for (size_t i = 0; i < items.size(); ++i) { diff --git a/ut/performance_tests.cpp b/ut/performance_tests.cpp index a2d7ace7..ce668824 100644 --- a/ut/performance_tests.cpp +++ b/ut/performance_tests.cpp @@ -127,8 +127,7 @@ TYPED_TEST_P(ColumnPerformanceTest, SaveAndLoad) { for (int i = 0; i < LOAD_AND_SAVE_REPEAT_TIMES; ++i) { buffer.clear(); - BufferOutput bufferOutput(&buffer); - CodedOutputStream ostr(&bufferOutput); + BufferOutput ostr(&buffer); Timer timer; column.Save(&ostr); @@ -147,8 +146,7 @@ TYPED_TEST_P(ColumnPerformanceTest, SaveAndLoad) { Timer::DurationType total{0}; for (int i = 0; i < LOAD_AND_SAVE_REPEAT_TIMES; ++i) { - ArrayInput arrayInput(buffer.data(), buffer.size()); - CodedInputStream istr(&arrayInput); + ArrayInput istr(buffer.data(), buffer.size()); column.Clear(); Timer timer; diff --git a/ut/stream_ut.cpp b/ut/stream_ut.cpp index b27efb27..7cd20421 100644 --- a/ut/stream_ut.cpp +++ b/ut/stream_ut.cpp @@ -1,4 +1,7 @@ -#include +#include +#include +#include + #include using namespace clickhouse; @@ -8,16 +11,13 @@ TEST(CodedStreamCase, Varint64) { { BufferOutput output(&buf); - CodedOutputStream coded(&output); - coded.WriteVarint64(18446744071965638648ULL); + WireFormat::WriteVarint64(&output, 18446744071965638648ULL); } - { ArrayInput input(buf.data(), buf.size()); - CodedInputStream coded(&input); uint64_t value; - ASSERT_TRUE(coded.ReadVarint64(&value)); + ASSERT_TRUE(WireFormat::ReadVarint64(&input, &value)); ASSERT_EQ(value, 18446744071965638648ULL); } } diff --git a/ut/utils.h b/ut/utils.h index 1e1a2100..2dc81b1a 100644 --- a/ut/utils.h +++ b/ut/utils.h @@ -2,6 +2,7 @@ #include #include +#include #include #include From 61e8d2a682f2ae9497687abc2f7b927c87bca4ff Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Sat, 13 Nov 2021 11:57:27 +0200 Subject: [PATCH 2/3] Fixed issues discovered during PR review --- clickhouse/CMakeLists.txt | 1 - clickhouse/base/coded.cpp | 30 ------------------ clickhouse/base/coded.h | 10 ------ clickhouse/base/compressed.cpp | 56 +++++++++++++++++++-------------- clickhouse/base/compressed.h | 8 +++-- clickhouse/base/sslsocket.h | 1 + clickhouse/base/wire_format.cpp | 11 ++----- clickhouse/base/wire_format.h | 3 +- ut/stream_ut.cpp | 2 +- 9 files changed, 43 insertions(+), 79 deletions(-) delete mode 100644 clickhouse/base/coded.cpp delete mode 100644 clickhouse/base/coded.h diff --git a/clickhouse/CMakeLists.txt b/clickhouse/CMakeLists.txt index beb411d3..7862415d 100644 --- a/clickhouse/CMakeLists.txt +++ b/clickhouse/CMakeLists.txt @@ -1,5 +1,4 @@ SET ( clickhouse-cpp-lib-src - base/coded.cpp base/compressed.cpp base/input.cpp base/output.cpp diff --git a/clickhouse/base/coded.cpp b/clickhouse/base/coded.cpp deleted file mode 100644 index 35a23b9c..00000000 --- a/clickhouse/base/coded.cpp +++ /dev/null @@ -1,30 +0,0 @@ -#include "coded.h" - -#include - -namespace clickhouse { - -//static const int MAX_VARINT_BYTES = 10; - -//CodedInputStream::CodedInputStream(ZeroCopyInput* input) -// : input_(input) -//{ -//} - -//bool CodedInputStream::ReadRaw(void* buffer, size_t size) { -// uint8_t* p = static_cast(buffer); - -// while (size > 0) { -// const void* ptr; -// size_t len = input_->Next(&ptr, size); - -// memcpy(p, ptr, len); - -// p += len; -// size -= len; -// } - -// return true; -//} - -} diff --git a/clickhouse/base/coded.h b/clickhouse/base/coded.h deleted file mode 100644 index b6b6ca7c..00000000 --- a/clickhouse/base/coded.h +++ /dev/null @@ -1,10 +0,0 @@ -#pragma once - -#include "input.h" -#include "output.h" - -#include - -namespace clickhouse { - -} diff --git a/clickhouse/base/compressed.cpp b/clickhouse/base/compressed.cpp index 0239c6f2..946a59ad 100644 --- a/clickhouse/base/compressed.cpp +++ b/clickhouse/base/compressed.cpp @@ -7,13 +7,13 @@ #include #include -#include - namespace { -static const size_t HEADER_SIZE = 9; -static const size_t EXTRA_PREALLOCATE_COMPRESS_BUFFER = 15; -static const uint8_t COMPRESSION_METHOD = 0x82; -#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB +constexpr size_t HEADER_SIZE = 9; +// see DB::CompressionMethodByte::LZ4 from src/Compression/CompressionInfo.h of ClickHouse project +constexpr uint8_t COMPRESSION_METHOD = 0x82; +// Documentation says that compression is faster when output buffer is larger than LZ4_compressBound estimation. +constexpr size_t EXTRA_COMPRESS_BUFFER_SIZE = 4096; +constexpr size_t DBMS_MAX_COMPRESSED_SIZE = 0x40000000ULL; // 1GB } namespace clickhouse { @@ -30,7 +30,7 @@ CompressedInput::~CompressedInput() { #else if (!std::uncaught_exceptions()) { #endif - throw std::runtime_error("some data was not readed"); + throw std::runtime_error("some data was not read"); } } } @@ -59,8 +59,7 @@ bool CompressedInput::Decompress() { } if (method != COMPRESSION_METHOD) { - throw std::runtime_error("unsupported compression method " + - std::to_string(int(method))); + throw std::runtime_error("unsupported compression method " + std::to_string(int(method))); } else { if (!WireFormat::ReadFixed(input_, &compressed)) { return false; @@ -105,24 +104,27 @@ bool CompressedInput::Decompress() { CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size) - : destination_(destination), - max_compressed_chunk_size_(max_compressed_chunk_size) + : destination_(destination) + , max_compressed_chunk_size_(max_compressed_chunk_size) { + PreallocateCompressBuffer(max_compressed_chunk_size); } CompressedOutput::~CompressedOutput() { - Flush(); + Flush(); } size_t CompressedOutput::DoWrite(const void* data, size_t len) { const size_t original_len = len; - const size_t max_chunk_size = max_compressed_chunk_size_ ? max_compressed_chunk_size_ : len; + // what if len > max_compressed_chunk_size_ ? + const size_t max_chunk_size = max_compressed_chunk_size_ > 0 ? max_compressed_chunk_size_ : len; + if (len > max_compressed_chunk_size_) { + PreallocateCompressBuffer(len); + } - while (len > 0) - { + while (len > 0) { auto to_compress = std::min(len, max_chunk_size); - if (!Compress(data, to_compress)) - break; + Compress(data, to_compress); len -= to_compress; data = reinterpret_cast(data) + to_compress; @@ -135,16 +137,15 @@ void CompressedOutput::DoFlush() { destination_->Flush(); } -bool CompressedOutput::Compress(const void * data, size_t len) { - - const size_t expected_out_size = LZ4_compressBound(len); - compressed_buffer_.resize(std::max(compressed_buffer_.size(), expected_out_size + HEADER_SIZE + EXTRA_PREALLOCATE_COMPRESS_BUFFER)); - - const int compressed_size = LZ4_compress_default( +void CompressedOutput::Compress(const void * data, size_t len) { + const auto compressed_size = LZ4_compress_default( (const char*)data, (char*)compressed_buffer_.data() + HEADER_SIZE, len, compressed_buffer_.size() - HEADER_SIZE); + if (compressed_size <= 0) + throw std::runtime_error("Failed to compress chunk of " + std::to_string(len) + " bytes, " + "LZ4 error: " + std::to_string(compressed_size)); { auto header = compressed_buffer_.data(); @@ -160,7 +161,14 @@ bool CompressedOutput::Compress(const void * data, size_t len) { WireFormat::WriteBytes(destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE); destination_->Flush(); - return true; +} + +void CompressedOutput::PreallocateCompressBuffer(size_t input_size) { + const auto estimated_compressed_buffer_size = LZ4_compressBound(input_size); + if (estimated_compressed_buffer_size <= 0) + throw std::runtime_error("Failed to estimate compressed buffer size, LZ4 error: " + std::to_string(estimated_compressed_buffer_size)); + + compressed_buffer_.resize(estimated_compressed_buffer_size + HEADER_SIZE + EXTRA_COMPRESS_BUFFER_SIZE); } } diff --git a/clickhouse/base/compressed.h b/clickhouse/base/compressed.h index 390d3ab9..0b40d0be 100644 --- a/clickhouse/base/compressed.h +++ b/clickhouse/base/compressed.h @@ -8,7 +8,7 @@ namespace clickhouse { class CompressedInput : public ZeroCopyInput { public: - CompressedInput(InputStream* input); + CompressedInput(InputStream* input); ~CompressedInput(); protected: @@ -31,13 +31,15 @@ class CompressedOutput : public OutputStream { protected: size_t DoWrite(const void* data, size_t len) override; void DoFlush() override; - bool Compress(const void * data, size_t len); +private: + void Compress(const void * data, size_t len); + void PreallocateCompressBuffer(size_t input_size); private: OutputStream * destination_; + const size_t max_compressed_chunk_size_; Buffer compressed_buffer_; - size_t max_compressed_chunk_size_; }; } diff --git a/clickhouse/base/sslsocket.h b/clickhouse/base/sslsocket.h index 19d42be9..0a85eef0 100644 --- a/clickhouse/base/sslsocket.h +++ b/clickhouse/base/sslsocket.h @@ -64,6 +64,7 @@ class SSLSocketInput : public InputStream { bool Skip(size_t /*bytes*/) override { return false; } + protected: size_t DoRead(void* buf, size_t len) override; diff --git a/clickhouse/base/wire_format.cpp b/clickhouse/base/wire_format.cpp index d44730b0..c0f09ecc 100644 --- a/clickhouse/base/wire_format.cpp +++ b/clickhouse/base/wire_format.cpp @@ -6,7 +6,7 @@ #include namespace { -static const int MAX_VARINT_BYTES = 10; +constexpr int MAX_VARINT_BYTES = 10; } namespace clickhouse { @@ -22,12 +22,7 @@ bool WireFormat::ReadAll(InputStream * input, void* buf, size_t len) { len -= read_previously; } - // true if all was read successfully return !len; -// if (len) { -// throw std::runtime_error("Failed to read " + std::to_string(original_len) -// + " bytes, only read " + std::to_string(original_len - len)); -// } } void WireFormat::WriteAll(OutputStream* output, const void* buf, size_t len) { @@ -52,7 +47,7 @@ bool WireFormat::ReadVarint64(InputStream* input, uint64_t* value) { *value = 0; for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) { - uint8_t byte; + uint8_t byte = 0; if (!input->ReadByte(&byte)) { return false; @@ -90,7 +85,7 @@ void WireFormat::WriteVarint64(OutputStream* output, uint64_t value) { } bool WireFormat::SkipString(InputStream* input) { - uint64_t len; + uint64_t len = 0; if (ReadVarint64(input, &len)) { if (len > 0x00FFFFFFULL) diff --git a/clickhouse/base/wire_format.h b/clickhouse/base/wire_format.h index a2c803f4..ade785df 100644 --- a/clickhouse/base/wire_format.h +++ b/clickhouse/base/wire_format.h @@ -35,8 +35,7 @@ inline bool WireFormat::ReadFixed(InputStream* input, T* value) { } inline bool WireFormat::ReadString(InputStream* input, std::string* value) { - uint64_t len; - + uint64_t len = 0; if (ReadVarint64(input, &len)) { if (len > 0x00FFFFFFULL) { return false; diff --git a/ut/stream_ut.cpp b/ut/stream_ut.cpp index 7cd20421..a16b8bb8 100644 --- a/ut/stream_ut.cpp +++ b/ut/stream_ut.cpp @@ -16,7 +16,7 @@ TEST(CodedStreamCase, Varint64) { { ArrayInput input(buf.data(), buf.size()); - uint64_t value; + uint64_t value = 0; ASSERT_TRUE(WireFormat::ReadVarint64(&input, &value)); ASSERT_EQ(value, 18446744071965638648ULL); } From 83c91db2e267f1c2b5b201c1b917f54bb33c58e0 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Sun, 14 Nov 2021 16:33:59 +0200 Subject: [PATCH 3/3] Reduced memory usage --- clickhouse/base/compressed.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clickhouse/base/compressed.cpp b/clickhouse/base/compressed.cpp index 946a59ad..a240f3d3 100644 --- a/clickhouse/base/compressed.cpp +++ b/clickhouse/base/compressed.cpp @@ -118,7 +118,7 @@ size_t CompressedOutput::DoWrite(const void* data, size_t len) { const size_t original_len = len; // what if len > max_compressed_chunk_size_ ? const size_t max_chunk_size = max_compressed_chunk_size_ > 0 ? max_compressed_chunk_size_ : len; - if (len > max_compressed_chunk_size_) { + if (max_chunk_size > max_compressed_chunk_size_) { PreallocateCompressBuffer(len); }