Skip to content

Commit

Permalink
Merge branch 'master' into fix-0807
Browse files Browse the repository at this point in the history
  • Loading branch information
felixwluo authored Aug 13, 2024
2 parents 868c22e + 66f6243 commit cdb162c
Show file tree
Hide file tree
Showing 236 changed files with 4,514 additions and 1,200 deletions.
2 changes: 1 addition & 1 deletion be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class BeExecVersionManager {
* d. change some agg function nullable property: PR #37215
* e. change variant serde to fix PR #38413
*/
constexpr inline int BeExecVersionManager::max_be_exec_version = 6;
constexpr inline int BeExecVersionManager::max_be_exec_version = 7;
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;

/// functional
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ DEFINE_mInt32(doris_scan_range_row_count, "524288");
DEFINE_mInt32(doris_scan_range_max_mb, "1024");
// max bytes number for single scan block, used in segmentv2
DEFINE_mInt32(doris_scan_block_max_mb, "67108864");
// size of scanner queue between scanner thread and compute thread
DEFINE_mInt32(doris_scanner_queue_size, "1024");
// single read execute fragment row number
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ DECLARE_mInt32(doris_scan_range_row_count);
DECLARE_mInt32(doris_scan_range_max_mb);
// max bytes number for single scan block, used in segmentv2
DECLARE_mInt32(doris_scan_block_max_mb);
// size of scanner queue between scanner thread and compute thread
DECLARE_mInt32(doris_scanner_queue_size);
// single read execute fragment row number
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
Expand Down
25 changes: 13 additions & 12 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ static const std::string ERROR_COL_DATA_IS_ARRAY =
static const std::string INVALID_NULL_VALUE =
"Invalid null value occurs: Non-null column `$0` contains NULL";

#define RETURN_ERROR_IF_COL_IS_ARRAY(col, type) \
#define RETURN_ERROR_IF_COL_IS_ARRAY(col, type, is_array) \
do { \
if (col.IsArray()) { \
if (col.IsArray() == is_array) { \
std::stringstream ss; \
ss << "Expected value of type: " << type_to_string(type) \
<< "; but found type: " << json_type_to_string(col.GetType()) \
Expand Down Expand Up @@ -167,7 +167,7 @@ Status get_int_value(const rapidjson::Value& col, PrimitiveType type, void* slot
return Status::OK();
}

RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);

StringParser::ParseResult result;
Expand Down Expand Up @@ -294,7 +294,7 @@ Status get_date_int(const rapidjson::Value& col, PrimitiveType type, bool pure_d
return get_date_value_int<T, RT>(col[0], type, false, slot, time_zone);
} else {
// this would happened just only when `enable_docvalue_scan = false`, and field has string format date from _source
RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
return get_date_value_int<T, RT>(col, type, true, slot, time_zone);
}
Expand Down Expand Up @@ -322,7 +322,7 @@ Status get_float_value(const rapidjson::Value& col, PrimitiveType type, void* sl
return Status::OK();
}

RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);

StringParser::ParseResult result;
Expand Down Expand Up @@ -351,7 +351,7 @@ Status insert_float_value(const rapidjson::Value& col, PrimitiveType type,
return Status::OK();
}

RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);

StringParser::ParseResult result;
Expand Down Expand Up @@ -390,7 +390,7 @@ Status insert_int_value(const rapidjson::Value& col, PrimitiveType type,
return Status::OK();
}

RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);

