Skip to content

Commit

Permalink
9
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Dec 5, 2023
1 parent f1b4bbe commit b120981
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 13 deletions.
5 changes: 3 additions & 2 deletions be/src/http/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include "http/http_status.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "olap/wal_manager.h"
#include "runtime/exec_env.h"
#include "util/path_util.h"
#include "util/url_coding.h"

Expand Down Expand Up @@ -199,8 +201,7 @@ bool load_size_smaller_than_wal_limit(HttpRequest* req) {
// these blocks within the limited space. So we need to set group_commit = false to avoid dead lock.
if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
// TODO(Yukang): change it to WalManager::wal_limit
return !(body_bytes > config::wal_max_disk_size * 0.8);
return !(body_bytes > ExecEnv::GetInstance()->wal_mgr()->get_wal_limit() * 0.8);
} else {
return false;
}
Expand Down
37 changes: 29 additions & 8 deletions be/src/olap/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "olap/wal_manager.h"

#include <glog/logging.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <atomic>
Expand All @@ -31,6 +32,7 @@
#include "io/fs/local_file_system.h"
#include "olap/wal_writer.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/stream_load/stream_load_context.h"
Expand All @@ -41,8 +43,20 @@

namespace doris {
WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
: _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false) {
: _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false), _wal_limit(0) {
doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs);
CHECK_GE(_wal_dirs.size(), 1);
std::vector<std::string> tmp_dirs;
for (const std::string& wal_dir : _wal_dirs) {
if (std::filesystem::path(wal_dir).is_absolute()) {
tmp_dirs.emplace_back(wal_dir);
} else {
for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) {
tmp_dirs.emplace_back(path.path + wal_dir);
}
}
}
_wal_dirs = tmp_dirs;
_all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0);
_cv = std::make_shared<std::condition_variable>();
}
Expand Down Expand Up @@ -85,19 +99,22 @@ Status WalManager::init() {
}

Status WalManager::init_wal_limit() {
size_t available_bytes;
size_t disk_capacity_bytes;
// Get the root path available space.
RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info("./", &disk_capacity_bytes,
&available_bytes));
size_t available_bytes =0;
for (const std::string& path : _wal_dirs) {
size_t disk_capacity_bytes;
size_t tmp_bytes;
RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(path, &disk_capacity_bytes,
&tmp_bytes));
available_bytes+=tmp_bytes;
}
bool is_percent = true;
int64_t wal_disk_limit =
ParseUtil::parse_mem_spec(config::wal_max_disk_limit, -1, available_bytes, &is_percent);
if (wal_disk_limit < 0 || wal_disk_limit > available_bytes) {
return Status::InternalError(
"wal_max_disk_limit config is wrong, please check your config!");
}
WalManager::wal_limit = wal_disk_limit;
_wal_limit = wal_disk_limit;
return Status::OK();
}

Expand All @@ -110,10 +127,14 @@ Status WalManager::init_wal_limit(size_t available_bytes) {
return Status::InternalError(
"wal_max_disk_limit config is wrong, please check your config!");
}
WalManager::wal_limit = wal_disk_limit;
_wal_limit = wal_disk_limit;
return Status::OK();
}

size_t WalManager::get_wal_limit() const {
return _wal_limit;
}

void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) {
std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
LOG(INFO) << "add wal queue "
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class WalManager {
Status init_wal_limit();
// Just used for ut.
Status init_wal_limit(size_t available_bytes);
inline static size_t wal_limit;
size_t get_wal_limit() const;

private:
ExecEnv* _exec_env = nullptr;
Expand All @@ -94,5 +94,6 @@ class WalManager {
std::atomic<bool> _stop;
std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
std::shared_ptr<std::condition_variable> _cv;
size_t _wal_limit;
};
} // namespace doris
6 changes: 4 additions & 2 deletions be/src/olap/wal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
if (_is_first_append_blocks) {
_is_first_append_blocks = false;
std::unique_lock l(_mutex);
while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > WalManager::wal_limit) {
while (_all_wal_disk_bytes->load(std::memory_order_relaxed) >
ExecEnv::GetInstance()->wal_mgr()->get_wal_limit()) {
LOG(INFO) << "First time to append blocks to wal file " << _file_name
<< ". Currently, all wal disk space usage is "
<< _all_wal_disk_bytes->load(std::memory_order_relaxed)
<< ", larger than the maximum limit " << WalManager::wal_limit
<< ", larger than the maximum limit "
<< ExecEnv::GetInstance()->wal_mgr()->get_wal_limit()
<< ", so we need to wait. When any other load finished, that wal will be "
"removed, the space used by that wal will be free.";
cv->wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
Expand Down

0 comments on commit b120981

Please sign in to comment.