Skip to content

Commit

Permalink
[performance](variant) support topn 2phase read for variant column
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Dec 13, 2023
1 parent ad483ef commit 1c68d1c
Show file tree
Hide file tree
Showing 17 changed files with 195 additions and 82 deletions.
72 changes: 67 additions & 5 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@
#include "util/slice.h" // Slice
#include "vec/columns/column.h"
#include "vec/common/string_ref.h"
#include "vec/core/field.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_object.h"
#include "vec/json/path_in_data.h"
#include "vec/olap/vgeneric_iterators.h"

namespace doris {
Expand Down Expand Up @@ -332,17 +334,18 @@ Status Segment::_load_index_impl() {

// Return the storage datatype of related column to field.
// Return nullptr meaning no such storage infomation for this column
vectorized::DataTypePtr Segment::get_data_type_of(const Field& field, bool ignore_children) const {
vectorized::DataTypePtr Segment::get_data_type_of(vectorized::PathInData path, bool is_nullable,
bool ignore_children) const {
// Path has higher priority
if (!field.path().empty()) {
auto node = _sub_column_tree.find_leaf(field.path());
if (!path.empty()) {
auto node = _sub_column_tree.find_leaf(path);
if (node) {
if (ignore_children || node->children.empty()) {
return node->data.file_column_type;
}
}
// it contains children or column missing in storage, so treat it as variant
return field.is_nullable()
return is_nullable
? vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>())
: std::make_shared<vectorized::DataTypeObject>();
}
Expand Down Expand Up @@ -686,7 +689,8 @@ Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) {

bool Segment::same_with_storage_type(int32_t cid, const Schema& schema,
bool ignore_children) const {
auto file_column_type = get_data_type_of(*schema.column(cid), ignore_children);
auto file_column_type = get_data_type_of(schema.column(cid)->path(),
schema.column(cid)->is_nullable(), ignore_children);
auto expected_type = Schema::get_data_type_ptr(*schema.column(cid));
#ifndef NDEBUG
if (file_column_type && !file_column_type->equals(*expected_type)) {
Expand All @@ -700,5 +704,63 @@ bool Segment::same_with_storage_type(int32_t cid, const Schema& schema,
return same;
}

Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot,
uint32_t row_id, vectorized::MutableColumnPtr& result,
OlapReaderStatistics& stats,
std::unique_ptr<ColumnIterator>& iterator_hint) {
StorageReadOptions storage_read_opt;
storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY;
segment_v2::ColumnIteratorOptions opt {
.use_page_cache = !config::disable_storage_page_cache,
.file_reader = file_reader().get(),
.stats = &stats,
.io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY},
};
std::vector<segment_v2::rowid_t> single_row_loc {row_id};
if (!slot->column_paths().empty()) {
vectorized::PathInData path(schema.column_by_uid(slot->col_unique_id()).name_lower_case(),
slot->column_paths());
auto storage_type = get_data_type_of(path, slot->is_nullable(), false);
vectorized::MutableColumnPtr file_storage_column = storage_type->create_column();
DCHECK(storage_type != nullptr);
TabletColumn column = TabletColumn::create_materialized_variant_column(
schema.column_by_uid(slot->col_unique_id()).name_lower_case(), slot->column_paths(),
slot->col_unique_id());
if (iterator_hint == nullptr) {
RETURN_IF_ERROR(
new_column_iterator_with_path(column, &iterator_hint, &storage_read_opt));
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(
iterator_hint->read_by_rowids(single_row_loc.data(), 1, file_storage_column));
// iterator_hint.reset(nullptr);
// Get it's inner field, for JSONB case
vectorized::Field field = remove_nullable(storage_type)->get_default();
file_storage_column->get(0, field);
result->insert(field);
} else {
int index = -1;
if (slot->col_unique_id() >= 0) {
index = schema.field_index(slot->col_name());
} else {
index = schema.field_index(slot->col_name());
}
if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << slot->col_name()
<< ", field_name_to_index=" << schema.get_all_field_names();
return Status::InternalError(ss.str());
}
storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY;
if (iterator_hint == nullptr) {
RETURN_IF_ERROR(
new_column_iterator(schema.column(index), &iterator_hint, &storage_read_opt));
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(iterator_hint->read_by_rowids(single_row_loc.data(), 1, result));
}
return Status::OK();
}

} // namespace segment_v2
} // namespace doris
14 changes: 11 additions & 3 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@
#include "olap/rowset/segment_v2/page_handle.h"
#include "olap/schema.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "util/once.h"
#include "util/slice.h"
#include "vec/columns/column.h"
#include "vec/columns/subcolumn_tree.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/json/path_in_data.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -123,6 +126,10 @@ class Segment : public std::enable_shared_from_this<Segment> {

Status read_key_by_rowid(uint32_t row_id, std::string* key);

Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot, uint32_t row_id,
vectorized::MutableColumnPtr& result, OlapReaderStatistics& stats,
std::unique_ptr<ColumnIterator>& iterator_hint);

Status load_index();

Status load_pk_index_and_bf();
Expand All @@ -146,7 +153,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
// ignore_chidren set to false will treat field as variant
// when it contains children with field paths.
// nullptr will returned if storage type does not contains such column
std::shared_ptr<const vectorized::IDataType> get_data_type_of(const Field& filed,
std::shared_ptr<const vectorized::IDataType> get_data_type_of(vectorized::PathInData path,
bool is_nullable,
bool ignore_children) const;

// Check is schema read type equals storage column type
Expand All @@ -157,8 +165,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
bool can_apply_predicate_safely(int cid, Predicate* pred, const Schema& schema,
ReaderType read_type) const {
const Field* col = schema.column(cid);
vectorized::DataTypePtr storage_column_type =
get_data_type_of(*col, read_type != ReaderType::READER_QUERY);
vectorized::DataTypePtr storage_column_type = get_data_type_of(
col->path(), col->is_nullable(), read_type != ReaderType::READER_QUERY);
if (storage_column_type == nullptr) {
// Default column iterator
return true;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
const Field* col = _schema->column(i);
if (col) {
auto storage_type = _segment->get_data_type_of(
*col, _opts.io_ctx.reader_type != ReaderType::READER_QUERY);
col->path(), col->is_nullable(),
_opts.io_ctx.reader_type != ReaderType::READER_QUERY);
if (storage_type == nullptr) {
storage_type = vectorized::DataTypeFactory::instance().create_data_type(*col);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ class SegmentIterator : public RowwiseIterator {
if (block_cid >= block->columns()) {
continue;
}
vectorized::DataTypePtr storage_type =
_segment->get_data_type_of(*_schema->column(cid), false);
vectorized::DataTypePtr storage_type = _segment->get_data_type_of(
_schema->column(cid)->path(), _schema->column(cid)->is_nullable(), false);
if (storage_type && !storage_type->equals(*block->get_by_position(block_cid).type)) {
// Do additional cast
vectorized::MutableColumnPtr tmp = storage_type->create_column();
Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,20 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
}
}

TabletColumn TabletColumn::create_materialized_variant_column(const std::string& root,
const std::vector<std::string>& paths,
int32_t parent_unique_id) {
TabletColumn subcol;
subcol.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
subcol.set_is_nullable(true);
subcol.set_unique_id(-1);
subcol.set_parent_unique_id(parent_unique_id);
vectorized::PathInData path(root, paths);
subcol.set_path_info(path);
subcol.set_name(path.get_path());
return subcol;
}

void TabletColumn::to_schema_pb(ColumnPB* column) const {
column->set_unique_id(_unique_id);
column->set_name(_col_name);
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "gutil/stringprintf.h"
#include "olap/olap_common.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "util/string_util.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/string_utils/string_utils.h"
Expand Down Expand Up @@ -91,6 +92,11 @@ class TabletColumn {
_type == FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE ||
_type == FieldType::OLAP_FIELD_TYPE_AGG_STATE;
}
// Such columns are not exist in frontend schema info, so we need to
// add them into tablet_schema for later column indexing.
static TabletColumn create_materialized_variant_column(const std::string& root,
const std::vector<std::string>& paths,
int32_t parent_unique_id);
bool has_default_value() const { return _has_default_value; }
std::string default_value() const { return _default_value; }
size_t length() const { return _length; }
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_is_materialized(pdesc.is_materialized()),
_is_key(pdesc.is_key()),
_need_materialize(true),
_column_paths(pdesc.column_paths().begin(), pdesc.column_paths().end()),
_is_auto_increment(pdesc.is_auto_increment()) {}

void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
Expand All @@ -103,6 +104,9 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
pslot->set_is_key(_is_key);
pslot->set_is_auto_increment(_is_auto_increment);
pslot->set_col_type(_col_type);
for (const std::string& path : _column_paths) {
pslot->add_column_paths(path);
}
}

vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const {
Expand Down
80 changes: 50 additions & 30 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <butil/errno.h>
#include <butil/iobuf.h>
#include <fcntl.h>
#include <fmt/core.h>
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
Expand All @@ -46,6 +47,7 @@
#include <set>
#include <sstream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -1721,6 +1723,34 @@ auto scope_timer_run(Func fn, int64_t* cost) -> decltype(fn()) {
return res;
}

struct IteratorKey {
int64_t tablet_id;
RowsetId rowset_id;
uint64_t segment_id;
int slot_id;

// unordered map std::equal_to
bool operator==(const IteratorKey& rhs) const {
return tablet_id == rhs.tablet_id && rowset_id == rhs.rowset_id &&
segment_id == rhs.segment_id && slot_id == rhs.slot_id;
}
};

struct HashOfIteratorKey {
size_t operator()(const IteratorKey& key) const {
size_t hashValue = 0;
std::hash<long> h1;
std::hash<unsigned long> h3;
std::hash<int> h4;
hashValue ^= h1(key.tablet_id) + 0x9e3779b9 + (hashValue << 6) + (hashValue >> 2);
hashValue ^=
HashOfRowsetId()(key.rowset_id) + 0x9e3779b9 + (hashValue << 6) + (hashValue >> 2);
hashValue ^= h3(key.segment_id) + 0x9e3779b9 + (hashValue << 6) + (hashValue >> 2);
hashValue ^= h4(key.slot_id) + 0x9e3779b9 + (hashValue << 6) + (hashValue >> 2);
return hashValue;
}
};

Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
PMultiGetResponse* response) {
OlapReaderStatistics stats;
Expand All @@ -1745,6 +1775,9 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
full_read_schema.append_column(TabletColumn(column_pb));
}

std::unordered_map<IteratorKey, std::unique_ptr<ColumnIterator>, HashOfIteratorKey>
iterator_map;

// read row by row
for (size_t i = 0; i < request.row_locs_size(); ++i) {
const auto& row_loc = request.row_locs(i);
Expand Down Expand Up @@ -1812,37 +1845,22 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
if (result_block.is_empty_column()) {
result_block = vectorized::Block(desc.slots(), request.row_locs().size());
}
VLOG_DEBUG << "Read row location "
<< fmt::format("{}, {}, {}, {}", row_location.tablet_id,
row_location.row_location.rowset_id.to_string(),
row_location.row_location.segment_id,
row_location.row_location.row_id);
for (int x = 0; x < desc.slots().size(); ++x) {
int index = -1;
if (desc.slots()[x]->col_unique_id() >= 0) {
// light sc enabled
index = full_read_schema.field_index(desc.slots()[x]->col_unique_id());
} else {
index = full_read_schema.field_index(desc.slots()[x]->col_name());
}
if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << desc.slots()[x]->col_name()
<< ", field_name_to_index=" << full_read_schema.get_all_field_names();
return Status::InternalError(ss.str());
}
std::unique_ptr<segment_v2::ColumnIterator> column_iterator;
auto row_id = static_cast<segment_v2::rowid_t>(row_loc.ordinal_id());
vectorized::MutableColumnPtr column =
result_block.get_by_position(x).column->assume_mutable();
StorageReadOptions storage_read_opt;
storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY;
RETURN_IF_ERROR(segment->new_column_iterator(full_read_schema.column(index),
&column_iterator, &storage_read_opt));
segment_v2::ColumnIteratorOptions opt {
.use_page_cache = !config::disable_storage_page_cache,
.file_reader = segment->file_reader().get(),
.stats = &stats,
.io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY},
};
static_cast<void>(column_iterator->init(opt));
std::vector<segment_v2::rowid_t> single_row_loc {
static_cast<segment_v2::rowid_t>(row_loc.ordinal_id())};
RETURN_IF_ERROR(column_iterator->read_by_rowids(single_row_loc.data(), 1, column));
IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
.rowset_id = rowset_id,
.segment_id = row_loc.segment_id(),
.slot_id = desc.slots()[x]->id()};
RETURN_IF_ERROR(segment->seek_and_read_by_rowid(full_read_schema, desc.slots()[x],
row_id, column, stats,
iterator_map[iterator_key]));
}
}
// serialize block if not empty
Expand All @@ -1862,11 +1880,13 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
"hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, "
"io_latency:{}ns, "
"uncompressed_bytes_read:{},"
"bytes_read:{},"
"acquire_tablet_ms:{}, acquire_rowsets_ms:{}, acquire_segments_ms:{}, "
"lookup_row_data_ms:{}",
stats.cached_pages_num, stats.total_pages_num, stats.compressed_bytes_read,
stats.io_ns, stats.uncompressed_bytes_read, acquire_tablet_ms,
acquire_rowsets_ms, acquire_segments_ms, lookup_row_data_ms);
stats.io_ns, stats.uncompressed_bytes_read, stats.bytes_read,
acquire_tablet_ms, acquire_rowsets_ms, acquire_segments_ms,
lookup_row_data_ms);
return Status::OK();
}

Expand Down
9 changes: 5 additions & 4 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,11 +639,12 @@ void ColumnObject::for_each_subcolumn(ColumnCallback callback) {
}

void ColumnObject::insert_from(const IColumn& src, size_t n) {
const auto& src_v = assert_cast<const ColumnObject&>(src);
const auto* src_v = check_and_get_column<ColumnObject>(src);
// optimize when src and this column are scalar variant, since try_insert is inefficiency
if (src_v.is_scalar_variant() && is_scalar_variant() &&
src_v.get_root_type()->equals(*get_root_type()) && src_v.is_finalized() && is_finalized()) {
assert_cast<ColumnNullable&>(*get_root()).insert_from(*src_v.get_root(), n);
if (src_v != nullptr && src_v->is_scalar_variant() && is_scalar_variant() &&
src_v->get_root_type()->equals(*get_root_type()) && src_v->is_finalized() &&
is_finalized()) {
assert_cast<ColumnNullable&>(*get_root()).insert_from(*src_v->get_root(), n);
++num_rows;
return;
}
Expand Down
Loading

0 comments on commit 1c68d1c

Please sign in to comment.