Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-761 refine cop request log #348

Merged
merged 15 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest
{
static Logger * log = &Logger::get("MockDAG");
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
context.setSetting("dag_planner", "optree");
tipb::SelectResponse dag_response;
DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, std::move(key_ranges), dag_response, true);
driver.execute();
Expand Down
19 changes: 1 addition & 18 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,8 @@ try

DAGContext dag_context(dag_request.executors_size());
DAGQuerySource dag(context, dag_context, region_id, region_version, region_conf_version, key_ranges, dag_request);
BlockIO streams;

String planner = context.getSettings().dag_planner;
if (planner == "sql")
{
DAGStringConverter converter(context, dag_request);
String query = converter.buildSqlString();
if (!query.empty())
streams = executeQuery(query, context, internal, QueryProcessingStage::Complete);
}
else if (planner == "optree")
{
streams = executeQuery(dag, context, internal, QueryProcessingStage::Complete);
}
else
{
throw Exception("Unknown DAG planner type " + planner, ErrorCodes::LOGICAL_ERROR);
}

BlockIO streams = executeQuery(dag, context, internal, QueryProcessingStage::Complete);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw Exception("DAG is not query.", ErrorCodes::LOGICAL_ERROR);
Expand Down
62 changes: 53 additions & 9 deletions dbms/src/Flash/Coprocessor/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

#include <Core/QueryProcessingStage.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Storages/MutableSupport.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TypeMapping.h>
#include <Storages/Transaction/Types.h>

namespace DB
Expand Down Expand Up @@ -55,12 +57,19 @@ void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst
for (const tipb::ColumnInfo & ci : ts.columns())
{
ColumnID cid = ci.column_id();
if (cid <= 0 || cid > (ColumnID)columns_from_ts.size())
if (cid == -1)
{
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
// Column ID -1 returns the handle column
auto pk_handle_col = storage->getTableInfo().getPKHandleColumn();
pk_handle_col.has_value();
auto pair = storage->getColumns().getPhysical(
pk_handle_col.has_value() ? pk_handle_col->get().name : MutableSupport::tidb_pk_column_name);
columns_from_ts.push_back(pair);
continue;
}
String name = merge_tree->getTableInfo().columns[cid - 1].name;
output_from_ts.push_back(std::move(name));
auto name = storage->getTableInfo().getColumnName(cid);
auto pair = storage->getColumns().getPhysical(name);
columns_from_ts.push_back(pair);
}
ss << "FROM " << storage->getDatabaseName() << "." << storage->getTableName() << " ";
}
Expand All @@ -86,6 +95,43 @@ void DAGStringConverter::buildSelString(const tipb::Selection & sel, std::string

void DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) { ss << "LIMIT " << limit.limit() << " "; }

void DAGStringConverter::buildAggString(const tipb::Aggregation & agg, std::stringstream & ss)
{
if (agg.group_by_size() != 0)
{
ss << "GROUP BY ";
bool first = true;
for (auto & group_by : agg.group_by())
{
if (first)
first = false;
else
ss << ", ";
ss << exprToString(group_by, getCurrentColumns());
}
}
for (auto & agg_func : agg.agg_func())
{
columns_from_agg.emplace_back(exprToString(agg_func, getCurrentColumns()), getDataTypeByFieldType(agg_func.field_type()));
}
afterAgg = true;
}
void DAGStringConverter::buildTopNString(const tipb::TopN & topN, std::stringstream & ss)
{
ss << "ORDER BY ";
bool first = true;
for (auto & order_by_item : topN.order_by())
{
if (first)
first = false;
else
ss << ", ";
ss << exprToString(order_by_item.expr(), getCurrentColumns()) << " ";
ss << (order_by_item.desc() ? "DESC" : "ASC");
}
ss << " LIMIT " << topN.limit() << " ";
}

//todo return the error message
void DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss)
{
Expand All @@ -101,11 +147,9 @@ void DAGStringConverter::buildString(const tipb::Executor & executor, std::strin
case tipb::ExecType::TypeAggregation:
// stream agg is not supported, treated as normal agg
case tipb::ExecType::TypeStreamAgg:
//todo support agg
throw Exception("Aggregation is not supported", ErrorCodes::NOT_IMPLEMENTED);
return buildAggString(executor.aggregation(), ss);
case tipb::ExecType::TypeTopN:
// todo support top n
throw Exception("TopN is not supported", ErrorCodes::NOT_IMPLEMENTED);
return buildTopNString(executor.topn(), ss);
case tipb::ExecType::TypeLimit:
return buildLimitString(executor.limit(), ss);
}
Expand Down Expand Up @@ -145,7 +189,7 @@ String DAGStringConverter::buildSqlString()
{
project << ", ";
}
project << getCurrentOutputColumns()[index];
project << getCurrentColumns()[index].name;
}
project << " ";
}
Expand Down
15 changes: 2 additions & 13 deletions dbms/src/Flash/Coprocessor/DAGStringConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,19 @@ class DAGStringConverter
return columns_from_ts;
}

const Names & getCurrentOutputColumns()
{
if (afterAgg)
{
return output_from_agg;
}
return output_from_ts;
}

