Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAG planner fix and mock dag request #169

Merged
merged 14 commits into from
Aug 9, 2019
106 changes: 65 additions & 41 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <DataStreams/StringStreamBlockInputStream.h>
#include <Debug/DBGInvoker.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Debug/dbgFuncMockTiDBData.h>
#include <Debug/dbgFuncMockTiDBTable.h>
#include <Debug/dbgFuncRegion.h>
Expand All @@ -29,43 +30,46 @@ void dbgFuncSleep(Context &, const ASTs & args, DBGInvoker::Printer output)

DBGInvoker::DBGInvoker()
{
regFunc("echo", dbgFuncEcho);
regSchemalessFunc("echo", dbgFuncEcho);
// TODO: remove this, use sleep in bash script
regFunc("sleep", dbgFuncSleep);

regFunc("mock_tidb_table", MockTiDBTable::dbgFuncMockTiDBTable);
regFunc("mock_tidb_db", MockTiDBTable::dbgFuncMockTiDBDB);
regFunc("mock_tidb_partition", MockTiDBTable::dbgFuncMockTiDBPartition);
regFunc("rename_table_for_partition", MockTiDBTable::dbgFuncRenameTableForPartition);
regFunc("drop_tidb_table", MockTiDBTable::dbgFuncDropTiDBTable);
regFunc("drop_tidb_db", MockTiDBTable::dbgFuncDropTiDBDB);
regFunc("add_column_to_tidb_table", MockTiDBTable::dbgFuncAddColumnToTiDBTable);
regFunc("drop_column_from_tidb_table", MockTiDBTable::dbgFuncDropColumnFromTiDBTable);
regFunc("modify_column_in_tidb_table", MockTiDBTable::dbgFuncModifyColumnInTiDBTable);
regFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable);
regFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable);

regFunc("set_flush_threshold", dbgFuncSetFlushThreshold);

regFunc("raft_insert_row", dbgFuncRaftInsertRow);
regFunc("raft_insert_row_full", dbgFuncRaftInsertRowFull);
regFunc("raft_insert_rows", dbgFuncRaftInsertRows);
regFunc("raft_update_rows", dbgFuncRaftUpdateRows);
regFunc("raft_delete_rows", dbgFuncRaftDelRows);
regFunc("raft_delete_row", dbgFuncRaftDeleteRow);

regFunc("put_region", dbgFuncPutRegion);
regFunc("region_snapshot", dbgFuncRegionSnapshot);
regFunc("region_snapshot_data", dbgFuncRegionSnapshotWithData);

regFunc("try_flush", dbgFuncTryFlush);
regFunc("try_flush_region", dbgFuncTryFlushRegion);

regFunc("dump_all_region", dbgFuncDumpAllRegion);

regFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService);
regFunc("refresh_schemas", dbgFuncRefreshSchemas);
regFunc("reset_schemas", dbgFuncResetSchemas);
regSchemalessFunc("sleep", dbgFuncSleep);

regSchemalessFunc("mock_tidb_table", MockTiDBTable::dbgFuncMockTiDBTable);
regSchemalessFunc("mock_tidb_db", MockTiDBTable::dbgFuncMockTiDBDB);
regSchemalessFunc("mock_tidb_partition", MockTiDBTable::dbgFuncMockTiDBPartition);
regSchemalessFunc("rename_table_for_partition", MockTiDBTable::dbgFuncRenameTableForPartition);
regSchemalessFunc("drop_tidb_table", MockTiDBTable::dbgFuncDropTiDBTable);
regSchemalessFunc("drop_tidb_db", MockTiDBTable::dbgFuncDropTiDBDB);
regSchemalessFunc("add_column_to_tidb_table", MockTiDBTable::dbgFuncAddColumnToTiDBTable);
regSchemalessFunc("drop_column_from_tidb_table", MockTiDBTable::dbgFuncDropColumnFromTiDBTable);
regSchemalessFunc("modify_column_in_tidb_table", MockTiDBTable::dbgFuncModifyColumnInTiDBTable);
regSchemalessFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable);
regSchemalessFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable);

regSchemalessFunc("set_flush_threshold", dbgFuncSetFlushThreshold);

regSchemalessFunc("raft_insert_row", dbgFuncRaftInsertRow);
regSchemalessFunc("raft_insert_row_full", dbgFuncRaftInsertRowFull);
regSchemalessFunc("raft_insert_rows", dbgFuncRaftInsertRows);
regSchemalessFunc("raft_update_rows", dbgFuncRaftUpdateRows);
regSchemalessFunc("raft_delete_rows", dbgFuncRaftDelRows);
regSchemalessFunc("raft_delete_row", dbgFuncRaftDeleteRow);

regSchemalessFunc("put_region", dbgFuncPutRegion);
regSchemalessFunc("region_snapshot", dbgFuncRegionSnapshot);
regSchemalessFunc("region_snapshot_data", dbgFuncRegionSnapshotWithData);

