Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-548/579] Engine DeltaMerge coprocessor integration #287

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ DatabaseID MockTiDB::newDataBase(const String & database_name)
}

TableID MockTiDB::newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name, const String & engine_type)
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name, String engine_type)
{
std::lock_guard lock(tables_mutex);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class MockTiDB : public ext::singleton<MockTiDB>

public:
TableID newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name, const String &engine_type);
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name, String engine_type);

DatabaseID newDataBase(const String & database_name);

Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
context, query,
[&](const String & database_name, const String & table_name) {
auto storage = context.getTable(database_name, table_name);
auto mmt = std::dynamic_pointer_cast<StorageMergeTree>(storage);
if (!mmt || mmt->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
throw Exception("Not TMT", ErrorCodes::BAD_ARGUMENTS);
return mmt->getTableInfo();
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage //
|| !(managed_storage->engineType() == ::TiDB::StorageEngine::DM
|| managed_storage->engineType() == ::TiDB::StorageEngine::TMT))
throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS);
return managed_storage->getTableInfo();
},
start_ts, tz_offset, tz_name);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, D
= InterpreterCreateQuery::getColumnsDescription(typeid_cast<const ASTExpressionList &>(*columns_ast), context);

String engine_type("tmt");
if (args.size() == 4)
engine_type = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);
if (args.size() == 5)
engine_type = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[4]).value);

auto tso = context.getTMTContext().getPDClient()->getTS();

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
return expr_name;
}

void DAGExpressionAnalyzer::makeExplicitSetForIndex(const tipb::Expr & expr, const TMTStoragePtr & storage)
void DAGExpressionAnalyzer::makeExplicitSetForIndex(const tipb::Expr & expr, const ManageableStoragePtr & storage)
{
for (auto & child : expr.children())
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const std::vector<NameAndTypePair> & getCurrentInputColumns();
void makeExplicitSet(const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name);
void makeExplicitSetForIndex(const tipb::Expr & expr, const TMTStoragePtr & storage);
void makeExplicitSetForIndex(const tipb::Expr & expr, const ManageableStoragePtr & storage);
String applyFunction(const String & func_name, Names & arg_names, ExpressionActionsPtr & actions);
Int32 getImplicitCastCount() { return implicit_cast_count; };
bool appendTimeZoneCastsAfterTS(ExpressionActionsChain & chain, std::vector<bool> is_ts_column, const tipb::DAGRequest & rqst);
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst
{
throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
}
const auto * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get());
if (!merge_tree)

const auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage)
{
throw Exception("Only MergeTree table is supported in DAG request", ErrorCodes::COP_BAD_DAG_REQUEST);
throw Exception("Only Manageable table is supported in DAG request", ErrorCodes::COP_BAD_DAG_REQUEST);
}

if (ts.columns_size() == 0)
Expand All @@ -59,7 +60,7 @@ void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst
{
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
String name = merge_tree->getTableInfo().columns[cid - 1].name;
String name = managed_storage->getTableInfo().columns[cid - 1].name;
output_from_ts.push_back(std::move(name));
}
ss << "FROM " << storage->getDatabaseName() << "." << storage->getTableName() << " ";
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
region_ids.push_back(info.region_id);
throw RegionException(std::move(region_ids), RegionTable::RegionReadStatus::NOT_FOUND);
}
if (!checkKeyRanges(dag.getKeyRanges(), table_id, storage->pkIsUInt64(), current_region->getRange()))
if (!checkKeyRanges(dag.getKeyRanges(), table_id, /* pk_is_uint64= */ storage->getPKType() == IManageableStorage::PKType::UINT64,
current_region->getRange()))
throw Exception("Cop request only support full range scan for given region", ErrorCodes::COP_BAD_DAG_REQUEST);
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
Expand Down Expand Up @@ -440,7 +441,7 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
auto global_schema_version = context.getTMTContext().getSchemaSyncer()->getCurrentVersion();

/// Lambda for get storage, then align schema version under the read lock.
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<TMTStoragePtr, TableStructureReadLockPtr, Int64, bool> {
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<ManageableStoragePtr, TableStructureReadLockPtr, Int64, bool> {
/// Get storage in case it's dropped then re-created.
// If schema synced, call getTable without try, leading to exception on table not existing.
auto storage_ = context.getTMTContext().getStorages().get(table_id);
Expand All @@ -452,10 +453,12 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false);
}

if (storage_->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
throw Exception("Specifying schema_version for non-TMT storage: " + storage_->getName() + ", table: " + std::to_string(table_id)
+ " is not allowed",
if (storage_->engineType() != ::TiDB::StorageEngine::TMT && storage_->engineType() != ::TiDB::StorageEngine::DM)
{
throw Exception("Specifying schema_version for non-managed storage: " + storage_->getName() + ", table: " + storage_->getTableName()
+ ",id: " + DB::toString(table_id) + " is not allowed",
ErrorCodes::LOGICAL_ERROR);
}

/// Lock storage.
auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__);
Expand All @@ -479,7 +482,7 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
};

