Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
zclllyybb committed Dec 21, 2023
1 parent d6217c0 commit dd1e9e0
Show file tree
Hide file tree
Showing 23 changed files with 80 additions and 102 deletions.
4 changes: 2 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateR
// check local disk capacity
int64_t tablet_size = tablet->tablet_local_size();
if ((*dest_store)->reach_capacity_limit(tablet_size)) {
return Status::InternalError("reach the capacity limit of path {}, tablet_size={}",
(*dest_store)->path(), tablet_size);
return Status::Error<EXCEEDED_LIMIT>("reach the capacity limit of path {}, tablet_size={}",
(*dest_store)->path(), tablet_size);
}
return Status::OK();
}
Expand Down
23 changes: 3 additions & 20 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ DEFINE_mInt32(hash_table_double_grow_degree, "31");
DEFINE_mInt32(max_fill_rate, "2");

DEFINE_mInt32(double_resize_threshold, "23");
// Expand the hash table before inserting data, the maximum expansion size.
// There are fewer duplicate keys, reducing the number of resize hash tables
// There are many duplicate keys, and the hash table filled bucket is far less than the hash table build bucket.
DEFINE_mInt64(hash_table_pre_expanse_max_rows, "65535");

// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 1.6G,
// actual low water mark=min(1.6G, MemTotal * 10%), avoid wasting too much memory on machines
Expand Down Expand Up @@ -848,16 +844,6 @@ DEFINE_String(function_service_protocol, "h2:grpc");
// use which load balancer to select server to connect
DEFINE_String(rpc_load_balancer, "rr");

// The maximum buffer/queue size to collect span. After the size is reached, spans are dropped.
// An export will be triggered when the number of spans in the queue reaches half of the maximum.
DEFINE_Int32(max_span_queue_size, "2048");

// The maximum batch size of every export spans. It must be smaller or equal to max_queue_size.
DEFINE_Int32(max_span_export_batch_size, "512");

// The time interval between two consecutive export spans.
DEFINE_Int32(export_span_schedule_delay_millis, "500");

// a soft limit of string type length, the hard limit is 2GB - 4, but if too long will cause very low performance,
// so we set a soft limit, default is 1MB
DEFINE_mInt32(string_type_length_soft_limit_bytes, "1048576");
Expand All @@ -870,10 +856,6 @@ DEFINE_mInt32(jsonb_type_length_soft_limit_bytes, "1048576");
DEFINE_Validator(jsonb_type_length_soft_limit_bytes,
[](const int config) -> bool { return config > 0 && config <= 2147483643; });

// used for olap scanner to save memory, when the size of unused_object_pool
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
DEFINE_Int32(object_pool_buffer_size, "100");

// Threshold of reading a small file into memory
DEFINE_mInt32(in_memory_file_size, "1048576"); // 1MB

Expand Down Expand Up @@ -975,8 +957,6 @@ DEFINE_Bool(enable_debug_points, "false");
DEFINE_Int32(pipeline_executor_size, "0");
DEFINE_Bool(enable_workload_group_for_scan, "false");
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
// 128 MB
DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728");

// Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node.
// Will remove after fully test.
Expand Down Expand Up @@ -1157,6 +1137,9 @@ DEFINE_Bool(enable_snapshot_action, "false");

DEFINE_mInt32(variant_max_merged_tablet_schema_size, "2048");

// 128 MB
DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
19 changes: 0 additions & 19 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ DECLARE_mInt32(max_fill_rate);

DECLARE_mInt32(double_resize_threshold);

// Expand the hash table before inserting data, the maximum expansion size.
// There are fewer duplicate keys, reducing the number of resize hash tables
// There are many duplicate keys, and the hash table filled bucket is far less than the hash table build bucket.
DECLARE_mInt64(hash_table_pre_expanse_max_rows);

// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 1.6G,
// actual low water mark=min(1.6G, MemTotal * 10%), avoid wasting too much memory on machines
// with large memory larger than 16G.
Expand Down Expand Up @@ -911,26 +906,12 @@ DECLARE_String(function_service_protocol);
// use which load balancer to select server to connect
DECLARE_String(rpc_load_balancer);

// The maximum buffer/queue size to collect span. After the size is reached, spans are dropped.
// An export will be triggered when the number of spans in the queue reaches half of the maximum.
DECLARE_Int32(max_span_queue_size);

// The maximum batch size of every export spans. It must be smaller or equal to max_queue_size.
DECLARE_Int32(max_span_export_batch_size);

// The time interval between two consecutive export spans.
DECLARE_Int32(export_span_schedule_delay_millis);

// a soft limit of string type length, the hard limit is 2GB - 4, but if too long will cause very low performance,
// so we set a soft limit, default is 1MB
DECLARE_mInt32(string_type_length_soft_limit_bytes);

DECLARE_mInt32(jsonb_type_length_soft_limit_bytes);

// used for olap scanner to save memory, when the size of unused_object_pool
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
DECLARE_Int32(object_pool_buffer_size);

// Threshold fo reading a small file into memory
DECLARE_mInt32(in_memory_file_size);