StringParser::ParseResult result;
Expand Down Expand Up @@ -543,7 +543,7 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
val = col[0].GetString();
}
} else {
RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
if (!col.IsString()) {
val = json_value_to_string(col);
} else {
Expand Down Expand Up @@ -623,7 +623,7 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,

const rapidjson::Value& str_col = is_nested_str ? col[0] : col;

RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);

const std::string& val = str_col.GetString();
size_t val_size = str_col.GetStringLength();
Expand All @@ -649,7 +649,7 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
val = col[0].GetString();
}
} else {
RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
if (!col.IsString()) {
val = json_value_to_string(col);
} else {
Expand Down Expand Up @@ -679,13 +679,14 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
case TYPE_ARRAY: {
vectorized::Array array;
const auto& sub_type = tuple_desc->slots()[i]->type().children[0].type;
for (auto& sub_col : col.GetArray()) {
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, false);
for (const auto& sub_col : col.GetArray()) {
switch (sub_type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
std::string val;
RETURN_ERROR_IF_COL_IS_ARRAY(sub_col, sub_type);
RETURN_ERROR_IF_COL_IS_ARRAY(sub_col, sub_type, true);
if (!sub_col.IsString()) {
val = json_value_to_string(sub_col);
} else {
Expand Down
9 changes: 7 additions & 2 deletions be/src/exprs/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <re2/re2.h>
#include <simdjson/error.h>
#include <simdjson/simdjson.h> // IWYU pragma: keep
#include <stdlib.h>

Expand Down Expand Up @@ -259,13 +260,17 @@ Status JsonFunctions::extract_from_object(simdjson::ondemand::object& obj,
const std::vector<JsonPath>& jsonpath,
simdjson::ondemand::value* value) noexcept {
// Return DataQualityError when it's a malformed json.
// Otherwise the path was not found, due to array out of bound or not exist
// Otherwise the path was not found, due to
// 1. array out of bound
// 2. not exist such field in object
// 3. the input type is not object but could be null or other types and lead to simdjson::INCORRECT_TYPE
#define HANDLE_SIMDJSON_ERROR(err, msg) \
do { \
const simdjson::error_code& _err = err; \
const std::string& _msg = msg; \
if (UNLIKELY(_err)) { \
if (_err == simdjson::NO_SUCH_FIELD || _err == simdjson::INDEX_OUT_OF_BOUNDS) { \
if (_err == simdjson::NO_SUCH_FIELD || _err == simdjson::INDEX_OUT_OF_BOUNDS || \
_err == simdjson::INCORRECT_TYPE) { \
return Status::NotFound<false>( \
fmt::format("Not found target filed, err: {}, msg: {}", \
simdjson::error_message(_err), _msg)); \
Expand Down
116 changes: 51 additions & 65 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,50 @@ Status S3FileWriter::close(bool non_block) {
return _st;
}

bool S3FileWriter::_complete_part_task_callback(Status s) {
bool ret = false;
if (!s.ok()) [[unlikely]] {
VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key
<< ", status: " << s.to_string();
std::unique_lock<std::mutex> _lck {_completed_lock};
_failed = true;
ret = true;
_st = std::move(s);
}
// After the signal, there is a scenario where the previous invocation of _wait_until_finish
// returns to the caller, and subsequently, the S3 file writer is destructed.
// This means that accessing _failed afterwards would result in a heap use after free vulnerability.
_countdown_event.signal();
return ret;
}

Status S3FileWriter::_build_upload_buffer() {
auto builder = FileBufferBuilder();
builder.set_type(BufferType::UPLOAD)
.set_upload_callback([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
})
.set_file_offset(_bytes_appended)
.set_sync_after_complete_task([this](auto&& PH1) {
return _complete_part_task_callback(std::forward<decltype(PH1)>(PH1));
})
.set_is_cancelled([this]() { return _failed.load(); });
if (_cache_builder != nullptr) {
// We would load the data into file cache asynchronously which indicates
// that this instance of S3FileWriter might have been destructed when we
// try to do writing into file cache, so we make the lambda capture the variable
// we need by value to extend their lifetime
builder.set_allocate_file_blocks_holder(
[builder = *_cache_builder, offset = _bytes_appended]() -> FileBlocksHolderPtr {
return builder.allocate_cache_holder(offset, config::s3_write_buffer_size);
});
}
RETURN_IF_ERROR(builder.build(&_pending_buf));
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
return Status::OK();
}

Status S3FileWriter::_close_impl() {
VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native();

Expand All @@ -165,35 +209,13 @@ Status S3FileWriter::_close_impl() {
}

if (_bytes_appended == 0) {
DCHECK(_cur_part_num == 1);
// No data written, but need to create an empty file
auto builder = FileBufferBuilder();
builder.set_type(BufferType::UPLOAD)
.set_upload_callback([this](UploadFileBuffer& buf) { _put_object(buf); })
.set_sync_after_complete_task([this](Status s) {
bool ret = false;
if (!s.ok()) [[unlikely]] {
VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key
<< ", status: " << s.to_string();
std::unique_lock<std::mutex> _lck {_completed_lock};
_failed = true;
ret = true;
this->_st = std::move(s);
}
// After the signal, there is a scenario where the previous invocation of _wait_until_finish
// returns to the caller, and subsequently, the S3 file writer is destructed.
// This means that accessing _failed afterwards would result in a heap use after free vulnerability.
_countdown_event.signal();
return ret;
})
.set_is_cancelled([this]() { return _failed.load(); });
RETURN_IF_ERROR(builder.build(&_pending_buf));
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
if (_used_by_s3_committer) {
buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
});
DCHECK(_cur_part_num == 1);
RETURN_IF_ERROR(_build_upload_buffer());
if (!_used_by_s3_committer) {
auto* pending_buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
pending_buf->set_upload_to_remote([this](UploadFileBuffer& buf) { _put_object(buf); });
} else {
RETURN_IF_ERROR(_create_multi_upload_request());
}
}
Expand Down Expand Up @@ -225,43 +247,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
return _st;
}
if (!_pending_buf) {
auto builder = FileBufferBuilder();
builder.set_type(BufferType::UPLOAD)
.set_upload_callback(
[part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
})
.set_file_offset(_bytes_appended)
.set_sync_after_complete_task([this, part_num = _cur_part_num](Status s) {
bool ret = false;
if (!s.ok()) [[unlikely]] {
VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key
<< ", load part " << part_num << ", st " << s;
std::unique_lock<std::mutex> _lck {_completed_lock};
_failed = true;
ret = true;
this->_st = std::move(s);
}
// After the signal, there is a scenario where the previous invocation of _wait_until_finish
// returns to the caller, and subsequently, the S3 file writer is destructed.
// This means that accessing _failed afterwards would result in a heap use after free vulnerability.
_countdown_event.signal();
return ret;
})
.set_is_cancelled([this]() { return _failed.load(); });
if (_cache_builder != nullptr) {
// We would load the data into file cache asynchronously which indicates
// that this instance of S3FileWriter might have been destructed when we
// try to do writing into file cache, so we make the lambda capture the variable
// we need by value to extend their lifetime
builder.set_allocate_file_blocks_holder(
[builder = *_cache_builder,
offset = _bytes_appended]() -> FileBlocksHolderPtr {
return builder.allocate_cache_holder(offset,
config::s3_write_buffer_size);
});
}
RETURN_IF_ERROR(builder.build(&_pending_buf));
RETURN_IF_ERROR(_build_upload_buffer());
}
// we need to make sure all parts except the last one to be 5MB or more
// and shouldn't be larger than buf
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/s3_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class S3FileWriter final : public FileWriter {
Status _set_upload_to_remote_less_than_buffer_size();
void _put_object(UploadFileBuffer& buf);
void _upload_one_part(int64_t part_num, UploadFileBuffer& buf);
bool _complete_part_task_callback(Status s);
Status _build_upload_buffer();

ObjectStoragePathOptions _obj_storage_path_opts;

Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/jsonb/serialize.h"
Expand Down Expand Up @@ -1030,7 +1031,8 @@ Status BaseTablet::generate_new_block_for_partial_update(
if (rs_column.has_default_value()) {
mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0);
} else if (rs_column.is_nullable()) {
assert_cast<vectorized::ColumnNullable*>(mutable_column.get())
assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(
mutable_column.get())
->insert_null_elements(1);
} else {
mutable_column->insert_default();
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/bloom_filter_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/predicate_column.h"
#include "vec/common/assert_cast.h"
#include "vec/exprs/vruntimefilter_wrapper.h"

namespace doris {
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,9 @@ Status Compaction::do_inverted_index_compaction() {
<< st;
return st;
}
for (const auto& writer : inverted_index_file_writers) {
writer->set_file_writer_opts(ctx.get_file_writer_options());
}
}

// use tmp file dir to store index files
Expand Down
8 changes: 1 addition & 7 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,13 +846,7 @@ Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) {

Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path,
io::FileWriterPtr& file_writer) {
io::FileWriterOptions opts {
.write_file_cache = _context.write_file_cache,
.is_cold_data = _context.is_hot_data,
.file_cache_expiration =
_context.file_cache_ttl_sec > 0 && _context.newest_write_timestamp > 0
? _context.newest_write_timestamp + _context.file_cache_ttl_sec
: 0};
io::FileWriterOptions opts = _context.get_file_writer_options();
Status st = _context.fs()->create_file(path, &file_writer, &opts);
if (!st.ok()) {
LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st;
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ struct RowsetWriterContext {
return *storage_resource->fs;
}
}

io::FileWriterOptions get_file_writer_options() const {
io::FileWriterOptions opts {
.write_file_cache = write_file_cache,
.is_cold_data = is_hot_data,
.file_cache_expiration = file_cache_ttl_sec > 0 && newest_write_timestamp > 0
? newest_write_timestamp + file_cache_ttl_sec
: 0};
return opts;
}
};

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ size_t InvertedIndexFileWriter::write_v1() {
ram_dir.close();

auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, idx_path.c_str());
out_dir->set_file_writer_opts(_opts);

auto* out = out_dir->createOutput(idx_name.c_str());
if (out == nullptr) {
Expand Down Expand Up @@ -348,6 +349,8 @@ size_t InvertedIndexFileWriter::write_v2() {
io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};

auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, index_path.parent_path().c_str());
out_dir->set_file_writer_opts(_opts);

std::unique_ptr<lucene::store::IndexOutput> compound_file_output;
// idx v2 writer != nullptr means memtable on sink node now
if (_idx_v2_writer != nullptr) {
Expand Down
Loading

0 comments on commit cdb162c

Please sign in to comment.