/// Try get storage and lock once.
TMTStoragePtr storage_;
ManageableStoragePtr storage_;
TableStructureReadLockPtr lock;
Int64 storage_schema_version;
auto log_schema_version = [&](const String & result) {
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class InterpreterDAG : public IInterpreter
size_t max_streams = 1;

/// Table from where to read data, if not subquery.
TMTStoragePtr storage;
ManageableStoragePtr storage;
TableStructureReadLockPtr table_lock;

std::unique_ptr<DAGExpressionAnalyzer> analyzer;
Expand Down
20 changes: 11 additions & 9 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,13 @@ BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settin
namespace
{

void throwRetryRegion(const MvccQueryInfo::RegionsQueryInfo & regions_info)
void throwRetryRegion(const MvccQueryInfo::RegionsQueryInfo & regions_info, RegionTable::RegionReadStatus status)
{
std::vector<RegionID> region_ids;
region_ids.reserve(regions_info.size());
for (const auto & info : regions_info)
region_ids.push_back(info.region_id);
throw RegionException(region_ids);
throw RegionException(std::move(region_ids), status);
}

inline void doLearnerRead(const TiDB::TableID table_id, //
Expand Down Expand Up @@ -287,7 +287,7 @@ inline void doLearnerRead(const TiDB::TableID table_id, //
if (region == nullptr)
{
LOG_WARNING(log, "[region " << info.region_id << "] is not found in KVStore, try again");
throwRetryRegion(regions_info);
throwRetryRegion(regions_info, RegionTable::RegionReadStatus::NOT_FOUND);
}
kvstore_region.emplace(info.region_id, std::move(region));
}
Expand Down Expand Up @@ -403,6 +403,9 @@ BlockInputStreams StorageDeltaMerge::read( //
{
/// Learner read.
doLearnerRead(tidb_table_info.id, mvcc_query_info.regions_query_info, tmt, log);

/// For learner read from TiDB/TiSpark, we set num_streams by `mvcc_query_info.concurrent`
num_streams = std::max(1U, static_cast<UInt32>(mvcc_query_info.concurrent));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like queries from ch client will also run this path. Maybe we should check the availability ofmvcc_query_info to determine whether it is a learner read from TiDB/TiSpark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mvcc_query_info is never null, but mvcc_query_info.regions_query_info is empty if it is not come from TiDB/TiSpark.
Done.

}

HandleRanges ranges = getQueryRanges(mvcc_query_info.regions_query_info);
Expand Down Expand Up @@ -617,10 +620,9 @@ void updateDeltaMergeTableCreateStatement( //
{
if (hidden_columns.has(column_define.name))
continue;
TiDB::ColumnInfo column_info = getColumnInfoByDataType(column_define.type);
column_info.id = column_define.id;
column_info.name = column_define.name;
column_info.origin_default_value = column_define.default_value;
Field default_field;
TiDB::ColumnInfo column_info = reverseGetColumnInfo(NameAndTypePair{column_define.name, column_define.type}, column_define.id, default_field);
// TODO column_info.origin_default_value = column_define.default_value;
table_info_from_store.columns.emplace_back(std::move(column_info));
}
}
Expand All @@ -630,9 +632,9 @@ void updateDeltaMergeTableCreateStatement( //
IDatabase::ASTModifier storage_modifier = [&](IAST & ast) {
std::shared_ptr<ASTLiteral> literal;
if (table_info_from_tidb)
literal = std::make_shared<ASTLiteral>(Field(table_info_from_tidb->get().serialize(true)));
literal = std::make_shared<ASTLiteral>(Field(table_info_from_tidb->get().serialize()));
else
literal = std::make_shared<ASTLiteral>(Field(table_info_from_store.serialize(true)));
literal = std::make_shared<ASTLiteral>(Field(table_info_from_store.serialize()));
auto & storage_ast = typeid_cast<ASTStorage &>(ast);
auto & args = typeid_cast<ASTExpressionList &>(*storage_ast.engine->arguments);
if (args.children.size() == 1)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/StorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ class StorageMergeTree : public ext::shared_ptr_helper<StorageMergeTree>, public

String getDataPath() const override { return full_path; }

bool pkIsUInt64() const { return getTMTPKType(*data.primary_key_data_types[0]) == TMTPKType::UINT64; }
SortDescription getPrimarySortDescription() const override { return data.getPrimarySortDescription(); }

private:
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ String createTableStmt(const DBInfo & db_info, const TableInfo & table_info)
else if (table_info.engine_type == TiDB::StorageEngine::DEBUGGING_MEMORY)
{
writeString(") Engine = Debugging('", stmt_buf);
writeString(table_info.serialize(true), stmt_buf);
writeEscapedString(table_info.serialize(), stmt_buf);
writeString("')", stmt_buf);
}
else
Expand Down
140 changes: 1 addition & 139 deletions dbms/src/Storages/Transaction/TypeMapping.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#include <type_traits>

#include <Common/FieldVisitors.h>
#include <Core/NamesAndTypes.h>
#include <Common/typeid_cast.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDecimal.h>
Expand All @@ -16,7 +16,6 @@
#include <Functions/FunctionHelpers.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/TypeMapping.h>
#include <Functions/FunctionHelpers.h>

namespace DB
{
Expand Down Expand Up @@ -335,141 +334,4 @@ ColumnInfo reverseGetColumnInfo(const NameAndTypePair & column, ColumnID id, con
return column_info;
}

namespace
{

template <typename T>
bool getDecimalInfo(const IDataType * type, ColumnInfo & column_info)
{
using TypeDec = DataTypeDecimal<T>;
if (auto decimal_type = checkAndGetDataType<TypeDec>(type); decimal_type != nullptr)
{
column_info.flen = decimal_type->getPrec();
column_info.decimal = decimal_type->getScale();
column_info.tp = TiDB::TypeNewDecimal;
return true;
}
return false;
}

} // namespace

ColumnInfo getColumnInfoByDataType(const DataTypePtr & type)
{
ColumnInfo col;
DataTypePtr not_null_type;
if (const auto * type_nullable = typeid_cast<const DataTypeNullable *>(type.get()))
{
not_null_type = type_nullable->getNestedType();
}
else
{
col.setNotNullFlag();
not_null_type = type;
}

// Use enum TypeIndex
switch (not_null_type->getTypeId())
{
case TypeIndex::Nothing:
col.tp = TiDB::TypeNull;
break;

// UnSigned
case TypeIndex::UInt8:
col.setUnsignedFlag();
col.tp = TiDB::TypeTiny;
break;
case TypeIndex::UInt16:
col.setUnsignedFlag();
col.tp = TiDB::TypeShort;
break;
case TypeIndex::UInt32:
col.setUnsignedFlag();
col.tp = TiDB::TypeLong;
break;
case TypeIndex::UInt64:
col.setUnsignedFlag();
col.tp = TiDB::TypeLongLong;
break;

// Signed
case TypeIndex::Int8:
col.tp = TiDB::TypeTiny;
break;
case TypeIndex::Int16:
col.tp = TiDB::TypeShort;
break;
case TypeIndex::Int32:
col.tp = TiDB::TypeLong;
break;
case TypeIndex::Int64:
col.tp = TiDB::TypeLongLong;
break;

// Floating point types
case TypeIndex::Float32:
col.tp = TiDB::TypeFloat;
break;
case TypeIndex::Float64:
col.tp = TiDB::TypeDouble;
break;

case TypeIndex::Date:
case TypeIndex::MyDate:
col.tp = TiDB::TypeDate;
break;
case TypeIndex::DateTime:
case TypeIndex::MyDateTime:
col.tp = TiDB::TypeDatetime;
break;

case TypeIndex::String:
col.tp = TiDB::TypeString;
break;
case TypeIndex::FixedString:
col.tp = TiDB::TypeString;
break;

// Decimal
case TypeIndex::Decimal32:
getDecimalInfo<Decimal32>(type.get(), col);
break;
case TypeIndex::Decimal64:
getDecimalInfo<Decimal64>(type.get(), col);
break;
case TypeIndex::Decimal128:
getDecimalInfo<Decimal128>(type.get(), col);
break;
case TypeIndex::Decimal256:
getDecimalInfo<Decimal256>(type.get(), col);
break;

// Unknown numeric in TiDB
case TypeIndex::UInt128:
break;
case TypeIndex::Int128:
break;
case TypeIndex::Int256:
break;

// Unkown
case TypeIndex::MyTimeStamp:
case TypeIndex::MyTime:
case TypeIndex::Enum8:
case TypeIndex::Enum16:
case TypeIndex::UUID:
case TypeIndex::Array:
case TypeIndex::Tuple:
case TypeIndex::Set:
case TypeIndex::Interval:
case TypeIndex::Nullable:
case TypeIndex::Function:
case TypeIndex::AggregateFunction:
case TypeIndex::LowCardinality:
throw Exception("Unknown TiDB type from " + type->getName(), ErrorCodes::NOT_IMPLEMENTED);
}
return col;
}

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/TypeMapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/expression.pb.h>
#include <Core/NamesAndTypes.h>

#pragma GCC diagnostic pop

namespace DB
Expand All @@ -23,7 +25,6 @@ TiDB::CodecFlag getCodecFlagByFieldType(const tipb::FieldType & field_type);
// Note that not every TiFlash type has a corresponding TiDB type,
// caller should make sure the source type is valid, otherwise exception will be thrown.
ColumnInfo reverseGetColumnInfo(const NameAndTypePair & column, ColumnID id, const Field & default_value);
ColumnInfo getColumnInfoByDataType(const DataTypePtr &type);

} // namespace DB

Loading