Expand Down
3 changes: 2 additions & 1 deletion be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
#include <gen_cpp/Status_types.h> // for TStatus
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <stdint.h>

#include <cstdint>
#include <iostream>
#include <memory>
#include <string>
Expand Down Expand Up @@ -93,6 +93,7 @@ namespace ErrorCode {
E(VERSION_NOT_EXIST, -214, false); \
E(TABLE_NOT_FOUND, -215, true); \
E(TRY_LOCK_FAILED, -216, false); \
E(EXCEEDED_LIMIT, -217, false); \
E(OUT_OF_BOUND, -218, false); \
E(INVALID_ROOT_PATH, -222, true); \
E(NO_AVAILABLE_ROOT_PATH, -223, true); \
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ int HttpStreamAction::on_header(HttpRequest* req) {
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< config::wal_max_disk_size * 0.8
<< " Bytes). Please set this load to \"group commit\"=false.";
st = Status::InternalError("Http load size too large.");
st = Status::Error<EXCEEDED_LIMIT>("Http load size too large.");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< config::wal_max_disk_size * 0.8
<< " Bytes). Please set this load to \"group commit\"=false.";
st = Status::InternalError("Stream load size too large.");
st = Status::Error<EXCEEDED_LIMIT>("Stream load size too large.");
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/http/action/tablet_migration_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <string>

#include "common/config.h"
#include "common/status.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
Expand Down Expand Up @@ -209,7 +210,9 @@ Status TabletMigrationAction::_check_migrate_request(int64_t tablet_id, int32_t
if ((*dest_store)->reach_capacity_limit(tablet_size)) {
LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path()
<< ", tablet size: " << tablet_size;
return Status::InternalError("Insufficient disk capacity");
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"reach the capacity limit of path {}, tablet_size={}", (*dest_store)->path(),
tablet_size);
}

return Status::OK();
Expand Down
31 changes: 14 additions & 17 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
#include <CLucene/search/IndexSearcher.h>
#include <CLucene/util/bkd/bkd_reader.h>
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <string.h>
#include <sys/resource.h>

#include <cerrno> // IWYU pragma: keep
#include <cstring>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <iostream>
#include <memory>

#include "common/logging.h"
#include "olap/olap_common.h"
Expand All @@ -38,8 +40,7 @@
#include "util/defer_op.h"
#include "util/runtime_profile.h"

namespace doris {
namespace segment_v2 {
namespace doris::segment_v2 {

Status FulltextIndexSearcherBuilder::build(DorisCompoundReader* directory,
OptionalIndexSearcherPtr& output_searcher) {
Expand Down Expand Up @@ -109,8 +110,7 @@ InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t

if (config::enable_inverted_index_cache_check_timestamp) {
auto get_last_visit_time = [](const void* value) -> int64_t {
InvertedIndexSearcherCache::CacheValue* cache_value =
(InvertedIndexSearcherCache::CacheValue*)value;
auto* cache_value = (InvertedIndexSearcherCache::CacheValue*)value;
return cache_value->last_visit_time;
};
_cache = std::unique_ptr<Cache>(
Expand Down Expand Up @@ -146,8 +146,7 @@ Status InvertedIndexSearcherCache::get_index_searcher(
cache_handle->owned = !use_cache;
IndexSearcherPtr index_searcher;
std::unique_ptr<IndexSearcherBuilder> index_builder = nullptr;
auto mem_tracker =
std::unique_ptr<MemTracker>(new MemTracker("InvertedIndexSearcherCacheWithRead"));
auto mem_tracker = std::make_unique<MemTracker>("InvertedIndexSearcherCacheWithRead");
#ifndef BE_TEST
{
bool exists = false;
Expand Down Expand Up @@ -280,7 +279,7 @@ Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs,
cache_value->index_searcher = std::move(index_searcher);
cache_value->size = mem_tracker->consumption();
cache_value->last_visit_time = UnixMillis();
auto lru_handle = _insert(cache_key, cache_value.release());
auto* lru_handle = _insert(cache_key, cache_value.release());
_cache->release(lru_handle);
return Status::OK();
}
Expand All @@ -300,7 +299,7 @@ int64_t InvertedIndexSearcherCache::mem_consumption() {

bool InvertedIndexSearcherCache::_lookup(const InvertedIndexSearcherCache::CacheKey& key,
InvertedIndexCacheHandle* handle) {
auto lru_handle = _cache->lookup(key.index_file_path);
auto* lru_handle = _cache->lookup(key.index_file_path);
if (lru_handle == nullptr) {
return false;
}
Expand All @@ -311,8 +310,7 @@ bool InvertedIndexSearcherCache::_lookup(const InvertedIndexSearcherCache::Cache
Cache::Handle* InvertedIndexSearcherCache::_insert(const InvertedIndexSearcherCache::CacheKey& key,
CacheValue* value) {
auto deleter = [](const doris::CacheKey& key, void* value) {
InvertedIndexSearcherCache::CacheValue* cache_value =
(InvertedIndexSearcherCache::CacheValue*)value;
auto* cache_value = (InvertedIndexSearcherCache::CacheValue*)value;
delete cache_value;
};

Expand All @@ -325,7 +323,7 @@ bool InvertedIndexQueryCache::lookup(const CacheKey& key, InvertedIndexQueryCach
if (key.encode().empty()) {
return false;
}
auto lru_handle = _cache->lookup(key.encode());
auto* lru_handle = _cache->lookup(key.encode());
if (lru_handle == nullptr) {
return false;
}
Expand All @@ -348,8 +346,8 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptr<roarin
return;
}

auto lru_handle = _cache->insert(key.encode(), (void*)cache_value_ptr.release(),
bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL);
auto* lru_handle = _cache->insert(key.encode(), (void*)cache_value_ptr.release(),
bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL);
*handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle);
}

Expand All @@ -360,5 +358,4 @@ int64_t InvertedIndexQueryCache::mem_consumption() {
return 0L;
}

} // namespace segment_v2
} // namespace doris
} // namespace doris::segment_v2
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1005,8 +1005,8 @@ Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size
timer.start();
// check disk capacity
if (_data_dir != nullptr && _data_dir->reach_capacity_limit((int64_t)estimate_segment_size())) {
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit, path: {}",
_data_dir->path_hash(), _data_dir->path());
return Status::Error<EXCEEDED_LIMIT>("reach the capacity limit of path {}, tablet_size={}",
_data_dir->path(), estimate_segment_size());
}
// write data
RETURN_IF_ERROR(finalize_columns_data());
Expand Down
9 changes: 5 additions & 4 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,9 @@ Status VerticalSegmentWriter::write_batch() {
}
if (_data_dir != nullptr &&
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit.",
_data_dir->path_hash());
return Status::Error<EXCEEDED_LIMIT>(
"reach the capacity limit of path {}, tablet_size={}", _data_dir->path(),
_column_writers[cid]->estimate_buffer_size());
}
RETURN_IF_ERROR(_column_writers[cid]->finish());
RETURN_IF_ERROR(_column_writers[cid]->write_data());
Expand Down Expand Up @@ -874,8 +875,8 @@ Status VerticalSegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* in
// check disk capacity
if (_data_dir != nullptr &&
_data_dir->reach_capacity_limit((int64_t)_estimated_remaining_size())) {
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit.",
_data_dir->path_hash());
return Status::Error<EXCEEDED_LIMIT>("reach the capacity limit of path {}, tablet_size={}",
_data_dir->path(), _estimated_remaining_size());
}
_row_count = _num_rows_written;
_num_rows_written = 0;
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1339,10 +1339,10 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
Status SchemaChangeHandler::_init_column_mapping(ColumnMapping* column_mapping,
const TabletColumn& column_schema,
const std::string& value) {
column_mapping->default_value = WrapperField::create(column_schema);

if (column_mapping->default_value == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("column_mapping->default_value is nullptr");
if (auto field = WrapperField::create(column_schema); field.has_value()) {
column_mapping->default_value = field.value();
} else {
return field.error();
}

if (column_schema.is_nullable() && value.length() == 0) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,9 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir,
HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, get_file_size_cb));
// check disk capacity
if (data_dir->reach_capacity_limit(file_size)) {
return Status::InternalError("Disk reach capacity limit");
return Status::Error<EXCEEDED_LIMIT>(
"reach the capacity limit of path {}, file_size={}", data_dir->path(),
file_size);
}

total_file_size += file_size;
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,9 @@ Status StorageEngine::_check_file_descriptor_number() {
LOG(ERROR) << "File descriptor number is less than " << config::min_file_descriptor_number
<< ". Please use (ulimit -n) to set a value equal or greater than "
<< config::min_file_descriptor_number;
return Status::InternalError("file descriptors limit is too small");
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"file descriptors limit {} is small than {}", l.rlim_cur,
config::min_file_descriptor_number);
}
return Status::OK();
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/task/engine_batch_load_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ Status EngineBatchLoadTask::_init() {

// check disk capacity
if (_push_req.push_type == TPushType::LOAD_V2) {
if (tablet->data_dir()->reach_capacity_limit(_push_req.__isset.http_file_size)) {
return Status::IOError("Disk does not have enough capacity");
if (tablet->data_dir()->reach_capacity_limit(_push_req.http_file_size)) {
return Status::Error<EXCEEDED_LIMIT>(
"reach the capacity limit of path {}, file_size={}", tablet->data_dir()->path(),
_push_req.http_file_size);
}
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,9 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re
HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, get_file_size_cb));
// check disk capacity
if (data_dir->reach_capacity_limit(file_size)) {
return Status::InternalError("Disk reach capacity limit");
return Status::Error<EXCEEDED_LIMIT>(
"reach the capacity limit of path {}, file_size={}", data_dir->path(),
file_size);
}

total_file_size += file_size;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ struct FieldTypeTraits<FieldType::OLAP_FIELD_TYPE_STRING>
const int scale) {
size_t value_len = scan_key.length();
if (value_len > config::string_type_length_soft_limit_bytes) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"the len of value string is too long, len={}, max_len={}", value_len,
config::string_type_length_soft_limit_bytes);
}
Expand Down
Loading

0 comments on commit dd1e9e0

Please sign in to comment.