protected:
void buildTSString(const tipb::TableScan & ts, std::stringstream & ss);
void buildSelString(const tipb::Selection & sel, std::stringstream & ss);
void buildLimitString(const tipb::Limit & limit, std::stringstream & ss);
void buildAggString(const tipb::Aggregation & agg, std::stringstream & ss);
void buildTopNString(const tipb::TopN & topN, std::stringstream & ss);
void buildString(const tipb::Executor & executor, std::stringstream & ss);

protected:
Context & context;
const tipb::DAGRequest & dag_request;
// used by columnRef, which starts with 1, and refs column index in the original ts/agg output
std::vector<NameAndTypePair> columns_from_ts;
std::vector<NameAndTypePair> columns_from_agg;
// used by output_offset, which starts with 0, and refs the index in the selected output of ts/agg operater
Names output_from_ts;
Names output_from_agg;
bool afterAgg;
};

Expand Down
36 changes: 23 additions & 13 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,34 @@ String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair>
if (isInOrGlobalInOperator(func_name))
{
// for in, we could not represent the function expr using func_name(param1, param2, ...)
throw Exception("Function " + func_name + " not supported", ErrorCodes::UNSUPPORTED_METHOD);
}
ss << func_name << "(";
bool first = true;
for (const tipb::Expr & child : expr.children())
{
String s = exprToString(child, input_col);
if (first)
ss << exprToString(expr.children(0), input_col) << " " << func_name << " (";
bool first = true;
for (int i = 1; i < expr.children_size(); i++)
{
first = false;
String s = exprToString(expr.children(i), input_col);
if (first)
first = false;
else
ss << ", ";
ss << s;
}
else
ss << ")";
}
else
{
ss << func_name << "(";
bool first = true;
for (const tipb::Expr & child : expr.children())
{
ss << ", ";
String s = exprToString(child, input_col);
if (first)
first = false;
else
ss << ", ";
ss << s;
}
ss << s;
ss << ")";
}
ss << ") ";
return ss.str();
}

Expand Down
22 changes: 19 additions & 3 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <DataStreams/UnionBlockInputStream.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryInfo.h>
#include <Flash/Coprocessor/DAGStringConverter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/Aggregator.h>
#include <Parsers/ASTSelectQuery.h>
Expand Down Expand Up @@ -140,7 +141,7 @@ bool checkKeyRanges(const std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>>
return isAllValueCoveredByRanges<Int64>(handle_ranges, region_handle_ranges);
}

RegionException::RegionReadStatus InterpreterDAG::getRegionReadStatus(RegionPtr current_region)
RegionException::RegionReadStatus InterpreterDAG::getRegionReadStatus(const RegionPtr & current_region)
{
if (!current_region)
return RegionException::NOT_FOUND;
Expand Down Expand Up @@ -270,6 +271,7 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
{
std::vector<RegionID> region_ids;
region_ids.push_back(info.region_id);
LOG_WARNING(log, __PRETTY_FUNCTION__ << " Meet region exception for region " << info.region_id);
throw RegionException(std::move(region_ids), region_read_status);
}
if (!checkKeyRanges(dag.getKeyRanges(), table_id, storage->pkIsUInt64(), current_region->getRange()))
Expand Down Expand Up @@ -374,7 +376,7 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
// add cast if type is not match
analyzer->appendAggSelect(chain, dag.getAggregation(), dag.getDAGRequest(), keep_session_timezone_info);
//todo use output_offset to reconstruct the final project columns
for (auto element : analyzer->getCurrentInputColumns())
for (auto & element : analyzer->getCurrentInputColumns())
{
final_project.emplace_back(element.name, "");
}
Expand Down Expand Up @@ -654,7 +656,7 @@ void InterpreterDAG::executeFinalProject(Pipeline & pipeline)
{
auto columns = pipeline.firstStream()->getHeader();
NamesAndTypesList input_column;
for (auto column : columns.getColumnsWithTypeAndName())
for (auto & column : columns.getColumnsWithTypeAndName())
{
input_column.emplace_back(column.name, column.type);
}
Expand Down Expand Up @@ -686,6 +688,20 @@ BlockIO InterpreterDAG::execute()

LOG_DEBUG(
log, __PRETTY_FUNCTION__ << " Convert DAG request to BlockIO, adding " << analyzer->getImplicitCastCount() << " implicit cast");
if (log->debug())
{
try
{
DAGStringConverter converter(context, dag.getDAGRequest());
auto sql_text = converter.buildSqlString();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " SQL in DAG request is " << sql_text);
}
catch (...)
{
// catch all the exceptions so the convert error will not affect the query execution
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Failed to convert DAG request to sql text");
}
}
return res;
}
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class InterpreterDAG : public IInterpreter
AnalysisResult analyzeExpressions();
void recordProfileStreams(Pipeline & pipeline, Int32 index);
bool addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline);
RegionException::RegionReadStatus getRegionReadStatus(RegionPtr current_region);
RegionException::RegionReadStatus getRegionReadStatus(const RegionPtr & current_region);

