Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the disk usage to avoid running out of disk capacity #1702

Merged
merged 8 commits into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/agent/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ enum AgentStatus {
DORIS_PUSH_HAD_LOADED = -504,
DORIS_TIMEOUT = -901,
DORIS_INTERNAL_ERROR = -902,
DORIS_DISK_REACH_CAPACITY_LIMIT = -903,
};
} // namespace doris
#endif // DORIS_BE_SRC_AGENT_STATUS_H
20 changes: 17 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,21 +433,35 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) {
vector<string> error_msgs;
TStatus task_status;

OLAPStatus create_status =
worker_pool_this->_env->storage_engine()->create_tablet(create_tablet_req);
std::vector<TTabletInfo> finish_tablet_infos;
OLAPStatus create_status = worker_pool_this->_env->storage_engine()->create_tablet(create_tablet_req);
if (create_status != OLAPStatus::OLAP_SUCCESS) {
OLAP_LOG_WARNING("create table failed. status: %d, signature: %ld",
create_status, agent_task_req.signature);
// TODO liutao09 distinguish the OLAPStatus
status_code = TStatusCode::RUNTIME_ERROR;
} else {
++_s_report_version;
// get path hash of the created tablet
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
create_tablet_req.tablet_id, create_tablet_req.tablet_schema.schema_hash);
DCHECK(tablet != nullptr);
TTabletInfo tablet_info;
tablet_info.tablet_id = tablet->table_id();
tablet_info.schema_hash = tablet->schema_hash();
tablet_info.version = create_tablet_req.version;
tablet_info.version_hash = create_tablet_req.version_hash;
tablet_info.row_count = 0;
tablet_info.data_size = 0;
tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
finish_tablet_infos.push_back(tablet_info);
}

task_status.__set_status_code(status_code);
task_status.__set_error_msgs(error_msgs);

TFinishTaskRequest finish_task_request;
finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
finish_task_request.__set_backend(worker_pool_this->_backend);
finish_task_request.__set_report_version(_s_report_version);
finish_task_request.__set_task_type(agent_task_req.task_type);
Expand Down Expand Up @@ -1252,7 +1266,7 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
}
#endif
vector<DataDirInfo> data_dir_infos;
worker_pool_this->_env->storage_engine()->get_all_data_dir_info(&data_dir_infos);
worker_pool_this->_env->storage_engine()->get_all_data_dir_info(&data_dir_infos, true /* update */);

