Skip to content

Commit

Permalink
feat: new SSL related kafka external stream settings (#635)
Browse files Browse the repository at this point in the history
* feat: support top-level setting for skipping cert check for external stream
* added setting ssl_ca_pem
  • Loading branch information
zliang-min authored Apr 5, 2024
1 parent aa9d940 commit b38f2ce
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/Storages/ExternalStream/ExternalStreamSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class ASTStorage;
M(String, password, "", "The password of external logstore", 0) \
M(String, sasl_mechanism, "PLAIN", "SASL mechanism to use for authentication. Supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. Default to PLAIN when SASL is enabled.", 0) \
M(String, ssl_ca_cert_file, "", "The path of ssl ca cert file", 0) \
M(String, ssl_ca_pem, "", "CA certificate string (PEM format) for verifying the server's key.", 0) \
M(Bool, skip_ssl_cert_check, false, "If set to true, the server's certification won't be verified.", 0) \
M(String, properties, "", "A semi-colon-separated key-value pairs for configuring the kafka client used by the external stream. A key-value pair is separated by a equal sign. Example: 'client.id=my-client-id;group.id=my-group-id'. Note, not all properties are supported, please check the document for supported properties.", 0) \
M(UInt64, poll_waittime_ms, 500, "How long (in milliseconds) should poll waits.", 0) \
M(String, sharding_expr, "", "An expression which will be evaluated on each row of data returned by the query to calculate the an integer which will be used to determine the ID of the partition to which the row of data will be sent. If not set, data are sent to any partition randomly.", 0) \
Expand Down
28 changes: 23 additions & 5 deletions src/Storages/ExternalStream/Kafka/Kafka.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <IO/WriteBufferFromFile.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
Expand All @@ -21,6 +22,7 @@
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>

#include <filesystem>
#include <ranges>

namespace DB
Expand Down Expand Up @@ -185,8 +187,12 @@ Kafka::ConfPtr createConfFromSettings(const KafkaExternalStreamSettings & settin
conf_set("sasl.password", settings.password.value);
}

if (settings.usesSecureConnection() && !settings.ssl_ca_cert_file.value.empty())
conf_set("ssl.ca.location", settings.ssl_ca_cert_file.value);
if (settings.usesSecureConnection())
{
conf_set("enable.ssl.certificate.verification", settings.skip_ssl_cert_check ? "false" : "true");
if (!settings.ssl_ca_cert_file.value.empty())
conf_set("ssl.ca.location", settings.ssl_ca_cert_file.value);
}

return conf;
}
Expand All @@ -195,13 +201,25 @@ Kafka::ConfPtr createConfFromSettings(const KafkaExternalStreamSettings & settin

const String Kafka::VIRTUAL_COLUMN_MESSAGE_KEY = "_message_key";

Kafka::ConfPtr Kafka::createRdConf(KafkaExternalStreamSettings settings_)
{
if (const auto & ca_pem = settings_.ssl_ca_pem.value; !ca_pem.empty())
{
createTempDirIfNotExists();
broker_ca_file = tmpdir / "broker_ca.pem";
WriteBufferFromFile wb {broker_ca_file};
wb.write(ca_pem.data(), ca_pem.size());
settings_.ssl_ca_cert_file = broker_ca_file;
}
return createConfFromSettings(settings_);
}

Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_, ContextPtr context)
: StorageExternalStreamImpl(std::move(settings_))
, storage_id(storage->getStorageID())
: StorageExternalStreamImpl(storage, std::move(settings_), context)
, engine_args(engine_args_)
, data_format(StorageExternalStreamImpl::dataFormat())
, external_stream_counter(external_stream_counter_)
, conf(createConfFromSettings(settings->getKafkaSettings()))
, conf(createRdConf(settings->getKafkaSettings()))
, logger(&Poco::Logger::get(getLoggerName()))
{
assert(settings->type.value == StreamTypes::KAFKA || settings->type.value == StreamTypes::REDPANDA);
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/ExternalStream/Kafka/Kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Kafka final : public StorageExternalStreamImpl
producer_topic.reset();
if (producer)
producer.reset();
tryRemoveTempDir(logger);
}
bool supportsSubcolumns() const override { return true; }
NamesAndTypesList getVirtuals() const override;
Expand Down Expand Up @@ -81,13 +82,13 @@ class Kafka final : public StorageExternalStreamImpl
String getLoggerName() const { return storage_id.getDatabaseName() == "default" ? storage_id.getTableName() : storage_id.getFullNameNotQuoted(); }