regSchemalessFunc("try_flush", dbgFuncTryFlush);
regSchemalessFunc("try_flush_region", dbgFuncTryFlushRegion);

regSchemalessFunc("dump_all_region", dbgFuncDumpAllRegion);

regSchemalessFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService);
regSchemalessFunc("refresh_schemas", dbgFuncRefreshSchemas);
regSchemalessFunc("reset_schemas", dbgFuncResetSchemas);

regSchemafulFunc("dag", dbgFuncDAG);
regSchemafulFunc("mock_dag", dbgFuncMockDAG);
}

void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement)
Expand Down Expand Up @@ -97,10 +101,25 @@ BlockInputStreamPtr DBGInvoker::invoke(Context & context, const std::string & or
name = ori_name.substr(prefix_not_print_res.size(), ori_name.size() - prefix_not_print_res.size());
}

auto it = funcs.find(name);
if (it == funcs.end())
throw Exception("DBG function not found", ErrorCodes::BAD_ARGUMENTS);
BlockInputStreamPtr res;
auto it_schemaless = schemaless_funcs.find(name);
if (it_schemaless != schemaless_funcs.end())
res = invokeSchemaless(context, name, it_schemaless->second, args);
else
{
auto it_schemaful = schemaful_funcs.find(name);
if (it_schemaful != schemaful_funcs.end())
res = invokeSchemaful(context, name, it_schemaful->second, args);
if (it_schemaful == schemaful_funcs.end())
throw Exception("DBG function not found", ErrorCodes::BAD_ARGUMENTS);
}

return print_res ? res : std::shared_ptr<StringStreamBlockInputStream>();
}

BlockInputStreamPtr DBGInvoker::invokeSchemaless(
Context & context, const std::string & name, const SchemalessDBGFunc & func, const ASTs & args)
{
std::stringstream col_name;
col_name << name << "(";
for (size_t i = 0; i < args.size(); ++i)
Expand All @@ -113,9 +132,14 @@ BlockInputStreamPtr DBGInvoker::invoke(Context & context, const std::string & or
std::shared_ptr<StringStreamBlockInputStream> res = std::make_shared<StringStreamBlockInputStream>(col_name.str());
Printer printer = [&](const std::string & s) { res->append(s); };

(it->second)(context, args, printer);
func(context, args, printer);

return print_res ? res : std::shared_ptr<StringStreamBlockInputStream>();
return res;
}

BlockInputStreamPtr DBGInvoker::invokeSchemaful(Context & context, const std::string &, const SchemafulDBGFunc & func, const ASTs & args)
{
return func(context, args);
}

} // namespace DB
11 changes: 8 additions & 3 deletions dbms/src/Debug/DBGInvoker.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@ class DBGInvoker
{
public:
using Printer = std::function<void(const std::string &)>;
using DBGFunc = std::function<void(Context & context, const ASTs & args, Printer printer)>;
using SchemalessDBGFunc = std::function<void(Context & context, const ASTs & args, Printer printer)>;
using SchemafulDBGFunc = std::function<BlockInputStreamPtr(Context & context, const ASTs & args)>;

DBGInvoker();

void regFunc(const std::string & name, DBGFunc func) { funcs[name] = func; }
void regSchemalessFunc(const std::string & name, SchemalessDBGFunc func) { schemaless_funcs[name] = func; }
void regSchemafulFunc(const std::string & name, SchemafulDBGFunc func) { schemaful_funcs[name] = func; }

BlockInputStreamPtr invoke(Context & context, const std::string & ori_name, const ASTs & args);
BlockInputStreamPtr invokeSchemaless(Context & context, const std::string & name, const SchemalessDBGFunc & func, const ASTs & args);
BlockInputStreamPtr invokeSchemaful(Context & context, const std::string & name, const SchemafulDBGFunc & func, const ASTs & args);

private:
std::unordered_map<std::string, DBGFunc> funcs;
std::unordered_map<std::string, SchemalessDBGFunc> schemaless_funcs;
std::unordered_map<std::string, SchemafulDBGFunc> schemaful_funcs;
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
table_info.id = table_id_allocator++;
table_info.name = table_name;

int i = 0;
int i = 1;
for (auto & column : columns.getAllPhysical())
{
table_info.columns.emplace_back(getColumnInfoFromColumn(column, i++));
Expand Down
211 changes: 211 additions & 0 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#include <Common/typeid_cast.h>
#include <DataStreams/BlocksListBlockInputStream.h>
#include <Debug/MockTiDB.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TypeMapping.h>
#include <tipb/select.pb.h>

namespace DB
{

namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
} // namespace ErrorCodes

using DAGField = std::pair<String, tipb::FieldType>;
using DAGSchema = std::vector<DAGField>;
using SchemaFetcher = std::function<TiDB::TableInfo(const String &, const String &)>;
std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts);
tipb::SelectResponse executeDAGRequest(
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version);
BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response);

BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
{
if (args.size() < 1 || args.size() > 2)
throw Exception("Args not matched, should be: query[, region-id]", ErrorCodes::BAD_ARGUMENTS);

String query = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
RegionID region_id = InvalidRegionID;
if (args.size() == 2)
region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
Timestamp start_ts = context.getTMTContext().getPDClient()->getTS();

auto [table_id, schema, dag_request] = compileQuery(context, query,
[&](const String & database_name, const String & table_name) {
auto storage = context.getTable(database_name, table_name);
auto mmt = std::dynamic_pointer_cast<StorageMergeTree>(storage);
if (!mmt || mmt->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
throw Exception("Not TMT", ErrorCodes::BAD_ARGUMENTS);
return mmt->getTableInfo();
},
start_ts);

RegionPtr region;
if (region_id == InvalidRegionID)
{
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id);
if (regions.empty())
throw Exception("No region for table", ErrorCodes::BAD_ARGUMENTS);
region = context.getTMTContext().getRegionTable().getRegionsByTable(table_id).front().second;
}
else
{
region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, region_id);
if (!region)
throw Exception("No such region", ErrorCodes::BAD_ARGUMENTS);
}
tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer());

return outputDAGResponse(context, schema, dag_response);
}

BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
{
if (args.size() < 2 || args.size() > 3)
throw Exception("Args not matched, should be: query, region-id[, start-ts]", ErrorCodes::BAD_ARGUMENTS);

String query = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
RegionID region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
Timestamp start_ts = DEFAULT_MAX_READ_TSO;
if (args.size() == 3)
start_ts = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (start_ts == 0)
start_ts = context.getTMTContext().getPDClient()->getTS();

auto [table_id, schema, dag_request] = compileQuery(context, query,
[&](const String & database_name, const String & table_name) {
return MockTiDB::instance().getTableByName(database_name, table_name)->table_info;
},
start_ts);
std::ignore = table_id;

RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id);
tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer());

return outputDAGResponse(context, schema, dag_response);
}

std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts)
{
DAGSchema schema;
tipb::DAGRequest dag_request;

dag_request.set_start_ts(start_ts);

ParserSelectQuery parser;
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "from DAG compiler", 0);
ASTSelectQuery & ast_query = typeid_cast<ASTSelectQuery &>(*ast);

String database_name, table_name;
auto query_database = ast_query.database();
auto query_table = ast_query.table();
if (query_database)
database_name = typeid_cast<ASTIdentifier &>(*query_database).name;
if (query_table)
table_name = typeid_cast<ASTIdentifier &>(*query_table).name;
if (!query_table)
{
database_name = "system";
table_name = "one";
}
else if (!query_database)
{
database_name = context.getCurrentDatabase();
}
auto table_info = schema_fetcher(database_name, table_name);

tipb::Executor * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeTableScan);
tipb::TableScan * ts = executor->mutable_tbl_scan();
ts->set_table_id(table_info.id);
size_t i = 0;
for (const auto & column_info : table_info.columns)
{
tipb::ColumnInfo * ci = ts->add_columns();
ci->set_column_id(column_info.id);
ci->set_tp(column_info.tp);
ci->set_flag(column_info.flag);

tipb::FieldType field_type;
field_type.set_tp(column_info.tp);
field_type.set_flag(column_info.flag);
field_type.set_flen(column_info.flen);
field_type.set_decimal(column_info.decimal);
schema.emplace_back(std::make_pair(column_info.name, std::move(field_type)));

dag_request.add_output_offsets(i);

i++;
}

// TODO: Other operator compile.

return std::make_tuple(table_info.id, std::move(schema), std::move(dag_request));
}

tipb::SelectResponse executeDAGRequest(
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version)
{
tipb::SelectResponse dag_response;
DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, dag_response, true);
driver.execute();
return dag_response;
}

BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response)
{
BlocksList blocks;
for (const auto & chunk : dag_response.chunks())
{
std::vector<std::vector<DB::Field>> rows;
std::vector<DB::Field> curr_row;
const std::string & data = chunk.rows_data();
size_t cursor = 0;
while (cursor < data.size())
{
curr_row.push_back(DB::DecodeDatum(cursor, data));
if (curr_row.size() == schema.size())
{
rows.emplace_back(std::move(curr_row));
curr_row.clear();
}
}

ColumnsWithTypeAndName columns;
for (auto & field : schema)
{
const auto & name = field.first;
auto data_type = getDataTypeByFieldType(field.second);
ColumnWithTypeAndName col(data_type, name);
col.column->assumeMutable()->reserve(rows.size());
columns.emplace_back(std::move(col));
}
for (const auto & row : rows)
{
for (size_t i = 0; i < row.size(); i++)
{
columns[i].column->assumeMutable()->insert(row[i]);
}
}

blocks.emplace_back(Block(columns));
}

return std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
}

} // namespace DB
Loading