Skip to content

Commit

Permalink
[FLASH-548/579] Engine DeltaMerge coprocessor integration (pingcap#287)
Browse files Browse the repository at this point in the history
* Throw exception with region status; Use reverseGetColumnInfo

* Use IManageableStorage instead of StorageMergeTree

* Fix args of mock_tidb_table && ensure engine is DeltaMerge

* For learner read from TiDB/TiSpark, we set num_streams by mvcc_query_info.concurrent

* fix compile error

* address comment; add log for level trace
  • Loading branch information
JaySon-Huang committed Oct 30, 2019
1 parent d724537 commit 6e22571
Show file tree
Hide file tree
Showing 23 changed files with 122 additions and 214 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ DatabaseID MockTiDB::newDataBase(const String & database_name)
}

TableID MockTiDB::newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name, const String & engine_type)
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name, String engine_type)
{
std::lock_guard lock(tables_mutex);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class MockTiDB : public ext::singleton<MockTiDB>

public:
TableID newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name, const String &engine_type);
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name, String engine_type);

DatabaseID newDataBase(const String & database_name);

Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
context, query,
[&](const String & database_name, const String & table_name) {
auto storage = context.getTable(database_name, table_name);
auto mmt = std::dynamic_pointer_cast<StorageMergeTree>(storage);
if (!mmt || mmt->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
throw Exception("Not TMT", ErrorCodes::BAD_ARGUMENTS);
return mmt->getTableInfo();
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage //
|| !(managed_storage->engineType() == ::TiDB::StorageEngine::DM
|| managed_storage->engineType() == ::TiDB::StorageEngine::TMT))
throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS);
return managed_storage->getTableInfo();
},
start_ts, tz_offset, tz_name, encode_type);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, D
= InterpreterCreateQuery::getColumnsDescription(typeid_cast<const ASTExpressionList &>(*columns_ast), context);

String engine_type("tmt");
if (args.size() == 4)
engine_type = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);
if (args.size() == 5)
engine_type = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[4]).value);

auto tso = context.getTMTContext().getPDClient()->getTS();

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
return expr_name;
}

void DAGExpressionAnalyzer::makeExplicitSetForIndex(const tipb::Expr & expr, const TMTStoragePtr & storage)
void DAGExpressionAnalyzer::makeExplicitSetForIndex(const tipb::Expr & expr, const ManageableStoragePtr & storage)
{
for (auto & child : expr.children())
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const std::vector<NameAndTypePair> & getCurrentInputColumns();
void makeExplicitSet(const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name);
void makeExplicitSetForIndex(const tipb::Expr & expr, const TMTStoragePtr & storage);
void makeExplicitSetForIndex(const tipb::Expr & expr, const ManageableStoragePtr & storage);
String applyFunction(const String & func_name, const Names & arg_names, ExpressionActionsPtr & actions);
Int32 getImplicitCastCount() { return implicit_cast_count; };
bool appendTimeZoneCastsAfterTS(ExpressionActionsChain & chain, std::vector<bool> is_ts_column, const tipb::DAGRequest & rqst);
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst
{
throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
}
const auto * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get());
if (!merge_tree)

const auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage)
{
throw Exception("Only MergeTree table is supported in DAG request", ErrorCodes::COP_BAD_DAG_REQUEST);
throw Exception("Only Manageable table is supported in DAG request", ErrorCodes::COP_BAD_DAG_REQUEST);
}

if (ts.columns_size() == 0)
Expand All @@ -59,7 +60,7 @@ void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst
{
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
String name = merge_tree->getTableInfo().columns[cid - 1].name;
String name = managed_storage->getTableInfo().columns[cid - 1].name;
output_from_ts.push_back(std::move(name));
}
ss << "FROM " << storage->getDatabaseName() << "." << storage->getTableName() << " ";
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
region_ids.push_back(info.region_id);
throw RegionException(std::move(region_ids), RegionTable::RegionReadStatus::NOT_FOUND);
}
if (!checkKeyRanges(dag.getKeyRanges(), table_id, storage->pkIsUInt64(), current_region->getRange()))
if (!checkKeyRanges(dag.getKeyRanges(), table_id, /* pk_is_uint64= */ storage->getPKType() == IManageableStorage::PKType::UINT64,
current_region->getRange()))
throw Exception("Cop request only support full range scan for given region", ErrorCodes::COP_BAD_DAG_REQUEST);
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
Expand Down Expand Up @@ -461,7 +462,7 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
auto global_schema_version = context.getTMTContext().getSchemaSyncer()->getCurrentVersion();

/// Lambda for get storage, then align schema version under the read lock.
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<TMTStoragePtr, TableStructureReadLockPtr, Int64, bool> {
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<ManageableStoragePtr, TableStructureReadLockPtr, Int64, bool> {
/// Get storage in case it's dropped then re-created.
// If schema synced, call getTable without try, leading to exception on table not existing.
auto storage_ = context.getTMTContext().getStorages().get(table_id);
Expand All @@ -473,10 +474,12 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false);
}

if (storage_->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
throw Exception("Specifying schema_version for non-TMT storage: " + storage_->getName() + ", table: " + std::to_string(table_id)
+ " is not allowed",
if (storage_->engineType() != ::TiDB::StorageEngine::TMT && storage_->engineType() != ::TiDB::StorageEngine::DM)
{
throw Exception("Specifying schema_version for non-managed storage: " + storage_->getName() + ", table: " + storage_->getTableName()
+ ",id: " + DB::toString(table_id) + " is not allowed",
ErrorCodes::LOGICAL_ERROR);
}

/// Lock storage.
auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__);
Expand All @@ -500,7 +503,7 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
};

