Skip to content

Commit

Permalink
Merge pull request #18802 from ClickHouse/backport/20.12/18776
Browse files Browse the repository at this point in the history
Backport #18776 to 20.12: Fix the issue with async Distributed INSERTs and network_compression_method
  • Loading branch information
alexey-milovidov authored Jan 7, 2021
2 parents 461fb29 + a55dd0c commit d6bd1bd
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 5 deletions.
4 changes: 2 additions & 2 deletions src/Compression/CompressedReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class CompressedReadBuffer : public CompressedReadBufferBase, public BufferWithO
bool nextImpl() override;

public:
CompressedReadBuffer(ReadBuffer & in_)
: CompressedReadBufferBase(&in_), BufferWithOwnMemory<ReadBuffer>(0)
CompressedReadBuffer(ReadBuffer & in_, bool allow_different_codecs_ = false)
: CompressedReadBufferBase(&in_, allow_different_codecs_), BufferWithOwnMemory<ReadBuffer>(0)
{
}

Expand Down
3 changes: 3 additions & 0 deletions src/Compression/CompressionFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std

void CompressionCodecFactory::validateCodec(const String & family_name, std::optional<int> level, bool sanity_check) const
{
if (family_name.empty())
throw Exception("Compression codec name cannot be empty", ErrorCodes::BAD_ARGUMENTS);

if (level)
{
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
Expand Down
7 changes: 5 additions & 2 deletions src/Server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ void TCPHandler::receiveUnexpectedData()
std::shared_ptr<ReadBuffer> maybe_compressed_in;

if (last_block_in.compression == Protocol::Compression::Enable)
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
else
maybe_compressed_in = in;

Expand All @@ -1118,8 +1118,11 @@ void TCPHandler::initBlockInput()
{
if (!state.block_in)
{
/// 'allow_different_codecs' is set to true, because some parts of compressed data can be precompressed in advance
/// with another codec that the rest of the data. Example: data sent by Distributed tables.

if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
else
state.maybe_compressed_in = in;

Expand Down
13 changes: 12 additions & 1 deletion src/Storages/Distributed/DistributedBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,17 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_

void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
const auto & settings = context.getSettingsRef();

std::string compression_method = Poco::toUpper(settings.network_compression_method.toString());
std::optional<int> compression_level;

if (compression_method == "ZSTD")
compression_level = settings.network_zstd_compression_level;

CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs);
CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level);

/// tmp directory is used to ensure atomicity of transactions
/// and keep monitor thread out from reading incomplete data
std::string first_file_tmp_path{};
Expand All @@ -588,7 +599,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
/// Write batch to temporary location
{
WriteBufferFromFile out{first_file_tmp_path};
CompressedWriteBuffer compress{out};
CompressedWriteBuffer compress{out, compression_codec};
NativeBlockOutputStream stream{compress, DBMS_TCP_PROTOCOL_VERSION, block.cloneEmpty()};

/// Prepare the header.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
256
512
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
DROP TABLE IF EXISTS local;
DROP TABLE IF EXISTS distributed;

CREATE TABLE local (x UInt8) ENGINE = Memory;
CREATE TABLE distributed AS local ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), local, x);

SET insert_distributed_sync = 0, network_compression_method = 'zstd';

INSERT INTO distributed SELECT number FROM numbers(256);
SYSTEM FLUSH DISTRIBUTED distributed;

SELECT count() FROM local;
SELECT count() FROM distributed;

DROP TABLE local;
DROP TABLE distributed;

0 comments on commit d6bd1bd

Please sign in to comment.