Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into topn-fix-2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaokang authored Oct 21, 2023
2 parents 1bb9744 + a5d5a8b commit 2433982
Show file tree
Hide file tree
Showing 35 changed files with 296 additions and 109 deletions.
7 changes: 5 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1528,14 +1528,17 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
std::map<TTabletId, TVersion> succ_tablets;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
std::map<TTableId, int64_t> table_id_to_num_delta_rows;
uint32_t retry_time = 0;
Status status;
bool is_task_timeout = false;
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
succ_tablets.clear();
error_tablet_ids.clear();
table_id_to_num_delta_rows.clear();
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
&succ_tablets, &discontinuous_version_tablets);
&succ_tablets, &discontinuous_version_tablets,
&table_id_to_num_delta_rows);
status = StorageEngine::instance()->execute_task(&engine_task);
if (status.ok()) {
break;
Expand Down Expand Up @@ -1620,7 +1623,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
finish_task_request.__set_succ_tablets(succ_tablets);
finish_task_request.__set_error_tablet_ids(
std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end()));

finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
Expand Down
12 changes: 6 additions & 6 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1311,15 +1311,15 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
} else if (desc->type == TRuntimeFilterType::MIN_MAX) {
if (desc->__isset.min_max_type) {
switch (desc->min_max_type) {
case TMinMaxRuntimeFilterType::MIN: {
case TMinMaxRuntimeFilterType::MIN:
_runtime_filter_type = RuntimeFilterType::MIN_FILTER;
}
case TMinMaxRuntimeFilterType::MAX: {
break;
case TMinMaxRuntimeFilterType::MAX:
_runtime_filter_type = RuntimeFilterType::MAX_FILTER;
}
case TMinMaxRuntimeFilterType::MIN_MAX: {
break;
case TMinMaxRuntimeFilterType::MIN_MAX:
_runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
}
break;
}
} else {
_runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
Expand Down
25 changes: 21 additions & 4 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <set>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <utility>

#include "common/logging.h"
Expand Down Expand Up @@ -72,11 +73,13 @@ void TabletPublishStatistics::record_in_bvar() {
EnginePublishVersionTask::EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req, std::set<TTabletId>* error_tablet_ids,
std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets)
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets,
std::map<TTableId, int64_t>* table_id_to_num_delta_rows)
: _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets) {}
_discontinuous_version_tablets(discontinuous_version_tablets),
_table_id_to_num_delta_rows(table_id_to_num_delta_rows) {}

void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
Expand All @@ -91,7 +94,7 @@ Status EnginePublishVersionTask::finish() {
std::unique_ptr<ThreadPoolToken> token =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);

std::unordered_map<int64_t, int64_t> tablet_id_to_num_delta_rows;
// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
int64_t partition_id = par_ver_info.partition_id;
Expand Down Expand Up @@ -188,6 +191,11 @@ Status EnginePublishVersionTask::finish() {
continue;
}
}

auto rowset_meta_ptr = rowset->rowset_meta();
tablet_id_to_num_delta_rows.insert(
{rowset_meta_ptr->tablet_id(), rowset_meta_ptr->num_rows()});

auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
Expand All @@ -204,7 +212,6 @@ Status EnginePublishVersionTask::finish() {
std::set<TabletInfo> partition_related_tablet_infos;
StorageEngine::instance()->tablet_manager()->get_partition_related_tablets(
partition_id, &partition_related_tablet_infos);

Version version(par_ver_info.version, par_ver_info.version);
for (auto& tablet_info : partition_related_tablet_infos) {
TabletSharedPtr tablet =
Expand Down Expand Up @@ -241,6 +248,7 @@ Status EnginePublishVersionTask::finish() {
}
}
}
_calculate_tbl_num_delta_rows(tablet_id_to_num_delta_rows);

if (!res.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
LOG(INFO) << "finish to publish version on transaction."
Expand All @@ -252,6 +260,15 @@ Status EnginePublishVersionTask::finish() {
return res;
}

void EnginePublishVersionTask::_calculate_tbl_num_delta_rows(
const std::unordered_map<int64_t, int64_t>& tablet_id_to_num_delta_rows) {
for (const auto& kv : tablet_id_to_num_delta_rows) {
auto table_id =
StorageEngine::instance()->tablet_manager()->get_tablet(kv.first)->get_table_id();
(*_table_id_to_num_delta_rows)[table_id] += kv.second;
}
}

TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task,
TabletSharedPtr tablet, RowsetSharedPtr rowset,
int64_t partition_id, int64_t transaction_id,
Expand Down
11 changes: 8 additions & 3 deletions be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,26 @@ class EnginePublishVersionTask : public EngineTask {
EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req,
std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets);
~EnginePublishVersionTask() {}
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets,
std::map<TTableId, int64_t>* table_id_to_num_delta_rows);
~EnginePublishVersionTask() override = default;

virtual Status finish() override;
Status finish() override;

void add_error_tablet_id(int64_t tablet_id);