private:
Kafka::ConfPtr createRdConf(KafkaExternalStreamSettings settings_);
void calculateDataFormat(const IStorage * storage);
void cacheVirtualColumnNamesAndTypes();
std::vector<Int64> getOffsets(const SeekToInfoPtr & seek_to_info, const std::vector<int32_t> & shards_to_query) const;
void validateMessageKey(const String & message_key, IStorage * storage, const ContextPtr & context);
void validate() const;

StorageID storage_id;
ASTs engine_args;
String data_format;
ExternalStreamCounterPtr external_stream_counter;
Expand All @@ -97,6 +98,7 @@ class Kafka final : public StorageExternalStreamImpl
ASTPtr message_key_ast;
Int32 topic_refresh_interval_ms = 0;
std::vector<Int32> shards_from_settings;
fs::path broker_ca_file;

ConfPtr conf;
/// The Producer instance and Topic instance can be used by multiple sinks at the same time, thus we only need one of each.
Expand Down
12 changes: 5 additions & 7 deletions src/Storages/ExternalStream/Log/FileLog.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#include "FileLog.h"
#include "FileLogSource.h"
#include "fileLastModifiedTime.h"

#include <Interpreters/Context.h>
#include <NativeLog/Base/Stds.h>
#include <NativeLog/Record/Record.h>
#include <Storages/ExternalStream/ExternalStreamTypes.h>
#include <Storages/ExternalStream/Log/FileLog.h>
#include <Storages/ExternalStream/Log/FileLogSource.h>
#include <Storages/ExternalStream/Log/fileLastModifiedTime.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/Streaming/storageUtil.h>
Expand All @@ -22,9 +21,8 @@ extern const int INVALID_SETTING_VALUE;
extern const int CANNOT_FSTAT;
}

FileLog::FileLog(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_)
: StorageExternalStreamImpl(std::move(settings_))
, storage_id(storage->getStorageID())
FileLog::FileLog(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, ContextPtr context)
: StorageExternalStreamImpl(storage, std::move(settings_), context)
, timestamp_regex(std::make_unique<re2::RE2>(settings->timestamp_regex.value))
, linebreaker_regex(std::make_unique<re2::RE2>(settings->row_delimiter.value))
, log(&Poco::Logger::get("External-FileLog"))
Expand Down
8 changes: 1 addition & 7 deletions src/Storages/ExternalStream/Log/FileLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ namespace re2
class RE2;
}

namespace Poco
{
class Logger;
}

