Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throw error if the cop request is not based on full region scan #247

Merged
merged 2 commits into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
Timestamp start_ts = context.getTMTContext().getPDClient()->getTS();

auto [table_id, schema, dag_request] = compileQuery(context, query,
auto [table_id, schema, dag_request] = compileQuery(
context, query,
[&](const String & database_name, const String & table_name) {
auto storage = context.getTable(database_name, table_name);
auto mmt = std::dynamic_pointer_cast<StorageMergeTree>(storage);
Expand Down Expand Up @@ -96,7 +97,8 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
if (start_ts == 0)
start_ts = context.getTMTContext().getPDClient()->getTS();

auto [table_id, schema, dag_request] = compileQuery(context, query,
auto [table_id, schema, dag_request] = compileQuery(
context, query,
[&](const String & database_name, const String & table_name) {
return MockTiDB::instance().getTableByName(database_name, table_name)->table_info;
},
Expand Down Expand Up @@ -528,7 +530,7 @@ tipb::SelectResponse executeDAGRequest(
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
context.setSetting("dag_planner", "optree");
tipb::SelectResponse dag_response;
DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, dag_response, true);
DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, {}, dag_response, true);
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
return dag_response;
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,26 @@ extern const int UNKNOWN_EXCEPTION;
} // namespace ErrorCodes

DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, tipb::SelectResponse & dag_response_, bool internal_)
UInt64 region_conf_version_, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
tipb::SelectResponse & dag_response_, bool internal_)
: context(context_),
dag_request(dag_request_),
region_id(region_id_),
region_version(region_version_),
region_conf_version(region_conf_version_),
key_ranges(std::move(key_ranges_)),
dag_response(dag_response_),
internal(internal_),
log(&Logger::get("DAGDriver"))
{}

void DAGDriver::execute() try
void DAGDriver::execute()
try
{
context.setSetting("read_tso", UInt64(dag_request.start_ts()));

DAGContext dag_context(dag_request.executors_size());
DAGQuerySource dag(context, dag_context, region_id, region_version, region_conf_version, dag_request);
DAGQuerySource dag(context, dag_context, region_id, region_version, region_conf_version, key_ranges, dag_request);
BlockIO streams;

String planner = context.getSettings().dag_planner;
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <DataStreams/BlockIO.h>
#include <Storages/Transaction/TiKVKeyValue.h>
#include <Storages/Transaction/Types.h>
#include <tipb/select.pb.h>

Expand All @@ -15,7 +16,8 @@ class DAGDriver
{
public:
DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, tipb::SelectResponse & dag_response_, bool internal_ = false);
UInt64 region_conf_version_, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
tipb::SelectResponse & dag_response_, bool internal_ = false);

void execute();

Expand All @@ -27,6 +29,7 @@ class DAGDriver
RegionID region_id;
UInt64 region_version;
UInt64 region_conf_version;
std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> key_ranges;

tipb::SelectResponse & dag_response;

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ static void assignOrThrowException(Int32 & index, Int32 value, const String & na
}

DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, const tipb::DAGRequest & dag_request_)
UInt64 region_conf_version_, const std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges_,
const tipb::DAGRequest & dag_request_)
: context(context_),
dag_context(dag_context_),
region_id(region_id_),
region_version(region_version_),
region_conf_version(region_conf_version_),
key_ranges(key_ranges_),
dag_request(dag_request_)
{
for (int i = 0; i < dag_request.executors_size(); i++)
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQuerySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <Flash/Coprocessor/DAGContext.h>
#include <Interpreters/IQuerySource.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/TiKVKeyValue.h>
#include <Storages/Transaction/Types.h>

namespace DB
Expand All @@ -28,7 +29,7 @@ class DAGQuerySource : public IQuerySource
static const String LIMIT_NAME;

DAGQuerySource(Context & context_, DAGContext & dag_context_, RegionID region_id_, UInt64 region_version_, UInt64 region_conf_version_,
const tipb::DAGRequest & dag_request_);
const std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges_, const tipb::DAGRequest & dag_request_);

std::tuple<std::string, ASTPtr> parse(size_t max_query_size) override;
String str(size_t max_query_size) override;
Expand All @@ -39,6 +40,7 @@ class DAGQuerySource : public IQuerySource
RegionID getRegionID() const { return region_id; }
UInt64 getRegionVersion() const { return region_version; }
UInt64 getRegionConfVersion() const { return region_conf_version; }
const std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & getKeyRanges() const { return key_ranges; }

bool hasSelection() const { return sel_index != -1; };
bool hasAggregation() const { return agg_index != -1; };
Expand Down Expand Up @@ -98,6 +100,7 @@ class DAGQuerySource : public IQuerySource
const RegionID region_id;
const UInt64 region_version;
const UInt64 region_conf_version;
const std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges;

