Skip to content

Commit

Permalink
FLASH-788 upgrade kvproto and tipb in TiFlash (#368)
Browse files Browse the repository at this point in the history
* TiDB requires row__id when use -1 as column id in table scan executor

* check region read status in executeTS

* address comments

* address comments

* FLASH-788 upgrade kvproto and tipb in TiFlash

* update dag tests

* revert unnecessary change

* use tso fallback if current tso is not set

Co-authored-by: Liangliang Gu <marsishandsome@gmail.com>
  • Loading branch information
windtalker and marsishandsome authored Dec 27, 2019
1 parent 31d470f commit 33c10a5
Show file tree
Hide file tree
Showing 16 changed files with 79 additions and 48 deletions.
26 changes: 13 additions & 13 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ using DAGColumnInfo = std::pair<String, ColumnInfo>;
using DAGSchema = std::vector<DAGColumnInfo>;
using SchemaFetcher = std::function<TableInfo(const String &, const String &)>;
std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context, const String & query, SchemaFetcher schema_fetcher,
Timestamp start_ts, Int64 tz_offset, const String & tz_name, const String & encode_type);
Int64 tz_offset, const String & tz_name, const String & encode_type);
tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version,
UInt64 region_conf_version, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges);
UInt64 region_conf_version, Timestamp start_ts, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges);
BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response);

BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
Expand Down Expand Up @@ -75,7 +75,7 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
throw Exception("Not TMT", ErrorCodes::BAD_ARGUMENTS);
return mmt->getTableInfo();
},
start_ts, tz_offset, tz_name, encode_type);
tz_offset, tz_name, encode_type);

RegionPtr region;
if (region_id == InvalidRegionID)
Expand All @@ -98,7 +98,7 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
DecodedTiKVKey end_key = RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id);
key_ranges.emplace_back(std::make_pair(std::move(start_key), std::move(end_key)));
tipb::SelectResponse dag_response
= executeDAGRequest(context, dag_request, region->id(), region->version(), region->confVer(), key_ranges);
= executeDAGRequest(context, dag_request, region->id(), region->version(), region->confVer(), start_ts, key_ranges);

return outputDAGResponse(context, schema, dag_response);
}
Expand Down Expand Up @@ -131,7 +131,7 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
[&](const String & database_name, const String & table_name) {
return MockTiDB::instance().getTableByName(database_name, table_name)->table_info;
},
start_ts, tz_offset, tz_name, encode_type);
tz_offset, tz_name, encode_type);
std::ignore = table_id;

RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id);
Expand All @@ -141,7 +141,7 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
DecodedTiKVKey end_key = RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id);
key_ranges.emplace_back(std::make_pair(std::move(start_key), std::move(end_key)));
tipb::SelectResponse dag_response
= executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer(), key_ranges);
= executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer(), start_ts, key_ranges);

return outputDAGResponse(context, schema, dag_response);
}
Expand Down Expand Up @@ -322,16 +322,15 @@ void hijackTiDBTypeForMockTest(ColumnInfo & ci)
}

std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context, const String & query, SchemaFetcher schema_fetcher,
Timestamp start_ts, Int64 tz_offset, const String & tz_name, const String & encode_type)
Int64 tz_offset, const String & tz_name, const String & encode_type)
{
DAGSchema schema;
tipb::DAGRequest dag_request;
dag_request.set_time_zone_name(tz_name);
dag_request.set_time_zone_offset(tz_offset);

dag_request.set_start_ts(start_ts);
if (encode_type == "arrow")
dag_request.set_encode_type(tipb::EncodeType::TypeArrow);
if (encode_type == "chunk")
dag_request.set_encode_type(tipb::EncodeType::TypeChunk);
else
dag_request.set_encode_type(tipb::EncodeType::TypeDefault);

Expand Down Expand Up @@ -622,12 +621,13 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context,
}

tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version,
UInt64 region_conf_version, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges)
UInt64 region_conf_version, Timestamp start_ts, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges)
{
static Logger * log = &Logger::get("MockDAG");
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
tipb::SelectResponse dag_response;
DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, std::move(key_ranges), dag_response, true);
DAGDriver driver(
context, dag_request, region_id, region_version, region_conf_version, start_ts, std::move(key_ranges), dag_response, true);
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
return dag_response;
Expand Down Expand Up @@ -657,7 +657,7 @@ BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const
throw Exception(dag_response.error().msg(), dag_response.error().code());