map<string, TDisk> disks;
for (auto& root_path_info : data_dir_infos) {
Expand Down
10 changes: 8 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,11 @@ namespace config {
// inc_rowset expired interval
CONF_Int32(inc_rowset_expired_sec, "1800");
// garbage sweep policy
CONF_Int32(max_garbage_sweep_interval, "14400");
CONF_Int32(max_garbage_sweep_interval, "3600");
CONF_Int32(min_garbage_sweep_interval, "180");
CONF_Int32(snapshot_expire_time_sec, "172800");
// 仅仅是建议值,当磁盘空间不足时,trash下的文件保存期可不遵守这个参数
CONF_Int32(trash_file_expire_time_sec, "259200");
CONF_Int32(disk_capacity_insufficient_percentage, "90");
// check row nums for BE/CE and schema change. true is open, false is closed.
CONF_Bool(row_nums_check, "true")
//file descriptors cache, by default, cache 30720 descriptors
Expand Down Expand Up @@ -439,6 +438,13 @@ namespace config {
CONF_Int32(path_gc_check_step, "1000");
CONF_Int32(path_gc_check_step_interval_ms, "10");
CONF_Int32(path_scan_interval_second, "86400");

// The following 2 configs limit the max usage of disk capacity of a data dir.
// If both of these 2 threshold reached, no more data can be writen into that data dir.
// The percent of max used capacity of a data dir
CONF_Int32(storage_flood_stage_usage_percent, "95"); // 95%
// The min bytes that should be left of a data dir
CONF_Int64(storage_flood_stage_left_capacity_bytes, "1073741824") // 1GB
} // namespace config

} // namespace doris
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

namespace doris {

class DataDir;
class Merger;

// This class is a base class for compaction.
Expand Down
40 changes: 36 additions & 4 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes,
TabletManager* tablet_manager, TxnManager* txn_manager)
: _path(path),
_capacity_bytes(capacity_bytes),
_available_bytes(0),
_disk_capacity_bytes(0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_capacity_bytes and _disk_capacity_bytes are redundant ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, _capacity_bytes is user specified disk capacity, but it currently not used.
_disk_capacity_bytes is the capacity of the disk

_is_used(false),
_tablet_manager(tablet_manager),
_txn_manager(txn_manager),
_cluster_id(-1),
_available_bytes(0),
_used_bytes(0),
_current_shard(0),
_is_used(false),
_to_be_deleted(false),
_current_shard(0),
_test_file_read_buf(nullptr),
_test_file_write_buf(nullptr),
_meta(nullptr) {
Expand Down Expand Up @@ -100,6 +100,7 @@ Status DataDir::init() {
return Status::InternalError("invalid root path: ");
}

RETURN_IF_ERROR(update_capacity());
RETURN_IF_ERROR(_init_cluster_id());
RETURN_IF_ERROR(_init_extension_and_capacity());
RETURN_IF_ERROR(_init_file_system());
Expand Down Expand Up @@ -1057,4 +1058,35 @@ void DataDir::_remove_check_paths_no_lock(const std::set<std::string>& paths) {
}
}

Status DataDir::update_capacity() {
try {
boost::filesystem::path path_name(_path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::filesystem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++14 not support std::filesystem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++14 not support std::filesystem

boost::filesystem::space_info path_info = boost::filesystem::space(path_name);
_available_bytes = path_info.available;
if (_disk_capacity_bytes == 0) {
// disk capacity only need to be set once
_disk_capacity_bytes = path_info.capacity;
}
} catch (boost::filesystem::filesystem_error& e) {
LOG(WARNING) << "get space info failed. path: " << _path << " erro:" << e.what();
return Status::InternalError("get path available capacity failed");
}
LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes
<< ", available capacity: " << _available_bytes;

return Status::OK();
}

bool DataDir::reach_capacity_limit(int64_t incoming_data_size) {
double used_pct = (_available_bytes + incoming_data_size) / (double) _disk_capacity_bytes;
int64_t left_bytes = _disk_capacity_bytes - _available_bytes - incoming_data_size;

if (used_pct >= config::storage_flood_stage_usage_percent / 100.0
&& left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
LOG(WARNING) << "reach capacity limit. used pct: " << used_pct << ", left bytes: " << left_bytes
<< ", path: " << _path;
return true;
}
return false;
}
} // namespace doris
36 changes: 28 additions & 8 deletions be/src/olap/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ class DataDir {
bool is_used() const { return _is_used; }
void set_is_used(bool is_used) { _is_used = is_used; }
int32_t cluster_id() const { return _cluster_id; }

DataDirInfo get_dir_info() {
DataDirInfo info;
info.path = _path;
info.path_hash = _path_hash;
info.is_used = _is_used;
info.capacity = _capacity_bytes;
info.available = _available_bytes;
info.is_used = _is_used;
info.storage_medium = _storage_medium;
return info;
}

Expand Down Expand Up @@ -121,6 +124,16 @@ class DataDir {

OLAPStatus set_convert_finished();

// check if the capacity reach the limit after adding the incoming data
// return true if limit reached, otherwise, return false.
// TODO(cmy): for now we can not precisely calculate the capacity Doris used,
// so in order to avoid running out of disk capacity, we currenty use the actual
// disk avaiable capacity and total capacity to do the calculation.
// So that the capacity Doris actually used may exceeds the user specified capacity.
bool reach_capacity_limit(int64_t incoming_data_size);

Status update_capacity();

private:
std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; }
Status _init_cluster_id();
Expand All @@ -146,23 +159,30 @@ class DataDir {

private:
std::string _path;
size_t _path_hash;
int64_t _path_hash;
// user specified capacity
int64_t _capacity_bytes;
// the actual avaiable capacity of the disk of this data dir
// NOTICE that _available_byte smay be larger than _capacity_bytes, if capacity is set
// by user, not the disk's actual capacity
int64_t _available_bytes;
// the actual capacity of the disk of this data dir
int64_t _disk_capacity_bytes;
TStorageMedium::type _storage_medium;
bool _is_used;

uint32_t _rand_seed;

std::string _file_system;
int64_t _capacity_bytes;
TabletManager* _tablet_manager;
TxnManager* _txn_manager;
int32_t _cluster_id;
int64_t _available_bytes;
int64_t _used_bytes;
uint64_t _current_shard;
bool _is_used;
// This flag will be set true if this store was not in root path when reloading
bool _to_be_deleted;

// used to protect _current_shard and _tablet_set
std::mutex _mutex;
TStorageMedium::type _storage_medium; // 存储介质类型:SSD|HDD
uint64_t _current_shard;
std::set<TabletInfo> _tablet_set;

static const size_t TEST_FILE_BUF_SIZE = 4096;
Expand Down
5 changes: 1 addition & 4 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,7 @@ OLAPStatus DeltaWriter::init() {

// TODO: new RowsetBuilder according to tablet storage type
_rowset_writer.reset(new AlphaRowsetWriter());
status = _rowset_writer->init(writer_context);
if (status != OLAP_SUCCESS) {
return OLAP_ERR_ROWSET_WRITER_INIT;
}
RETURN_NOT_OK(_rowset_writer->init(writer_context));

const std::vector<SlotDescriptor*>& slots = _req.tuple_desc->slots();
const TabletSchema& schema = _tablet->tablet_schema();
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ enum OLAPStatus {
OLAP_ERR_TRANSACTION_ALREADY_VISIBLE = -229,
OLAP_ERR_VERSION_ALREADY_MERGED = -230,
OLAP_ERR_LZO_DISABLED = -231,
OLAP_ERR_DISK_REACH_CAPACITY_LIMIT = -232,

// CommandExecutor
// [-300, -400)
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_d
// cgroup is not initialized at this time
// add tid to cgroup
CgroupsMgr::apply_system_cgroup();
perform_base_compaction(data_dir);
if (!data_dir->reach_capacity_limit(0)) {
perform_base_compaction(data_dir);
}

usleep(interval * 1000000);
}
Expand Down Expand Up @@ -249,7 +251,9 @@ void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir*
// cgroup is not initialized at this time
// add tid to cgroup
CgroupsMgr::apply_system_cgroup();
perform_cumulative_compaction(data_dir);
if (!data_dir->reach_capacity_limit(0)) {
perform_cumulative_compaction(data_dir);
}
usleep(interval * 1000000);
}

Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/rowset/column_data_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ OLAPStatus ColumnDataWriter::_flush_segment_with_verfication() {
OLAPStatus res = _finalize_segment();
if (OLAP_SUCCESS != res) {
OLAP_LOG_WARNING("fail to finalize segment. [res=%d]", res);
return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
return res;
}

_new_segment_created = false;
Expand All @@ -292,12 +292,12 @@ OLAPStatus ColumnDataWriter::_finalize_segment() {
OLAPStatus res = OLAP_SUCCESS;
uint32_t data_segment_size;

if (OLAP_SUCCESS != _segment_writer->finalize(&data_segment_size)) {
if ((res = _segment_writer->finalize(&data_segment_size)) != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to finish segment from olap_data.");
return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
return res;
}

if (OLAP_SUCCESS != _segment_group->finalize_segment(data_segment_size, _num_rows)) {
if ((res != _segment_group->finalize_segment(data_segment_size, _num_rows)) != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to finish segment from olap_index.");
return OLAP_ERR_WRITER_INDEX_WRITE_ERROR;
}
Expand Down
18 changes: 12 additions & 6 deletions be/src/olap/rowset/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,25 @@ OLAPStatus SegmentWriter::finalize(uint32_t* segment_file_size) {
boost::filesystem::path data_dir_path = tablet_path.parent_path().parent_path().parent_path().parent_path();
std::string data_dir_string = data_dir_path.string();
DataDir* data_dir = StorageEngine::instance()->get_store(data_dir_string);
data_dir->add_pending_ids(ROWSET_ID_PREFIX + std::to_string(_segment_group->rowset_id()));
if (OLAP_SUCCESS != (res = file_handle.open_with_mode(
_file_name, O_CREAT | O_EXCL | O_WRONLY , S_IRUSR | S_IWUSR))) {
LOG(WARNING) << "fail to open file. [file_name=" << _file_name << "]";
return res;
}

res = _make_file_header(file_header.mutable_message());
if (OLAP_SUCCESS != res) {
OLAP_LOG_WARNING("fail to make file header. [res=%d]", res);
return res;
}

// check disk capacity
if (data_dir->reach_capacity_limit((int64_t) file_header.file_length())) {
return OLAP_ERR_DISK_REACH_CAPACITY_LIMIT;
}

data_dir->add_pending_ids(ROWSET_ID_PREFIX + std::to_string(_segment_group->rowset_id()));
if (OLAP_SUCCESS != (res = file_handle.open_with_mode(
_file_name, O_CREAT | O_EXCL | O_WRONLY , S_IRUSR | S_IWUSR))) {
LOG(WARNING) << "fail to open file. [file_name=" << _file_name << "]";
return res;
}

res = file_header.prepare(&file_handle);
if (OLAP_SUCCESS != res) {
OLAP_LOG_WARNING("write file header error. [err=%m]");
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1892,7 +1892,6 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
// set status for monitor
// 只要有一个new_table为running,ref table就设置为running
// NOTE 如果第一个sub_table先fail,这里会继续按正常走

RowsetId rowset_id = 0;
TabletSharedPtr new_tablet = sc_params.new_tablet;
res = sc_params.new_tablet->next_rowset_id(&rowset_id);
Expand Down
Loading