Skip to content

Commit

Permalink
[Improvement](functions)Optimized some datetime function's return val…
Browse files Browse the repository at this point in the history
…ue (apache#18369)
  • Loading branch information
zclllyybb authored and Reminiscent committed May 15, 2023
1 parent 55ce653 commit 38133dc
Show file tree
Hide file tree
Showing 31 changed files with 463 additions and 280 deletions.
16 changes: 10 additions & 6 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ class BeExecVersionManager {
static const int min_be_exec_version;
};

// When we have some breaking change for execute engine, we should update be_exec_version.
// 0: not contain be_exec_version.
// 1: start from doris 1.2
// a. remove ColumnString terminating zero.
// b. runtime filter use new hash method.
inline const int BeExecVersionManager::max_be_exec_version = 1;
/*
* When we have some breaking change for execute engine, we should update be_exec_version.
* 0: not contain be_exec_version.
* 1: start from doris 1.2
* a. remove ColumnString terminating zero.
* b. runtime filter use new hash method.
* 2: start from doris 2.0
* a. function month/day/hour/minute/second's return type is changed to smaller type.
*/
inline const int BeExecVersionManager::max_be_exec_version = 2;
inline const int BeExecVersionManager::min_be_exec_version = 0;

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
ObjectPool pool;
RuntimeState* state = pool.add(new RuntimeState());
state->set_desc_tbl(&_desc_tbl);
state->set_be_exec_version(_fe_compatible_version);
RowDescriptor row_desc =
RowDescriptor(_desc_tbl.get_tuple_descriptor(_desc_tbl.get_row_tuples()[0]), false);

Expand Down Expand Up @@ -1087,6 +1088,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
sc_params.ref_rowset_readers = rs_readers;
sc_params.delete_handler = &delete_handler;
sc_params.base_tablet_schema = base_tablet_schema;
sc_params.be_exec_version = request.be_exec_version;
DCHECK(request.__isset.alter_tablet_type);
switch (request.alter_tablet_type) {
case TAlterTabletType::SCHEMA_CHANGE:
Expand Down Expand Up @@ -1651,6 +1653,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
BlockChanger* changer, bool* sc_sorting,
bool* sc_directly) {
changer->set_type(sc_params.alter_tablet_type);
changer->set_compatible_version(sc_params.be_exec_version);

TabletSharedPtr base_tablet = sc_params.base_tablet;
TabletSharedPtr new_tablet = sc_params.new_tablet;
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <cstdint>

#include "common/status.h"
#include "gen_cpp/AgentService_types.h"
#include "olap/column_mapping.h"
Expand Down Expand Up @@ -48,6 +50,8 @@ class BlockChanger {

void set_type(AlterTabletType type) { _type = type; }

void set_compatible_version(int32_t version) noexcept { _fe_compatible_version = version; }

bool has_where() const { return _where_expr != nullptr; }

private:
Expand All @@ -62,6 +66,8 @@ class BlockChanger {
std::shared_ptr<TExpr> _where_expr;

AlterTabletType _type;

int32_t _fe_compatible_version = -1;
};

class SchemaChange {
Expand Down Expand Up @@ -263,6 +269,7 @@ class SchemaChangeHandler {
std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;
DescriptorTbl* desc_tbl = nullptr;
ObjectPool pool;
int32_t be_exec_version;
};

static Status _do_process_alter_tablet_v2(const TAlterTabletReqV2& request);
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ class RuntimeState {
return 0;
}

void set_be_exec_version(int32_t version) noexcept { _query_options.be_exec_version = version; }

private:
Status create_error_log_file();

Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exprs/vectorized_fn_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ doris::Status VectorizedFnCall::prepare(doris::RuntimeState* state,
"and restart be.");
}
} else {
_function = SimpleFunctionFactory::instance().get_function(_fn.name.function_name,
argument_template, _data_type);
// get the function. won't prepare function.
_function = SimpleFunctionFactory::instance().get_function(
_fn.name.function_name, argument_template, _data_type, state->be_exec_version());
}
if (_function == nullptr) {
std::string type_str;
Expand Down
15 changes: 13 additions & 2 deletions be/src/vec/functions/function.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,14 @@ class IFunctionBuilder {

using FunctionBuilderPtr = std::shared_ptr<IFunctionBuilder>;

/// used in function_factory. when we register a function, save a builder. to get a function, to get a builder.
/// will use DefaultFunctionBuilder as the default builder in function's registration if we didn't explicitly specify.
class FunctionBuilderImpl : public IFunctionBuilder {
public:
FunctionBasePtr build(const ColumnsWithTypeAndName& arguments,
const DataTypePtr& return_type) const final {
const DataTypePtr& func_return_type = get_return_type(arguments);
// check return types equal.
DCHECK(return_type->equals(*func_return_type) ||
// For null constant argument, `get_return_type` would return
// Nullable<DataTypeNothing> when `use_default_implementation_for_nulls` is true.
Expand Down Expand Up @@ -371,6 +374,7 @@ class FunctionBuilderImpl : public IFunctionBuilder {
/// If it isn't, will convert all ColumnLowCardinality arguments to full columns.
virtual bool can_be_executed_on_low_cardinality_dictionary() const { return true; }

/// return a real function object to execute. called in build(...).
virtual FunctionBasePtr build_impl(const ColumnsWithTypeAndName& arguments,
const DataTypePtr& return_type) const = 0;

Expand Down Expand Up @@ -449,7 +453,7 @@ class IFunction : public std::enable_shared_from_this<IFunction>,
}
};

/// Wrappers over IFunction.
/// Wrappers over IFunction. If we (default)use DefaultFunction as wrapper, all function execution will go through this.

class DefaultExecutable final : public PreparedFunctionImpl {
public:
Expand Down Expand Up @@ -485,6 +489,10 @@ class DefaultExecutable final : public PreparedFunctionImpl {
std::shared_ptr<IFunction> function;
};

/*
* when we register a function which didn't specify its base(i.e. inherited from IFunction), actually we use this as a wrapper.
* it saves real implementation as `function`.
*/
class DefaultFunction final : public IFunctionBase {
public:
DefaultFunction(std::shared_ptr<IFunction> function_, DataTypes arguments_,
Expand All @@ -498,6 +506,7 @@ class DefaultFunction final : public IFunctionBase {
const DataTypes& get_argument_types() const override { return arguments; }
const DataTypePtr& get_return_type() const override { return return_type; }

// return a default wrapper for IFunction.
PreparedFunctionPtr prepare(FunctionContext* context, const Block& /*sample_block*/,
const ColumnNumbers& /*arguments*/,
size_t /*result*/) const override {
Expand Down Expand Up @@ -599,7 +608,9 @@ class DefaultFunctionBuilder : public FunctionBuilderImpl {
FunctionBasePtr build_impl(const ColumnsWithTypeAndName& arguments,
const DataTypePtr& return_type) const override {
DataTypes data_types(arguments.size());
for (size_t i = 0; i < arguments.size(); ++i) data_types[i] = arguments[i].type;
for (size_t i = 0; i < arguments.size(); ++i) {
data_types[i] = arguments[i].type;
}
return std::make_shared<DefaultFunction>(function, data_types, return_type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ using FunctionCurrentTime = FunctionCurrentDateOrDateTime<CurrentTimeImpl<Curren
using FunctionUtcTimeStamp = FunctionCurrentDateOrDateTime<UtcTimestampImpl>;
using FunctionTimeToSec = FunctionCurrentDateOrDateTime<TimeToSecImpl>;

/// @TEMPORARY: for be_exec_version=2
using FunctionToWeekTwoArgsOld =
FunctionDateOrDateTimeComputation<ToWeekTwoArgsImplOld<DataTypeDateTime>>;

void register_function_date_time_computation(SimpleFunctionFactory& factory) {
factory.register_function<FunctionAddSeconds>();
factory.register_function<FunctionAddMinutes>();
Expand Down Expand Up @@ -178,6 +182,8 @@ void register_function_date_time_computation(SimpleFunctionFactory& factory) {
factory.register_alias("days_add", "date_add");
factory.register_alias("days_add", "adddate");
factory.register_alias("months_add", "add_months");
/// @TEMPORARY: for be_exec_version=2
factory.register_alternative_function<FunctionToWeekTwoArgsOld>();
}

} // namespace doris::vectorized
11 changes: 7 additions & 4 deletions be/src/vec/functions/function_date_or_datetime_computation.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ TIME_DIFF_FUNCTION_IMPL(HoursDiffImpl, hours_diff, HOUR);
TIME_DIFF_FUNCTION_IMPL(MintueSDiffImpl, minutes_diff, MINUTE);
TIME_DIFF_FUNCTION_IMPL(SecondsDiffImpl, seconds_diff, SECOND);

#define TIME_FUNCTION_TWO_ARGS_IMPL(CLASS, NAME, FUNCTION) \
#define TIME_FUNCTION_TWO_ARGS_IMPL(CLASS, NAME, FUNCTION, RETURN_TYPE) \
template <typename DateType> \
struct CLASS { \
using ArgType = std::conditional_t< \
Expand All @@ -280,7 +280,7 @@ TIME_DIFF_FUNCTION_IMPL(SecondsDiffImpl, seconds_diff, SECOND);
std::is_same_v<DateType, DataTypeDateV2>, DateV2Value<DateV2ValueType>, \
std::conditional_t<std::is_same_v<DateType, DataTypeDateTimeV2>, \
DateV2Value<DateTimeV2ValueType>, VecDateTimeValue>>; \
using ReturnType = DataTypeInt32; \
using ReturnType = RETURN_TYPE; \
static constexpr auto name = #NAME; \
static constexpr auto is_nullable = false; \
static inline ReturnType::FieldType execute(const ArgType& t0, const Int32 mode, \
Expand All @@ -294,8 +294,11 @@ TIME_DIFF_FUNCTION_IMPL(SecondsDiffImpl, seconds_diff, SECOND);
} \
}

TIME_FUNCTION_TWO_ARGS_IMPL(ToYearWeekTwoArgsImpl, yearweek, year_week(mysql_week_mode(mode)));
TIME_FUNCTION_TWO_ARGS_IMPL(ToWeekTwoArgsImpl, week, week(mysql_week_mode(mode)));
TIME_FUNCTION_TWO_ARGS_IMPL(ToYearWeekTwoArgsImpl, yearweek, year_week(mysql_week_mode(mode)),
DataTypeInt32);
TIME_FUNCTION_TWO_ARGS_IMPL(ToWeekTwoArgsImpl, week, week(mysql_week_mode(mode)), DataTypeInt8);
/// @TEMPORARY: for be_exec_version=2
TIME_FUNCTION_TWO_ARGS_IMPL(ToWeekTwoArgsImplOld, week, week(mysql_week_mode(mode)), DataTypeInt32);

template <typename FromType1, typename FromType2, typename ToType, typename Transform>
struct DateTimeOp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ using FunctionDatetimeV2ToYearWeekTwoArgs =
using FunctionDatetimeV2ToWeekTwoArgs =
FunctionDateOrDateTimeComputation<ToWeekTwoArgsImpl<DataTypeDateTimeV2>>;

/// @TEMPORARY: for be_exec_version=2
using FunctionDatetimeV2ToWeekTwoArgsOld =
FunctionDateOrDateTimeComputation<ToWeekTwoArgsImplOld<DataTypeDateTimeV2>>;

void register_function_date_time_computation_v2(SimpleFunctionFactory& factory) {
factory.register_function<FunctionAddSecondsV2>();
factory.register_function<FunctionAddMinutesV2>();
Expand Down Expand Up @@ -168,6 +172,9 @@ void register_function_date_time_computation_v2(SimpleFunctionFactory& factory)
factory.register_function<FunctionToWeekTwoArgsV2>();
factory.register_function<FunctionDatetimeV2ToYearWeekTwoArgs>();
factory.register_function<FunctionDatetimeV2ToWeekTwoArgs>();

/// @TEMPORARY: for be_exec_version=2
factory.register_alternative_function<FunctionDatetimeV2ToWeekTwoArgsOld>();
}

} // namespace doris::vectorized
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ class FunctionDateOrDateTimeToSomething : public IFunction {
bool is_variadic() const override { return true; }
size_t get_number_of_arguments() const override { return 0; }
DataTypes get_variadic_argument_types_impl() const override {
if constexpr (has_variadic_argument) return Transform::get_variadic_argument_types();
if constexpr (has_variadic_argument) {
return Transform::get_variadic_argument_types();
}
return {};
}

Expand Down
36 changes: 31 additions & 5 deletions be/src/vec/functions/simple_function_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <mutex>
#include <string>

#include "agent/be_exec_version_manager.h"
#include "udf/udf.h"
#include "vec/exprs/table_function/table_function.h"
#include "vec/functions/function.h"

Expand Down Expand Up @@ -99,6 +101,8 @@ class SimpleFunctionFactory {
using Creator = std::function<FunctionBuilderPtr()>;
using FunctionCreators = phmap::flat_hash_map<std::string, Creator>;
using FunctionIsVariadic = phmap::flat_hash_set<std::string>;
/// @TEMPORARY: for be_exec_version=2
constexpr static int DATETIME_FUNCTION_NEW = 2;

public:
void register_function(const std::string& name, const Creator& ptr) {
Expand Down Expand Up @@ -135,14 +139,25 @@ class SimpleFunctionFactory {
function_alias[alias] = name;
}

/// @TEMPORARY: for be_exec_version=2
template <class Function>
void register_alternative_function() {
static std::string suffix {"_old_for_version_before_2_0"};
function_to_replace[Function::name] = Function::name + suffix;
register_function(Function::name + suffix, &createDefaultFunction<Function>);
}

FunctionBasePtr get_function(const std::string& name, const ColumnsWithTypeAndName& arguments,
const DataTypePtr& return_type) {
const DataTypePtr& return_type,
int be_version = BeExecVersionManager::get_newest_version()) {
std::string key_str = name;

if (function_alias.count(name)) {
key_str = function_alias[name];
}

temporary_function_update(be_version, key_str);

// if function is variadic, added types_str as key
if (function_variadic_set.count(key_str)) {
for (auto& arg : arguments) {
Expand All @@ -155,24 +170,35 @@ class SimpleFunctionFactory {
}

auto iter = function_creators.find(key_str);
if (iter != function_creators.end()) {
return iter->second()->build(arguments, return_type);
if (iter == function_creators.end()) {
LOG(WARNING) << fmt::format("Function signature {} is not found", key_str);
return nullptr;
}

LOG(WARNING) << fmt::format("Function signature {} is not found", key_str);
return nullptr;
return iter->second()->build(arguments, return_type);
}

private:
FunctionCreators function_creators;
FunctionIsVariadic function_variadic_set;
std::unordered_map<std::string, std::string> function_alias;
/// @TEMPORARY: for be_exec_version=2. replace function to old version.
std::unordered_map<std::string, std::string> function_to_replace;

template <typename Function>
static FunctionBuilderPtr createDefaultFunction() {
return std::make_shared<DefaultFunctionBuilder>(Function::create());
}

/// @TEMPORARY: for be_exec_version=2
void temporary_function_update(int fe_version_now, std::string& name) {
// replace if fe is old version.
if (fe_version_now < DATETIME_FUNCTION_NEW &&
function_to_replace.find(name) != function_to_replace.end()) {
name = function_to_replace[name];
}
}

public:
static SimpleFunctionFactory& instance() {
static std::once_flag oc;
Expand Down
Loading

0 comments on commit 38133dc

Please sign in to comment.