BlocksList blocks;
if (dag_response.encode_type() == tipb::EncodeType::TypeArrow)
if (dag_response.encode_type() == tipb::EncodeType::TypeChunk)
{
arrowChunkToBlocks(schema, dag_response, blocks);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_,
{
chunk_codec_stream = std::make_unique<DefaultChunkCodec>()->newCodecStream(result_field_types);
}
else if (encodeType == tipb::EncodeType::TypeArrow)
else if (encodeType == tipb::EncodeType::TypeChunk)
{
chunk_codec_stream = std::make_unique<ArrowChunkCodec>()->newCodecStream(result_field_types);
}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ extern const int UNKNOWN_EXCEPTION;
} // namespace ErrorCodes

DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
UInt64 region_conf_version_, UInt64 start_ts, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
tipb::SelectResponse & dag_response_, bool internal_)
: context(context_),
dag_request(dag_request_),
Expand All @@ -34,13 +34,13 @@ DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_,
dag_response(dag_response_),
internal(internal_),
log(&Logger::get("DAGDriver"))
{}
{
context.setSetting("read_tso", start_ts);
}

void DAGDriver::execute()
try
{
context.setSetting("read_tso", UInt64(dag_request.start_ts()));

DAGContext dag_context(dag_request.executors_size());
DAGQuerySource dag(context, dag_context, region_id, region_version, region_conf_version, key_ranges, dag_request);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class DAGDriver
{
public:
DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
UInt64 region_conf_version_, UInt64 start_ts, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
tipb::SelectResponse & dag_response_, bool internal_ = false);

void execute();
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, Re
}
}
encode_type = dag_request.encode_type();
if (encode_type == tipb::EncodeType::TypeArrow && hasUnsupportedTypeForArrowEncode(getResultFieldTypes()))
if (encode_type == tipb::EncodeType::TypeChunk && hasUnsupportedTypeForArrowEncode(getResultFieldTypes()))
{
encode_type = tipb::EncodeType::TypeDefault;
}
if (encode_type == tipb::EncodeType::TypeChunk && dag_request.has_chunk_memory_layout()
&& dag_request.chunk_memory_layout().has_endian() && dag_request.chunk_memory_layout().endian() == tipb::Endian::BigEndian)
// todo support BigEndian encode for chunk encode type
throw Exception("BigEndian encode for chunk encode type is not supported yet.", ErrorCodes::NOT_IMPLEMENTED);
}

std::tuple<std::string, ASTPtr> DAGQuerySource::parse(size_t max_query_size)
Expand Down
52 changes: 38 additions & 14 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
{tipb::ScalarFuncSig::Log10, "log10"},

{tipb::ScalarFuncSig::Rand, "rand"},
//{tipb::ScalarFuncSig::RandWithSeed, "cast"},
//{tipb::ScalarFuncSig::RandWithSeedFirstGen, "cast"},

{tipb::ScalarFuncSig::Pow, "pow"},
//{tipb::ScalarFuncSig::Conv, "cast"},
Expand All @@ -584,6 +584,7 @@ std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
{tipb::ScalarFuncSig::TruncateInt, "trunc"},
{tipb::ScalarFuncSig::TruncateReal, "trunc"},
//{tipb::ScalarFuncSig::TruncateDecimal, "cast"},
{tipb::ScalarFuncSig::TruncateUint, "trunc"},

{tipb::ScalarFuncSig::LogicalAnd, "and"},
{tipb::ScalarFuncSig::LogicalOr, "or"},
Expand Down Expand Up @@ -672,6 +673,10 @@ std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::SHA2, "cast"},
//{tipb::ScalarFuncSig::Uncompress, "cast"},
//{tipb::ScalarFuncSig::UncompressedLength, "cast"},
//{tipb::ScalarFuncSig::AesDecryptIV, "cast"},
//{tipb::ScalarFuncSig::AesEncryptIV, "cast"},
//{tipb::ScalarFuncSig::Encode, "cast"},
//{tipb::ScalarFuncSig::Decode, "cast"},