const tipb::DAGRequest & dag_request;

Expand Down
59 changes: 59 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Flash/Coprocessor/InterpreterDAG.h>

#include <Core/TMTPKType.h>
#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/ConcatBlockInputStream.h>
Expand All @@ -18,11 +19,13 @@
#include <Storages/MutableSupport.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/CHTableHandle.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TiKVRange.h>
#include <Storages/Transaction/TypeMapping.h>
#include <Storages/Transaction/Types.h>

Expand All @@ -42,6 +45,59 @@ InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
: context(context_), dag(dag_), log(&Logger::get("InterpreterDAG"))
{}

template <typename HandleType>
bool isAllValueCoveredByRanges(std::vector<HandleRange<HandleType>> & ranges)
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
{
if (ranges.empty())
return false;
std::sort(ranges.begin(), ranges.end(),
[](const HandleRange<HandleType> & a, const HandleRange<HandleType> & b) { return a.first < b.first; });

HandleRange<HandleType> merged_range;
merged_range.first = ranges[0].first;
merged_range.second = ranges[0].second;

for (size_t i = 1; i < ranges.size(); i++)
{
if (merged_range.second >= ranges[i].first)
merged_range.second = merged_range.second >= ranges[i].second ? merged_range.second : ranges[i].second;
else
break;
}

return merged_range.first == TiKVHandle::Handle<HandleType>::normal_min && merged_range.second == TiKVHandle::Handle<HandleType>::max;
}

bool checkKeyRanges(const std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges, TableID table_id, bool pk_is_uint64)
{
if (key_ranges.empty())
return true;

std::vector<HandleRange<Int64>> scan_ranges;
for (auto & range : key_ranges)
{
TiKVRange::Handle start = TiKVRange::getRangeHandle<true>(range.first, table_id);
TiKVRange::Handle end = TiKVRange::getRangeHandle<false>(range.second, table_id);
scan_ranges.emplace_back(std::make_pair(start, end));
}

if (pk_is_uint64)
{
std::vector<HandleRange<UInt64>> update_ranges;
for (auto & range : scan_ranges)
{
const auto [n, new_range] = CHTableHandle::splitForUInt64TableHandle(range);

for (int i = 0; i < n; i++)
{
update_ranges.emplace_back(new_range[i]);
}
}
return isAllValueCoveredByRanges<UInt64>(update_ranges);
}
else
return isAllValueCoveredByRanges<Int64>(scan_ranges);
}
// the flow is the same as executeFetchcolumns
void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
{
Expand Down Expand Up @@ -144,6 +200,9 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
max_streams *= settings.max_streams_to_max_threads_ratio;
}

if (!checkKeyRanges(dag.getKeyRanges(), table_id, storage->pkIsUInt64()))
throw Exception("Cop request only support full range scan for given region", ErrorCodes::COP_BAD_DAG_REQUEST);

//todo support index in
SelectQueryInfo query_info;
query_info.query = dag.getAST();
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,29 @@ CoprocessorHandler::CoprocessorHandler(
: cop_context(cop_context_), cop_request(cop_request_), cop_response(cop_response_), log(&Logger::get("CoprocessorHandler"))
{}

grpc::Status CoprocessorHandler::execute() try
grpc::Status CoprocessorHandler::execute()
try
{
switch (cop_request->tp())
{
case COP_REQ_TYPE_DAG:
{
std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> key_ranges;
for (auto & range : cop_request->ranges())
{
std::string start_key(range.start());
DecodedTiKVKey start(std::move(start_key));
std::string end_key(range.end());
DecodedTiKVKey end(std::move(end_key));
key_ranges.emplace_back(std::make_pair(std::move(start), std::move(end)));
}
tipb::DAGRequest dag_request;
dag_request.ParseFromString(cop_request->data());
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
tipb::SelectResponse dag_response;
DAGDriver driver(cop_context.db_context, dag_request, cop_context.kv_context.region_id(),
cop_context.kv_context.region_epoch().version(), cop_context.kv_context.region_epoch().conf_ver(), dag_response);
cop_context.kv_context.region_epoch().version(), cop_context.kv_context.region_epoch().conf_ver(), std::move(key_ranges),
dag_response);
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
cop_response->set_data(dag_response.SerializeAsString());
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/StorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ext/shared_ptr_helper.h>

#include <Common/SimpleIncrement.h>
#include <Core/TMTPKType.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h>
Expand Down Expand Up @@ -93,6 +94,8 @@ class StorageMergeTree : public ext::shared_ptr_helper<StorageMergeTree>, public

String getDataPath() const override { return full_path; }

bool pkIsUInt64() const { return getTMTPKType(*data.primary_key_data_types[0]) == TMTPKType::UINT64; }

private:
String path;
String database_name;
Expand Down