diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index bfef46036b76dd..9e146058fbbc56 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -38,6 +38,8 @@ #include "http/http_status.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" +#include "olap/wal_manager.h" +#include "runtime/exec_env.h" #include "util/path_util.h" #include "util/url_coding.h" @@ -199,8 +201,7 @@ bool load_size_smaller_than_wal_limit(HttpRequest* req) { // these blocks within the limited space. So we need to set group_commit = false to avoid dead lock. if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) { size_t body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH)); - // TODO(Yukang): change it to WalManager::wal_limit - return !(body_bytes > config::wal_max_disk_size * 0.8); + return !(body_bytes > ExecEnv::GetInstance()->wal_mgr()->get_wal_limit() * 0.8); } else { return false; } diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index b213df68bbf055..a711c6154168cf 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -17,6 +17,7 @@ #include "olap/wal_manager.h" +#include #include #include @@ -31,6 +32,7 @@ #include "io/fs/local_file_system.h" #include "olap/wal_writer.h" #include "runtime/client_cache.h" +#include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/plan_fragment_executor.h" #include "runtime/stream_load/stream_load_context.h" @@ -41,8 +43,20 @@ namespace doris { WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) - : _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false) { + : _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false), _wal_limit(0) { doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); + CHECK_GE(_wal_dirs.size(), 1); + std::vector tmp_dirs; + for (const std::string& wal_dir : _wal_dirs) { + if (std::filesystem::path(wal_dir).is_absolute()) { + tmp_dirs.emplace_back(wal_dir); + } else { + for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) { + tmp_dirs.emplace_back(path.path + wal_dir); + } + } + } + _wal_dirs = tmp_dirs; _all_wal_disk_bytes = std::make_shared(0); _cv = std::make_shared(); } @@ -85,11 +99,14 @@ Status WalManager::init() { } Status WalManager::init_wal_limit() { - size_t available_bytes; - size_t disk_capacity_bytes; - // Get the root path available space. - RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info("./", &disk_capacity_bytes, - &available_bytes)); + size_t available_bytes =0; + for (const std::string& path : _wal_dirs) { + size_t disk_capacity_bytes; + size_t tmp_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(path, &disk_capacity_bytes, + &tmp_bytes)); + available_bytes+=tmp_bytes; + } bool is_percent = true; int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::wal_max_disk_limit, -1, available_bytes, &is_percent); @@ -97,7 +114,7 @@ Status WalManager::init_wal_limit() { return Status::InternalError( "wal_max_disk_limit config is wrong, please check your config!"); } - WalManager::wal_limit = wal_disk_limit; + _wal_limit = wal_disk_limit; return Status::OK(); } @@ -110,10 +127,14 @@ Status WalManager::init_wal_limit(size_t available_bytes) { return Status::InternalError( "wal_max_disk_limit config is wrong, please check your config!"); } - WalManager::wal_limit = wal_disk_limit; + _wal_limit = wal_disk_limit; return Status::OK(); } +size_t WalManager::get_wal_limit() const { + return _wal_limit; +} + void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) { std::lock_guard wrlock(_wal_status_lock); LOG(INFO) << "add wal queue " diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index ae1bc551952d4e..6a63d0bef92ebb 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -76,7 +76,7 @@ class WalManager { Status init_wal_limit(); // Just used for ut. Status init_wal_limit(size_t available_bytes); - inline static size_t wal_limit; + size_t get_wal_limit() const; private: ExecEnv* _exec_env = nullptr; @@ -94,5 +94,6 @@ class WalManager { std::atomic _stop; std::unordered_map&> _wal_column_id_map; std::shared_ptr _cv; + size_t _wal_limit; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp index 9e4d28c4eadc54..ff63d626264416 100644 --- a/be/src/olap/wal_writer.cpp +++ b/be/src/olap/wal_writer.cpp @@ -63,11 +63,13 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) { if (_is_first_append_blocks) { _is_first_append_blocks = false; std::unique_lock l(_mutex); - while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > WalManager::wal_limit) { + while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > + ExecEnv::GetInstance()->wal_mgr()->get_wal_limit()) { LOG(INFO) << "First time to append blocks to wal file " << _file_name << ". Currently, all wal disk space usage is " << _all_wal_disk_bytes->load(std::memory_order_relaxed) - << ", larger than the maximum limit " << WalManager::wal_limit + << ", larger than the maximum limit " + << ExecEnv::GetInstance()->wal_mgr()->get_wal_limit() << ", so we need to wait. When any other load finished, that wal will be " "removed, the space used by that wal will be free."; cv->wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));