Skip to content

Commit

Permalink
Rename Rowset to SegmentGroup (#364)
Browse files Browse the repository at this point in the history
* Rename Rowset to SegmentGroup

* Modify protobuf related rowset to SegmentGroup
  • Loading branch information
kangpinghuang authored and imay committed Nov 29, 2018
1 parent 5694bcb commit 85d0996
Show file tree
Hide file tree
Showing 34 changed files with 861 additions and 926 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ add_library(Olap STATIC
reader.cpp
row_block.cpp
row_cursor.cpp
rowset.cpp
segment_group.cpp
run_length_byte_reader.cpp
run_length_byte_writer.cpp
run_length_integer_reader.cpp
Expand Down
20 changes: 10 additions & 10 deletions be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "olap/column_data.h"
#include "olap/olap_engine.h"
#include "olap/olap_header.h"
#include "olap/rowset.h"
#include "olap/segment_group.h"
#include "olap/olap_table.h"
#include "olap/utils.h"
#include "util/doris_metrics.h"
Expand Down Expand Up @@ -119,7 +119,7 @@ OLAPStatus BaseCompaction::run() {
DorisMetrics::base_compaction_deltas_total.increment(_need_merged_versions.size());
int64_t merge_bytes = 0;
for (ColumnData* i_data : base_data_sources) {
merge_bytes += i_data->olap_index()->data_size();
merge_bytes += i_data->segment_group()->data_size();
}
DorisMetrics::base_compaction_bytes_total.increment(merge_bytes);
}
Expand Down Expand Up @@ -148,7 +148,7 @@ OLAPStatus BaseCompaction::run() {
// 4. make new versions visable.
// If success, remove files belong to old versions;
// If fail, gc files belong to new versions.
vector<Rowset*> unused_olap_indices;
vector<SegmentGroup*> unused_olap_indices;
res = _update_header(row_count, &unused_olap_indices);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to update header. table=" << _table->full_name() << ", "
Expand Down Expand Up @@ -323,12 +323,12 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
vector<ColumnData*>* base_data_sources,
uint64_t* row_count) {
// 1. 生成新base文件对应的olap index
Rowset* new_base = new (std::nothrow) Rowset(_table.get(),
SegmentGroup* new_base = new (std::nothrow) SegmentGroup(_table.get(),
_new_base_version,
new_base_version_hash,
false, 0, 0);
if (new_base == NULL) {
OLAP_LOG_WARNING("fail to new Rowset.");
OLAP_LOG_WARNING("fail to new SegmentGroup.");
return OLAP_ERR_MALLOC_ERROR;
}

Expand Down Expand Up @@ -397,7 +397,7 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
// Check row num changes
uint64_t source_rows = 0;
for (ColumnData* i_data : *base_data_sources) {
source_rows += i_data->olap_index()->num_rows();
source_rows += i_data->segment_group()->num_rows();
}
bool row_nums_check = config::row_nums_check;
if (row_nums_check) {
Expand All @@ -422,7 +422,7 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
return OLAP_SUCCESS;
}

OLAPStatus BaseCompaction::_update_header(uint64_t row_count, vector<Rowset*>* unused_olap_indices) {
OLAPStatus BaseCompaction::_update_header(uint64_t row_count, vector<SegmentGroup*>* unused_olap_indices) {
WriteLock wrlock(_table->get_header_lock_ptr());
vector<Version> unused_versions;
_get_unused_versions(&unused_versions);
Expand Down Expand Up @@ -463,11 +463,11 @@ OLAPStatus BaseCompaction::_update_header(uint64_t row_count, vector<Rowset*>* u
return OLAP_SUCCESS;
}

void BaseCompaction::_delete_old_files(vector<Rowset*>* unused_indices) {
void BaseCompaction::_delete_old_files(vector<SegmentGroup*>* unused_indices) {
if (!unused_indices->empty()) {
OLAPEngine* unused_index = OLAPEngine::get_instance();

for (vector<Rowset*>::iterator it = unused_indices->begin();
for (vector<SegmentGroup*>::iterator it = unused_indices->begin();
it != unused_indices->end(); ++it) {
unused_index->add_unused_index(*it);
}
Expand All @@ -476,7 +476,7 @@ void BaseCompaction::_delete_old_files(vector<Rowset*>* unused_indices) {

void BaseCompaction::_garbage_collection() {
// 清理掉已生成的版本文件
for (vector<Rowset*>::iterator it = _new_olap_indices.begin();
for (vector<SegmentGroup*>::iterator it = _new_olap_indices.begin();
it != _new_olap_indices.end(); ++it) {
(*it)->delete_all_files();
SAFE_DELETE(*it);
Expand Down
16 changes: 8 additions & 8 deletions be/src/olap/base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
namespace doris {

class ColumnData;
class Rowset;
class SegmentGroup;

// @brief 实现对START_BASE_COMPACTION命令的处理逻辑,并返回处理结果
class BaseCompaction {
Expand Down Expand Up @@ -94,23 +94,23 @@ class BaseCompaction {

// 更新Header使得修改对外可见
// 输出参数:
// - unused_olap_indices: 需要被物理删除的Rowset*
// - unused_olap_indices: 需要被物理删除的SegmentGroup*
//
// 返回值:
// - 如果执行成功,则返回OLAP_SUCCESS;
// - 其它情况下,返回相应的错误码
OLAPStatus _update_header(uint64_t row_count,
std::vector<Rowset*>* unused_olap_indices);
std::vector<SegmentGroup*>* unused_olap_indices);

// 删除不再使用的Rowset文件
// 删除不再使用的SegmentGroup文件
//
// 输入参数:
// - unused_olap_indices: 需要被物理删除的Rowset*
// - unused_olap_indices: 需要被物理删除的SegmentGroup*
//
// 返回值:
// - 如果执行成功,则返回OLAP_SUCCESS;
// - 其它情况下,返回相应的错误码
void _delete_old_files(std::vector<Rowset*>* unused_indices);
void _delete_old_files(std::vector<SegmentGroup*>* unused_indices);

// 其它函数执行失败时,调用该函数进行清理工作
void _garbage_collection();
Expand Down Expand Up @@ -173,8 +173,8 @@ class BaseCompaction {
Version _latest_cumulative;
// 在此次base compaction执行过程中,将被合并的cumulative文件版本
std::vector<Version> _need_merged_versions;
// 需要新增的版本对应的Rowset
std::vector<Rowset*> _new_olap_indices;
// 需要新增的版本对应的SegmentGroup
std::vector<SegmentGroup*> _new_olap_indices;

bool _base_compaction_locked;

Expand Down
62 changes: 31 additions & 31 deletions be/src/olap/column_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@

namespace doris {

ColumnData* ColumnData::create(Rowset* index) {
ColumnData* ColumnData::create(SegmentGroup* segment_group) {
ColumnData* data = NULL;
DataFileType file_type = index->table()->data_file_type();
DataFileType file_type = segment_group->table()->data_file_type();

switch (file_type) {
case COLUMN_ORIENTED_FILE:
data = new(std::nothrow) ColumnData(index);
data = new(std::nothrow) ColumnData(segment_group);
break;

default:
Expand All @@ -40,29 +40,29 @@ ColumnData* ColumnData::create(Rowset* index) {
return data;
}

ColumnData::ColumnData(Rowset* olap_index)
ColumnData::ColumnData(SegmentGroup* segment_group)
: _data_file_type(COLUMN_ORIENTED_FILE),
_olap_index(olap_index),
_segment_group(segment_group),
_eof(false),
_conditions(NULL),
_col_predicates(NULL),
_delete_status(DEL_NOT_SATISFIED),
_runtime_state(NULL),
_is_using_cache(false),
_segment_reader(NULL) {
_table = olap_index->table();
_table = segment_group->table();
_num_rows_per_block = _table->num_rows_per_row_block();
}

ColumnData::~ColumnData() {
_olap_index->release();
_segment_group->release();
SAFE_DELETE(_segment_reader);
}

OLAPStatus ColumnData::init() {
_olap_index->acquire();
_segment_group->acquire();

auto res = _short_key_cursor.init(_olap_index->short_key_fields());
auto res = _short_key_cursor.init(_segment_group->short_key_fields());
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "key cursor init failed, table:" << _table->id()
<< ", res:" << res;
Expand Down Expand Up @@ -105,7 +105,7 @@ OLAPStatus ColumnData::_next_row(const RowCursor** row, bool without_filter) {
} else {
DCHECK(_read_block->block_status() == DEL_PARTIAL_SATISFIED);
bool row_del_filter = _delete_handler.is_filter_data(
_olap_index->version().second, _cursor);
_segment_group->version().second, _cursor);
if (!row_del_filter) {
*row = &_cursor;
return OLAP_SUCCESS;
Expand All @@ -130,16 +130,16 @@ OLAPStatus ColumnData::_seek_to_block(const RowBlockPosition& block_pos, bool wi
// TODO(zc): _segment_readers???
// open segment reader if needed
if (_segment_reader == nullptr || block_pos.segment != _current_segment) {
if (block_pos.segment >= _olap_index->num_segments() ||
if (block_pos.segment >= _segment_group->num_segments() ||
(_end_key_is_set && block_pos.segment > _end_segment)) {
_eof = true;
return OLAP_ERR_DATA_EOF;
}
SAFE_DELETE(_segment_reader);
std::string file_name;
file_name = olap_index()->construct_data_file_path(olap_index()->rowset_id(), block_pos.segment);
file_name = segment_group()->construct_data_file_path(segment_group()->segment_group_id(), block_pos.segment);
_segment_reader = new(std::nothrow) SegmentReader(
file_name, _table, olap_index(), block_pos.segment,
file_name, _table, segment_group(), block_pos.segment,
_seek_columns, _load_bf_columns, _conditions,
_col_predicates, _delete_handler, _delete_status, _runtime_state, _stats);
if (_segment_reader == nullptr) {
Expand Down Expand Up @@ -170,7 +170,7 @@ OLAPStatus ColumnData::_seek_to_block(const RowBlockPosition& block_pos, bool wi
OLAPStatus ColumnData::_find_position_by_short_key(
const RowCursor& key, bool find_last_key, RowBlockPosition *position) {
RowBlockPosition tmp_pos;
auto res = _olap_index->find_short_key(key, &_short_key_cursor, find_last_key, &tmp_pos);
auto res = _segment_group->find_short_key(key, &_short_key_cursor, find_last_key, &tmp_pos);
if (res != OLAP_SUCCESS) {
if (res == OLAP_ERR_INDEX_EOF) {
res = OLAP_ERR_DATA_EOF;
Expand All @@ -179,7 +179,7 @@ OLAPStatus ColumnData::_find_position_by_short_key(
}
return res;
}
res = olap_index()->find_prev_point(tmp_pos, position);
res = segment_group()->find_prev_point(tmp_pos, position);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("find prev row block failed. [res=%d]", res);
return res;
Expand All @@ -190,7 +190,7 @@ OLAPStatus ColumnData::_find_position_by_short_key(
OLAPStatus ColumnData::_find_position_by_full_key(
const RowCursor& key, bool find_last_key, RowBlockPosition *position) {
RowBlockPosition tmp_pos;
auto res = _olap_index->find_short_key(key, &_short_key_cursor, false, &tmp_pos);
auto res = _segment_group->find_short_key(key, &_short_key_cursor, false, &tmp_pos);
if (res != OLAP_SUCCESS) {
if (res == OLAP_ERR_INDEX_EOF) {
res = OLAP_ERR_DATA_EOF;
Expand All @@ -200,14 +200,14 @@ OLAPStatus ColumnData::_find_position_by_full_key(
return res;
}
RowBlockPosition start_position;
res = olap_index()->find_prev_point(tmp_pos, &start_position);
res = segment_group()->find_prev_point(tmp_pos, &start_position);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("find prev row block failed. [res=%d]", res);
return res;
}

RowBlockPosition end_position;
res = _olap_index->find_short_key(key, &_short_key_cursor, true, &end_position);
res = _segment_group->find_short_key(key, &_short_key_cursor, true, &end_position);
if (res != OLAP_SUCCESS) {
if (res == OLAP_ERR_INDEX_EOF) {
res = OLAP_ERR_DATA_EOF;
Expand All @@ -226,7 +226,7 @@ OLAPStatus ColumnData::_find_position_by_full_key(
OLAPIndexOffset index_offset;
index_offset.segment = _end_segment;
index_offset.offset = _end_block;
res = olap_index()->get_row_block_position(index_offset, &end_position);
res = segment_group()->get_row_block_position(index_offset, &end_position);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to get row block position. [res=%d]", res);
return res;
Expand All @@ -235,15 +235,15 @@ OLAPStatus ColumnData::_find_position_by_full_key(
}

// ????end_position
uint32_t distance = olap_index()->compute_distance(start_position, end_position);
uint32_t distance = segment_group()->compute_distance(start_position, end_position);

BinarySearchIterator it_start(0u);
BinarySearchIterator it_end(distance + 1);
BinarySearchIterator it_result(0u);
ColumnDataComparator comparator(
start_position,
this,
olap_index());
segment_group());
try {
if (!find_last_key) {
it_result = std::lower_bound(it_start, it_end, key, comparator);
Expand All @@ -261,7 +261,7 @@ OLAPStatus ColumnData::_find_position_by_full_key(
it_result -= 1;
}

if (OLAP_SUCCESS != (res = olap_index()->advance_row_block(*it_result,
if (OLAP_SUCCESS != (res = segment_group()->advance_row_block(*it_result,
&start_position))) {
OLAP_LOG_WARNING("fail to advance row_block. [res=%d it_offset=%u "
"start_pos='%s']", res, *it_result,
Expand Down Expand Up @@ -490,16 +490,16 @@ OLAPStatus ColumnData::get_first_row_block(RowBlock** row_block) {
return res;
}

// to be same with OLAPData, we use olap_index.
// to be same with OLAPData, we use segment_group.
RowBlockPosition block_pos;
res = olap_index()->find_first_row_block(&block_pos);
res = segment_group()->find_first_row_block(&block_pos);
if (res != OLAP_SUCCESS) {
if (res == OLAP_ERR_INDEX_EOF) {
*row_block = nullptr;
_eof = true;
return res;
}
OLAP_LOG_WARNING("fail to find first row block with Rowset.");
OLAP_LOG_WARNING("fail to find first row block with SegmentGroup.");
return res;
}

Expand Down Expand Up @@ -545,11 +545,11 @@ bool ColumnData::delta_pruning_filter() {
return true;
}

if (!_olap_index->has_column_statistics()) {
if (!_segment_group->has_column_statistics()) {
return false;
}

return _conditions->delta_pruning_filter(_olap_index->get_column_statistics());
return _conditions->delta_pruning_filter(_segment_group->get_column_statistics());
}

int ColumnData::delete_pruning_filter() {
Expand All @@ -559,9 +559,9 @@ int ColumnData::delete_pruning_filter() {
return DEL_NOT_SATISFIED;
}

if (false == _olap_index->has_column_statistics()) {
if (false == _segment_group->has_column_statistics()) {
/*
* if olap_index has no column statistics, we cannot judge whether the data can be filtered or not
* if segment_group has no column statistics, we cannot judge whether the data can be filtered or not
*/
return DEL_PARTIAL_SATISFIED;
}
Expand All @@ -576,12 +576,12 @@ int ColumnData::delete_pruning_filter() {
bool del_partial_stastified = false;
bool del_stastified = false;
for (auto& delete_condtion : _delete_handler.get_delete_conditions()) {
if (delete_condtion.filter_version <= _olap_index->version().first) {
if (delete_condtion.filter_version <= _segment_group->version().first) {
continue;
}

Conditions* del_cond = delete_condtion.del_cond;
int del_ret = del_cond->delete_pruning_filter(_olap_index->get_column_statistics());
int del_ret = del_cond->delete_pruning_filter(_segment_group->get_column_statistics());
if (DEL_SATISFIED == del_ret) {
del_stastified = true;
break;
Expand Down
Loading

0 comments on commit 85d0996

Please sign in to comment.