private:
Context & context;
Expand Down
13 changes: 6 additions & 7 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ CoprocessorHandler::CoprocessorHandler(
: cop_context(cop_context_), cop_request(cop_request_), cop_response(cop_response_), log(&Logger::get("CoprocessorHandler"))
{}

grpc::Status CoprocessorHandler::execute() try
grpc::Status CoprocessorHandler::execute()
try
{
switch (cop_request->tp())
{
Expand Down Expand Up @@ -59,9 +60,8 @@ grpc::Status CoprocessorHandler::execute() try
}
catch (const LockException & e)
{
LOG_ERROR(log,
__PRETTY_FUNCTION__ << ": LockException: region " << cop_request->context().region_id() << "\n"
<< e.getStackTrace().toString());
LOG_WARNING(
log, __PRETTY_FUNCTION__ << ": LockException: region " << cop_request->context().region_id() << ", message: " << e.message());
cop_response->Clear();
kvrpcpb::LockInfo * lock_info = cop_response->mutable_locked();
lock_info->set_key(e.lock_infos[0]->key);
Expand All @@ -73,9 +73,8 @@ catch (const LockException & e)
}
catch (const RegionException & e)
{
LOG_ERROR(log,
__PRETTY_FUNCTION__ << ": RegionException: region " << cop_request->context().region_id() << "\n"
<< e.getStackTrace().toString());
LOG_WARNING(
log, __PRETTY_FUNCTION__ << ": RegionException: region " << cop_request->context().region_id() << ", message: " << e.message());
cop_response->Clear();
errorpb::Error * region_err;
switch (e.status)
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ std::tuple<Context, grpc::Status> FlashService::createDBContext(const grpc::Serv
{
context.setSetting("dag_records_per_chunk", dag_records_per_chunk_str);
}
std::string planner = getClientMetaVarWithDefault(grpc_context, "dag_planner", "optree");
context.setSetting("dag_planner", planner);
std::string expr_field_type_check = getClientMetaVarWithDefault(grpc_context, "dag_expr_field_type_strict_check", "1");
context.setSetting("dag_expr_field_type_strict_check", expr_field_type_check);

Expand Down
1 change: 0 additions & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ struct Settings
M(SettingBool, resolve_locks, false, "tmt resolve locks.") \
M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \
M(SettingInt64, dag_records_per_chunk, DEFAULT_DAG_RECORDS_PER_CHUNK, "default chunk size of a DAG response.") \
M(SettingString, dag_planner, "optree", "planner for DAG query, sql builds the SQL string, optree builds the internal operator(stream) tree.") \
M(SettingBool, dag_expr_field_type_strict_check, true, "when set to true, every expr in the dag request must provide field type, otherwise only the result expr will be checked.") \
M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \
M(SettingUInt64, batch_commands_threads, 0, "Number of threads to use for handling batch commands concurrently. 0 means - same as 'max_threads'.") \
Expand Down
4 changes: 2 additions & 2 deletions tests/mutable-test/txn_dag/arrow_encode.test
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
=> DBGInvoke __raft_insert_row(default, test, 4, 55, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, null, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'arrow encode test',11.11,100,null,9572888,1)
=> DBGInvoke __raft_insert_row(default, test, 4, 56, -128, 255, -32768, null, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'arrow encode test',22.22,100,0,9572888,2)

=> DBGInvoke dag('select * from default.test',4,'arrow') " --dag_planner="optree
=> DBGInvoke dag('select * from default.test',4,'arrow')
┌─col_1─┬─col_2─┬──col_3─┬─col_4─┬───────col_5─┬──────col_6─┬────────────────col_7─┬────────────────col_8─┬─────col_9─┬─────────col_10─┬─────col_11─┬──────────────col_12─┬─col_13────────────┬─col_14───┬─col_15─┬─col_16─┬──col_17─┬─col_18─┐
│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 12.12 │ 100 │ 1 │ 9572888 │ a │
│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode │ 123.23 │ 100 │ 0 │ \N │ b │
Expand All @@ -29,7 +29,7 @@
│ -128 │ 255 │ -32768 │ \N │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 22.22 │ 100 │ 0 │ 9572888 │ b │
└───────┴───────┴────────┴───────┴─────────────┴────────────┴──────────────────────┴──────────────────────┴───────────┴────────────────┴────────────┴─────────────────────┴───────────────────┴──────────┴────────┴────────┴─────────┴────────┘

=> DBGInvoke mock_dag('select * from default.test',4,0,'arrow') " --dag_planner="optree
=> DBGInvoke mock_dag('select * from default.test',4,0,'arrow')
┌─col_1─┬─col_2─┬──col_3─┬─col_4─┬───────col_5─┬──────col_6─┬────────────────col_7─┬────────────────col_8─┬─────col_9─┬─────────col_10─┬─────col_11─┬──────────────col_12─┬─col_13────────────┬─col_14───┬─col_15─┬─col_16─┬──col_17─┬─col_18─┐
│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 12.12 │ 100 │ 1 │ 9572888 │ a │
│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode │ 123.23 │ 100 │ 0 │ \N │ b │
Expand Down
Loading