Skip to content

Commit

Permalink
feat: timeplus external stream (#894)
Browse files Browse the repository at this point in the history
  • Loading branch information
zliang-min authored Jan 17, 2025
1 parent 051b614 commit 0860399
Show file tree
Hide file tree
Showing 14 changed files with 510 additions and 8 deletions.
41 changes: 41 additions & 0 deletions src/Databases/DDLDependencyVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Poco/String.h>
/// proton: starts
#include <Common/parseRemoteDescription.h>
#include <Storages/ExternalStream/ExternalStreamSettings.h>
/// proton: ends

namespace DB
{
Expand Down Expand Up @@ -83,10 +87,47 @@ void DDLDependencyVisitor::visit(const ASTFunctionWithKeyValueArguments & dict_s
data.dependencies.emplace(std::move(info->table_name));
}

/// proton: starts
namespace
{
bool hasLocalAddress(const String & hosts, bool secure)
{
auto global_context = Context::getGlobalContextInstance();
UInt16 default_port = secure ? global_context->getTCPPortSecure().value_or(0) : global_context->getTCPPort();
auto addresses = parseRemoteDescriptionForExternalDatabase(hosts, /*max_addresses=*/ 10, /*default_port=*/ default_port);
for (const auto & addr : addresses)
if (isLocalAddress({addr.first, addr.second}, default_port))
return true;

return false;
}
}
/// proton: ends

void DDLDependencyVisitor::visit(const ASTStorage & storage, Data & data)
{
if (!storage.engine)
return;

/// proton: starts
/// Because Timeplus external streams need to get the structure of the target stream,
/// it depends on the target stream. Thus, if a Timeplus external stream is pointing to
/// a local stream, then add the target stream to dependencies, to make sure that the
/// target stream is loaded before the external stream.
if (storage.engine->name == "ExternalStream")
{
ExternalStreamSettings settings;
settings.loadFromQuery(const_cast<ASTStorage &>(storage));
if (settings.type.value == "timeplus" && hasLocalAddress(settings.hosts, settings.secure))
{
QualifiedTableName name{settings.db, settings.stream};
data.dependencies.emplace(std::move(name));
}

return;
}
/// proton: ends

if (storage.engine->name != "Dictionary")
return;

Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/ClusterProxy/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr c
new_settings.limit = 0;
new_settings.limit.changed = false;
}
/// proton: starts
if (settings.query_mode.value == "table")
new_settings.query_mode = settings.query_mode;
/// proton: ends

auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
Expand Down
7 changes: 6 additions & 1 deletion src/Processors/Sources/RemoteSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace DB
{

RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_)
RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_, std::optional<bool> is_streaming_)
: ISource(executor->getHeader(), false, ProcessorID::RemoteSourceID)
, add_aggregation_info(add_aggregation_info_), query_executor(std::move(executor))
, async_read(async_read_)
Expand All @@ -18,6 +18,11 @@ RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation
for (auto & type : sample.getDataTypes())
if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
add_aggregation_info = true;

/// proton: starts
if (is_streaming_)
setStreaming(is_streaming_.value());
/// proton: ends
}

RemoteSource::~RemoteSource() = default;
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Sources/RemoteSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class RemoteSource final : public ISource
/// Flag add_aggregation_info tells if AggregatedChunkInfo should be added to result chunk.
/// AggregatedChunkInfo stores the bucket number used for two-level aggregation.
/// This flag should be typically enabled for queries with GROUP BY which are executed till WithMergeableState.
RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_);
RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_, std::optional<bool> is_streaming_=std::nullopt); /// proton: added is_streaming_
~RemoteSource() override;

Status prepare() override;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ExternalStream/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ endif()

add_headers_and_sources(external_stream Kafka)
add_headers_and_sources(external_stream Pulsar)
add_headers_and_sources(external_stream Timeplus)

add_library(external_stream ${external_stream_headers} ${external_stream_sources})

Expand Down
10 changes: 9 additions & 1 deletion src/Storages/ExternalStream/ExternalStreamSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,19 @@ class ASTStorage;
M(UInt64, memory_limit, 0, "Configure a limit on the amount of memory that will be allocated by this external stream. Setting this to 0 will disable the limit. By default this is disabled.", 0) \
M(UInt64, io_threads, 1, "Set the number of IO threads to be used by the Pulsar client. Default is 1 thread.", 0)

