diff --git a/be/src/gutil/endian.h b/be/src/gutil/endian.h index f6b2485b6d8bab..6b8a0bc772d414 100644 --- a/be/src/gutil/endian.h +++ b/be/src/gutil/endian.h @@ -32,6 +32,18 @@ inline uint64 gbswap_64(uint64 host_int) { #endif // bswap_64 } +inline unsigned __int128 gbswap_128(unsigned __int128 host_int) { + return static_cast(bswap_64(static_cast(host_int >> 64))) | + (static_cast(bswap_64(static_cast(host_int))) << 64); +} + +// Swap bytes of a 24-bit value. +inline uint32_t bswap_24(uint32_t x) { + return ((x & 0x0000ffULL) << 16) | + ((x & 0x00ff00ULL)) | + ((x & 0xff0000ULL) >> 16); +} + #ifdef IS_LITTLE_ENDIAN // Definitions for ntohl etc. that don't require us to include @@ -188,4 +200,145 @@ class LittleEndian { #define gntohll(x) ghtonll(x) #define ntohll(x) htonll(x) +// Utilities to convert numbers between the current hosts's native byte +// order and big-endian byte order (same as network byte order) +// +// Load/Store methods are alignment safe +class BigEndian { +public: +#ifdef IS_LITTLE_ENDIAN + + static uint16 FromHost16(uint16 x) { return bswap_16(x); } + static uint16 ToHost16(uint16 x) { return bswap_16(x); } + + static uint32 FromHost24(uint32 x) { return bswap_24(x); } + static uint32 ToHost24(uint32 x) { return bswap_24(x); } + + static uint32 FromHost32(uint32 x) { return bswap_32(x); } + static uint32 ToHost32(uint32 x) { return bswap_32(x); } + + static uint64 FromHost64(uint64 x) { return gbswap_64(x); } + static uint64 ToHost64(uint64 x) { return gbswap_64(x); } + + static unsigned __int128 FromHost128(unsigned __int128 x) { return gbswap_128(x); } + static unsigned __int128 ToHost128(unsigned __int128 x) { return gbswap_128(x); } + + static bool IsLittleEndian() { return true; } + +#elif defined IS_BIG_ENDIAN + + static uint16 FromHost16(uint16 x) { return x; } + static uint16 ToHost16(uint16 x) { return x; } + + static uint32 FromHost24(uint32 x) { return x; } + static uint32 ToHost24(uint32 x) { return x; } + + static uint32 FromHost32(uint32 x) { return x; } + static uint32 ToHost32(uint32 x) { return x; } + + static uint64 FromHost64(uint64 x) { return x; } + static uint64 ToHost64(uint64 x) { return x; } + + static uint128 FromHost128(uint128 x) { return x; } + static uint128 ToHost128(uint128 x) { return x; } + + static bool IsLittleEndian() { return false; } + +#endif /* ENDIAN */ + // Functions to do unaligned loads and stores in little-endian order. + static uint16 Load16(const void *p) { + return ToHost16(UNALIGNED_LOAD16(p)); + } + + static void Store16(void *p, uint16 v) { + UNALIGNED_STORE16(p, FromHost16(v)); + } + + static uint32 Load32(const void *p) { + return ToHost32(UNALIGNED_LOAD32(p)); + } + + static void Store32(void *p, uint32 v) { + UNALIGNED_STORE32(p, FromHost32(v)); + } + + static uint64 Load64(const void *p) { + return ToHost64(UNALIGNED_LOAD64(p)); + } + + // Build a uint64 from 1-8 bytes. + // 8 * len least significant bits are loaded from the memory with + // BigEndian order. The 64 - 8 * len most significant bits are + // set all to 0. + // In latex-friendly words, this function returns: + // $\sum_{i=0}^{len-1} p[i] 256^{i}$, where p[i] is unsigned. + // + // This function is equivalent with: + // uint64 val = 0; + // memcpy(&val, p, len); + // return ToHost64(val); + // TODO(user): write a small benchmark and benchmark the speed + // of a memcpy based approach. + // + // For speed reasons this function does not work for len == 0. + // The caller needs to guarantee that 1 <= len <= 8. + static uint64 Load64VariableLength(const void * const p, int len) { + assert(len >= 1 && len <= 8); + uint64 val = Load64(p); + uint64 mask = 0; + --len; + do { + mask = (mask << 8) | 0xff; + // (--len >= 0) is about 10 % faster than (len--) in some benchmarks. + } while (--len >= 0); + return val & mask; + } + + static void Store64(void *p, uint64 v) { + UNALIGNED_STORE64(p, FromHost64(v)); + } + + static uint128 Load128(const void *p) { + return uint128( + ToHost64(UNALIGNED_LOAD64(p)), + ToHost64(UNALIGNED_LOAD64(reinterpret_cast(p) + 1))); + } + + static void Store128(void *p, const uint128 v) { + UNALIGNED_STORE64(p, FromHost64(Uint128High64(v))); + UNALIGNED_STORE64(reinterpret_cast(p) + 1, + FromHost64(Uint128Low64(v))); + } + + // Build a uint128 from 1-16 bytes. + // 8 * len least significant bits are loaded from the memory with + // BigEndian order. The 128 - 8 * len most significant bits are + // set all to 0. + static uint128 Load128VariableLength(const void *p, int len) { + if (len <= 8) { + return uint128(Load64VariableLength(static_cast(p)+8, + len)); + } else { + return uint128( + Load64VariableLength(p, len-8), + Load64(static_cast(p)+8)); + } + } + + // Load & Store in machine's word size. + static uword_t LoadUnsignedWord(const void *p) { + if (sizeof(uword_t) == 8) + return Load64(p); + else + return Load32(p); + } + + static void StoreUnsignedWord(void *p, uword_t v) { + if (sizeof(uword_t) == 8) + Store64(p, v); + else + Store32(p, v); + } +}; // BigEndian + #endif // UTIL_ENDIAN_ENDIAN_H_ diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index c494c8d0f6c6a6..1790f9effa7f98 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -41,6 +41,7 @@ add_library(Olap STATIC hll.cpp in_list_predicate.cpp in_stream.cpp + key_coder.cpp lru_cache.cpp memtable.cpp merger.cpp @@ -63,6 +64,7 @@ add_library(Olap STATIC serialize.cpp storage_engine.cpp data_dir.cpp + short_key_index.cpp snapshot_manager.cpp stream_index_common.cpp stream_index_reader.cpp @@ -84,6 +86,9 @@ add_library(Olap STATIC rowset/segment_v2/encoding_info.cpp rowset/segment_v2/ordinal_page_index.cpp rowset/segment_v2/binary_dict_page.cpp + rowset/segment_v2/segment.cpp + rowset/segment_v2/segment_iterator.cpp + rowset/segment_v2/segment_writer.cpp rowset_factory.cpp task/engine_batch_load_task.cpp task/engine_checksum_task.cpp diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h index 1cebe4eab3e544..7b19d9c5f111ec 100644 --- a/be/src/olap/column_mapping.h +++ b/be/src/olap/column_mapping.h @@ -18,10 +18,10 @@ #ifndef DORIS_BE_SRC_OLAP_COLUMN_MAPPING_H #define DORIS_BE_SRC_OLAP_COLUMN_MAPPING_H -#include "olap/wrapper_field.h" - namespace doris { +class WrapperField; + struct ColumnMapping { ColumnMapping() : ref_column(-1), default_value(nullptr) {} virtual ~ColumnMapping() {} diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp index 70e7146247ecff..b6c36d64f6227e 100644 --- a/be/src/olap/delete_handler.cpp +++ b/be/src/olap/delete_handler.cpp @@ -29,6 +29,7 @@ #include "gen_cpp/olap_file.pb.h" #include "olap/olap_common.h" #include "olap/utils.h" +#include "olap/olap_cond.h" using apache::thrift::ThriftDebugString; using std::numeric_limits; diff --git a/be/src/olap/delete_handler.h b/be/src/olap/delete_handler.h index 9e0f7689a34736..0c23a005ab91f6 100644 --- a/be/src/olap/delete_handler.h +++ b/be/src/olap/delete_handler.h @@ -23,14 +23,14 @@ #include "gen_cpp/AgentService_types.h" #include "gen_cpp/olap_file.pb.h" -#include "olap/field.h" -#include "olap/olap_cond.h" #include "olap/olap_define.h" -#include "olap/row_cursor.h" +#include "olap/tablet_schema.h" namespace doris { typedef google::protobuf::RepeatedPtrField DelPredicateArray; +class Conditions; +class RowCursor; class DeleteConditionHandler { public: diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index d593aef5921a54..d3f63478c4e967 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -18,6 +18,7 @@ #include "olap/delta_writer.h" #include "olap/schema.h" +#include "olap/memtable.h" #include "olap/data_dir.h" #include "olap/rowset/alpha_rowset_writer.h" #include "olap/rowset/rowset_meta_manager.h" diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 16db33d4aab312..c909147b075100 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -18,7 +18,6 @@ #ifndef DORIS_BE_SRC_DELTA_WRITER_H #define DORIS_BE_SRC_DELTA_WRITER_H -#include "olap/memtable.h" #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/schema_change.h" @@ -30,6 +29,8 @@ namespace doris { class SegmentGroup; +class MemTable; +class Schema; enum WriteType { LOAD = 1, diff --git a/be/src/olap/field.h b/be/src/olap/field.h index d6eef2a7753075..460c7bd936277e 100644 --- a/be/src/olap/field.h +++ b/be/src/olap/field.h @@ -26,6 +26,7 @@ #include "olap/olap_define.h" #include "olap/tablet_schema.h" #include "olap/types.h" +#include "olap/key_coder.h" #include "olap/utils.h" #include "olap/row_cursor_cell.h" #include "runtime/mem_pool.h" @@ -57,12 +58,14 @@ class Field { Field(const TabletColumn& column) : _type_info(get_type_info(column.type())), _agg_info(get_aggregate_info(column.aggregation(), column.type())), + _key_coder(get_key_coder(column.type())), _index_size(column.index_length()), _is_nullable(column.is_nullable()) { } Field(FieldType type) : _type_info(get_type_info(type)), _agg_info(get_aggregate_info(OLAP_FIELD_AGGREGATION_NONE, type)), + _key_coder(get_key_coder(type)), _index_size(_type_info->size()), _is_nullable(true) { } @@ -70,6 +73,7 @@ class Field { Field(const FieldAggregationMethod& agg, const FieldType& type, bool is_nullable) : _type_info(get_type_info(type)), _agg_info(get_aggregate_info(agg, type)), + _key_coder(get_key_coder(type)), _index_size(-1), _is_nullable(is_nullable) { } @@ -77,6 +81,7 @@ class Field { Field(const FieldAggregationMethod& agg, const FieldType& type, size_t index_size, bool is_nullable) : _type_info(get_type_info(type)), _agg_info(get_aggregate_info(agg, type)), + _key_coder(get_key_coder(type)), _index_size(index_size), _is_nullable(is_nullable) { } @@ -233,10 +238,19 @@ class Field { FieldType type() const { return _type_info->type(); } const TypeInfo* type_info() const { return _type_info; } bool is_nullable() const { return _is_nullable; } + + void encode_ascending(const void* value, std::string* buf) const { + _key_coder->encode_ascending(value, _index_size, buf); + } + + Status decode_ascending(Slice* encoded_key, uint8_t* cell_ptr, Arena* arena) const { + return _key_coder->decode_ascending(encoded_key, _index_size, cell_ptr, arena); + } private: // Field的最大长度,单位为字节,通常等于length, 变长字符串不同 const TypeInfo* _type_info; const AggregateInfo* _agg_info; + const KeyCoder* _key_coder; uint16_t _index_size; bool _is_nullable; }; diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index d54cef13ff6840..bbb2ce62906b2e 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -17,14 +17,33 @@ #pragma once +#include + #include "common/status.h" namespace doris { +class RowCursor; class RowBlockV2; class Schema; struct StorageReadOptions { + // lower_bound defines the smallest key at which iterator will + // return data. + // If lower_bound is null, won't return + std::shared_ptr lower_bound; + + // If include_lower_bound is true, data equal with lower_bound will + // be read + bool include_lower_bound; + + // upper_bound defines the extend upto which the iterator can return + // data. + std::shared_ptr upper_bound; + + // If include_upper_bound is true, data equal with upper_bound will + // be read + bool include_upper_bound; }; // Used to read data in RowBlockV2 one by one diff --git a/be/src/olap/key_coder.cpp b/be/src/olap/key_coder.cpp new file mode 100644 index 00000000000000..68f2ace8961bcb --- /dev/null +++ b/be/src/olap/key_coder.cpp @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/key_coder.h" + +#include + +namespace doris { + +template +KeyCoder::KeyCoder(TraitsType traits) + : _encode_ascending(traits.encode_ascending), + _decode_ascending(traits.decode_ascending) { +} + +// Helper class used to get KeyCoder +class KeyCoderResolver { +public: + ~KeyCoderResolver() { + for (auto& iter : _coder_map) { + delete iter.second; + } + } + + static KeyCoderResolver* instance() { + static KeyCoderResolver s_instance; + return &s_instance; + } + + KeyCoder* get_coder(FieldType field_type) const { + auto it = _coder_map.find(field_type); + if (it != _coder_map.end()) { + return it->second; + } + return nullptr; + } + +private: + KeyCoderResolver() { + add_mapping(); + add_mapping(); + add_mapping(); + add_mapping(); + add_mapping(); + add_mapping(); + add_mapping(); + + add_mapping(); + add_mapping(); + add_mapping(); + add_mapping(); + } + + template + void add_mapping() { + _coder_map.emplace(field_type, new KeyCoder(KeyCoderTraits())); + } + + std::unordered_map _coder_map; +}; + +const KeyCoder* get_key_coder(FieldType type) { + return KeyCoderResolver::instance()->get_coder(type); +} + +} diff --git a/be/src/olap/key_coder.h b/be/src/olap/key_coder.h new file mode 100644 index 00000000000000..773c4e9ddb3188 --- /dev/null +++ b/be/src/olap/key_coder.h @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "gutil/endian.h" +#include "gutil/strings/substitute.h" +#include "olap/types.h" +#include "util/arena.h" + +namespace doris { + +using strings::Substitute; + +using EncodeAscendingFunc = void (*)(const void* value, size_t index_size, std::string* buf); +using DecodeAscendingFunc = Status (*)(Slice* encoded_key, size_t index_size, uint8_t* cell_ptr, Arena* arena); + +// Helper class that is used to encode types of value in memory format +// into a sorted binary. For example, this class will encode unsigned +// integer to bit endian format which can compare with memcmp. +class KeyCoder { +public: + template + KeyCoder(TraitsType traits); + + void encode_ascending(const void* value, size_t index_size, std::string* buf) const { + _encode_ascending(value, index_size, buf); + } + Status decode_ascending(Slice* encoded_key, size_t index_size, uint8_t* cell_ptr, Arena* arena) const { + return _decode_ascending(encoded_key, index_size, cell_ptr, arena); + } + +private: + EncodeAscendingFunc _encode_ascending; + DecodeAscendingFunc _decode_ascending; +}; + +extern const KeyCoder* get_key_coder(FieldType type); + +template +class KeyCoderTraits { +}; + +template +class KeyCoderTraits::CppType>::value>::type> { +public: + using CppType = typename CppTypeTraits::CppType; + using UnsignedCppType = typename CppTypeTraits::UnsignedCppType; + +private: + // Swap value's endian from/to big endian + static UnsignedCppType swap_big_endian(UnsignedCppType val) { + switch (sizeof(UnsignedCppType)) { + case 1: return val; + case 2: return BigEndian::FromHost16(val); + case 4: return BigEndian::FromHost32(val); + case 8: return BigEndian::FromHost64(val); + case 16: return BigEndian::FromHost128(val); + default: LOG(FATAL) << "Invalid type to big endian, type=" << field_type + << ", size=" << sizeof(UnsignedCppType); + } + } + +public: + static void encode_ascending(const void* value, size_t index_size, std::string* buf) { + UnsignedCppType unsigned_val; + memcpy(&unsigned_val, value, sizeof(unsigned_val)); + // swap MSB to encode integer + if (std::is_signed::value) { + unsigned_val ^= (static_cast(1) << (sizeof(UnsignedCppType) * CHAR_BIT - 1)); + } + // make it bigendian + unsigned_val = swap_big_endian(unsigned_val); + + buf->append((char*)&unsigned_val, sizeof(unsigned_val)); + } + + static Status decode_ascending(Slice* encoded_key, size_t index_size, + uint8_t* cell_ptr, Arena* arena) { + if (encoded_key->size < sizeof(UnsignedCppType)) { + return Status::InvalidArgument( + Substitute("Key too short, need=$0 vs real=$1", + sizeof(UnsignedCppType), encoded_key->size)); + } + UnsignedCppType unsigned_val; + memcpy(&unsigned_val, encoded_key->data, sizeof(UnsignedCppType)); + unsigned_val = swap_big_endian(unsigned_val); + if (std::is_signed::value) { + unsigned_val ^= (static_cast(1) << (sizeof(UnsignedCppType) * CHAR_BIT - 1)); + } + memcpy(cell_ptr, &unsigned_val, sizeof(UnsignedCppType)); + encoded_key->remove_prefix(sizeof(UnsignedCppType)); + return Status::OK(); + } +}; + +template<> +class KeyCoderTraits { +public: + using CppType = typename CppTypeTraits::CppType; + using UnsignedCppType = typename CppTypeTraits::UnsignedCppType; + +public: + static void encode_ascending(const void* value, size_t index_size, std::string* buf) { + UnsignedCppType unsigned_val; + memcpy(&unsigned_val, value, sizeof(unsigned_val)); + // make it bigendian + unsigned_val = BigEndian::FromHost24(unsigned_val); + buf->append((char*)&unsigned_val, sizeof(unsigned_val)); + } + + static Status decode_ascending(Slice* encoded_key, size_t index_size, + uint8_t* cell_ptr, Arena* arena) { + if (encoded_key->size < sizeof(UnsignedCppType)) { + return Status::InvalidArgument( + Substitute("Key too short, need=$0 vs real=$1", + sizeof(UnsignedCppType), encoded_key->size)); + } + UnsignedCppType unsigned_val; + memcpy(&unsigned_val, encoded_key->data, sizeof(UnsignedCppType)); + unsigned_val = BigEndian::FromHost24(unsigned_val); + memcpy(cell_ptr, &unsigned_val, sizeof(UnsignedCppType)); + encoded_key->remove_prefix(sizeof(UnsignedCppType)); + return Status::OK(); + } +}; + +template<> +class KeyCoderTraits { +public: + static void encode_ascending(const void* value, size_t index_size, std::string* buf) { + decimal12_t decimal_val; + memcpy(&decimal_val, value, sizeof(decimal12_t)); + // encode integer + KeyCoderTraits::encode_ascending( + &decimal_val.integer, sizeof(decimal_val.integer), buf); + // encode integer + KeyCoderTraits::encode_ascending( + &decimal_val.fraction, sizeof(decimal_val.fraction), buf); + } + + static Status decode_ascending(Slice* encoded_key, size_t index_size, + uint8_t* cell_ptr, Arena* arena) { + decimal12_t decimal_val; + RETURN_IF_ERROR(KeyCoderTraits::decode_ascending( + encoded_key, sizeof(decimal_val.integer), (uint8_t*)&decimal_val.integer, arena)); + RETURN_IF_ERROR(KeyCoderTraits::decode_ascending( + encoded_key, sizeof(decimal_val.fraction), (uint8_t*)&decimal_val.fraction, arena)); + memcpy(cell_ptr, &decimal_val, sizeof(decimal12_t)); + return Status::OK(); + } +}; + +template<> +class KeyCoderTraits { +public: + static void encode_ascending(const void* value, size_t index_size, std::string* buf) { + const Slice* slice = (const Slice*)value; + CHECK(index_size <= slice->size) << "index size is larger than char size, index=" << index_size << ", char=" << slice->size; + buf->append(slice->data, index_size); + } + + static Status decode_ascending(Slice* encoded_key, size_t index_size, + uint8_t* cell_ptr, Arena* arena) { + if (encoded_key->size < index_size) { + return Status::InvalidArgument( + Substitute("Key too short, need=$0 vs real=$1", + index_size, encoded_key->size)); + } + Slice* slice = (Slice*)cell_ptr; + slice->data = arena->Allocate(index_size); + slice->size = index_size; + memcpy(slice->data, encoded_key->data, index_size); + encoded_key->remove_prefix(index_size); + return Status::OK(); + } +}; + +template<> +class KeyCoderTraits { +public: + static void encode_ascending(const void* value, size_t index_size, std::string* buf) { + const Slice* slice = (const Slice*)value; + size_t copy_size = std::min(index_size, slice->size); + buf->append(slice->data, copy_size); + } + + static Status decode_ascending(Slice* encoded_key, size_t index_size, + uint8_t* cell_ptr, Arena* arena) { + CHECK(encoded_key->size <= index_size) + << "encoded_key size is larger than index_size, key_size=" << encoded_key->size + << ", index_size=" << index_size; + auto copy_size = encoded_key->size; + Slice* slice = (Slice*)cell_ptr; + slice->data = arena->Allocate(copy_size); + slice->size = copy_size; + memcpy(slice->data, encoded_key->data, copy_size); + encoded_key->remove_prefix(copy_size); + return Status::OK(); + } +}; + +} diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 0f60fc8e344a6b..57e3533b52e0f8 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -26,6 +26,7 @@ namespace doris { +class RowBlock; class RowsetReader; using RowsetReaderSharedPtr = std::shared_ptr; diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 9d516242ee6ee2..f2c51100f75fde 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -18,17 +18,17 @@ #ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H #define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H -#include "olap/schema.h" #include "olap/column_predicate.h" -#include "olap/row_cursor.h" -#include "olap/row_block.h" #include "olap/lru_cache.h" -#include "olap/olap_cond.h" -#include "olap/delete_handler.h" #include "runtime/runtime_state.h" namespace doris { +class RowCursor; +class Conditions; +class DeleteHandler; +class TabletSchema; + struct RowsetReaderContext { RowsetReaderContext() : reader_type(READER_QUERY), tablet_schema(nullptr), diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 9d3eea05243f94..a637dae353e09c 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -20,14 +20,14 @@ #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_writer_context.h" -#include "olap/schema.h" -#include "olap/row_block.h" #include "gen_cpp/types.pb.h" #include "runtime/mem_pool.h" +#include "olap/column_mapping.h" namespace doris { class ContiguousRow; +class RowCursor; class RowsetWriter; using RowsetWriterSharedPtr = std::shared_ptr; diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index d137059e2141b7..2e7d778d8f1541 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -58,6 +58,18 @@ class ColumnWriter { ~ColumnWriter(); Status init(); + + template + Status append(const CellType& cell) { + if (_is_nullable) { + uint8_t nullmap = 0; + BitmapChange(&nullmap, 0, cell.is_null()); + return append_nullable(&nullmap, cell.cell_ptr(), 1); + } else { + return append(cell.cell_ptr(), 1); + } + } + // Now we only support append one by one, we should support append // multi rows in one call Status append(bool is_null, void* data) { diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp new file mode 100644 index 00000000000000..28c0b8f372371c --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/segment.h" + +#include "common/logging.h" // LOG +#include "env/env.h" // RandomAccessFile +#include "gutil/strings/substitute.h" +#include "olap/rowset/segment_v2/column_reader.h" // ColumnReader +#include "olap/rowset/segment_v2/segment_writer.h" // k_segment_magic_length +#include "olap/rowset/segment_v2/segment_iterator.h" +#include "util/slice.h" // Slice +#include "olap/tablet_schema.h" + +namespace doris { +namespace segment_v2 { + +using strings::Substitute; + +Segment::Segment( + std::string fname, uint32_t segment_id, + const std::shared_ptr& tablet_schema, + size_t num_rows_per_block) + : _fname(std::move(fname)), + _segment_id(segment_id), + _tablet_schema(tablet_schema), + _num_rows_per_block(num_rows_per_block) { +} + +Segment::~Segment() { + for (auto reader : _column_readers) { + delete reader; + } +} + +Status Segment::open() { + RETURN_IF_ERROR(Env::Default()->new_random_access_file(_fname, &_input_file)); + RETURN_IF_ERROR(_input_file->size(&_file_size)); + + // 24: 1 * magic + 1 * checksum + 1 * footer length + if (_file_size < 12) { + return Status::Corruption( + Substitute("Bad segment, file size is too small, real=$0 vs need=$1", + _file_size, 12)); + } + + // check header's magic + RETURN_IF_ERROR(_check_magic(0)); + + // parse footer to get meta + RETURN_IF_ERROR(_parse_footer()); + // parse short key index + RETURN_IF_ERROR(_parse_index()); + // initial all column reader + RETURN_IF_ERROR(_initial_column_readers()); + return Status::OK(); +} + +Status Segment::new_iterator(const Schema& schema, std::unique_ptr* output) { + output->reset(new SegmentIterator(this->shared_from_this(), schema)); + return Status::OK(); +} + +// Read data at offset of input file, check if the file content match the magic +Status Segment::_check_magic(uint64_t offset) { + // read magic and length + uint8_t buf[k_segment_magic_length]; + Slice slice(buf, k_segment_magic_length); + RETURN_IF_ERROR(_input_file->read_at(offset, slice)); + + if (memcmp(slice.data, k_segment_magic, k_segment_magic_length) != 0) { + return Status::Corruption( + Substitute("Bad segment, file magic don't match, magic=$0 vs need=$1", + std::string((char*)buf, k_segment_magic_length), k_segment_magic)); + } + return Status::OK(); +} + +Status Segment::_parse_footer() { + uint64_t offset = _file_size - 8; + // read footer's length and checksum + uint8_t buf[8]; + Slice slice(buf, 8); + RETURN_IF_ERROR(_input_file->read_at(offset, slice)); + + uint32_t footer_length = decode_fixed32_le((uint8_t*)slice.data); + uint32_t checksum = decode_fixed32_le((uint8_t*)slice.data + 4); + + // check file size footer + if (offset < footer_length) { + return Status::Corruption( + Substitute("Bad segment, file size is too small, file_size=$0 vs footer_size=$1", + _file_size, footer_length)); + } + offset -= footer_length; + + std::string footer_buf; + footer_buf.resize(footer_length); + RETURN_IF_ERROR(_input_file->read_at(offset, footer_buf)); + + // TODO(zc): check footer's checksum + if (checksum != 0) { + return Status::Corruption( + Substitute("Bad segment, segment footer checksum not match, real=$0 vs expect=$1", + 0, checksum)); + } + + if (!_footer.ParseFromString(footer_buf)) { + return Status::Corruption("Bad segment, parse footer from PB failed"); + } + + return Status::OK(); +} + +// load and parse short key index +Status Segment::_parse_index() { + // read short key index content + _sk_index_buf.resize(_footer.short_key_index_page().size()); + Slice slice(_sk_index_buf.data(), _sk_index_buf.size()); + RETURN_IF_ERROR(_input_file->read_at(_footer.short_key_index_page().offset(), slice)); + + // Parse short key index + _sk_index_decoder.reset(new ShortKeyIndexDecoder(_sk_index_buf)); + RETURN_IF_ERROR(_sk_index_decoder->parse()); + return Status::OK(); +} + +Status Segment::_initial_column_readers() { + // Map from column unique id to column ordinal in footer's ColumnMetaPB + // If we can't find unique id, it means this segment is created + // with an old schema. So we should create a DefaultValueIterator + // for this column. + std::unordered_map unique_id_to_ordinal; + for (uint32_t ordinal = 0; ordinal < _footer.columns().size(); ++ordinal) { + auto& column_pb = _footer.columns(ordinal); + unique_id_to_ordinal.emplace(column_pb.unique_id(), ordinal); + } + // TODO(zc): Lazy init()? + // There may be too many columns, majority of them would not be used + // in query, so we should not init them here. + _column_readers.resize(_tablet_schema->columns().size(), nullptr); + + for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns(); ++ordinal) { + auto& column = _tablet_schema->columns()[ordinal]; + auto iter = unique_id_to_ordinal.find(column.unique_id()); + if (iter == unique_id_to_ordinal.end()) { + continue; + } + + ColumnReaderOptions opts; + std::unique_ptr reader( + new ColumnReader(opts, _footer.columns(iter->second), _input_file.get())); + RETURN_IF_ERROR(reader->init()); + + _column_readers[ordinal] = reader.release(); + } + return Status::OK(); +} + +Status Segment::new_column_iterator(uint32_t cid, ColumnIterator** iter) { + if (_column_readers[cid] == nullptr) { + // TODO(zc): create a DefaultValueIterator for this column + // create + } + return _column_readers[cid]->new_iterator(iter); +} + +} +} diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h new file mode 100644 index 00000000000000..e69a10e7513ad0 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include // for unique_ptr +#include + +#include "common/status.h" // Status +#include "gen_cpp/segment_v2.pb.h" +#include "olap/rowset/segment_v2/common.h" // rowid_t +#include "olap/short_key_index.h" +#include "olap/tablet_schema.h" +#include "util/faststring.h" + +namespace doris { + +class RandomAccessFile; +class SegmentGroup; +class FieldInfo; +class TabletSchema; +class ShortKeyIndexDecoder; +class Schema; + +namespace segment_v2 { + +class ColumnReader; +class ColumnIterator; +class SegmentIterator; + +// A Segment is used to represent a segment in memory format. When segment is +// generated, it won't be modified, so this struct aimed to help read operation. +// It will prepare all ColumnReader to create ColumnIterator as needed. +// And user can create a SegmentIterator through new_iterator function. +// +// NOTE: This segment is used to a specified TabletSchema, when TabletSchema +// is changed, this segemnt can not be used any more. For eample, after a schema +// change finished, client should disalbe all cahced Segment for old TabletSchema. +class Segment : public std::enable_shared_from_this { +public: + Segment(std::string fname, uint32_t segment_id, + const std::shared_ptr& tablet_schema, + size_t num_rows_per_block); + ~Segment(); + + Status open(); + + Status new_iterator(const Schema& schema, std::unique_ptr* iter); + + uint64_t id() const { return _segment_id; } + + uint32_t num_rows() const { return _footer.num_rows(); } + +private: + friend class SegmentIterator; + + Status new_column_iterator(uint32_t cid, ColumnIterator** iter); + uint32_t num_rows_per_block() const { return _num_rows_per_block; } + size_t num_short_keys() const { return _tablet_schema->num_short_key_columns(); } + + Status _check_magic(uint64_t offset); + Status _parse_footer(); + Status _parse_index(); + Status _initial_column_readers(); + + ShortKeyIndexIterator lower_bound(const Slice& key) const { + return _sk_index_decoder->lower_bound(key); + } + ShortKeyIndexIterator upper_bound(const Slice& key) const { + return _sk_index_decoder->upper_bound(key); + } + + // This will return the last row block in this segment. + // NOTE: Before call this function , client should assure that + // this segment is not empty. + uint32_t last_block() const { + DCHECK(num_rows() > 0); + return _sk_index_decoder->num_items() - 1; + } + +private: + std::string _fname; + uint32_t _segment_id; + std::shared_ptr _tablet_schema; + uint32_t _num_rows_per_block; + + SegmentFooterPB _footer; + std::unique_ptr _input_file; + uint64_t _file_size = 0; + + // ColumnReader for each column in TabletSchema. If ColumnReader is nullptr, + // This means that this segment has no data for that column, which may be added + // after this segment is generated. + std::vector _column_readers; + + // used to store short key index + faststring _sk_index_buf; + + // short key index decoder + std::unique_ptr _sk_index_decoder; +}; + +} +} diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp new file mode 100644 index 00000000000000..8ad70a1d525c34 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -0,0 +1,258 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/segment_iterator.h" + +#include + +#include "gutil/strings/substitute.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/column_reader.h" +#include "olap/row_block2.h" +#include "olap/row_cursor.h" +#include "olap/short_key_index.h" + +using strings::Substitute; + +namespace doris { +namespace segment_v2 { + +SegmentIterator::SegmentIterator(std::shared_ptr segment, + const Schema& schema) + : _segment(std::move(segment)), + _schema(schema), + _column_iterators(_schema.num_columns(), nullptr) { +} + +SegmentIterator::~SegmentIterator() { + for (auto iter : _column_iterators) { + delete iter; + } +} + +Status SegmentIterator::init(const StorageReadOptions& opts) { + _opts = opts; + RETURN_IF_ERROR(_init_short_key_range()); + RETURN_IF_ERROR(_init_column_iterators()); + return Status::OK(); +} + +// This function will use input key bounds to get a row range. +Status SegmentIterator::_init_short_key_range() { + _lower_rowid = 0; + _upper_rowid = num_rows(); + + // fast path for empty segment + if (_upper_rowid == 0) { + return Status::OK(); + } + + if (_opts.lower_bound == nullptr && _opts.upper_bound == nullptr) { + return Status::OK(); + } + + RETURN_IF_ERROR(_prepare_seek()); + + // init row range with short key range + if (_opts.upper_bound != nullptr) { + // If client want to read upper_bound, the include_upper_bound is true. So we + // should get the first ordinal at which key is larger than upper_bound. + // So we call _lookup_ordinal with include_upper_bound's negate + RETURN_IF_ERROR(_lookup_ordinal( + *_opts.upper_bound, !_opts.include_upper_bound, num_rows(), &_upper_rowid)); + } + if (_upper_rowid > 0 && _opts.lower_bound != nullptr) { + RETURN_IF_ERROR(_lookup_ordinal( + *_opts.lower_bound, _opts.include_lower_bound, _upper_rowid, &_lower_rowid)); + } + + return Status::OK(); +} + +// Set up environment for the following seek. +Status SegmentIterator::_prepare_seek() { + std::vector key_fields; + std::set column_set; + if (_opts.lower_bound != nullptr) { + for (auto cid : _opts.lower_bound->schema()->column_ids()) { + column_set.emplace(cid); + key_fields.push_back(*_opts.lower_bound->schema()->column(cid)); + } + } + if (_opts.upper_bound != nullptr) { + for (auto cid : _opts.upper_bound->schema()->column_ids()) { + if (column_set.count(cid) == 0) { + key_fields.push_back(*_opts.upper_bound->schema()->column(cid)); + column_set.emplace(cid); + } + } + } + _seek_schema.reset(new Schema(key_fields, key_fields.size())); + _seek_block.reset(new RowBlockV2(*_seek_schema, 1, &_arena)); + + // create used column iterator + for (auto cid : _seek_schema->column_ids()) { + if (_column_iterators[cid] == nullptr) { + RETURN_IF_ERROR(_create_column_iterator(cid, &_column_iterators[cid])); + } + } + + return Status::OK(); +} + +Status SegmentIterator::_init_column_iterators() { + _cur_rowid = _lower_rowid; + if (_cur_rowid >= num_rows()) { + return Status::OK(); + } + for (auto cid : _schema.column_ids()) { + if (_column_iterators[cid] == nullptr) { + RETURN_IF_ERROR(_create_column_iterator(cid, &_column_iterators[cid])); + } + + _column_iterators[cid]->seek_to_ordinal(_cur_rowid); + } + return Status::OK(); +} + +Status SegmentIterator::_create_column_iterator(uint32_t cid, ColumnIterator** iter) { + return _segment->new_column_iterator(cid, iter); +} + +// Schema of lhs and rhs are different. +// callers should assure that rhs' schema has all columns in lhs schema +template +int compare_row_with_lhs_columns(const LhsRowType& lhs, const RhsRowType& rhs) { + for (auto cid : lhs.schema()->column_ids()) { + auto res = lhs.schema()->column(cid)->compare_cell(lhs.cell(cid), rhs.cell(cid)); + if (res != 0) { + return res; + } + } + return 0; +} + +// look up one key to get its ordinal at which can get data. +// 'upper_bound' is defined the max ordinal the function will search. +// We use upper_bound to reduce search times. +// If we find a valid ordinal, it will be set in rowid and with Status::OK() +// If we can not find a valid key in this segment, we will set rowid to upper_bound +// Otherwise return error. +// 1. get [start, end) ordinal through short key index +// 2. binary search to find exact ordinal that match the input condition +// Make is_include template to reduce branch +Status SegmentIterator::_lookup_ordinal(const RowCursor& key, bool is_include, + rowid_t upper_bound, rowid_t* rowid) { + std::string index_key; + encode_key_with_padding(&index_key, key, _segment->num_short_keys(), is_include); + + uint32_t start_block_id = 0; + auto start_iter = _segment->lower_bound(index_key); + if (start_iter.valid()) { + // Because previous block may contain this key, so we should set rowid to + // last block's first row. + start_block_id = start_iter.ordinal(); + if (start_block_id > 0) { + start_block_id--; + } + } else { + // When we don't find a valid index item, which means all short key is + // smaller than input key, this means that this key may exist in the last + // row block. so we set the rowid to first row of last row block. + start_block_id = _segment->last_block(); + } + rowid_t start = start_block_id * _segment->num_rows_per_block(); + + rowid_t end = upper_bound; + auto end_iter = _segment->upper_bound(index_key); + if (end_iter.valid()) { + end = end_iter.ordinal() * _segment->num_rows_per_block(); + } + + // binary search to find the exact key + while (start < end) { + rowid_t mid = (start + end) / 2; + RETURN_IF_ERROR(_seek_and_peek(mid)); + int cmp = compare_row_with_lhs_columns(key, _seek_block->row(0)); + if (cmp > 0) { + start = mid + 1; + } else if (cmp == 0) { + if (is_include) { + // lower bound + end = mid; + } else { + // upper bound + start = mid + 1; + } + } else { + end = mid; + } + } + + *rowid = start; + return Status::OK(); +} + +// seek to the row and load that row to _key_cursor +Status SegmentIterator::_seek_and_peek(rowid_t rowid) { + for (auto cid : _seek_schema->column_ids()) { + _column_iterators[cid]->seek_to_ordinal(rowid); + } + size_t num_rows = 1; + _seek_block->resize(num_rows); + RETURN_IF_ERROR(_next_batch(_seek_block.get(), &num_rows)); + return Status::OK(); +} + +// Try to read data as much to block->num_rows(). The number of read rows +// will be set in rows_read when return OK. rows_read will small than +// block->num_rows() when reach the end of this segment +Status SegmentIterator::_next_batch(RowBlockV2* block, size_t* rows_read) { + bool has_read = false; + size_t first_read = 0; + for (int i = 0; i < block->schema()->column_ids().size(); ++i) { + auto cid = block->schema()->column_ids()[i]; + size_t num_rows = has_read ? first_read : block->num_rows(); + auto column_block = block->column_block(i); + RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&num_rows, &column_block)); + if (!has_read) { + has_read = true; + first_read = num_rows; + } else if (num_rows != first_read) { + return Status::InternalError( + Substitute("Read different rows in different columns" + ", column($0) read $1 vs column($2) read $3", + block->schema()->column_ids()[0], first_read, cid, num_rows)); + } + } + *rows_read = first_read; + return Status::OK(); +} + +Status SegmentIterator::next_batch(RowBlockV2* block) { + size_t rows_to_read = std::min((rowid_t)block->capacity(), _upper_rowid - _cur_rowid); + block->resize(rows_to_read); + if (rows_to_read == 0) { + return Status::OK(); + } + RETURN_IF_ERROR(_next_batch(block, &rows_to_read)); + _cur_rowid += rows_to_read; + return Status::OK(); +} + +} +} diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h new file mode 100644 index 00000000000000..46a1e696b307a9 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "olap/rowset/segment_v2/common.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/iterators.h" +#include "olap/schema.h" +#include "util/arena.h" + +namespace doris { + +class RowCursor; +class RowBlockV2; +class ShortKeyIndexIterator; + +namespace segment_v2 { + +class ColumnIterator; + +class SegmentIterator : public RowwiseIterator { +public: + SegmentIterator(std::shared_ptr segment, const Schema& _schema); + ~SegmentIterator() override; + Status init(const StorageReadOptions& opts) override; + Status next_batch(RowBlockV2* row_block) override; + const Schema& schema() const override { return _schema; } +private: + Status _init_short_key_range(); + Status _prepare_seek(); + Status _init_column_iterators(); + Status _create_column_iterator(uint32_t cid, ColumnIterator** iter); + + Status _lookup_ordinal(const RowCursor& key, bool is_include, + rowid_t upper_bound, rowid_t* rowid); + Status _seek_and_peek(rowid_t rowid); + Status _next_batch(RowBlockV2* block, size_t* rows_read); + + uint32_t segment_id() const { return _segment->id(); } + uint32_t num_rows() const { return _segment->num_rows(); } + +private: + std::shared_ptr _segment; + // TODO(zc): rethink if we need copy it + Schema _schema; + + StorageReadOptions _opts; + + // Only used when init is called, help to finish seek_and_peek. + // Data will be saved in this batch + std::unique_ptr _seek_schema; + + // used to read data from columns when do bianry search to find + // oridnal for input bounds + std::unique_ptr _seek_block; + // helper to save row to compare with input bounds + std::unique_ptr _key_cursor; + + std::vector _column_iterators; + + rowid_t _lower_rowid; + rowid_t _upper_rowid; + rowid_t _cur_rowid; + + Arena _arena; +}; + +} +} diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp new file mode 100644 index 00000000000000..c3975732c30e55 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/segment_writer.h" + +#include "env/env.h" // Env +#include "olap/row_block.h" // RowBlock +#include "olap/row_cursor.h" // RowCursor +#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter +#include "olap/short_key_index.h" + +namespace doris { +namespace segment_v2 { + +const char* k_segment_magic = "D0R1"; +const uint32_t k_segment_magic_length = 4; + +SegmentWriter::SegmentWriter(std::string fname, uint32_t segment_id, + const std::shared_ptr& tablet_schema, + const SegmentWriterOptions& opts) + : _fname(std::move(fname)), + _segment_id(segment_id), + _tablet_schema(tablet_schema), + _opts(opts) { +} + +SegmentWriter::~SegmentWriter() { + for (auto writer : _column_writers) { + delete writer; + } +} + +Status SegmentWriter::init(uint32_t write_mbytes_per_sec) { + // create for write + RETURN_IF_ERROR(Env::Default()->new_writable_file(_fname, &_output_file)); + + uint32_t column_id = 0; + for (auto& column : _tablet_schema->columns()) { + ColumnMetaPB* column_meta = _footer.add_columns(); + // TODO(zc): Do we need this column_id?? + column_meta->set_column_id(column_id++); + column_meta->set_unique_id(column.unique_id()); + bool is_nullable = column.is_nullable(); + column_meta->set_is_nullable(is_nullable); + + // TODO(zc): we can add type_info into TabletColumn? + const TypeInfo* type_info = get_type_info(column.type()); + DCHECK(type_info != nullptr); + + ColumnWriterOptions opts; + std::unique_ptr writer(new ColumnWriter(opts, type_info, is_nullable, _output_file.get())); + RETURN_IF_ERROR(writer->init()); + _column_writers.push_back(writer.release()); + } + _index_builder.reset(new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block)); + return Status::OK(); +} + +template +Status SegmentWriter::append_row(const RowType& row) { + for (size_t cid = 0; cid < _column_writers.size(); ++cid) { + auto cell = row.cell(cid); + RETURN_IF_ERROR(_column_writers[cid]->append(cell)); + } + + if ((_row_count % _opts.num_rows_per_block) == 0) { + std::string encoded_key; + encode_key(&encoded_key, row, _tablet_schema->num_short_key_columns()); + RETURN_IF_ERROR(_index_builder->add_item(encoded_key)); + _block_count++; + } + _row_count++; + return Status::OK(); +} + +template Status SegmentWriter::append_row(const RowCursor& row); + +uint64_t SegmentWriter::estimate_segment_size() { + return 0; +} + +Status SegmentWriter::finalize(uint32_t* segment_file_size) { + for (auto column_writer : _column_writers) { + RETURN_IF_ERROR(column_writer->finish()); + } + RETURN_IF_ERROR(_write_raw_data({k_segment_magic})); + RETURN_IF_ERROR(_write_data()); + RETURN_IF_ERROR(_write_ordinal_index()); + RETURN_IF_ERROR(_write_short_key_index()); + RETURN_IF_ERROR(_write_footer()); + return Status::OK(); +} + +// write column data to file one by one +Status SegmentWriter::_write_data() { + for (auto column_writer : _column_writers) { + RETURN_IF_ERROR(column_writer->write_data()); + } + return Status::OK(); +} + +// write ordinal index after data has been written +Status SegmentWriter::_write_ordinal_index() { + for (auto column_writer : _column_writers) { + RETURN_IF_ERROR(column_writer->write_ordinal_index()); + } + return Status::OK(); +} + +Status SegmentWriter::_write_short_key_index() { + std::vector slices; + // TODO(zc): we should get segment_size + RETURN_IF_ERROR(_index_builder->finalize(_row_count * 100, _row_count, &slices)); + + uint64_t offset = _output_file->size(); + RETURN_IF_ERROR(_write_raw_data(slices)); + uint32_t written_bytes = _output_file->size() - offset; + + _footer.mutable_short_key_index_page()->set_offset(offset); + _footer.mutable_short_key_index_page()->set_size(written_bytes); + return Status::OK(); +} + +Status SegmentWriter::_write_footer() { + _footer.set_num_rows(_row_count); + // collect all + for (int i = 0; i < _column_writers.size(); ++i) { + _column_writers[i]->write_meta(_footer.mutable_columns(i)); + } + + // write footer + std::string footer_buf; + if (!_footer.SerializeToString(&footer_buf)) { + return Status::InternalError("failed to serialize segment footer"); + } + + std::string footer_info_buf; + // put footer's size + put_fixed32_le(&footer_info_buf, footer_buf.size()); + // TODO(zc): compute checksum for footer + uint32_t checksum = 0; + put_fixed32_le(&footer_info_buf, checksum); + + // I think we don't need to put a tail magic. + + std::vector slices{footer_buf, footer_info_buf}; + // write offset and length + RETURN_IF_ERROR(_write_raw_data(slices)); + return Status::OK(); +} + +Status SegmentWriter::_write_raw_data(const std::vector& slices) { + RETURN_IF_ERROR(_output_file->appendv(&slices[0], slices.size())); + return Status::OK(); +} + +} +} diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h new file mode 100644 index 00000000000000..a6a1bd8a26b20a --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include // unique_ptr +#include +#include + +#include "common/logging.h" // LOG +#include "common/status.h" // Status +#include "gen_cpp/segment_v2.pb.h" +#include "olap/schema.h" + +namespace doris { + +class WritableFile; +class RowBlock; +class RowCursor; +class ShortKeyIndexBuilder; + +namespace segment_v2 { + +class ColumnWriter; + +extern const char* k_segment_magic; +extern const uint32_t k_segment_magic_length; + +struct SegmentWriterOptions { + uint32_t num_rows_per_block = 1024; +}; + +class SegmentWriter { +public: + explicit SegmentWriter(std::string file_name, + uint32_t segment_id, + const std::shared_ptr& tablet_schema, + const SegmentWriterOptions& opts); + + ~SegmentWriter(); + Status init(uint32_t write_mbytes_per_sec); + + template + Status append_row(const RowType& row); + + uint64_t estimate_segment_size(); + + Status finalize(uint32_t* segment_file_size); + +private: + Status _write_data(); + Status _write_ordinal_index(); + Status _write_short_key_index(); + Status _write_footer(); + Status _write_raw_data(const std::vector& slices); + +private: + std::string _fname; + uint32_t _segment_id; + std::shared_ptr _tablet_schema; + size_t _num_short_keys; + SegmentWriterOptions _opts; + + SegmentFooterPB _footer; + std::unique_ptr _index_builder; + std::unique_ptr _output_file; + std::vector _column_writers; + uint64_t _row_count = 0; + uint32_t _block_count = 0; +}; + +} +} diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index dcc84e504e7be5..7afeefccdbd262 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -45,6 +45,51 @@ using std::vector; namespace doris { +class RowBlockSorter { +public: + explicit RowBlockSorter(RowBlockAllocator* allocator); + virtual ~RowBlockSorter(); + + bool sort(RowBlock** row_block); + +private: + static bool _row_cursor_comparator(const RowCursor* a, const RowCursor* b) { + return compare_row(*a, *b) < 0; + } + + RowBlockAllocator* _row_block_allocator; + RowBlock* _swap_row_block; +}; + +class RowBlockMerger { +public: + explicit RowBlockMerger(TabletSharedPtr tablet); + virtual ~RowBlockMerger(); + + bool merge( + const std::vector& row_block_arr, + RowsetWriterSharedPtr rowset_writer, + uint64_t* merged_rows); + +private: + struct MergeElement { + bool operator<(const MergeElement& other) const { + return compare_row(*row_cursor, *other.row_cursor) > 0; + } + + const RowBlock* row_block; + RowCursor* row_cursor; + uint32_t row_block_index; + }; + + bool _make_heap(const std::vector& row_block_arr); + bool _pop_heap(); + + TabletSharedPtr _tablet; + std::priority_queue _heap; +}; + + RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, const TabletSharedPtr &base_tablet) { _schema_mapping.resize(tablet_schema.num_columns()); diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 6081e6a6889e1b..7aa24f5d68a4c9 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -28,7 +28,6 @@ #include "olap/rowset/rowset_writer.h" #include "olap/tablet.h" #include "olap/column_mapping.h" -#include "olap/row.h" namespace doris { // defined in 'field.h' @@ -74,23 +73,6 @@ class RowBlockChanger { DISALLOW_COPY_AND_ASSIGN(RowBlockChanger); }; -class RowBlockAllocator; -class RowBlockSorter { -public: - explicit RowBlockSorter(RowBlockAllocator* allocator); - virtual ~RowBlockSorter(); - - bool sort(RowBlock** row_block); - -private: - static bool _row_cursor_comparator(const RowCursor* a, const RowCursor* b) { - return compare_row(*a, *b) < 0; - } - - RowBlockAllocator* _row_block_allocator; - RowBlock* _swap_row_block; -}; - class RowBlockAllocator { public: RowBlockAllocator(const TabletSchema& tablet_schema, size_t memory_limitation); @@ -107,34 +89,6 @@ class RowBlockAllocator { size_t _memory_limitation; }; -class RowBlockMerger { -public: - explicit RowBlockMerger(TabletSharedPtr tablet); - virtual ~RowBlockMerger(); - - bool merge( - const std::vector& row_block_arr, - RowsetWriterSharedPtr rowset_writer, - uint64_t* merged_rows); - -private: - struct MergeElement { - bool operator<(const MergeElement& other) const { - return compare_row(*row_cursor, *other.row_cursor) > 0; - } - - const RowBlock* row_block; - RowCursor* row_cursor; - uint32_t row_block_index; - }; - - bool _make_heap(const std::vector& row_block_arr); - bool _pop_heap(); - - TabletSharedPtr _tablet; - std::priority_queue _heap; -}; - class SchemaChange { public: SchemaChange() : _filtered_rows(0), _merged_rows(0) {} diff --git a/be/src/olap/short_key_index.cpp b/be/src/olap/short_key_index.cpp new file mode 100644 index 00000000000000..03d66dffbf2564 --- /dev/null +++ b/be/src/olap/short_key_index.cpp @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/short_key_index.h" + +#include + +#include "util/coding.h" +#include "gutil/strings/substitute.h" + +using strings::Substitute; + +namespace doris { + +Status ShortKeyIndexBuilder::add_item(const Slice& key) { + put_varint32(&_offset_buf, _key_buf.size()); + _footer.set_num_items(_footer.num_items() + 1); + _key_buf.append(key.data, key.size); + return Status::OK(); +} + +Status ShortKeyIndexBuilder::finalize(uint32_t segment_bytes, + uint32_t num_segment_rows, + std::vector* slices) { + _footer.set_num_segment_rows(num_segment_rows); + _footer.set_segment_bytes(segment_bytes); + _footer.set_key_bytes(_key_buf.size()); + _footer.set_offset_bytes(_offset_buf.size()); + + // encode header + if (!_footer.SerializeToString(&_footer_buf)) { + return Status::InternalError("Failed to serialize index footer"); + } + + put_fixed32_le(&_footer_buf, _footer_buf.size()); + // TODO(zc): checksum + uint32_t checksum = 0; + put_fixed32_le(&_footer_buf, checksum); + + slices->emplace_back(_key_buf); + slices->emplace_back(_offset_buf); + slices->emplace_back(_footer_buf); + return Status::OK(); +} + +Status ShortKeyIndexDecoder::parse() { + Slice data = _data; + + // 1. parse footer, get checksum and footer length + if (data.size < 2 * sizeof(uint32_t)) { + return Status::Corruption( + Substitute("Short key is too short, need=$0 vs real=$1", + 2 * sizeof(uint32_t), data.size)); + } + size_t offset = data.size - 2 * sizeof(uint32_t); + uint32_t footer_length = decode_fixed32_le((uint8_t*)data.data + offset); + uint32_t checksum = decode_fixed32_le((uint8_t*)data.data + offset + 4); + // TODO(zc): do checksum + if (checksum != 0) { + return Status::Corruption( + Substitute("Checksum not match, need=$0 vs read=$1", 0, checksum)); + } + // move offset to parse footer + offset -= footer_length; + std::string footer_buf(data.data + offset, footer_length); + if (!_footer.ParseFromString(footer_buf)) { + return Status::Corruption("Fail to parse index footer from string"); + } + + // check if real data size match footer's content + if (offset != _footer.key_bytes() + _footer.offset_bytes()) { + return Status::Corruption( + Substitute("Index size not match, need=$0, real=$1", + _footer.key_bytes() + _footer.offset_bytes(), offset)); + } + + // set index buffer + _key_data = Slice(_data.data, _footer.key_bytes()); + + // parse offset information + Slice offset_slice(_data.data + _footer.key_bytes(), _footer.offset_bytes()); + // +1 for record total length + _offsets.resize(_footer.num_items() + 1); + _offsets[_footer.num_items()] = _footer.key_bytes(); + for (uint32_t i = 0; i < _footer.num_items(); ++i) { + uint32_t offset = 0; + if (!get_varint32(&offset_slice, &offset)) { + return Status::Corruption("Fail to get varint from index offset buffer"); + } + DCHECK(offset <= _footer.key_bytes()) + << "Offset is larger than total bytes, offset=" << offset + << ", key_bytes=" << _footer.key_bytes(); + _offsets[i] = offset; + } + + if (offset_slice.size != 0) { + return Status::Corruption("Still has data after parse all key offset"); + } + + return Status::OK(); +} + +} diff --git a/be/src/olap/short_key_index.h b/be/src/olap/short_key_index.h new file mode 100644 index 00000000000000..5bc0374898f6ae --- /dev/null +++ b/be/src/olap/short_key_index.h @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/status.h" +#include "gen_cpp/segment_v2.pb.h" +#include "util/faststring.h" +#include "util/slice.h" + +#include "util/debug_util.h" + +namespace doris { + +// In our system, we have more complicated situation. +// First, our keys can be NULL. +// Second, when key columns are not complete we want to distinguish GT and GE. For examle, +// there are two key columns a and b, we have only one condition a > 1. We can only encode +// a prefix key 1, which is less than 1|2. This will make our read more data than +// we actually need. So we want to add more marker. +// a > 1: will be encoded into 1|\xFF +// a >= 1: will be encoded into 1|\x00 +// a = 1 and b > 1: will be encoded into 1|\x02|1 +// a = 1 and b is null: will be encoded into 1|\x01 + +// Used to represent minimal value for that field +constexpr uint8_t KEY_MINIMAL_MARKER = 0x00; +// Used to represent a null field, which value is seemed as minimal than other values +constexpr uint8_t KEY_NULL_FIRST_MARKER = 0x01; +// Used to represent a normal field, which content is encoded after this marker +constexpr uint8_t KEY_NORMAL_MARKER = 0x02; +// Used to represent +constexpr uint8_t KEY_NULL_LAST_MARKER = 0xFE; +// Used to represent maximal value for that field +constexpr uint8_t KEY_MAXIMAL_MARKER = 0xFF; + +// Encode one row into binary according given num_keys. +// A cell will be encoded in the format of a marker and encoded content. +// When function encoding row, if any cell isn't found in row, this function will +// fill a marker and return. If padding_minimal is true, KEY_MINIMAL_MARKER will +// be added, if padding_minimal is false, KEY_MAXIMAL_MARKER will be added. +// If all num_keys are found in row, no marker will be added. +template +void encode_key_with_padding(std::string* buf, const RowType& row, + size_t num_keys, bool padding_minimal) { + for (auto cid = 0; cid < num_keys; cid++) { + auto field = row.schema()->column(cid); + if (field == nullptr) { + if (padding_minimal) { + buf->push_back(KEY_MINIMAL_MARKER); + } else { + buf->push_back(KEY_MAXIMAL_MARKER); + } + break; + } + + auto cell = row.cell(cid); + if (cell.is_null()) { + if (null_first) { + buf->push_back(KEY_NULL_FIRST_MARKER); + } else { + buf->push_back(KEY_NULL_LAST_MARKER); + } + continue; + } + buf->push_back(KEY_NORMAL_MARKER); + field->encode_ascending(cell.cell_ptr(), buf); + } +} + +// Encode one row into binary according given num_keys. +// Client call this function must assure that row contains the first +// num_keys columns. +template +void encode_key(std::string* buf, const RowType& row, size_t num_keys) { + for (auto cid = 0; cid < num_keys; cid++) { + auto cell = row.cell(cid); + if (cell.is_null()) { + if (null_first) { + buf->push_back(KEY_NULL_FIRST_MARKER); + } else { + buf->push_back(KEY_NULL_LAST_MARKER); + } + continue; + } + buf->push_back(KEY_NORMAL_MARKER); + row.schema()->column(cid)->encode_ascending(cell.cell_ptr(), buf); + } +} + +// Used to encode a segment short key indices to binary format. This version +// only accepts binary key, client should assure that input key is sorted, +// otherwise error could happens. This builder would arrange data in following +// format. +// index = encoded_keys + encoded_offsets + footer + footer_size + checksum +// encoded_keys = binary_key + [, ...] +// encoded_offsets = encoded_offset + [, ...] +// encoded_offset = variant32 +// footer = ShortKeyFooterPB +// footer_size = fixed32 +// checksum = fixed32 +// Usage: +// ShortKeyIndexBuilder builder(segment_id, num_rows_per_block); +// builder.add_item(key1); +// ... +// builder.add_item(keyN); +// builder.finalize(segment_size, num_rows, &slices); +// NOTE: This is used for BetaRowset and is not compatible with AlphaRowset's +// short key index format. +// TODO(zc): +// 1. If this can leverage binary page to save key and offset data +// 2. Extending this to save in a BTree like struct, which can index full key +// more than short key +class ShortKeyIndexBuilder { +public: + ShortKeyIndexBuilder(uint32_t segment_id, + uint32_t num_rows_per_block) { + _footer.set_segment_id(segment_id); + _footer.set_num_rows_per_block(num_rows_per_block); + } + + Status add_item(const Slice& key); + + Status finalize(uint32_t segment_size, uint32_t num_rows, std::vector* slices); + +private: + segment_v2::ShortKeyFooterPB _footer; + + faststring _key_buf; + faststring _offset_buf; + std::string _footer_buf; +}; + +class ShortKeyIndexDecoder; + +// An Iterator to iterate one short key index. +// Client can use this class to iterator all items in this index. +class ShortKeyIndexIterator { +public: + using iterator_category = std::random_access_iterator_tag; + using value_type = Slice; + using pointer = Slice*; + using reference = Slice&; + using difference_type = ssize_t; + + ShortKeyIndexIterator(const ShortKeyIndexDecoder* decoder, uint32_t ordinal = 0) + : _decoder(decoder), _ordinal(ordinal) { } + + ShortKeyIndexIterator& operator-=(ssize_t step) { + _ordinal -= step; + return *this; + } + + ShortKeyIndexIterator& operator+=(ssize_t step) { + _ordinal += step; + return *this; + } + + ShortKeyIndexIterator& operator++() { + _ordinal++; + return *this; + } + + bool operator!=(const ShortKeyIndexIterator& other) { + return _ordinal != other._ordinal || _decoder != other._decoder; + } + + bool operator==(const ShortKeyIndexIterator& other) { + return _ordinal == other._ordinal && _decoder == other._decoder; + } + + ssize_t operator-(const ShortKeyIndexIterator& other) const { + return _ordinal - other._ordinal; + } + + inline bool valid() const; + + Slice operator*() const; + + ssize_t ordinal() const { return _ordinal; } + +private: + const ShortKeyIndexDecoder* _decoder; + ssize_t _ordinal; +}; + +// Used to decode short key to header and encoded index data. +// Usage: +// MemIndex index; +// ShortKeyIndexDecoder decoder(slice) +// decoder.parse(); +// auto iter = decoder.lower_bound(key); +class ShortKeyIndexDecoder { +public: + // Client should assure that data is available when this class + // is used. + ShortKeyIndexDecoder(const Slice& data) : _data(data) { } + + Status parse(); + + ShortKeyIndexIterator begin() const { return {this, 0}; } + ShortKeyIndexIterator end() const { return {this, num_items()}; } + + // Return an iterator which locates at the first item who is + // equal with or greater than the given key. + // NOTE: If one key is the prefix of other key, this funciton thinks + // that longer key is greater than the shorter key. + ShortKeyIndexIterator lower_bound(const Slice& key) const { + return seek(key); + } + + // Return the iterator which locates the first item greater than the + // input key. + ShortKeyIndexIterator upper_bound(const Slice& key) const { + return seek(key); + } + + uint32_t num_items() const { return _footer.num_items(); } + + Slice key(ssize_t ordinal) const { + DCHECK(ordinal >= 0 && ordinal < num_items()); + return {_key_data.data + _offsets[ordinal], _offsets[ordinal + 1] - _offsets[ordinal]}; + } + +private: + template + ShortKeyIndexIterator seek(const Slice& key) const { + auto comparator = [this] (const Slice& lhs, const Slice& rhs) { + return lhs.compare(rhs) < 0; + }; + if (lower_bound) { + return std::lower_bound(begin(), end(), key, comparator); + } else { + return std::upper_bound(begin(), end(), key, comparator); + } + } + +private: + Slice _data; + + // All following fields are only valid after parse has been executed successfully + segment_v2::ShortKeyFooterPB _footer; + std::vector _offsets; + Slice _key_data; +}; + +inline Slice ShortKeyIndexIterator::operator*() const { + return _decoder->key(_ordinal); +} + +inline bool ShortKeyIndexIterator::valid() const { + return _ordinal >= 0 && _ordinal < _decoder->num_items(); +} + +} diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 99cd6aa307c94a..f3718028834a38 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -43,7 +43,6 @@ #include "olap/tablet.h" #include "olap/olap_meta.h" #include "olap/options.h" -#include "olap/rowset/segment_group.h" #include "olap/tablet_manager.h" #include "olap/txn_manager.h" #include "olap/task/engine_task.h" @@ -299,7 +298,6 @@ class StorageEngine { static StorageEngine* _s_instance; - std::unordered_map> _gc_files; std::unordered_map _unused_rowsets; Mutex _gc_mutex; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index fb34e1c381a8f1..fe26cca358c9cb 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -29,7 +29,6 @@ #include "gen_cpp/olap_file.pb.h" #include "olap/olap_define.h" #include "olap/tuple.h" -#include "olap/row_cursor.h" #include "olap/rowset_graph.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_reader.h" diff --git a/be/src/olap/types.h b/be/src/olap/types.h index d7597d36825601..0989d1a86599ce 100644 --- a/be/src/olap/types.h +++ b/be/src/olap/types.h @@ -104,24 +104,31 @@ struct CppTypeTraits { template<> struct CppTypeTraits { using CppType = bool; + using UnsignedCppType = bool; }; template<> struct CppTypeTraits { using CppType = int8_t; + using UnsignedCppType = uint8_t; }; template<> struct CppTypeTraits { using CppType = int16_t; + using UnsignedCppType = uint16_t; }; template<> struct CppTypeTraits { using CppType = int32_t; + using UnsignedCppType = uint32_t; }; template<> struct CppTypeTraits { using CppType = uint32_t; + using UnsignedCppType = uint32_t; }; template<> struct CppTypeTraits { using CppType = int64_t; + using UnsignedCppType = uint64_t; }; template<> struct CppTypeTraits { using CppType = int128_t; + using UnsignedCppType = unsigned int128_t; }; template<> struct CppTypeTraits { using CppType = float; @@ -131,12 +138,15 @@ template<> struct CppTypeTraits { }; template<> struct CppTypeTraits { using CppType = decimal12_t; + using UnsignedCppType = decimal12_t; }; template<> struct CppTypeTraits { using CppType = uint24_t; + using UnsignedCppType = uint24_t; }; template<> struct CppTypeTraits { using CppType = int64_t; + using UnsignedCppType = uint64_t; }; template<> struct CppTypeTraits { using CppType = Slice; diff --git a/be/src/olap/uint24.h b/be/src/olap/uint24.h index 7632b584c93d01..638ffae6e6a1ae 100644 --- a/be/src/olap/uint24.h +++ b/be/src/olap/uint24.h @@ -36,19 +36,26 @@ struct uint24_t { data[2] = value.data[2]; } - uint24_t(const int32_t& value) { + uint24_t(const uint32_t& value) { data[0] = static_cast(value); data[1] = static_cast(value >> 8); data[2] = static_cast(value >> 16); } + uint24_t& operator=(const uint32_t& value) { + data[0] = static_cast(value); + data[1] = static_cast(value >> 8); + data[2] = static_cast(value >> 16); + return *this; + } + uint24_t& operator+=(const uint24_t& value) { *this = static_cast(*this) + static_cast(value); return *this; } - operator int() const { - int value = static_cast(data[0]); + operator uint32_t() const { + uint32_t value = static_cast(data[0]); value += (static_cast(static_cast(data[1]))) << 8; value += (static_cast(static_cast(data[2]))) << 16; return value; diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index d2cc2b0152aec7..05cc4983a63f06 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -78,6 +78,7 @@ set(UTIL_FILES md5.cpp frontend_helper.cpp faststring.cc + slice.cpp ) if (WITH_MYSQL) diff --git a/be/src/util/slice.cpp b/be/src/util/slice.cpp new file mode 100644 index 00000000000000..fc583a2d228bef --- /dev/null +++ b/be/src/util/slice.cpp @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/slice.h" + +#include "util/faststring.h" + +namespace doris { + +// NOTE(zc): we define this function here to make compile work. +Slice::Slice(const faststring& s) : // NOLINT(runtime/explicit) + data((char*)(s.data())), size(s.size()) { } + +} diff --git a/be/src/util/slice.h b/be/src/util/slice.h index 5c31072fe2e1e4..1eebe3a5f29fdd 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -29,6 +29,8 @@ namespace doris { +class faststring; + /// @brief A wrapper around externally allocated data. /// /// Slice is a simple structure containing a pointer into some external @@ -66,6 +68,8 @@ struct Slice { /// Create a slice that refers to the contents of the given string. Slice(const std::string& s) : // NOLINT(runtime/explicit) data(const_cast(s.data())), size(s.size()) { } + + Slice(const faststring& s); /// Create a slice that refers to a C-string s[0,strlen(s)-1]. Slice(const char* s) : // NOLINT(runtime/explicit) diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 0a11d868f7466d..a258e0f73e4618 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -53,6 +53,7 @@ ADD_BE_TEST(rowset/segment_v2/encoding_info_test) ADD_BE_TEST(rowset/segment_v2/ordinal_page_index_test) ADD_BE_TEST(rowset/segment_v2/rle_page_test) ADD_BE_TEST(rowset/segment_v2/binary_dict_page_test) +ADD_BE_TEST(rowset/segment_v2/segment_test) ADD_BE_TEST(tablet_meta_manager_test) ADD_BE_TEST(tablet_mgr_test) ADD_BE_TEST(rowset/rowset_meta_manager_test) @@ -61,3 +62,5 @@ ADD_BE_TEST(rowset/alpha_rowset_test) ADD_BE_TEST(olap_snapshot_converter_test) ADD_BE_TEST(txn_manager_test) ADD_BE_TEST(generic_iterators_test) +ADD_BE_TEST(key_coder_test) +ADD_BE_TEST(short_key_index_test) diff --git a/be/test/olap/key_coder_test.cpp b/be/test/olap/key_coder_test.cpp new file mode 100644 index 00000000000000..9c6ddd6c34076a --- /dev/null +++ b/be/test/olap/key_coder_test.cpp @@ -0,0 +1,287 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/key_coder.h" + +#include +#include +#include + +#include "util/debug_util.h" + +namespace doris { + +class KeyCoderTest : public testing::Test { +public: + KeyCoderTest() { } + virtual ~KeyCoderTest() { + } +}; + +template +void test_integer_encode() { + using CppType = typename CppTypeTraits::CppType; + + auto key_coder = get_key_coder(type); + + { + std::string buf; + CppType val = std::numeric_limits::min(); + key_coder->encode_ascending(&val, 1, &buf); + + std::string result; + for (int i = 0; i < sizeof(CppType); ++i) { + result.append("00"); + } + + ASSERT_STREQ(result.c_str(), hexdump(buf.data(), buf.size()).c_str()); + + { + Slice slice(buf); + CppType check_val; + key_coder->decode_ascending(&slice, sizeof(CppType), (uint8_t*)&check_val, nullptr); + ASSERT_EQ(val, check_val); + } + } + + { + std::string buf; + CppType val = std::numeric_limits::max(); + key_coder->encode_ascending(&val, sizeof(CppType), &buf); + + std::string result; + for (int i = 0; i < sizeof(CppType); ++i) { + result.append("FF"); + } + + ASSERT_STREQ(result.c_str(), hexdump(buf.data(), buf.size()).c_str()); + { + Slice slice(buf); + CppType check_val; + key_coder->decode_ascending(&slice, sizeof(CppType), (uint8_t*)&check_val, nullptr); + ASSERT_EQ(val, check_val); + } + } + + for (auto i = 0; i < 100; ++i) { + CppType val1 = random(); + CppType val2 = random(); + + std::string buf1; + std::string buf2; + + key_coder->encode_ascending(&val1, sizeof(CppType), &buf1); + key_coder->encode_ascending(&val2, sizeof(CppType), &buf2); + + if (val1 < val2) { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) < 0); + } else if (val1 > val2) { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) > 0); + } else { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) == 0); + } + } +} + +TEST(KeyCoderTest, test_int) { + test_integer_encode(); + test_integer_encode(); + test_integer_encode(); + test_integer_encode(); + test_integer_encode(); + test_integer_encode(); + + test_integer_encode(); +} + +TEST(KeyCoderTest, test_date) { + using CppType = uint24_t; + auto key_coder = get_key_coder(OLAP_FIELD_TYPE_DATE); + + { + std::string buf; + CppType val = 0; + key_coder->encode_ascending(&val, 1, &buf); + + std::string result; + for (int i = 0; i < sizeof(uint24_t); ++i) { + result.append("00"); + } + + ASSERT_STREQ(result.c_str(), hexdump(buf.data(), buf.size()).c_str()); + + { + Slice slice(buf); + CppType check_val; + key_coder->decode_ascending(&slice, sizeof(CppType), (uint8_t*)&check_val, nullptr); + ASSERT_EQ(val, check_val); + } + } + + { + std::string buf; + CppType val = 10000; + key_coder->encode_ascending(&val, sizeof(CppType), &buf); + + std::string result("002710"); + + ASSERT_STREQ(result.c_str(), hexdump(buf.data(), buf.size()).c_str()); + { + Slice slice(buf); + CppType check_val; + key_coder->decode_ascending(&slice, sizeof(CppType), (uint8_t*)&check_val, nullptr); + ASSERT_EQ(val, check_val); + } + } + + for (auto i = 0; i < 100; ++i) { + CppType val1 = random(); + CppType val2 = random(); + + std::string buf1; + std::string buf2; + + key_coder->encode_ascending(&val1, sizeof(CppType), &buf1); + key_coder->encode_ascending(&val2, sizeof(CppType), &buf2); + + if (val1 < val2) { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) < 0); + } else if (val1 > val2) { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) > 0); + } else { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) == 0); + } + } +} + +TEST(KeyCoderTest, test_decimal) { + auto key_coder = get_key_coder(OLAP_FIELD_TYPE_DECIMAL); + + decimal12_t val1(1, 100000000); + std::string buf1; + + key_coder->encode_ascending(&val1, sizeof(decimal12_t), &buf1); + + decimal12_t check_val; + Slice slice1(buf1); + key_coder->decode_ascending(&slice1, sizeof(decimal12_t), (uint8_t*)&check_val, nullptr); + ASSERT_EQ(check_val, val1); + + { + decimal12_t val2(-1, -100000000); + std::string buf2; + key_coder->encode_ascending(&val2, sizeof(decimal12_t), &buf2); + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) > 0); + } + { + decimal12_t val2(1, 100000001); + std::string buf2; + key_coder->encode_ascending(&val2, sizeof(decimal12_t), &buf2); + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) < 0); + } + { + decimal12_t val2(0, 0); + std::string buf2; + key_coder->encode_ascending(&val2, sizeof(decimal12_t), &buf2); + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) > 0); + + std::string result("80"); + for (int i = 0; i < sizeof(int64_t) - 1; ++i) { + result.append("00"); + } + result.append("80"); + for (int i = 0; i < sizeof(int32_t) - 1; ++i) { + result.append("00"); + } + + ASSERT_STREQ(result.c_str(), hexdump(buf2.data(), buf2.size()).c_str()); + } +} + +TEST(KeyCoderTest, test_char) { + auto key_coder = get_key_coder(OLAP_FIELD_TYPE_CHAR); + + char buf[] = "1234567890"; + Slice slice(buf, 10); + + { + std::string key; + key_coder->encode_ascending(&slice, 10, &key); + Slice encoded_key(key); + + Arena arena; + Slice check_slice; + auto st = key_coder->decode_ascending(&encoded_key, 10, (uint8_t*)&check_slice, &arena); + ASSERT_TRUE(st.ok()); + + ASSERT_STREQ("1234567890", check_slice.data); + } + + { + std::string key; + key_coder->encode_ascending(&slice, 5, &key); + Slice encoded_key(key); + + Arena arena; + Slice check_slice; + auto st = key_coder->decode_ascending(&encoded_key, 5, (uint8_t*)&check_slice, &arena); + ASSERT_TRUE(st.ok()); + + ASSERT_STREQ("12345", check_slice.data); + } +} + +TEST(KeyCoderTest, test_varchar) { + auto key_coder = get_key_coder(OLAP_FIELD_TYPE_VARCHAR); + + char buf[] = "1234567890"; + Slice slice(buf, 10); + + { + std::string key; + key_coder->encode_ascending(&slice, 15, &key); + Slice encoded_key(key); + + Arena arena; + Slice check_slice; + auto st = key_coder->decode_ascending(&encoded_key, 15, (uint8_t*)&check_slice, &arena); + ASSERT_TRUE(st.ok()); + + ASSERT_STREQ("1234567890", check_slice.data); + } + + { + std::string key; + key_coder->encode_ascending(&slice, 5, &key); + Slice encoded_key(key); + + Arena arena; + Slice check_slice; + auto st = key_coder->decode_ascending(&encoded_key, 5, (uint8_t*)&check_slice, &arena); + ASSERT_TRUE(st.ok()); + + ASSERT_STREQ("12345", check_slice.data); + } +} + + +} // namespace doris + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp new file mode 100644 index 00000000000000..c5c5e71a2c3d00 --- /dev/null +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -0,0 +1,259 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_writer.h" +#include "olap/rowset/segment_v2/segment_iterator.h" + +#include +#include + +#include "common/logging.h" +#include "olap/olap_common.h" +#include "olap/row_cursor.h" +#include "olap/tablet_schema.h" +#include "olap/row_block.h" +#include "olap/row_block2.h" +#include "olap/types.h" +#include "olap/tablet_schema_helper.h" +#include "util/file_utils.h" + +namespace doris { +namespace segment_v2 { + +class SegmentReaderWriterTest : public testing::Test { +public: + SegmentReaderWriterTest() { } + virtual ~SegmentReaderWriterTest() { + } +}; + +TEST_F(SegmentReaderWriterTest, normal) { + + size_t num_rows_per_block = 10; + + std::shared_ptr tablet_schema(new TabletSchema()); + tablet_schema->_num_columns = 4; + tablet_schema->_num_key_columns = 3; + tablet_schema->_num_short_key_columns = 2; + tablet_schema->_num_rows_per_row_block = num_rows_per_block; + tablet_schema->_cols.push_back(create_int_key(1)); + tablet_schema->_cols.push_back(create_int_key(2)); + tablet_schema->_cols.push_back(create_int_key(3)); + tablet_schema->_cols.push_back(create_int_value(4)); + + // segment write + std::string dname = "./ut_dir/segment_test"; + FileUtils::create_dir(dname); + + SegmentWriterOptions opts; + opts.num_rows_per_block = num_rows_per_block; + + std::string fname = dname + "/int_case"; + SegmentWriter writer(fname, 0, tablet_schema, opts); + auto st = writer.init(10); + ASSERT_TRUE(st.ok()); + + RowCursor row; + auto olap_st = row.init(*tablet_schema); + ASSERT_EQ(OLAP_SUCCESS, olap_st); + + // 0, 1, 2, 3 + // 10, 11, 12, 13 + // 20, 21, 22, 23 + for (int i = 0; i < 4096; ++i) { + for (int j = 0; j < 4; ++j) { + auto cell = row.cell(j); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = i * 10 + j; + } + writer.append_row(row); + } + + uint32_t file_size = 0; + st = writer.finalize(&file_size); + ASSERT_TRUE(st.ok()); + // reader + { + std::shared_ptr segment(new Segment(fname, 0, tablet_schema, num_rows_per_block)); + st = segment->open(); + LOG(INFO) << "segment open, msg=" << st.to_string(); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(4096, segment->num_rows()); + Schema schema(*tablet_schema); + // scan all rows + { + std::unique_ptr iter; + st = segment->new_iterator(schema, &iter); + ASSERT_TRUE(st.ok()); + + StorageReadOptions read_opts; + st = iter->init(read_opts); + ASSERT_TRUE(st.ok()); + + Arena arena; + RowBlockV2 block(schema, 1024, &arena); + + int left = 4096; + + int rowid = 0; + while (left > 0) { + int rows_read = left > 1024 ? 1024 : left; + st = iter->next_batch(&block); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(rows_read, block.num_rows()); + left -= rows_read; + + for (int j = 0; j < block.schema()->column_ids().size(); ++j) { + auto cid = block.schema()->column_ids()[j]; + auto column_block = block.column_block(j); + for (int i = 0; i < rows_read; ++i) { + int rid = rowid + i; + ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i)); + ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i)); + } + } + rowid += rows_read; + } + } + // test seek, key + { + std::unique_ptr iter; + st = segment->new_iterator(schema, &iter); + ASSERT_TRUE(st.ok()); + + // lower bound + StorageReadOptions read_opts; + read_opts.lower_bound.reset(new RowCursor()); + RowCursor* lower_bound = read_opts.lower_bound.get(); + lower_bound->init(*tablet_schema, 2); + { + auto cell = lower_bound->cell(0); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = 100; + } + { + auto cell = lower_bound->cell(1); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = 100; + } + read_opts.include_lower_bound = false; + + // upper bound + read_opts.upper_bound.reset(new RowCursor()); + RowCursor* upper_bound = read_opts.upper_bound.get(); + upper_bound->init(*tablet_schema, 1); + { + auto cell = upper_bound->cell(0); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = 200; + } + read_opts.include_upper_bound = true; + + st = iter->init(read_opts); + LOG(INFO) << "iterator init msg=" << st.to_string(); + ASSERT_TRUE(st.ok()); + + Arena arena; + RowBlockV2 block(schema, 100, &arena); + st = iter->next_batch(&block); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(11, block.num_rows()); + auto column_block = block.column_block(0); + for (int i = 0; i < 11; ++i) { + ASSERT_EQ(100 + i * 10, *(int*)column_block.cell_ptr(i)); + } + } + // test seek, key + { + std::unique_ptr iter; + st = segment->new_iterator(schema, &iter); + ASSERT_TRUE(st.ok()); + + StorageReadOptions read_opts; + + // lower bound + read_opts.lower_bound.reset(new RowCursor()); + RowCursor* lower_bound = read_opts.lower_bound.get(); + lower_bound->init(*tablet_schema, 1); + { + auto cell = lower_bound->cell(0); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = 40970; + } + read_opts.include_lower_bound = false; + + st = iter->init(read_opts); + LOG(INFO) << "iterator init msg=" << st.to_string(); + ASSERT_TRUE(st.ok()); + + Arena arena; + RowBlockV2 block(schema, 100, &arena); + st = iter->next_batch(&block); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(0, block.num_rows()); + } + // test seek, key (-2, -1) + { + std::unique_ptr iter; + st = segment->new_iterator(schema, &iter); + ASSERT_TRUE(st.ok()); + + StorageReadOptions read_opts; + + // lower bound + read_opts.lower_bound.reset(new RowCursor()); + RowCursor* lower_bound = read_opts.lower_bound.get(); + lower_bound->init(*tablet_schema, 1); + { + auto cell = lower_bound->cell(0); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = -2; + } + read_opts.include_lower_bound = false; + + read_opts.upper_bound.reset(new RowCursor()); + RowCursor* upper_bound = read_opts.upper_bound.get(); + upper_bound->init(*tablet_schema, 1); + { + auto cell = upper_bound->cell(0); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = -1; + } + read_opts.include_upper_bound = false; + + st = iter->init(read_opts); + LOG(INFO) << "iterator init msg=" << st.to_string(); + ASSERT_TRUE(st.ok()); + + Arena arena; + RowBlockV2 block(schema, 100, &arena); + st = iter->next_batch(&block); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(0, block.num_rows()); + } + } +} + +} +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/be/test/olap/short_key_index_test.cpp b/be/test/olap/short_key_index_test.cpp new file mode 100644 index 00000000000000..1b825e9320292c --- /dev/null +++ b/be/test/olap/short_key_index_test.cpp @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/short_key_index.h" + +#include + +#include "olap/tablet_schema_helper.h" +#include "olap/row_cursor.h" +#include "util/debug_util.h" + +namespace doris { + +class ShortKeyIndexTest : public testing::Test { +public: + ShortKeyIndexTest() { } + virtual ~ShortKeyIndexTest() { + } +}; + +TEST_F(ShortKeyIndexTest, buider) { + ShortKeyIndexBuilder builder(0, 1024); + + for (int i = 1000; i < 10000; i += 2) { + builder.add_item(std::to_string(i)); + } + std::vector slices; + auto st = builder.finalize(10000, 9000 * 1024, &slices); + ASSERT_TRUE(st.ok()); + + std::string buf; + for (auto& slice : slices) { + buf.append(slice.data, slice.size); + } + + ShortKeyIndexDecoder decoder(buf); + st = decoder.parse(); + ASSERT_TRUE(st.ok()); + + // find 1499 + { + auto iter = decoder.lower_bound("1499"); + ASSERT_TRUE(iter.valid()); + ASSERT_STREQ("1500", (*iter).to_string().c_str()); + } + // find 1500 lower bound + { + auto iter = decoder.lower_bound("1500"); + ASSERT_TRUE(iter.valid()); + ASSERT_STREQ("1500", (*iter).to_string().c_str()); + } + // find 1500 upper bound + { + auto iter = decoder.upper_bound("1500"); + ASSERT_TRUE(iter.valid()); + ASSERT_STREQ("1502", (*iter).to_string().c_str()); + } + // find prefix "87" + { + auto iter = decoder.lower_bound("87"); + ASSERT_TRUE(iter.valid()); + ASSERT_STREQ("8700", (*iter).to_string().c_str()); + } + // find prefix "87" + { + auto iter = decoder.upper_bound("87"); + ASSERT_TRUE(iter.valid()); + ASSERT_STREQ("8700", (*iter).to_string().c_str()); + } + + // find prefix "9999" + { + auto iter = decoder.upper_bound("9999"); + ASSERT_FALSE(iter.valid()); + } +} + + +TEST_F(ShortKeyIndexTest, enocde) { + TabletSchema tablet_schema; + tablet_schema._cols.push_back(create_int_key(0)); + tablet_schema._cols.push_back(create_int_key(1)); + tablet_schema._cols.push_back(create_int_key(2)); + tablet_schema._cols.push_back(create_int_value(3)); + tablet_schema._num_columns = 4; + tablet_schema._num_key_columns = 3; + tablet_schema._num_short_key_columns = 3; + + // test encoding with padding + { + RowCursor row; + row.init(tablet_schema, 2); + + { + // test padding + { + auto cell = row.cell(0); + cell.set_is_null(false); + *(int*)cell.mutable_cell_ptr() = 12345; + } + { + auto cell = row.cell(1); + cell.set_is_null(false); + *(int*)cell.mutable_cell_ptr() = 54321; + } + std::string buf; + encode_key_with_padding(&buf, row, 3, true); + // should be \x02\x80\x00\x30\x39\x02\x80\x00\xD4\x31\x00 + ASSERT_STREQ("0280003039028000D43100", hexdump(buf.c_str(), buf.size()).c_str()); + } + // test with null + { + { + auto cell = row.cell(0); + cell.set_is_null(false); + *(int*)cell.mutable_cell_ptr() = 54321; + } + { + auto cell = row.cell(1); + cell.set_is_null(true); + *(int*)cell.mutable_cell_ptr() = 54321; + } + + { + std::string buf; + encode_key_with_padding(&buf, row, 3, false); + // should be \x02\x80\x00\xD4\x31\x01\xff + ASSERT_STREQ("028000D43101FF", hexdump(buf.c_str(), buf.size()).c_str()); + } + // encode key + { + std::string buf; + encode_key(&buf, row, 2); + // should be \x02\x80\x00\xD4\x31\x01 + ASSERT_STREQ("028000D43101", hexdump(buf.c_str(), buf.size()).c_str()); + } + } + } +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/be/test/olap/tablet_schema_helper.h b/be/test/olap/tablet_schema_helper.h new file mode 100644 index 00000000000000..01f43bc0eb7a6e --- /dev/null +++ b/be/test/olap/tablet_schema_helper.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "olap/tablet_schema.h" + +namespace doris { + +TabletColumn create_int_key(int32_t id, bool is_nullable = true) { + TabletColumn column; + column._unique_id = id; + column._col_name = std::to_string(id); + column._type = OLAP_FIELD_TYPE_INT; + column._is_key = true; + column._is_nullable = is_nullable; + column._length = 4; + column._index_length = 4; + return column; +} + + +TabletColumn create_int_value( + int32_t id, + FieldAggregationMethod agg_method = OLAP_FIELD_AGGREGATION_SUM, + bool is_nullable = true) { + TabletColumn column; + column._unique_id = id; + column._col_name = std::to_string(id); + column._type = OLAP_FIELD_TYPE_INT; + column._is_key = false; + column._aggregation = agg_method; + column._is_nullable = is_nullable; + column._length = 4; + column._index_length = 4; + return column; +} + +} diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index c66c2d88e023be..edc7b23ff832d5 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -72,17 +72,21 @@ message ZoneMapPB { } message ColumnMetaPB { + // column id in table schema + optional uint32 column_id = 1; + // unique column id + optional uint32 unique_id = 2; // this field is FieldType's value - optional int32 type = 1; - optional EncodingTypePB encoding = 2; + optional int32 type = 3; + optional EncodingTypePB encoding = 4; // compress type for column - optional CompressionTypePB compression = 3; + optional CompressionTypePB compression = 5; // if this column can be nullable - optional bool is_nullable = 4; + optional bool is_nullable = 6; // if this column has checksum for each page - optional bool has_checksum = 5; + optional bool has_checksum = 7; // ordinal index page - optional PagePointerPB ordinal_index_page = 6; + optional PagePointerPB ordinal_index_page = 8; // // dictionary page for DICT_ENCODING // optional PagePointerPB dict_page = 2; @@ -117,3 +121,36 @@ message FileFooterPB { repeated MetadataPairPB file_meta_datas = 8; // meta data of file optional PagePointerPB key_index_page = 9; // short key index page } + +message ShortKeyFooterPB { + // How many index item in this index. + optional uint32 num_items = 1; + // The total bytes occupied by the index key + optional uint32 key_bytes = 2; + // The total bytes occupied by the key offsets + optional uint32 offset_bytes = 3; + // Segment id which this index is belong to + optional uint32 segment_id = 4; + // number rows in each block + optional uint32 num_rows_per_block = 5; + // How many rows in this segment + optional uint32 num_segment_rows = 6; + // Total bytes for this segment + optional uint32 segment_bytes = 7; +} + +message SegmentFooterPB { + optional uint32 version = 1 [default = 1]; // file version + repeated ColumnMetaPB columns = 2; // tablet schema + optional uint64 num_rows = 3; // number of values + optional uint64 index_footprint = 4; // total idnex footprint of all columns + optional uint64 data_footprint = 5; // total data footprint of all columns + optional uint64 raw_data_footprint = 6; // raw data footprint + + optional CompressionTypePB compress_type = 7 [default = LZ4]; // default compression type for file columns + repeated MetadataPairPB file_meta_datas = 8; // meta data of file + + // Short key index's page + optional PagePointerPB short_key_index_page = 9; +} + diff --git a/run-ut.sh b/run-ut.sh index 5c3a4a4bc6b721..fdf7b32741de64 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -248,10 +248,13 @@ ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_plain_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/column_reader_writer_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/rle_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_dict_page_test +${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/segment_test ${DORIS_TEST_BINARY_DIR}/olap/txn_manager_test ${DORIS_TEST_BINARY_DIR}/olap/storage_types_test ${DORIS_TEST_BINARY_DIR}/olap/generic_iterators_test ${DORIS_TEST_BINARY_DIR}/olap/aggregate_func_test +${DORIS_TEST_BINARY_DIR}/olap/short_key_index_test +${DORIS_TEST_BINARY_DIR}/olap/key_coder_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test