Skip to content

Commit

Permalink
Merge branch 'master' into hive_text_write_and_compression
Browse files Browse the repository at this point in the history
  • Loading branch information
suxiaogang223 authored Aug 20, 2024
2 parents 5a4b968 + 872aab7 commit f69e658
Show file tree
Hide file tree
Showing 77 changed files with 3,392 additions and 489 deletions.
1 change: 1 addition & 0 deletions be/src/http/action/check_rpc_channel_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace doris {
CheckRPCChannelAction::CheckRPCChannelAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type)
: HttpHandlerWithAuth(exec_env, hier, type) {}

void CheckRPCChannelAction::handle(HttpRequest* req) {
std::string req_ip = req->param("ip");
std::string req_port = req->param("port");
Expand Down
3 changes: 0 additions & 3 deletions be/src/http/action/check_rpc_channel_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,5 @@ class CheckRPCChannelAction : public HttpHandlerWithAuth {
~CheckRPCChannelAction() override = default;

void handle(HttpRequest* req) override;

private:
ExecEnv* _exec_env;
};
} // namespace doris
6 changes: 4 additions & 2 deletions be/src/http/action/download_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ Status DownloadAction::check_token(HttpRequest* req) {
return Status::NotAuthorized("token is not specified.");
}

if (token_str != _exec_env->token()) {
return Status::NotAuthorized("invalid token.");
const std::string& local_token = _exec_env->token();
if (token_str != local_token) {
LOG(WARNING) << "invalid download token: " << token_str << ", local token: " << local_token;
return Status::NotAuthorized("invalid token {}", token_str);
}

return Status::OK();
Expand Down
6 changes: 4 additions & 2 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,10 @@ Status DownloadBinlogAction::_check_token(HttpRequest* req) {
return Status::InternalError("token is not specified.");
}

if (token_str != _exec_env->token()) {
return Status::InternalError("invalid token.");
const std::string& local_token = _exec_env->token();
if (token_str != local_token) {
LOG(WARNING) << "invalid download token: " << token_str << ", local token: " << local_token;
return Status::NotAuthorized("invalid token {}", token_str);
}

return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/action/reset_rpc_channel_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
namespace doris {
ResetRPCChannelAction::ResetRPCChannelAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type)
: HttpHandlerWithAuth(exec_env, hier, type), _exec_env(exec_env) {}
: HttpHandlerWithAuth(exec_env, hier, type) {}

void ResetRPCChannelAction::handle(HttpRequest* req) {
std::string endpoints = req->param("endpoints");
if (iequal(endpoints, "all")) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/http/action/reset_rpc_channel_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,5 @@ class ResetRPCChannelAction : public HttpHandlerWithAuth {
~ResetRPCChannelAction() override = default;

void handle(HttpRequest* req) override;

private:
ExecEnv* _exec_env;
};
} // namespace doris
4 changes: 3 additions & 1 deletion be/src/http/http_handler_with_auth.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ class HttpHandlerWithAuth : public HttpHandler {
return true;
}

private:
protected:
ExecEnv* _exec_env;

private:
TPrivilegeHier::type _hier;
TPrivilegeType::type _type;
};
Expand Down
35 changes: 15 additions & 20 deletions be/src/olap/accept_null_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace doris {
* but pass (set/return true) for NULL value rows.
*
* At parent, it's used for topn runtime predicate.
* Eg: original input indexs is '1,2,3,7,8,9' and value of index9 is null, we get nested predicate output index is '1,2,3', but we finally output '1,2,3,9'
*/
class AcceptNullPredicate : public ColumnPredicate {
ENABLE_FACTORY_CREATOR(AcceptNullPredicate);
Expand All @@ -44,8 +45,6 @@ class AcceptNullPredicate : public ColumnPredicate {

PredicateType type() const override { return _nested->type(); }

void set_nested(ColumnPredicate* nested) { _nested.reset(nested); }

Status evaluate(BitmapIndexIterator* iterator, uint32_t num_rows,
roaring::Roaring* roaring) const override {
return _nested->evaluate(iterator, num_rows, roaring);
Expand All @@ -64,11 +63,14 @@ class AcceptNullPredicate : public ColumnPredicate {
void evaluate_and(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
bool* flags) const override {
if (column.has_null()) {
std::vector<uint8_t> original_flags(size);
memcpy(original_flags.data(), flags, size);

const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
_nested->evaluate_and(nullable_col.get_nested_column(), sel, size, flags);
const auto& nullmap = nullable_col.get_null_map_data();
for (uint16_t i = 0; i < size; ++i) {
flags[i] |= nullmap[sel[i]];
flags[i] |= (original_flags[i] && nullmap[sel[i]]);
}
} else {
_nested->evaluate_and(column, sel, size, flags);
Expand All @@ -77,20 +79,7 @@ class AcceptNullPredicate : public ColumnPredicate {

void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
bool* flags) const override {
if (column.has_null()) {
const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
_nested->evaluate_or(nullable_col.get_nested_column(), sel, size, flags);

// call evaluate_or and set true for NULL rows
for (uint16_t i = 0; i < size; ++i) {
uint16_t idx = sel[i];
if (!flags[i] && nullable_col.is_null_at(idx)) {
flags[i] = true;
}
}
} else {
_nested->evaluate_or(column, sel, size, flags);
}
DCHECK(false) << "should not reach here";
}

bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const override {
Expand Down Expand Up @@ -158,21 +147,27 @@ class AcceptNullPredicate : public ColumnPredicate {
}
// create selected_flags
uint16_t max_idx = sel[size - 1];
std::vector<uint16_t> old_sel(size);
memcpy(old_sel.data(), sel, sizeof(uint16_t) * size);

const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
// call nested predicate evaluate
uint16_t new_size = _nested->evaluate(nullable_col.get_nested_column(), sel, size);

// process NULL values
if (new_size < size) {
std::vector<uint8_t> selected(max_idx + 1);
memcpy(selected.data(), nullable_col.get_null_map_data().data(),
(max_idx + 1) * sizeof(bool));
std::vector<uint8_t> selected(max_idx + 1, 0);
const auto* nullmap = nullable_col.get_null_map_data().data();
// add rows selected by _nested->evaluate
for (uint16_t i = 0; i < new_size; ++i) {
uint16_t row_idx = sel[i];
selected[row_idx] = true;
}
// reset null from original data
for (uint16_t i = 0; i < size; ++i) {
uint16_t row_idx = old_sel[i];
selected[row_idx] |= nullmap[row_idx];
}

// recaculate new_size and sel array
new_size = 0;
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
RETURN_IF_ERROR(_parse_variant_columns(flush_block));
}
bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
if (config::enable_vertical_segment_writer &&
_context.tablet_schema->cluster_key_idxes().empty()) {
if (config::enable_vertical_segment_writer) {
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
Expand Down
94 changes: 49 additions & 45 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,36 +94,36 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id,
_mem_tracker(std::make_unique<MemTracker>(segment_mem_tracker_name(segment_id))),
_mow_context(std::move(opts.mow_ctx)) {
CHECK_NOTNULL(file_writer);
_num_key_columns = _tablet_schema->num_key_columns();
_num_sort_key_columns = _tablet_schema->num_key_columns();
_num_short_key_columns = _tablet_schema->num_short_key_columns();
if (_tablet_schema->cluster_key_idxes().empty()) {
DCHECK(_num_key_columns >= _num_short_key_columns)
if (!_is_mow_with_cluster_key()) {
DCHECK(_num_sort_key_columns >= _num_short_key_columns)
<< ", table_id=" << _tablet_schema->table_id()
<< ", num_key_columns=" << _num_key_columns
<< ", num_key_columns=" << _num_sort_key_columns
<< ", num_short_key_columns=" << _num_short_key_columns
<< ", cluster_key_columns=" << _tablet_schema->cluster_key_idxes().size();
}
for (size_t cid = 0; cid < _num_key_columns; ++cid) {
for (size_t cid = 0; cid < _num_sort_key_columns; ++cid) {
const auto& column = _tablet_schema->column(cid);
_key_coders.push_back(get_key_coder(column.type()));
_key_index_size.push_back(column.index_length());
}
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_is_mow()) {
// encode the sequence id into the primary key index
if (_tablet_schema->has_sequence_col()) {
const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx());
_seq_coder = get_key_coder(column.type());
}
// encode the rowid into the primary key index
if (!_tablet_schema->cluster_key_idxes().empty()) {
if (_is_mow_with_cluster_key()) {
const auto* type_info = get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
_rowid_coder = get_key_coder(type_info->type());
// primary keys
_primary_key_coders.swap(_key_coders);
// cluster keys
_key_coders.clear();
_key_index_size.clear();
_num_key_columns = _tablet_schema->cluster_key_idxes().size();
_num_sort_key_columns = _tablet_schema->cluster_key_idxes().size();
for (auto cid : _tablet_schema->cluster_key_idxes()) {
const auto& column = _tablet_schema->column(cid);
_key_coders.push_back(get_key_coder(column.type()));
Expand Down Expand Up @@ -284,14 +284,14 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {

// we don't need the short key index for unique key merge on write table.
if (_has_key) {
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_is_mow()) {
size_t seq_col_length = 0;
if (_tablet_schema->has_sequence_col()) {
seq_col_length =
_tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
}
size_t rowid_length = 0;
if (!_tablet_schema->cluster_key_idxes().empty()) {
if (_is_mow_with_cluster_key()) {
rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH;
_short_key_index_builder.reset(
new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
Expand Down Expand Up @@ -478,7 +478,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
block->columns(), _tablet_schema->num_key_columns(),
_tablet_schema->num_columns()));
}
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write);
DCHECK(_is_mow());

DCHECK(_opts.rowset_ctx->partial_update_info);
// find missing column cids
Expand Down Expand Up @@ -507,7 +507,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
if (!converted_result.first.ok()) {
return converted_result.first;
}
if (cid < _num_key_columns) {
if (cid < _num_sort_key_columns) {
key_columns.push_back(converted_result.second);
} else if (_tablet_schema->has_sequence_col() &&
cid == _tablet_schema->sequence_col_idx()) {
Expand Down Expand Up @@ -906,19 +906,9 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
converted_result.second->get_data(), num_rows));
}
if (_has_key) {
// for now we don't need to query short key index for CLUSTER BY feature,
// but we still write the index for future usage.
bool need_primary_key_indexes = (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
bool need_short_key_indexes =
!need_primary_key_indexes ||
(need_primary_key_indexes && !_tablet_schema->cluster_key_idxes().empty());
if (need_primary_key_indexes && !need_short_key_indexes) { // mow table without cluster keys
RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column,
num_rows, false));
} else if (!need_primary_key_indexes && need_short_key_indexes) { // other tables
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
} else if (need_primary_key_indexes && need_short_key_indexes) { // mow with cluster keys
if (_is_mow_with_cluster_key()) {
// for now we don't need to query short key index for CLUSTER BY feature,
// but we still write the index for future usage.
// 1. generate primary key index, the key_columns is primary_key_columns
RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns,
seq_column, num_rows, true));
Expand All @@ -938,6 +928,11 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
}
}
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
} else if (_is_mow()) {
RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column,
num_rows, false));
} else {
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
}
}

Expand Down Expand Up @@ -967,8 +962,9 @@ int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) {
std::string SegmentWriter::_full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos,
bool null_first) {
assert(_key_index_size.size() == _num_key_columns);
assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns);
assert(_key_index_size.size() == _num_sort_key_columns);
assert(key_columns.size() == _num_sort_key_columns &&
_key_coders.size() == _num_sort_key_columns);
return _full_encode_keys(_key_coders, key_columns, pos, null_first);
}

Expand Down Expand Up @@ -1047,15 +1043,18 @@ Status SegmentWriter::append_row(const RowType& row) {
RETURN_IF_ERROR(_column_writers[cid]->append(cell));
}
std::string full_encoded_key;
encode_key<RowType, true>(&full_encoded_key, row, _num_key_columns);
encode_key<RowType, true>(&full_encoded_key, row, _num_sort_key_columns);
if (_tablet_schema->has_sequence_col()) {
full_encoded_key.push_back(KEY_NORMAL_MARKER);
auto cid = _tablet_schema->sequence_col_idx();
auto cell = row.cell(cid);
row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &full_encoded_key);
}

if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_is_mow_with_cluster_key()) {
return Status::InternalError(
"SegmentWriter::append_row does not support mow tables with cluster key");
} else if (_is_mow()) {
RETURN_IF_ERROR(_primary_key_index_builder->add_item(full_encoded_key));
} else {
// At the beginning of one block, so add a short key index entry
Expand All @@ -1082,7 +1081,9 @@ uint64_t SegmentWriter::estimate_segment_size() {
for (auto& column_writer : _column_writers) {
size += column_writer->estimate_buffer_size();
}
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_is_mow_with_cluster_key()) {
size += _primary_key_index_builder->size() + _short_key_index_builder->size();
} else if (_is_mow()) {
size += _primary_key_index_builder->size();
} else {
size += _short_key_index_builder->size();
Expand Down Expand Up @@ -1126,19 +1127,17 @@ Status SegmentWriter::finalize_columns_index(uint64_t* index_size) {

*index_size = _file_writer->bytes_appended() - index_start;
if (_has_key) {
bool write_short_key_index = _tablet_schema->keys_type() != UNIQUE_KEYS ||
(_tablet_schema->keys_type() == UNIQUE_KEYS &&
!_opts.enable_unique_key_merge_on_write) ||
(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write &&
!_tablet_schema->cluster_key_idxes().empty());
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_is_mow_with_cluster_key()) {
RETURN_IF_ERROR(_write_short_key_index());
*index_size = _file_writer->bytes_appended() - index_start;
RETURN_IF_ERROR(_write_primary_key_index());
*index_size += _primary_key_index_builder->disk_size();
} else if (_is_mow()) {
RETURN_IF_ERROR(_write_primary_key_index());
// IndexedColumnWriter write data pages mixed with segment data, we should use
// the stat from primary key index builder.
*index_size += _primary_key_index_builder->disk_size();
}
if (write_short_key_index) {
} else {
RETURN_IF_ERROR(_write_short_key_index());
*index_size = _file_writer->bytes_appended() - index_start;
}
Expand Down Expand Up @@ -1297,14 +1296,12 @@ Status SegmentWriter::_write_raw_data(const std::vector<Slice>& slices) {
}

Slice SegmentWriter::min_encoded_key() {
return (_primary_key_index_builder == nullptr || !_tablet_schema->cluster_key_idxes().empty())
? Slice(_min_key.data(), _min_key.size())
: _primary_key_index_builder->min_key();
return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(), _min_key.size())
: _primary_key_index_builder->min_key();
}
Slice SegmentWriter::max_encoded_key() {
return (_primary_key_index_builder == nullptr || !_tablet_schema->cluster_key_idxes().empty())
? Slice(_max_key.data(), _max_key.size())
: _primary_key_index_builder->max_key();
return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(), _max_key.size())
: _primary_key_index_builder->max_key();
}

void SegmentWriter::set_min_max_key(const Slice& key) {
Expand Down Expand Up @@ -1400,5 +1397,12 @@ int64_t SegmentWriter::get_inverted_index_total_size() {
return 0;
}

inline bool SegmentWriter::_is_mow() {
return _tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write;
}

inline bool SegmentWriter::_is_mow_with_cluster_key() {
return _is_mow() && !_tablet_schema->cluster_key_idxes().empty();
}
} // namespace segment_v2
} // namespace doris
Loading

0 comments on commit f69e658

Please sign in to comment.