Skip to content

Commit

Permalink
Merge branch 'master' into fix-minmax-rf
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly authored Oct 22, 2023
2 parents 6e810fa + fbc4485 commit c92530f
Show file tree
Hide file tree
Showing 198 changed files with 3,238 additions and 5,411 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ jobs:
run: |
git clone https://github.com/apache/doris-website.git website
cd website
echo "[\"current\"]" > versions.json
mkdir -p docs
cp -R ../docs/en/docs/* docs/
cp -R ../docs/sidebars.json sidebars.json
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ be/tags
be/test/olap/test_data/tablet_meta_test.hdr
be/.devcontainer/
be/src/apache-orc/
zoneinfo/

## tools
tools/ssb-tools/ssb-data/
Expand All @@ -121,3 +122,6 @@ lru_cache_test
/conf/log4j2-spring.xml
/fe/fe-core/src/test/resources/real-help-resource.zip
/ui/dist

# other
compile_commands.json
18 changes: 18 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/Status_types.h>
Expand Down Expand Up @@ -49,6 +50,7 @@
#include "gutil/strings/numbers.h"
#include "gutil/strings/substitute.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "io/fs/s3_file_system.h"
Expand Down Expand Up @@ -1203,6 +1205,22 @@ void TaskWorkerPool::_push_storage_policy_worker_thread_callback() {
.tag("s3_conf", s3_conf.to_string());
put_storage_resource(resource.id, {std::move(fs), resource.version});
}
} else if (resource.__isset.hdfs_storage_param) {
Status st;
std::shared_ptr<io::HdfsFileSystem> fs;
if (existed_resource.fs == nullptr) {
st = io::HdfsFileSystem::create(resource.hdfs_storage_param, "", nullptr, &fs);
} else {
fs = std::static_pointer_cast<io::HdfsFileSystem>(existed_resource.fs);
}
if (!st.ok()) {
LOG(WARNING) << "update hdfs resource failed: " << st;
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", resource.id)
.tag("resource_name", resource.name);
put_storage_resource(resource.id, {std::move(fs), resource.version});
}
} else {
LOG(WARNING) << "unknown resource=" << resource;
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,7 @@ DEFINE_Int32(group_commit_sync_wal_batch, "10");

// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");
DEFINE_mInt32(group_commit_interval_seconds, "10");
DEFINE_mInt32(group_commit_interval_ms, "10000");

DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");
Expand All @@ -1114,6 +1114,9 @@ DEFINE_Bool(enable_cpu_hard_limit, "false");

DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");

// Dir of default timezone files
DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
5 changes: 4 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ DECLARE_Int32(group_commit_sync_wal_batch);

// This config can be set to limit thread number in group commit insert thread pool.
DECLARE_mInt32(group_commit_insert_threads);
DECLARE_mInt32(group_commit_interval_seconds);
DECLARE_mInt32(group_commit_interval_ms);

// The configuration item is used to lower the priority of the scanner thread,
// typically employed to ensure CPU scheduling for write operations.
Expand All @@ -1184,6 +1184,9 @@ DECLARE_Bool(enable_flush_file_cache_async);
// Remove predicate that is always true for a segment.
DECLARE_Bool(ignore_always_true_predicate_for_segment);

// Dir of default timezone files
DECLARE_String(default_tzfiles_path);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
17 changes: 16 additions & 1 deletion be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "common/config.h"
#include "vec/sink/async_writer_sink.h"
#include "vec/sink/group_commit_block_sink.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vmemory_scratch_sink.h"
Expand Down Expand Up @@ -163,6 +164,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status));
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
return Status::NotSupported("MULTI_CAST_DATA_STREAM_SINK only support in pipeline engine");
}
Expand Down Expand Up @@ -296,7 +304,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
_has_inverted_index(thrift_sink.olap_table_sink)) {
!_has_inverted_index(thrift_sink.olap_table_sink)) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
Expand All @@ -319,6 +327,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status));
RETURN_IF_ERROR(status);
break;
}
default: {
std::stringstream error_msg;
std::map<int, const char*>::const_iterator i =
Expand Down
28 changes: 11 additions & 17 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ Status get_int_value(const rapidjson::Value& col, PrimitiveType type, void* slot
template <typename T, typename RT>
Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool is_date_str,
RT* slot, const cctz::time_zone& time_zone) {
constexpr bool is_datetime_v1 = std::is_same_v<T, vectorized::VecDateTimeValue>;
constexpr bool is_datetime_v1 = std::is_same_v<T, VecDateTimeValue>;
T dt_val;
if (is_date_str) {
const std::string str_date = col.GetString();
Expand Down Expand Up @@ -656,18 +656,16 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,

case TYPE_DATE:
case TYPE_DATETIME:
RETURN_IF_ERROR((fill_date_int<vectorized::VecDateTimeValue, int64_t>(
col, type, pure_doc_value, col_ptr, time_zone)));
RETURN_IF_ERROR((fill_date_int<VecDateTimeValue, int64_t>(col, type, pure_doc_value,
col_ptr, time_zone)));
break;
case TYPE_DATEV2:
RETURN_IF_ERROR(
(fill_date_int<vectorized::DateV2Value<vectorized::DateV2ValueType>, uint32_t>(
col, type, pure_doc_value, col_ptr, time_zone)));
RETURN_IF_ERROR((fill_date_int<DateV2Value<DateV2ValueType>, uint32_t>(
col, type, pure_doc_value, col_ptr, time_zone)));
break;
case TYPE_DATETIMEV2: {
RETURN_IF_ERROR(
(fill_date_int<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>,
uint64_t>(col, type, pure_doc_value, col_ptr, time_zone)));
RETURN_IF_ERROR((fill_date_int<DateV2Value<DateTimeV2ValueType>, uint64_t>(
col, type, pure_doc_value, col_ptr, time_zone)));
break;
}
case TYPE_ARRAY: {
Expand Down Expand Up @@ -773,19 +771,15 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
// No need to support date and datetime types.
case TYPE_DATEV2: {
uint32_t data;
RETURN_IF_ERROR(
(get_date_int<vectorized::DateV2Value<vectorized::DateV2ValueType>,
uint32_t>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
RETURN_IF_ERROR((get_date_int<DateV2Value<DateV2ValueType>, uint32_t>(
sub_col, sub_type, pure_doc_value, &data, time_zone)));
array.push_back(data);
break;
}
case TYPE_DATETIMEV2: {
uint64_t data;
RETURN_IF_ERROR(
(get_date_int<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>,
uint64_t>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
RETURN_IF_ERROR((get_date_int<DateV2Value<DateTimeV2ValueType>, uint64_t>(
sub_col, sub_type, pure_doc_value, &data, time_zone)));
array.push_back(data);
break;
}
Expand Down
6 changes: 2 additions & 4 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "exec/olap_utils.h"
#include "olap/olap_common.h"
#include "olap/olap_tuple.h"
#include "runtime/datetime_value.h"
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "runtime/type_limit.h"
Expand All @@ -60,9 +59,8 @@ std::string cast_to_string(T value, int scale) {
} else if constexpr (primitive_type == TYPE_LARGEINT) {
return vectorized::int128_to_string(value);
} else if constexpr (primitive_type == TYPE_DATETIMEV2) {
doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType> datetimev2_val =
static_cast<doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType>>(
value);
DateV2Value<DateTimeV2ValueType> datetimev2_val =
static_cast<DateV2Value<DateTimeV2ValueType>>(value);
char buf[30];
datetimev2_val.to_string(buf);
std::stringstream ss;
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/olap_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

#include "common/logging.h"
#include "olap/olap_tuple.h"
#include "runtime/datetime_value.h"
#include "runtime/primitive_type.h"

namespace doris {
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/schema_scanner/schema_tables_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
{ static_cast<void>(fill_dest_column_for_range(block, 13, null_datas)); }
// creation_time
{
vectorized::VecDateTimeValue srcs[table_num];
VecDateTimeValue srcs[table_num];
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.create_time) {
Expand All @@ -274,7 +274,7 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
}
// update_time
{
vectorized::VecDateTimeValue srcs[table_num];
VecDateTimeValue srcs[table_num];
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.update_time) {
Expand All @@ -293,7 +293,7 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
}
// check_time
{
vectorized::VecDateTimeValue srcs[table_num];
VecDateTimeValue srcs[table_num];
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.last_check_time) {
Expand Down
17 changes: 6 additions & 11 deletions be/src/exec/table_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_p
fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const double*>(item));
break;
case TYPE_DATE: {
vectorized::VecDateTimeValue value =
binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(*(int64_t*)item);
VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item);

char buf[64];
char* pos = value.to_string(buf);
Expand All @@ -167,8 +166,7 @@ Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_p
break;
}
case TYPE_DATETIME: {
vectorized::VecDateTimeValue value =
binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(*(int64_t*)item);
VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item);

char buf[64];
char* pos = value.to_string(buf);
Expand All @@ -177,9 +175,8 @@ Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_p
break;
}
case TYPE_DATEV2: {
vectorized::DateV2Value<vectorized::DateV2ValueType> value =
binary_cast<uint32_t, doris::vectorized::DateV2Value<vectorized::DateV2ValueType>>(
*(int32_t*)item);
DateV2Value<DateV2ValueType> value =
binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(int32_t*)item);

char buf[64];
char* pos = value.to_string(buf);
Expand All @@ -188,10 +185,8 @@ Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_p
break;
}
case TYPE_DATETIMEV2: {
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> value =
binary_cast<uint64_t,
doris::vectorized::DateV2Value<vectorized::DateTimeV2ValueType>>(
*(int64_t*)item);
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(int64_t*)item);

char buf[64];
char* pos = value.to_string(buf, type.scale);
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr,
switch (t_expr.node_type) {
case TExprNodeType::DATE_LITERAL: {
if (TypeDescriptor::from_thrift(t_expr.type).is_date_v2_type()) {
vectorized::DateV2Value<doris::vectorized::DateV2ValueType> dt;
DateV2Value<DateV2ValueType> dt;
if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
t_expr.date_literal.value.size())) {
std::stringstream ss;
Expand All @@ -498,7 +498,7 @@ Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr,
}
column->insert_data(reinterpret_cast<const char*>(&dt), 0);
} else if (TypeDescriptor::from_thrift(t_expr.type).is_datetime_v2_type()) {
vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType> dt;
DateV2Value<DateTimeV2ValueType> dt;
if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
t_expr.date_literal.value.size())) {
std::stringstream ss;
Expand All @@ -507,7 +507,7 @@ Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr,
}
column->insert_data(reinterpret_cast<const char*>(&dt), 0);
} else {
vectorized::VecDateTimeValue dt;
VecDateTimeValue dt;
if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
t_expr.date_literal.value.size())) {
std::stringstream ss;
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/text_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ bool TextConverter::_write_data(const TypeDescriptor& type_desc,
break;
}
case TYPE_DATE: {
vectorized::VecDateTimeValue ts_slot;
VecDateTimeValue ts_slot;
if (!ts_slot.from_date_str(data, len)) {
parse_result = StringParser::PARSE_FAILURE;
break;
Expand All @@ -205,7 +205,7 @@ bool TextConverter::_write_data(const TypeDescriptor& type_desc,
break;
}
case TYPE_DATEV2: {
vectorized::DateV2Value<vectorized::DateV2ValueType> ts_slot;
DateV2Value<DateV2ValueType> ts_slot;
if (!ts_slot.from_date_str(data, len)) {
parse_result = StringParser::PARSE_FAILURE;
break;
Expand All @@ -217,7 +217,7 @@ bool TextConverter::_write_data(const TypeDescriptor& type_desc,
break;
}
case TYPE_DATETIME: {
vectorized::VecDateTimeValue ts_slot;
VecDateTimeValue ts_slot;
if (!ts_slot.from_date_str(data, len)) {
parse_result = StringParser::PARSE_FAILURE;
break;
Expand All @@ -229,7 +229,7 @@ bool TextConverter::_write_data(const TypeDescriptor& type_desc,
break;
}
case TYPE_DATETIMEV2: {
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> ts_slot;
DateV2Value<DateTimeV2ValueType> ts_slot;
if (!ts_slot.from_date_str(data, len)) {
parse_result = StringParser::PARSE_FAILURE;
break;
Expand Down
12 changes: 6 additions & 6 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,26 +416,26 @@ struct FixedStringFindOp : public StringFindOp {
}
};

struct DateTimeFindOp : public CommonFindOp<vectorized::VecDateTimeValue> {
struct DateTimeFindOp : public CommonFindOp<VecDateTimeValue> {
bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
vectorized::VecDateTimeValue value;
VecDateTimeValue value;
value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data));
return bloom_filter.test(Slice((char*)&value, sizeof(vectorized::VecDateTimeValue)));
return bloom_filter.test(Slice((char*)&value, sizeof(VecDateTimeValue)));
}
};

// avoid violating C/C++ aliasing rules.
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=101684

struct DateFindOp : public CommonFindOp<vectorized::VecDateTimeValue> {
struct DateFindOp : public CommonFindOp<VecDateTimeValue> {
bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
uint24_t date = *static_cast<const uint24_t*>(data);
uint64_t value = uint32_t(date);

vectorized::VecDateTimeValue date_value;
VecDateTimeValue date_value;
date_value.from_olap_date(value);

return bloom_filter.test(Slice((char*)&date_value, sizeof(vectorized::VecDateTimeValue)));
return bloom_filter.test(Slice((char*)&date_value, sizeof(VecDateTimeValue)));
}
};

Expand Down
Loading

0 comments on commit c92530f

Please sign in to comment.