Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client-c: Add retry for getting TSO from PD (#8571) #8742

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ namespace DB
M(force_set_dtfile_exist_when_acquire_id) \
M(force_no_local_region_for_mpp_task) \
M(force_remote_read_for_batch_cop) \
M(force_pd_grpc_error) \
M(force_context_path) \
M(force_slow_page_storage_snapshot_release) \
M(force_pick_all_blobs_to_full_gc) \
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/KVStore/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ class Region : public std::enable_shared_from_this<Region>
// Private methods no need to lock mutex, normally

void doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode);
void doCheckTable(const DecodedTiKVKey & key) const;
void doRemove(ColumnFamilyType type, const TiKVKey & key);

std::optional<RegionDataReadInfo> readDataByWriteIt(
Expand Down
68 changes: 60 additions & 8 deletions dbms/src/Storages/KVStore/TiKVHelpers/KeyspaceSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <IO/Endian.h>
#include <Storages/KVStore/TiKVHelpers/KeyspaceSnapshot.h>
#include <Storages/KVStore/TiKVHelpers/TiKVKeyspaceIDImpl.h>
#include <pingcap/Exception.h>

#include <magic_enum.hpp>

namespace DB
{
Expand All @@ -31,26 +35,74 @@ KeyspaceSnapshot::KeyspaceSnapshot(KeyspaceID keyspace_id_, pingcap::kv::Cluster

std::string KeyspaceSnapshot::Get(const std::string & key)
{
auto encoded_key = encodeKey(key);
return snap.Get(encoded_key);
try
{
auto encoded_key = encodeKey(key);
return snap.Get(encoded_key);
}
catch (pingcap::Exception & e)
{
// turn into DB::Exception with stack trace
throw DB::Exception(
ErrorCodes::LOGICAL_ERROR,
"pingcap::Exception code={} msg={}",
magic_enum::enum_name(static_cast<pingcap::ErrorCodes>(e.code())),
e.message());
}
}

kvrpcpb::MvccInfo KeyspaceSnapshot::mvccGet(const std::string & key)
{
auto encoded_key = encodeKey(key);
return snap.mvccGet(encoded_key);
try
{
auto encoded_key = encodeKey(key);
return snap.mvccGet(encoded_key);
}
catch (pingcap::Exception & e)
{
// turn into DB::Exception with stack trace
throw DB::Exception(
ErrorCodes::LOGICAL_ERROR,
"pingcap::Exception code={} msg={}",
magic_enum::enum_name(static_cast<pingcap::ErrorCodes>(e.code())),
e.message());
}
}

std::string KeyspaceSnapshot::Get(pingcap::kv::Backoffer & bo, const std::string & key)
{
auto encoded_key = encodeKey(key);
return snap.Get(bo, encoded_key);
try
{
auto encoded_key = encodeKey(key);
return snap.Get(bo, encoded_key);
}
catch (pingcap::Exception & e)
{
// turn into DB::Exception with stack trace
throw DB::Exception(
ErrorCodes::LOGICAL_ERROR,
"pingcap::Exception code={} msg={}",
magic_enum::enum_name(static_cast<pingcap::ErrorCodes>(e.code())),
e.message());
}
}

KeyspaceScanner KeyspaceSnapshot::Scan(const std::string & begin, const std::string & end)
{
auto inner = snap.Scan(encodeKey(begin), encodeKey(end));
return KeyspaceScanner(inner, /* need_cut_ */ !prefix.empty());
try
{
auto inner = snap.Scan(encodeKey(begin), encodeKey(end));
return KeyspaceScanner(inner, /* need_cut_ */ !prefix.empty());
}
catch (pingcap::Exception & e)
{
// turn into DB::Exception with stack trace
throw DB::Exception(
ErrorCodes::LOGICAL_ERROR,
"pingcap::Exception code={} msg={}",
magic_enum::enum_name(static_cast<pingcap::ErrorCodes>(e.code())),
e.message());
}
}

std::string KeyspaceSnapshot::encodeKey(const std::string & key)
Expand Down
75 changes: 61 additions & 14 deletions dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,30 @@
#ifdef __clang__
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <Common/Logger.h>
#include <Core/Types.h>
#include <pingcap/kv/RegionClient.h>
#include <pingcap/pd/IClient.h>
#pragma GCC diagnostic pop

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Core/Types.h>
#include <Storages/KVStore/Types.h>
#include <common/logger_useful.h>
#include <fiu.h>

#include <atomic>
#include <magic_enum.hpp>

using TimePoint = std::atomic<std::chrono::time_point<std::chrono::steady_clock>>;


namespace DB
{
namespace FailPoints
{
extern const char force_pd_grpc_error[];
} // namespace FailPoints

struct KeyspaceGCInfo
{
DB::Timestamp ks_gc_sp{};
Expand All @@ -58,14 +68,50 @@ struct KeyspaceGCInfo
}
};

namespace DB
{

struct PDClientHelper
{
static constexpr int get_safepoint_maxtime = 120000; // 120s. waiting pd recover.

// 10 seconds timeout for getting TSO
// https://github.com/pingcap/tidb/blob/069631e2ecfedc000ffb92c67207bea81380f020/pkg/store/mockstore/unistore/pd/client.go#L256-L276
static constexpr int get_tso_maxtime = 10'000;

static bool enable_safepoint_v2;

static UInt64 getTSO(const pingcap::pd::ClientPtr & pd_client, size_t timeout_ms)
{
pingcap::kv::Backoffer bo(timeout_ms);
while (true)
{
try
{
fiu_do_on(FailPoints::force_pd_grpc_error, {
throw pingcap::Exception("force_pd_grpc_error", pingcap::ErrorCodes::GRPCErrorCode);
});

return pd_client->getTS();
}
catch (pingcap::Exception & e)
{
try
{
bo.backoff(pingcap::kv::boPDRPC, e);
}
catch (pingcap::Exception & e)
{
// The backoff meets deadline exceeded
// Wrap the exception by DB::Exception to get the stacktrack
throw DB::Exception(
ErrorCodes::LOGICAL_ERROR,
"pingcap::Exception code={} msg={}",
magic_enum::enum_name(static_cast<pingcap::ErrorCodes>(e.code())),
e.message());
}
}
}
}

static Timestamp getGCSafePointWithRetry(
const pingcap::pd::ClientPtr & pd_client,
KeyspaceID keyspace_id,
Expand All @@ -87,7 +133,8 @@ struct PDClientHelper
auto now = std::chrono::steady_clock::now();
const auto duration
= std::chrono::duration_cast<std::chrono::seconds>(now - safe_point_last_update_time.load());
const auto min_interval = std::max(Int64(1), safe_point_update_interval_seconds); // at least one second
const auto min_interval
= std::max(static_cast<Int64>(1), safe_point_update_interval_seconds); // at least one second
if (duration.count() < min_interval)
return cached_gc_safe_point;
}
Expand Down Expand Up @@ -121,7 +168,7 @@ struct PDClientHelper
// In case we cost too much to update safe point from PD.
auto now = std::chrono::steady_clock::now();

auto ks_gc_info = get_ks_gc_sp(keyspace_id);
auto ks_gc_info = getKeyspaceGCSafepoint(keyspace_id);
const auto duration
= std::chrono::duration_cast<std::chrono::seconds>(now - ks_gc_info.ks_gc_sp_update_time.load());
const auto min_interval
Expand All @@ -138,7 +185,7 @@ struct PDClientHelper
try
{
auto ks_gc_sp = pd_client->getGCSafePointV2(keyspace_id);
update_ks_gc_sp_map(keyspace_id, ks_gc_sp);
updateKeyspaceGCSafepointMap(keyspace_id, ks_gc_sp);
return ks_gc_sp;
}
catch (pingcap::Exception & e)
Expand All @@ -148,22 +195,22 @@ struct PDClientHelper
}
}

static void update_ks_gc_sp_map(KeyspaceID keyspace_id, Timestamp ks_gc_sp)
static void updateKeyspaceGCSafepointMap(KeyspaceID keyspace_id, Timestamp ks_gc_sp)
{
std::unique_lock<std::shared_mutex> lock(ks_gc_sp_mutex);
KeyspaceGCInfo newKeyspaceGCInfo;
newKeyspaceGCInfo.ks_gc_sp = ks_gc_sp;
newKeyspaceGCInfo.ks_gc_sp_update_time = std::chrono::steady_clock::now();
ks_gc_sp_map[keyspace_id] = newKeyspaceGCInfo;
KeyspaceGCInfo new_keyspace_gc_info;
new_keyspace_gc_info.ks_gc_sp = ks_gc_sp;
new_keyspace_gc_info.ks_gc_sp_update_time = std::chrono::steady_clock::now();
ks_gc_sp_map[keyspace_id] = new_keyspace_gc_info;
}

static KeyspaceGCInfo get_ks_gc_sp(KeyspaceID keyspace_id)
static KeyspaceGCInfo getKeyspaceGCSafepoint(KeyspaceID keyspace_id)
{
std::shared_lock<std::shared_mutex> lock(ks_gc_sp_mutex);
return ks_gc_sp_map[keyspace_id];
}

static void remove_ks_gc_sp(KeyspaceID keyspace_id)
static void removeKeyspaceGCSafepoint(KeyspaceID keyspace_id)
{
std::unique_lock<std::shared_mutex> lock(ks_gc_sp_mutex);
ks_gc_sp_map.erase(keyspace_id);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropDatabaseByName(const String & d
// 2. Use the same GC safe point as TiDB.
// In such way our database (and its belonging tables) will be GC-ed later than TiDB, which is safe and correct.
auto & tmt_context = context.getTMTContext();
auto tombstone = tmt_context.getPDClient()->getTS();
auto tombstone = PDClientHelper::getTSO(tmt_context.getPDClient(), PDClientHelper::get_tso_maxtime);
db->alterTombstone(context, tombstone, /*new_db_info*/ nullptr); // keep the old db_info

LOG_INFO(log, "Tombstone database end, db_name={} tombstone={}", db_name, tombstone);
Expand Down Expand Up @@ -1147,7 +1147,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateStorageInstance(
UInt64 tombstone_ts = 0;
if (is_tombstone)
{
tombstone_ts = context.getTMTContext().getPDClient()->getTS();
tombstone_ts = PDClientHelper::getTSO(context.getTMTContext().getPDClient(), PDClientHelper::get_tso_maxtime);
}

String stmt = createTableStmt(keyspace_id, database_id, *table_info, name_mapper, tombstone_ts, log);
Expand Down Expand Up @@ -1236,7 +1236,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(
table_id,
action);

const UInt64 tombstone_ts = tmt_context.getPDClient()->getTS();
const UInt64 tombstone_ts = PDClientHelper::getTSO(tmt_context.getPDClient(), PDClientHelper::get_tso_maxtime);
// TODO:try to optimize alterCommands
AlterCommands commands;
{
Expand Down
10 changes: 4 additions & 6 deletions dbms/src/TiDB/Schema/SchemaGetter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,8 @@ TiDB::DBInfoPtr SchemaGetter::getDatabase(DatabaseID db_id)
if (json.empty())
return nullptr;

LOG_DEBUG(log, "Get DB Info from TiKV, database_id={} {}", db_id, json);
auto db_info = std::make_shared<TiDB::DBInfo>(json, keyspace_id);
return db_info;
LOG_DEBUG(log, "Get DatabaseInfo from TiKV, database_id={} {}", db_id, json);
return std::make_shared<TiDB::DBInfo>(json, keyspace_id);
}

template <bool mvcc_get>
Expand Down Expand Up @@ -310,7 +309,7 @@ std::pair<TiDB::TableInfoPtr, bool> SchemaGetter::getTableInfoImpl(DatabaseID db
return {nullptr, get_by_mvcc};
}
}
LOG_DEBUG(log, "Get Table Info from TiKV, table_id={} {}", table_id, table_info_json);
LOG_DEBUG(log, "Get TableInfo from TiKV, table_id={} {}", table_id, table_info_json);
return {std::make_shared<TiDB::TableInfo>(table_info_json, keyspace_id), get_by_mvcc};
}
template std::pair<TiDB::TableInfoPtr, bool> SchemaGetter::getTableInfoImpl<false>(DatabaseID db_id, TableID table_id);
Expand Down Expand Up @@ -339,7 +338,7 @@ std::vector<TiDB::TableInfoPtr> SchemaGetter::listTables(DatabaseID db_id)
auto db_key = getDBKey(db_id);
if (!checkDBExists(db_key))
{
LOG_ERROR(log, "DB {} Not Exists!", db_id);
LOG_ERROR(log, "The database does not exist, database_id={}", db_id);
return {};
}

Expand All @@ -362,5 +361,4 @@ std::vector<TiDB::TableInfoPtr> SchemaGetter::listTables(DatabaseID db_id)
return res;
}

// end of namespace.
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/TiDB/Schema/SchemaSyncService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void SchemaSyncService::removeKeyspaceGCTasks()
keyspace_handle_iter = keyspace_handle_map.erase(keyspace_handle_iter);

context.getTMTContext().getSchemaSyncerManager()->removeSchemaSyncer(keyspace);
PDClientHelper::remove_ks_gc_sp(keyspace);
PDClientHelper::removeKeyspaceGCSafepoint(keyspace);
keyspace_gc_context.erase(keyspace); // clear the last gc safepoint
num_remove_tasks += 1;
}
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Storages/KVStore/TiKVHelpers/PDTiKVClient.h>
#include <TiDB/Schema/SchemaBuilder.h>
#include <TiDB/Schema/TiDBSchemaSyncer.h>
#include <common/logger_useful.h>
Expand All @@ -23,6 +24,21 @@

namespace DB
{
template <bool mock_getter, bool mock_mapper>
typename TiDBSchemaSyncer<mock_getter, mock_mapper>::Getter TiDBSchemaSyncer<mock_getter, mock_mapper>::
createSchemaGetter(KeyspaceID keyspace_id)
{
if constexpr (mock_getter)
{
return Getter();
}
else
{
auto tso = PDClientHelper::getTSO(cluster->pd_client, PDClientHelper::get_tso_maxtime);
return Getter(cluster.get(), tso, keyspace_id);
}
}

template <bool mock_getter, bool mock_mapper>
bool TiDBSchemaSyncer<mock_getter, mock_mapper>::syncSchemas(Context & context)
{
Expand Down
13 changes: 1 addition & 12 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,7 @@ class TiDBSchemaSyncer : public SchemaSyncer
DatabaseInfoCache databases;
TableIDMap table_id_map;

Getter createSchemaGetter(KeyspaceID keyspace_id)
{
if constexpr (mock_getter)
{
return Getter();
}
else
{
auto tso = cluster->pd_client->getTS();
return Getter(cluster.get(), tso, keyspace_id);
}
}
Getter createSchemaGetter(KeyspaceID keyspace_id);

public:
TiDBSchemaSyncer(KVClusterPtr cluster_, KeyspaceID keyspace_id_)
Expand Down