//{tipb::ScalarFuncSig::Database, "cast"},
//{tipb::ScalarFuncSig::FoundRows, "cast"},
Expand Down Expand Up @@ -705,8 +710,8 @@ std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::UUID, "cast"},

{tipb::ScalarFuncSig::LikeSig, "like3Args"},
//{tipb::ScalarFuncSig::RegexpBinarySig, "cast"},
//{tipb::ScalarFuncSig::RegexpSig, "cast"},
//{tipb::ScalarFuncSig::RegexpUTF8Sig, "cast"},

//{tipb::ScalarFuncSig::JsonExtractSig, "cast"},
//{tipb::ScalarFuncSig::JsonUnquoteSig, "cast"},
Expand Down Expand Up @@ -746,6 +751,24 @@ std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::StringDurationTimeDiff, "cast"},
//{tipb::ScalarFuncSig::StringStringTimeDiff, "cast"},
//{tipb::ScalarFuncSig::TimeTimeTimeDiff, "cast"},
//{tipb::ScalarFuncSig::SubDateStringReal, "cast"},
//{tipb::ScalarFuncSig::SubDateIntReal, "cast"},
//{tipb::ScalarFuncSig::SubDateIntDecimal, "cast"},
//{tipb::ScalarFuncSig::SubDateDatetimeReal, "cast"},
//{tipb::ScalarFuncSig::SubDateDatetimeDecimal, "cast"},
//{tipb::ScalarFuncSig::SubDateDurationString, "cast"},
//{tipb::ScalarFuncSig::SubDateDurationInt, "cast"},
//{tipb::ScalarFuncSig::SubDateDatetimeReal, "cast"},
//{tipb::ScalarFuncSig::SubDateDatetimeDecimal, "cast"},
//{tipb::ScalarFuncSig::AddDateStringReal, "cast"},
//{tipb::ScalarFuncSig::AddDateIntReal, "cast"},
//{tipb::ScalarFuncSig::AddDateIntDecimal, "cast"},
//{tipb::ScalarFuncSig::AddDateDatetimeReal, "cast"},
//{tipb::ScalarFuncSig::AddDateDatetimeDecimal, "cast"},
//{tipb::ScalarFuncSig::AddDateDurationString, "cast"},
//{tipb::ScalarFuncSig::AddDateDurationInt, "cast"},
//{tipb::ScalarFuncSig::AddDateDurationInt, "cast"},
//{tipb::ScalarFuncSig::AddDateDurationDecimal, "cast"},

//{tipb::ScalarFuncSig::Date, "cast"},
//{tipb::ScalarFuncSig::Hour, "cast"},
Expand Down Expand Up @@ -863,7 +886,7 @@ std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::Bin, "cast"},
//{tipb::ScalarFuncSig::ASCII, "cast"},
//{tipb::ScalarFuncSig::Char, "cast"},
{tipb::ScalarFuncSig::CharLength, "lengthUTF8"},
{tipb::ScalarFuncSig::CharLengthUTF8, "lengthUTF8"},
//{tipb::ScalarFuncSig::Concat, "cast"},
//{tipb::ScalarFuncSig::ConcatWS, "cast"},
//{tipb::ScalarFuncSig::Convert, "cast"},
Expand All @@ -881,23 +904,23 @@ std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::FromBase64, "cast"},
//{tipb::ScalarFuncSig::HexIntArg, "cast"},
//{tipb::ScalarFuncSig::HexStrArg, "cast"},
//{tipb::ScalarFuncSig::InsertUTF8, "cast"},
//{tipb::ScalarFuncSig::Insert, "cast"},
//{tipb::ScalarFuncSig::InsertBinary, "cast"},
//{tipb::ScalarFuncSig::InstrUTF8, "cast"},
//{tipb::ScalarFuncSig::Instr, "cast"},
//{tipb::ScalarFuncSig::InstrBinary, "cast"},

{tipb::ScalarFuncSig::LTrim, "ltrim"},
//{tipb::ScalarFuncSig::LeftUTF8, "cast"},
//{tipb::ScalarFuncSig::Left, "cast"},
//{tipb::ScalarFuncSig::LeftBinary, "cast"},
{tipb::ScalarFuncSig::Length, "length"},
//{tipb::ScalarFuncSig::Locate2ArgsUTF8, "cast"},
//{tipb::ScalarFuncSig::Locate3ArgsUTF8, "cast"},
//{tipb::ScalarFuncSig::Locate2Args, "cast"},
//{tipb::ScalarFuncSig::Locate3Args, "cast"},
//{tipb::ScalarFuncSig::LocateBinary2Args, "cast"},
//{tipb::ScalarFuncSig::LocateBinary3Args, "cast"},