namespace DB
{

Expand All @@ -25,7 +20,7 @@ class IStorage;
class FileLog final : public StorageExternalStreamImpl
{
public:
FileLog(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_);
FileLog(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, ContextPtr context);
~FileLog() override = default;

void startup() override { }
Expand All @@ -49,7 +44,6 @@ class FileLog final : public StorageExternalStreamImpl
FileLogSource::FileContainer searchForCandidates();

private:
StorageID storage_id;
std::vector<std::unique_ptr<re2::RE2>> file_regexes;
std::unique_ptr<re2::RE2> timestamp_regex;
std::unique_ptr<re2::RE2> linebreaker_regex;
Expand Down
7 changes: 4 additions & 3 deletions src/Storages/ExternalStream/StorageExternalStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int NOT_IMPLEMENTED;
extern const int TYPE_MISMATCH;
}

Expand Down Expand Up @@ -66,7 +66,7 @@ std::unique_ptr<StorageExternalStreamImpl> createExternalStream(

#ifdef OS_LINUX
else if (settings->type.value == StreamTypes::LOG && context->getSettingsRef()._tp_enable_log_stream_expr.value)
return std::make_unique<FileLog>(storage, std::move(settings));
return std::make_unique<FileLog>(storage, std::move(settings), std::move(context_));
else
#endif
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} external stream is not supported yet", settings->type.value);
Expand Down Expand Up @@ -144,7 +144,8 @@ StorageExternalStream::StorageExternalStream(
std::unique_ptr<ExternalStreamSettings> external_stream_settings_,
const String & comment,
bool attach)
: IStorage(table_id_), WithContext(context_->getGlobalContext())
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ExternalStream/StorageExternalStream.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#pragma once

#include <Storages/IStorage.h>
#include <base/shared_ptr_helper.h>
#include <Common/SettingsChanges.h>
#include <Storages/ExternalStream/ExternalStreamCounter.h>

#include <base/shared_ptr_helper.h>

namespace DB
{
struct ExternalStreamSettings;
Expand Down
27 changes: 27 additions & 0 deletions src/Storages/ExternalStream/StorageExternalStreamImpl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include <Storages/ExternalStream/StorageExternalStreamImpl.h>

namespace DB
{

namespace ErrorCodes
{
extern const int CANNOT_CREATE_DIRECTORY;
}

void StorageExternalStreamImpl::createTempDirIfNotExists() const
{
std::error_code err;
/// create_directories will do nothing if the directory already exists.
if (!fs::create_directories(tmpdir, err))
throw Exception(ErrorCodes::CANNOT_CREATE_DIRECTORY, "Failed to create external stream temproary directory, error_code={}, error_mesage={}", err.value(), err.message());
}

void StorageExternalStreamImpl::tryRemoveTempDir(Poco::Logger * logger) const
{
LOG_INFO(logger, "Trying to remove external stream temproary directory {}", tmpdir.string());
std::error_code err;
if (!fs::remove_all(tmpdir, err))
LOG_ERROR(logger, "Failed to remove the temporary directory, error_code={}, error_message={}", err.value(), err.message());
}

}
14 changes: 13 additions & 1 deletion src/Storages/ExternalStream/StorageExternalStreamImpl.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <Common/logger_useful.h>
#include <Formats/FormatFactory.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/ExternalStream/ExternalStreamSettings.h>
#include <Storages/ExternalStream/ExternalStreamCounter.h>
Expand All @@ -12,7 +14,11 @@ namespace DB
class StorageExternalStreamImpl : public std::enable_shared_from_this<StorageExternalStreamImpl>
{
public:
explicit StorageExternalStreamImpl(std::unique_ptr<ExternalStreamSettings> settings_): settings(std::move(settings_)) {
explicit StorageExternalStreamImpl(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ContextPtr & context)
: storage_id(storage->getStorageID())
, settings(std::move(settings_))
, tmpdir(fs::path(context->getConfigRef().getString("tmp_path", fs::temp_directory_path())) / "external_streams" / toString(storage_id.uuid))
{
/// Make it easier for people to ingest data from external streams. A lot of times people didn't see data coming
/// only because the external stream does not have all the fields.
if (!settings->input_format_skip_unknown_fields.changed)
Expand Down Expand Up @@ -54,7 +60,13 @@ class StorageExternalStreamImpl : public std::enable_shared_from_this<StorageExt
}

protected:
/// Creates a temporary directory for the external stream to store temporary data.
void createTempDirIfNotExists() const;
void tryRemoveTempDir(Poco::Logger * logger) const;

StorageID storage_id;
std::unique_ptr<ExternalStreamSettings> settings;
fs::path tmpdir;
};

}

0 comments on commit b38f2ce

Please sign in to comment.