#define TIMEPLUS_EXTERNAL_STREAM_SETTINGS(M, ALIAS) \
M(String, hosts, "", "A remote server address or an expression that generates multiple addresses of remote servers. Format: host or host:port.", 0) \
M(String, db, "default", "Database name.", 0) \
M(String, stream, "", "Stream name.", 0) \
M(String, user, "", "User name. If not specified, `default` is be used.", 0) \
M(Bool, secure, false, "Use secure connection.", 0)

#define ALL_EXTERNAL_STREAM_SETTINGS(M) \
M(String, type, "", "External stream type", 0) \
KAFKA_EXTERNAL_STREAM_SETTINGS(M) \
LOG_FILE_EXTERNAL_STREAM_SETTINGS(M) \
PULSAR_EXTERNAL_STREAM_SETTINGS(M)
PULSAR_EXTERNAL_STREAM_SETTINGS(M) \
TIMEPLUS_EXTERNAL_STREAM_SETTINGS(M, ALIAS)

#define LIST_OF_EXTERNAL_STREAM_SETTINGS(M) \
ALL_EXTERNAL_STREAM_SETTINGS(M) \
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ExternalStream/ExternalStreamTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace StreamTypes
const String KAFKA = "kafka";
const String REDPANDA = "redpanda";
const String PULSAR = "pulsar";
const String TIMEPLUS = "timeplus";
const String LOG = "log";
}
}
11 changes: 9 additions & 2 deletions src/Storages/ExternalStream/StorageExternalStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Storages/ExternalStream/ExternalStreamTypes.h>
#include <Storages/ExternalStream/Kafka/Kafka.h>
#include <Storages/ExternalStream/Pulsar/Pulsar.h>
#include <Storages/ExternalStream/Timeplus/Timeplus.h>
#ifdef OS_LINUX
# include <Storages/ExternalStream/Log/FileLog.h>
#endif
Expand Down Expand Up @@ -66,6 +67,7 @@ StoragePtr createExternalStream(
IStorage * storage,
ExternalStreamSettingsPtr settings,
const ASTs & engine_args,
StorageInMemoryMetadata & storage_metadata,
bool attach,
ExternalStreamCounterPtr external_stream_counter,
ContextPtr context_)
Expand All @@ -79,6 +81,9 @@ StoragePtr createExternalStream(
if (type == StreamTypes::KAFKA || type == StreamTypes::REDPANDA)
return std::make_unique<Kafka>(storage, std::move(settings), engine_args, attach, external_stream_counter, std::move(context_));

if (type == StreamTypes::TIMEPLUS)
return std::make_unique<ExternalStream::Timeplus>(storage, storage_metadata, std::move(settings), attach, std::move(context_));

#ifdef OS_LINUX
if (type == StreamTypes::LOG && context_->getSettingsRef()._tp_enable_log_stream_expr.value)
return std::make_unique<FileLog>(storage, std::move(settings), std::move(context_));
Expand Down Expand Up @@ -121,7 +126,7 @@ StorageExternalStream::StorageExternalStream(
}
}

if (columns_.empty())
if (columns_.empty() && external_stream_settings->type.value != StreamTypes::TIMEPLUS)
/// This is the same error reported by InterpreterCreateQuery
throw Exception(
ErrorCodes::INCORRECT_QUERY, "Incorrect CREATE query: required list of column descriptions or AS section or SELECT.");
Expand All @@ -145,8 +150,10 @@ StorageExternalStream::StorageExternalStream(

auto metadata = getInMemoryMetadata();
auto stream = createExternalStream(
this, std::move(external_stream_settings), engine_args, attach, external_stream_counter, std::move(context_));
this, std::move(external_stream_settings), engine_args, metadata, attach, external_stream_counter, std::move(context_));
external_stream.swap(stream);
/// Some external streams fetch the structure in other ways, thus need to set the metadata again here in case it's updated.
setInMemoryMetadata(metadata);
}

void StorageExternalStream::read(
Expand Down
Loading

0 comments on commit 0860399

Please sign in to comment.