{tipb::ScalarFuncSig::Lower, "lower"},
//{tipb::ScalarFuncSig::LpadUTF8, "cast"},
//{tipb::ScalarFuncSig::Lpad, "cast"},
//{tipb::ScalarFuncSig::LpadBinary, "cast"},
//{tipb::ScalarFuncSig::MakeSet, "cast"},
//{tipb::ScalarFuncSig::OctInt, "cast"},
//{tipb::ScalarFuncSig::OctString, "cast"},
Expand All @@ -906,18 +929,18 @@ std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
{tipb::ScalarFuncSig::RTrim, "rtrim"},
//{tipb::ScalarFuncSig::Repeat, "cast"},
//{tipb::ScalarFuncSig::Replace, "cast"},
//{tipb::ScalarFuncSig::ReverseUTF8, "cast"},
//{tipb::ScalarFuncSig::Reverse, "cast"},
//{tipb::ScalarFuncSig::ReverseBinary, "cast"},
//{tipb::ScalarFuncSig::RightUTF8, "cast"},
//{tipb::ScalarFuncSig::Right, "cast"},
//{tipb::ScalarFuncSig::RightBinary, "cast"},
//{tipb::ScalarFuncSig::RpadUTF8, "cast"},
//{tipb::ScalarFuncSig::Rpad, "cast"},
//{tipb::ScalarFuncSig::RpadBinary, "cast"},
//{tipb::ScalarFuncSig::Space, "cast"},
//{tipb::ScalarFuncSig::Strcmp, "cast"},
//{tipb::ScalarFuncSig::Substring2ArgsUTF8, "cast"},
//{tipb::ScalarFuncSig::Substring3ArgsUTF8, "cast"},
//{tipb::ScalarFuncSig::Substring2Args, "cast"},
//{tipb::ScalarFuncSig::Substring3Args, "cast"},
//{tipb::ScalarFuncSig::SubstringBinary2Args, "cast"},
//{tipb::ScalarFuncSig::SubstringBinary3Args, "cast"},
//{tipb::ScalarFuncSig::SubstringIndex, "cast"},

//{tipb::ScalarFuncSig::ToBase64, "cast"},
Expand All @@ -926,6 +949,7 @@ std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::Trim3Args, "cast"},
//{tipb::ScalarFuncSig::UnHex, "cast"},
{tipb::ScalarFuncSig::Upper, "upper"},
//{tipb::ScalarFuncSig::CharLength, "upper"},
});

tipb::FieldType columnInfoToFieldType(const TiDB::ColumnInfo & ci)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ extern const int COP_BAD_DAG_REQUEST;
InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
: context(context_),
dag(dag_),
keep_session_timezone_info(dag.getEncodeType() == tipb::EncodeType::TypeArrow),
keep_session_timezone_info(dag.getEncodeType() == tipb::EncodeType::TypeChunk),
log(&Logger::get("InterpreterDAG"))
{}

Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ try
tipb::DAGRequest dag_request;
dag_request.ParseFromString(cop_request->data());
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
if (dag_request.has_is_rpn_expr() && dag_request.is_rpn_expr())
throw Exception("DAG request with rpn expression is not supported in TiFlash", ErrorCodes::NOT_IMPLEMENTED);
tipb::SelectResponse dag_response;
DAGDriver driver(cop_context.db_context, dag_request, cop_context.kv_context.region_id(),
cop_context.kv_context.region_epoch().version(), cop_context.kv_context.region_epoch().conf_ver(), std::move(key_ranges),
cop_context.kv_context.region_epoch().version(), cop_context.kv_context.region_epoch().conf_ver(),
cop_request->start_ts() > 0 ? cop_request->start_ts() : dag_request.start_ts_fallback(), std::move(key_ranges),
dag_response);
driver.execute();
cop_response->set_data(dag_response.SerializeAsString());
Expand Down
Loading

0 comments on commit 33c10a5

Please sign in to comment.