/// Try get storage and lock once.
TMTStoragePtr storage_;
ManageableStoragePtr storage_;
TableStructureReadLockPtr lock;
Int64 storage_schema_version;
auto log_schema_version = [&](const String & result) {
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class InterpreterDAG : public IInterpreter
size_t max_streams = 1;

/// Table from where to read data, if not subquery.
TMTStoragePtr storage;
ManageableStoragePtr storage;
TableStructureReadLockPtr table_lock;

std::unique_ptr<DAGExpressionAnalyzer> analyzer;
Expand Down
50 changes: 28 additions & 22 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,13 @@ BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settin
namespace
{

void throwRetryRegion(const MvccQueryInfo::RegionsQueryInfo & regions_info)
void throwRetryRegion(const MvccQueryInfo::RegionsQueryInfo & regions_info, RegionTable::RegionReadStatus status)
{
std::vector<RegionID> region_ids;
region_ids.reserve(regions_info.size());
for (const auto & info : regions_info)
region_ids.push_back(info.region_id);
throw RegionException(region_ids);
throw RegionException(std::move(region_ids), status);
}

inline void doLearnerRead(const TiDB::TableID table_id, //
Expand Down Expand Up @@ -287,7 +287,7 @@ inline void doLearnerRead(const TiDB::TableID table_id, //
if (region == nullptr)
{
LOG_WARNING(log, "[region " << info.region_id << "] is not found in KVStore, try again");
throwRetryRegion(regions_info);
throwRetryRegion(regions_info, RegionTable::RegionReadStatus::NOT_FOUND);
}
kvstore_region.emplace(info.region_id, std::move(region));
}
Expand Down Expand Up @@ -403,27 +403,34 @@ BlockInputStreams StorageDeltaMerge::read( //
{
/// Learner read.
doLearnerRead(tidb_table_info.id, mvcc_query_info.regions_query_info, tmt, log);

if (likely(!mvcc_query_info.regions_query_info.empty()))
{
/// For learner read from TiDB/TiSpark, we set num_streams by `mvcc_query_info.concurrent`
num_streams = std::max(1U, static_cast<UInt32>(mvcc_query_info.concurrent));
} // else learner read from ch-client, keep num_streams
}

HandleRanges ranges = getQueryRanges(mvcc_query_info.regions_query_info);

#ifndef NDEBUG
if (log->trace())
{
std::stringstream ss;
for (const auto &region: mvcc_query_info.regions_query_info)
{
const auto & range = region.range_in_table;
ss << region.region_id << "[" << range.first.toString() << "," << range.second.toString() << "),";
std::stringstream ss;
for (const auto &region: mvcc_query_info.regions_query_info)
{
const auto &range = region.range_in_table;
ss << region.region_id << "[" << range.first.toString() << "," << range.second.toString() << "),";
}
LOG_TRACE(log, "reading ranges: orig: " << ss.str());
}
{
std::stringstream ss;
for (const auto &range : ranges)
ss << range.toString() << ",";
LOG_TRACE(log, "reading ranges: " << ss.str());
}
LOG_TRACE(log, "reading ranges: orig: " << ss.str());
}
{
std::stringstream ss;
for (const auto &range : ranges)
ss << range.toString() << ",";
LOG_TRACE(log, "reading ranges: " << ss.str());
}
#endif

return store->read(
context, context.getSettingsRef(), to_read, ranges, num_streams, /*max_version=*/mvcc_query_info.read_tso, max_block_size);
Expand Down Expand Up @@ -617,10 +624,9 @@ void updateDeltaMergeTableCreateStatement( //
{
if (hidden_columns.has(column_define.name))
continue;
TiDB::ColumnInfo column_info = getColumnInfoByDataType(column_define.type);
column_info.id = column_define.id;
column_info.name = column_define.name;
column_info.origin_default_value = column_define.default_value;
Field default_field;
TiDB::ColumnInfo column_info = reverseGetColumnInfo(NameAndTypePair{column_define.name, column_define.type}, column_define.id, default_field);
// TODO column_info.origin_default_value = column_define.default_value;
table_info_from_store.columns.emplace_back(std::move(column_info));
}
}
Expand All @@ -630,9 +636,9 @@ void updateDeltaMergeTableCreateStatement( //
IDatabase::ASTModifier storage_modifier = [&](IAST & ast) {
std::shared_ptr<ASTLiteral> literal;
if (table_info_from_tidb)
literal = std::make_shared<ASTLiteral>(Field(table_info_from_tidb->get().serialize(true)));
literal = std::make_shared<ASTLiteral>(Field(table_info_from_tidb->get().serialize()));
else
literal = std::make_shared<ASTLiteral>(Field(table_info_from_store.serialize(true)));
literal = std::make_shared<ASTLiteral>(Field(table_info_from_store.serialize()));
auto & storage_ast = typeid_cast<ASTStorage &>(ast);
auto & args = typeid_cast<ASTExpressionList &>(*storage_ast.engine->arguments);
if (args.children.size() == 1)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/StorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ class StorageMergeTree : public ext::shared_ptr_helper<StorageMergeTree>, public

String getDataPath() const override { return full_path; }

bool pkIsUInt64() const { return getTMTPKType(*data.primary_key_data_types[0]) == TMTPKType::UINT64; }
SortDescription getPrimarySortDescription() const override { return data.getPrimarySortDescription(); }

private:
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ String createTableStmt(const DBInfo & db_info, const TableInfo & table_info)
else if (table_info.engine_type == TiDB::StorageEngine::DEBUGGING_MEMORY)
{
writeString(") Engine = Debugging('", stmt_buf);
writeString(table_info.serialize(true), stmt_buf);
writeEscapedString(table_info.serialize(), stmt_buf);
writeString("')", stmt_buf);
}
else
Expand Down
Loading

0 comments on commit 6e22571

Please sign in to comment.