int64_t finish_task();

private:
void _calculate_tbl_num_delta_rows(
const std::unordered_map<int64_t, int64_t>& tablet_id_to_num_delta_rows);

const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
std::set<TTabletId>* _error_tablet_ids;
std::map<TTabletId, TVersion>* _succ_tablets;
std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets;
std::map<TTableId, int64_t>* _table_id_to_num_delta_rows;
};

class AsyncTabletPublishTask {
Expand Down
17 changes: 12 additions & 5 deletions be/src/vec/aggregate_functions/aggregate_function_collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ struct AggregateFunctionCollectSetData {

void merge(const SelfType& rhs) {
if constexpr (HasLimit::value) {
DCHECK(max_size == -1 || max_size == rhs.max_size);
max_size = rhs.max_size;
if (max_size == -1) {
max_size = rhs.max_size;
}

for (auto& rhs_elem : rhs.data_set) {
if (size() >= max_size) {
Expand Down Expand Up @@ -131,7 +132,9 @@ struct AggregateFunctionCollectSetData<StringRef, HasLimit> {
void merge(const SelfType& rhs, Arena* arena) {
bool inserted;
Set::LookupResult it;
DCHECK(max_size == -1 || max_size == rhs.max_size);
if (max_size == -1) {
max_size = rhs.max_size;
}
max_size = rhs.max_size;

for (auto& rhs_elem : rhs.data_set) {
Expand Down Expand Up @@ -192,7 +195,9 @@ struct AggregateFunctionCollectListData {

void merge(const SelfType& rhs) {
if constexpr (HasLimit::value) {
DCHECK(max_size == -1 || max_size == rhs.max_size);
if (max_size == -1) {
max_size = rhs.max_size;
}
max_size = rhs.max_size;
for (auto& rhs_elem : rhs.data) {
if (size() >= max_size) {
Expand Down Expand Up @@ -244,7 +249,9 @@ struct AggregateFunctionCollectListData<StringRef, HasLimit> {

void merge(const AggregateFunctionCollectListData& rhs) {
if constexpr (HasLimit::value) {
DCHECK(max_size == -1 || max_size == rhs.max_size);
if (max_size == -1) {
max_size = rhs.max_size;
}
max_size = rhs.max_size;

data->insert_range_from(*rhs.data, 0,
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,11 +725,12 @@ Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_bloc
_probe_columns.resize(probe_expr_ctxs_sz);

std::vector<int> res_col_ids(probe_expr_ctxs_sz);
RETURN_IF_ERROR(
_do_evaluate(*input_block, _probe_expr_ctxs, *_probe_expr_call_timer, res_col_ids));
if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
_probe_column_convert_to_null = _convert_block_to_null(*input_block);
}
RETURN_IF_ERROR(
_do_evaluate(*input_block, _probe_expr_ctxs, *_probe_expr_call_timer, res_col_ids));

// TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc`
// so we have to initialize this flag by the first probe block.
if (!_has_set_need_null_map_for_probe) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/functions/function_quantile_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ class FunctionToQuantileState : public IFunction {
status = execute_internal<ColumnString, true>(column, data_type, column_result);
} else if (nested_which.is_int64()) {
status = execute_internal<ColumnInt64, true>(column, data_type, column_result);
} else if (which.is_float32()) {
} else if (nested_which.is_float32()) {
status = execute_internal<ColumnFloat32, true>(column, data_type, column_result);
} else if (which.is_float64()) {
} else if (nested_which.is_float64()) {
status = execute_internal<ColumnFloat64, true>(column, data_type, column_result);
} else {
return type_error();
Expand Down
2 changes: 1 addition & 1 deletion docker/thirdparties/run-thirdparties-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ STOP=0

if [[ "$#" == 1 ]]; then
# default
COMPONENTS="mysql,pg,oracle,sqlserver,clickhouse,hive,iceberg,kafka,hudi,trino"
COMPONENTS="mysql,es,hive,pg,oracle,sqlserver,clickhouse"
else
while true; do
case "$1" in
Expand Down
2 changes: 1 addition & 1 deletion docs/dev.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"version.label": {
"message": "dev",
"message": "2.0",
"description": "The label for version current"
},
"sidebar.docs.category.Getting Started": {
Expand Down
7 changes: 6 additions & 1 deletion docs/sidebars.json
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@
"sql-manual/sql-functions/date-time-functions/date_format",
"sql-manual/sql-functions/date-time-functions/datediff",
"sql-manual/sql-functions/date-time-functions/microseconds_add",
"sql-manual/sql-functions/date-time-functions/microseconds-diff",
"sql-manual/sql-functions/date-time-functions/microseconds-sub",
"sql-manual/sql-functions/date-time-functions/milliseconds-add",
"sql-manual/sql-functions/date-time-functions/milliseconds-diff",
"sql-manual/sql-functions/date-time-functions/milliseconds-sub",
"sql-manual/sql-functions/date-time-functions/minutes_add",
"sql-manual/sql-functions/date-time-functions/minutes_diff",
"sql-manual/sql-functions/date-time-functions/minutes_sub",
Expand Down Expand Up @@ -613,7 +618,7 @@
"sql-manual/sql-functions/json-functions/json_unquote",
"sql-manual/sql-functions/json-functions/json_valid",
"sql-manual/sql-functions/json-functions/json_contains",
"sql-manual/sql-functions/json-functions/json_length",
"sql-manual/sql-functions/json-functions/json-length",
"sql-manual/sql-functions/json-functions/get_json_double",
"sql-manual/sql-functions/json-functions/get_json_int",
"sql-manual/sql-functions/json-functions/get_json_string"
Expand Down
16 changes: 0 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,22 +312,6 @@ public void analyze(boolean isOlap) throws AnalysisException {
}
FeNameFormat.checkColumnName(name);

// When string type length is not assigned, it needs to be assigned to 1.
if (typeDef.getType().isScalarType()) {
final ScalarType targetType = (ScalarType) typeDef.getType();
if (targetType.getPrimitiveType().isStringType() && !targetType.isLengthSet()) {
if (targetType.getPrimitiveType() == PrimitiveType.VARCHAR) {
// always set varchar length MAX_VARCHAR_LENGTH
targetType.setLength(ScalarType.MAX_VARCHAR_LENGTH);
} else if (targetType.getPrimitiveType() == PrimitiveType.STRING) {
// always set text length MAX_STRING_LENGTH
targetType.setLength(ScalarType.MAX_STRING_LENGTH);
} else {
targetType.setLength(1);
}
}
}

typeDef.analyze(null);

Type type = typeDef.getType();
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/TypeDef.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ private void analyzeNestedType(Type parent, ScalarType child) throws AnalysisExc
private void analyzeScalarType(ScalarType scalarType)
throws AnalysisException {
PrimitiveType type = scalarType.getPrimitiveType();
// When string type length is not assigned, it needs to be assigned to 1.
if (scalarType.getPrimitiveType().isStringType() && !scalarType.isLengthSet()) {
if (scalarType.getPrimitiveType() == PrimitiveType.VARCHAR) {
// always set varchar length MAX_VARCHAR_LENGTH
scalarType.setLength(ScalarType.MAX_VARCHAR_LENGTH);
} else if (scalarType.getPrimitiveType() == PrimitiveType.STRING) {
// always set text length MAX_STRING_LENGTH
scalarType.setLength(ScalarType.MAX_STRING_LENGTH);
} else {
scalarType.setLength(1);
}
}
switch (type) {
case CHAR:
case VARCHAR: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class AggregateFunction extends Function {
FunctionSet.INTERSECT_COUNT, FunctionSet.ORTHOGONAL_BITMAP_UNION_COUNT,
FunctionSet.COUNT, "approx_count_distinct", "ndv", FunctionSet.BITMAP_UNION_INT,
FunctionSet.BITMAP_UNION_COUNT, "ndv_no_finalize", FunctionSet.WINDOW_FUNNEL, FunctionSet.RETENTION,
FunctionSet.SEQUENCE_MATCH, FunctionSet.SEQUENCE_COUNT, FunctionSet.MAP_AGG, FunctionSet.BITMAP_AGG);
FunctionSet.SEQUENCE_MATCH, FunctionSet.SEQUENCE_COUNT, FunctionSet.MAP_AGG, FunctionSet.BITMAP_AGG,
FunctionSet.COLLECT_LIST, FunctionSet.COLLECT_SET);

public static ImmutableSet<String> ALWAYS_NULLABLE_AGGREGATE_FUNCTION_NAME_SET =
ImmutableSet.of("stddev_samp", "variance_samp", "var_samp", "percentile_approx");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
// not remove the task from queue and be will retry
return;
}
if (request.isSetTableIdToDeltaNumRows()) {
publishVersionTask.setTableIdToDeltaNumRows(request.getTableIdToDeltaNumRows());
}
AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
publishVersionTask.getTaskType(),
publishVersionTask.getSignature());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.doris.nereids.trees.expressions.functions.BoundFunction;
import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.NullableAggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Array;
import org.apache.doris.nereids.trees.expressions.functions.scalar.ConnectionId;
import org.apache.doris.nereids.trees.expressions.functions.scalar.CurrentCatalog;
Expand Down Expand Up @@ -510,8 +509,10 @@ private boolean argsHasNullLiteral(Expression expression) {
}

private Optional<Expression> preProcess(Expression expression) {
if (expression instanceof PropagateNullable && !(expression instanceof NullableAggregateFunction)
&& argsHasNullLiteral(expression)) {
if (expression instanceof AggregateFunction) {
return Optional.of(expression);
}
if (expression instanceof PropagateNullable && argsHasNullLiteral(expression)) {
return Optional.of(new NullLiteral(expression.getDataType()));
}
if (!allArgsIsAllLiteral(expression)) {
Expand Down
Loading

0 comments on commit 2433982

Please sign in to comment.