Skip to content

Commit

Permalink
client-c: Add retry for getting TSO from PD (#8571)
Browse files Browse the repository at this point in the history
close #8323, close #8504
  • Loading branch information
JaySon-Huang authored Jan 31, 2024
1 parent cb1362c commit e1a8fe3
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 45 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ namespace DB
M(force_set_dtfile_exist_when_acquire_id) \
M(force_no_local_region_for_mpp_task) \
M(force_remote_read_for_batch_cop) \
M(force_pd_grpc_error) \
M(force_context_path) \
M(force_slow_page_storage_snapshot_release) \
M(force_pick_all_blobs_to_full_gc) \
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/KVStore/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ class Region : public std::enable_shared_from_this<Region>
// Private methods no need to lock mutex, normally

size_t doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode);
void doCheckTable(const DecodedTiKVKey & key) const;
void doRemove(ColumnFamilyType type, const TiKVKey & key);

std::optional<RegionDataReadInfo> readDataByWriteIt(
Expand Down
68 changes: 60 additions & 8 deletions dbms/src/Storages/KVStore/TiKVHelpers/KeyspaceSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <IO/Endian.h>
#include <Storages/KVStore/TiKVHelpers/KeyspaceSnapshot.h>
#include <Storages/KVStore/TiKVHelpers/TiKVKeyspaceIDImpl.h>
#include <pingcap/Exception.h>

#include <magic_enum.hpp>

namespace DB
{
Expand All @@ -31,26 +35,74 @@ KeyspaceSnapshot::KeyspaceSnapshot(KeyspaceID keyspace_id_, pingcap::kv::Cluster

std::string KeyspaceSnapshot::Get(const std::string & key)
{
auto encoded_key = encodeKey(key);
return snap.Get(encoded_key);
try
{
auto encoded_key = encodeKey(key);
return snap.Get(encoded_key);
}
catch (pingcap::Exception & e)
{
// turn into DB::Exception with stack trace
throw DB::Exception(
ErrorCodes::LOGICAL_ERROR,
"pingcap::Exception code={} msg={}",
magic_enum::enum_name(static_cast<pingcap::ErrorCodes>(e.code())),
e.message());
}
}

kvrpcpb::MvccInfo KeyspaceSnapshot::mvccGet(const std::string & key)
{
auto encoded_key = encodeKey(key);
return snap.mvccGet(encoded_key);
try
{
auto encoded_key = encodeKey(key);
return snap.mvccGet(encoded_key);
}
catch (pingcap::Exception & e)
{
// turn into DB::Exception with stack trace
throw DB::Exception(
ErrorCodes::LOGICAL_ERROR,
"pingcap::Exception code={} msg={}",
magic_enum::enum_name(static_cast<pingcap::ErrorCodes>(e.code())),
e.message());
}
}

std::string KeyspaceSnapshot::Get(pingcap::kv::Backoffer & bo, const std::string & key)
{
auto encoded_key = encodeKey(key);
return snap.Get(bo, encoded_key);
try
{
auto encoded_key = encodeKey(key);
return snap.Get(bo, encoded_key);
}
catch (pingcap::Exception & e)
{
// turn into DB::Exception with stack trace
throw DB::Exception(
ErrorCodes::LOGICAL_ERROR,
"pingcap::Exception code={} msg={}",
magic_enum::enum_name(static_cast<pingcap::ErrorCodes>(e.code())),
e.message());
}
}

KeyspaceScanner KeyspaceSnapshot::Scan(const std::string & begin, const std::string & end)
{
auto inner = snap.Scan(encodeKey(begin), encodeKey(end));
return KeyspaceScanner(inner, /* need_cut_ */ !prefix.empty());
try
{
auto inner = snap.Scan(encodeKey(begin), encodeKey(end));
return KeyspaceScanner(inner, /* need_cut_ */ !prefix.empty());
}
catch (pingcap::Exception & e)
{
// turn into DB::Exception with stack trace
throw DB::Exception(
ErrorCodes::LOGICAL_ERROR,
"pingcap::Exception code={} msg={}",
magic_enum::enum_name(static_cast<pingcap::ErrorCodes>(e.code())),
e.message());
}
}

std::string KeyspaceSnapshot::encodeKey(const std::string & key)
Expand Down
75 changes: 61 additions & 14 deletions dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,30 @@
#ifdef __clang__
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <Common/Logger.h>
#include <Core/Types.h>
#include <pingcap/kv/RegionClient.h>
#include <pingcap/pd/IClient.h>
#pragma GCC diagnostic pop

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Core/Types.h>
#include <Storages/KVStore/Types.h>
#include <common/logger_useful.h>
#include <fiu.h>

#include <atomic>
#include <magic_enum.hpp>

using TimePoint = std::atomic<std::chrono::time_point<std::chrono::steady_clock>>;


namespace DB
{
namespace FailPoints
{
extern const char force_pd_grpc_error[];
} // namespace FailPoints

struct KeyspaceGCInfo
{
DB::Timestamp ks_gc_sp{};
Expand All @@ -58,14 +68,50 @@ struct KeyspaceGCInfo
}
};

namespace DB
{

struct PDClientHelper
{
static constexpr int get_safepoint_maxtime = 120000; // 120s. waiting pd recover.

// 10 seconds timeout for getting TSO
// https://github.com/pingcap/tidb/blob/069631e2ecfedc000ffb92c67207bea81380f020/pkg/store/mockstore/unistore/pd/client.go#L256-L276
static constexpr int get_tso_maxtime = 10'000;

static bool enable_safepoint_v2;

static UInt64 getTSO(const pingcap::pd::ClientPtr & pd_client, size_t timeout_ms)
{
pingcap::kv::Backoffer bo(timeout_ms);
while (true)
{
try
{
fiu_do_on(FailPoints::force_pd_grpc_error, {
throw pingcap::Exception("force_pd_grpc_error", pingcap::ErrorCodes::GRPCErrorCode);
});

return pd_client->getTS();
}
catch (pingcap::Exception & e)
{
try
{
bo.backoff(pingcap::kv::boPDRPC, e);
}
catch (pingcap::Exception & e)
{
// The backoff meets deadline exceeded
// Wrap the exception by DB::Exception to get the stacktrack
throw DB::Exception(
ErrorCodes::LOGICAL_ERROR,
"pingcap::Exception code={} msg={}",
magic_enum::enum_name(static_cast<pingcap::ErrorCodes>(e.code())),
e.message());
}
}
}
}

static Timestamp getGCSafePointWithRetry(
const pingcap::pd::ClientPtr & pd_client,
KeyspaceID keyspace_id,
Expand All @@ -87,7 +133,8 @@ struct PDClientHelper
auto now = std::chrono::steady_clock::now();
const auto duration
= std::chrono::duration_cast<std::chrono::seconds>(now - safe_point_last_update_time.load());
const auto min_interval = std::max(Int64(1), safe_point_update_interval_seconds); // at least one second
const auto min_interval
= std::max(static_cast<Int64>(1), safe_point_update_interval_seconds); // at least one second
if (duration.count() < min_interval)
return cached_gc_safe_point;
}
Expand Down Expand Up @@ -121,7 +168,7 @@ struct PDClientHelper
// In case we cost too much to update safe point from PD.
auto now = std::chrono::steady_clock::now();

auto ks_gc_info = get_ks_gc_sp(keyspace_id);
auto ks_gc_info = getKeyspaceGCSafepoint(keyspace_id);
const auto duration
= std::chrono::duration_cast<std::chrono::seconds>(now - ks_gc_info.ks_gc_sp_update_time.load());
const auto min_interval
Expand All @@ -138,7 +185,7 @@ struct PDClientHelper
try
{
auto ks_gc_sp = pd_client->getGCSafePointV2(keyspace_id);
update_ks_gc_sp_map(keyspace_id, ks_gc_sp);
updateKeyspaceGCSafepointMap(keyspace_id, ks_gc_sp);
return ks_gc_sp;
}
catch (pingcap::Exception & e)
Expand All @@ -148,22 +195,22 @@ struct PDClientHelper
}
}

static void update_ks_gc_sp_map(KeyspaceID keyspace_id, Timestamp ks_gc_sp)
static void updateKeyspaceGCSafepointMap(KeyspaceID keyspace_id, Timestamp ks_gc_sp)
{
std::unique_lock<std::shared_mutex> lock(ks_gc_sp_mutex);
KeyspaceGCInfo newKeyspaceGCInfo;
newKeyspaceGCInfo.ks_gc_sp = ks_gc_sp;
newKeyspaceGCInfo.ks_gc_sp_update_time = std::chrono::steady_clock::now();
ks_gc_sp_map[keyspace_id] = newKeyspaceGCInfo;
KeyspaceGCInfo new_keyspace_gc_info;
new_keyspace_gc_info.ks_gc_sp = ks_gc_sp;
new_keyspace_gc_info.ks_gc_sp_update_time = std::chrono::steady_clock::now();
ks_gc_sp_map[keyspace_id] = new_keyspace_gc_info;
}

static KeyspaceGCInfo get_ks_gc_sp(KeyspaceID keyspace_id)
static KeyspaceGCInfo getKeyspaceGCSafepoint(KeyspaceID keyspace_id)
{
std::shared_lock<std::shared_mutex> lock(ks_gc_sp_mutex);
return ks_gc_sp_map[keyspace_id];
}

static void remove_ks_gc_sp(KeyspaceID keyspace_id)
static void removeKeyspaceGCSafepoint(KeyspaceID keyspace_id)
{
std::unique_lock<std::shared_mutex> lock(ks_gc_sp_mutex);
ks_gc_sp_map.erase(keyspace_id);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropDatabaseByName(const String & d
// 2. Use the same GC safe point as TiDB.
// In such way our database (and its belonging tables) will be GC-ed later than TiDB, which is safe and correct.
auto & tmt_context = context.getTMTContext();
auto tombstone = tmt_context.getPDClient()->getTS();
auto tombstone = PDClientHelper::getTSO(tmt_context.getPDClient(), PDClientHelper::get_tso_maxtime);
db->alterTombstone(context, tombstone, /*new_db_info*/ nullptr); // keep the old db_info

LOG_INFO(log, "Tombstone database end, db_name={} tombstone={}", db_name, tombstone);
Expand Down Expand Up @@ -1142,7 +1142,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateStorageInstance(
UInt64 tombstone_ts = 0;
if (is_tombstone)
{
tombstone_ts = context.getTMTContext().getPDClient()->getTS();
tombstone_ts = PDClientHelper::getTSO(context.getTMTContext().getPDClient(), PDClientHelper::get_tso_maxtime);
}

String stmt = createTableStmt(keyspace_id, database_id, *table_info, name_mapper, tombstone_ts, log);
Expand Down Expand Up @@ -1230,7 +1230,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(
table_id,
action);

const UInt64 tombstone_ts = tmt_context.getPDClient()->getTS();
const UInt64 tombstone_ts = PDClientHelper::getTSO(tmt_context.getPDClient(), PDClientHelper::get_tso_maxtime);
// TODO:try to optimize alterCommands
AlterCommands commands;
{
Expand Down
10 changes: 4 additions & 6 deletions dbms/src/TiDB/Schema/SchemaGetter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,8 @@ TiDB::DBInfoPtr SchemaGetter::getDatabase(DatabaseID db_id)
if (json.empty())
return nullptr;

LOG_DEBUG(log, "Get DB Info from TiKV, database_id={} {}", db_id, json);
auto db_info = std::make_shared<TiDB::DBInfo>(json, keyspace_id);
return db_info;
LOG_DEBUG(log, "Get DatabaseInfo from TiKV, database_id={} {}", db_id, json);
return std::make_shared<TiDB::DBInfo>(json, keyspace_id);
}

template <bool mvcc_get>
Expand Down Expand Up @@ -310,7 +309,7 @@ std::pair<TiDB::TableInfoPtr, bool> SchemaGetter::getTableInfoImpl(DatabaseID db
return {nullptr, get_by_mvcc};
}
}
LOG_DEBUG(log, "Get Table Info from TiKV, table_id={} {}", table_id, table_info_json);
LOG_DEBUG(log, "Get TableInfo from TiKV, table_id={} {}", table_id, table_info_json);
return {std::make_shared<TiDB::TableInfo>(table_info_json, keyspace_id), get_by_mvcc};
}
template std::pair<TiDB::TableInfoPtr, bool> SchemaGetter::getTableInfoImpl<false>(DatabaseID db_id, TableID table_id);
Expand Down Expand Up @@ -339,7 +338,7 @@ std::vector<TiDB::TableInfoPtr> SchemaGetter::listTables(DatabaseID db_id)
auto db_key = getDBKey(db_id);
if (!checkDBExists(db_key))
{
LOG_ERROR(log, "DB {} Not Exists!", db_id);
LOG_ERROR(log, "The database does not exist, database_id={}", db_id);
return {};
}

Expand All @@ -362,5 +361,4 @@ std::vector<TiDB::TableInfoPtr> SchemaGetter::listTables(DatabaseID db_id)
return res;
}

// end of namespace.
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/TiDB/Schema/SchemaSyncService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void SchemaSyncService::removeKeyspaceGCTasks()
keyspace_handle_iter = keyspace_handle_map.erase(keyspace_handle_iter);

context.getTMTContext().getSchemaSyncerManager()->removeSchemaSyncer(keyspace);
PDClientHelper::remove_ks_gc_sp(keyspace);
PDClientHelper::removeKeyspaceGCSafepoint(keyspace);
keyspace_gc_context.erase(keyspace); // clear the last gc safepoint
num_remove_tasks += 1;
}
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Storages/KVStore/TiKVHelpers/PDTiKVClient.h>
#include <TiDB/Schema/SchemaBuilder.h>
#include <TiDB/Schema/TiDBSchemaSyncer.h>
#include <common/logger_useful.h>
Expand All @@ -23,6 +24,21 @@

namespace DB
{
template <bool mock_getter, bool mock_mapper>
typename TiDBSchemaSyncer<mock_getter, mock_mapper>::Getter TiDBSchemaSyncer<mock_getter, mock_mapper>::
createSchemaGetter(KeyspaceID keyspace_id)
{
if constexpr (mock_getter)
{
return Getter();
}
else
{
auto tso = PDClientHelper::getTSO(cluster->pd_client, PDClientHelper::get_tso_maxtime);
return Getter(cluster.get(), tso, keyspace_id);
}
}

template <bool mock_getter, bool mock_mapper>
bool TiDBSchemaSyncer<mock_getter, mock_mapper>::syncSchemas(Context & context)
{
Expand Down
13 changes: 1 addition & 12 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,7 @@ class TiDBSchemaSyncer : public SchemaSyncer
DatabaseInfoCache databases;
TableIDMap table_id_map;

Getter createSchemaGetter(KeyspaceID keyspace_id)
{
if constexpr (mock_getter)
{
return Getter();
}
else
{
auto tso = cluster->pd_client->getTS();
return Getter(cluster.get(), tso, keyspace_id);
}
}
Getter createSchemaGetter(KeyspaceID keyspace_id);

public:
TiDBSchemaSyncer(KVClusterPtr cluster_, KeyspaceID keyspace_id_)
Expand Down

0 comments on commit e1a8fe3

Please sign in to comment.