From e1a8fe30a8cbf948022614698812f2646c3ed68e Mon Sep 17 00:00:00 2001 From: JaySon Date: Wed, 31 Jan 2024 22:22:55 +0800 Subject: [PATCH] client-c: Add retry for getting TSO from PD (#8571) close pingcap/tiflash#8323, close pingcap/tiflash#8504 --- dbms/src/Common/FailPoint.cpp | 1 + dbms/src/Storages/KVStore/Region.h | 1 - .../KVStore/TiKVHelpers/KeyspaceSnapshot.cpp | 68 +++++++++++++++-- .../KVStore/TiKVHelpers/PDTiKVClient.h | 75 +++++++++++++++---- dbms/src/TiDB/Schema/SchemaBuilder.cpp | 6 +- dbms/src/TiDB/Schema/SchemaGetter.cpp | 10 +-- dbms/src/TiDB/Schema/SchemaSyncService.cpp | 2 +- dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp | 16 ++++ dbms/src/TiDB/Schema/TiDBSchemaSyncer.h | 13 +--- 9 files changed, 147 insertions(+), 45 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 0939d8f0a3e..b0381e58ed3 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -88,6 +88,7 @@ namespace DB M(force_set_dtfile_exist_when_acquire_id) \ M(force_no_local_region_for_mpp_task) \ M(force_remote_read_for_batch_cop) \ + M(force_pd_grpc_error) \ M(force_context_path) \ M(force_slow_page_storage_snapshot_release) \ M(force_pick_all_blobs_to_full_gc) \ diff --git a/dbms/src/Storages/KVStore/Region.h b/dbms/src/Storages/KVStore/Region.h index 968eba6dd27..52d068af6df 100644 --- a/dbms/src/Storages/KVStore/Region.h +++ b/dbms/src/Storages/KVStore/Region.h @@ -265,7 +265,6 @@ class Region : public std::enable_shared_from_this // Private methods no need to lock mutex, normally size_t doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode); - void doCheckTable(const DecodedTiKVKey & key) const; void doRemove(ColumnFamilyType type, const TiKVKey & key); std::optional readDataByWriteIt( diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/KeyspaceSnapshot.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/KeyspaceSnapshot.cpp index 32d71964cdd..128b25079f2 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/KeyspaceSnapshot.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/KeyspaceSnapshot.cpp @@ -12,9 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include +#include + +#include namespace DB { @@ -31,26 +35,74 @@ KeyspaceSnapshot::KeyspaceSnapshot(KeyspaceID keyspace_id_, pingcap::kv::Cluster std::string KeyspaceSnapshot::Get(const std::string & key) { - auto encoded_key = encodeKey(key); - return snap.Get(encoded_key); + try + { + auto encoded_key = encodeKey(key); + return snap.Get(encoded_key); + } + catch (pingcap::Exception & e) + { + // turn into DB::Exception with stack trace + throw DB::Exception( + ErrorCodes::LOGICAL_ERROR, + "pingcap::Exception code={} msg={}", + magic_enum::enum_name(static_cast(e.code())), + e.message()); + } } kvrpcpb::MvccInfo KeyspaceSnapshot::mvccGet(const std::string & key) { - auto encoded_key = encodeKey(key); - return snap.mvccGet(encoded_key); + try + { + auto encoded_key = encodeKey(key); + return snap.mvccGet(encoded_key); + } + catch (pingcap::Exception & e) + { + // turn into DB::Exception with stack trace + throw DB::Exception( + ErrorCodes::LOGICAL_ERROR, + "pingcap::Exception code={} msg={}", + magic_enum::enum_name(static_cast(e.code())), + e.message()); + } } std::string KeyspaceSnapshot::Get(pingcap::kv::Backoffer & bo, const std::string & key) { - auto encoded_key = encodeKey(key); - return snap.Get(bo, encoded_key); + try + { + auto encoded_key = encodeKey(key); + return snap.Get(bo, encoded_key); + } + catch (pingcap::Exception & e) + { + // turn into DB::Exception with stack trace + throw DB::Exception( + ErrorCodes::LOGICAL_ERROR, + "pingcap::Exception code={} msg={}", + magic_enum::enum_name(static_cast(e.code())), + e.message()); + } } KeyspaceScanner KeyspaceSnapshot::Scan(const std::string & begin, const std::string & end) { - auto inner = snap.Scan(encodeKey(begin), encodeKey(end)); - return KeyspaceScanner(inner, /* need_cut_ */ !prefix.empty()); + try + { + auto inner = snap.Scan(encodeKey(begin), encodeKey(end)); + return KeyspaceScanner(inner, /* need_cut_ */ !prefix.empty()); + } + catch (pingcap::Exception & e) + { + // turn into DB::Exception with stack trace + throw DB::Exception( + ErrorCodes::LOGICAL_ERROR, + "pingcap::Exception code={} msg={}", + magic_enum::enum_name(static_cast(e.code())), + e.message()); + } } std::string KeyspaceSnapshot::encodeKey(const std::string & key) diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h b/dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h index f4135d9a3b4..7cfcfc86d15 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h @@ -20,20 +20,30 @@ #ifdef __clang__ #pragma GCC diagnostic ignored "-Wdeprecated-declarations" #endif -#include -#include #include #include #pragma GCC diagnostic pop +#include +#include #include #include #include +#include #include +#include using TimePoint = std::atomic>; + +namespace DB +{ +namespace FailPoints +{ +extern const char force_pd_grpc_error[]; +} // namespace FailPoints + struct KeyspaceGCInfo { DB::Timestamp ks_gc_sp{}; @@ -58,14 +68,50 @@ struct KeyspaceGCInfo } }; -namespace DB -{ + struct PDClientHelper { static constexpr int get_safepoint_maxtime = 120000; // 120s. waiting pd recover. + // 10 seconds timeout for getting TSO + // https://github.com/pingcap/tidb/blob/069631e2ecfedc000ffb92c67207bea81380f020/pkg/store/mockstore/unistore/pd/client.go#L256-L276 + static constexpr int get_tso_maxtime = 10'000; + static bool enable_safepoint_v2; + static UInt64 getTSO(const pingcap::pd::ClientPtr & pd_client, size_t timeout_ms) + { + pingcap::kv::Backoffer bo(timeout_ms); + while (true) + { + try + { + fiu_do_on(FailPoints::force_pd_grpc_error, { + throw pingcap::Exception("force_pd_grpc_error", pingcap::ErrorCodes::GRPCErrorCode); + }); + + return pd_client->getTS(); + } + catch (pingcap::Exception & e) + { + try + { + bo.backoff(pingcap::kv::boPDRPC, e); + } + catch (pingcap::Exception & e) + { + // The backoff meets deadline exceeded + // Wrap the exception by DB::Exception to get the stacktrack + throw DB::Exception( + ErrorCodes::LOGICAL_ERROR, + "pingcap::Exception code={} msg={}", + magic_enum::enum_name(static_cast(e.code())), + e.message()); + } + } + } + } + static Timestamp getGCSafePointWithRetry( const pingcap::pd::ClientPtr & pd_client, KeyspaceID keyspace_id, @@ -87,7 +133,8 @@ struct PDClientHelper auto now = std::chrono::steady_clock::now(); const auto duration = std::chrono::duration_cast(now - safe_point_last_update_time.load()); - const auto min_interval = std::max(Int64(1), safe_point_update_interval_seconds); // at least one second + const auto min_interval + = std::max(static_cast(1), safe_point_update_interval_seconds); // at least one second if (duration.count() < min_interval) return cached_gc_safe_point; } @@ -121,7 +168,7 @@ struct PDClientHelper // In case we cost too much to update safe point from PD. auto now = std::chrono::steady_clock::now(); - auto ks_gc_info = get_ks_gc_sp(keyspace_id); + auto ks_gc_info = getKeyspaceGCSafepoint(keyspace_id); const auto duration = std::chrono::duration_cast(now - ks_gc_info.ks_gc_sp_update_time.load()); const auto min_interval @@ -138,7 +185,7 @@ struct PDClientHelper try { auto ks_gc_sp = pd_client->getGCSafePointV2(keyspace_id); - update_ks_gc_sp_map(keyspace_id, ks_gc_sp); + updateKeyspaceGCSafepointMap(keyspace_id, ks_gc_sp); return ks_gc_sp; } catch (pingcap::Exception & e) @@ -148,22 +195,22 @@ struct PDClientHelper } } - static void update_ks_gc_sp_map(KeyspaceID keyspace_id, Timestamp ks_gc_sp) + static void updateKeyspaceGCSafepointMap(KeyspaceID keyspace_id, Timestamp ks_gc_sp) { std::unique_lock lock(ks_gc_sp_mutex); - KeyspaceGCInfo newKeyspaceGCInfo; - newKeyspaceGCInfo.ks_gc_sp = ks_gc_sp; - newKeyspaceGCInfo.ks_gc_sp_update_time = std::chrono::steady_clock::now(); - ks_gc_sp_map[keyspace_id] = newKeyspaceGCInfo; + KeyspaceGCInfo new_keyspace_gc_info; + new_keyspace_gc_info.ks_gc_sp = ks_gc_sp; + new_keyspace_gc_info.ks_gc_sp_update_time = std::chrono::steady_clock::now(); + ks_gc_sp_map[keyspace_id] = new_keyspace_gc_info; } - static KeyspaceGCInfo get_ks_gc_sp(KeyspaceID keyspace_id) + static KeyspaceGCInfo getKeyspaceGCSafepoint(KeyspaceID keyspace_id) { std::shared_lock lock(ks_gc_sp_mutex); return ks_gc_sp_map[keyspace_id]; } - static void remove_ks_gc_sp(KeyspaceID keyspace_id) + static void removeKeyspaceGCSafepoint(KeyspaceID keyspace_id) { std::unique_lock lock(ks_gc_sp_mutex); ks_gc_sp_map.erase(keyspace_id); diff --git a/dbms/src/TiDB/Schema/SchemaBuilder.cpp b/dbms/src/TiDB/Schema/SchemaBuilder.cpp index e79dcb64af8..f238e48164d 100644 --- a/dbms/src/TiDB/Schema/SchemaBuilder.cpp +++ b/dbms/src/TiDB/Schema/SchemaBuilder.cpp @@ -1017,7 +1017,7 @@ void SchemaBuilder::applyDropDatabaseByName(const String & d // 2. Use the same GC safe point as TiDB. // In such way our database (and its belonging tables) will be GC-ed later than TiDB, which is safe and correct. auto & tmt_context = context.getTMTContext(); - auto tombstone = tmt_context.getPDClient()->getTS(); + auto tombstone = PDClientHelper::getTSO(tmt_context.getPDClient(), PDClientHelper::get_tso_maxtime); db->alterTombstone(context, tombstone, /*new_db_info*/ nullptr); // keep the old db_info LOG_INFO(log, "Tombstone database end, db_name={} tombstone={}", db_name, tombstone); @@ -1142,7 +1142,7 @@ void SchemaBuilder::applyCreateStorageInstance( UInt64 tombstone_ts = 0; if (is_tombstone) { - tombstone_ts = context.getTMTContext().getPDClient()->getTS(); + tombstone_ts = PDClientHelper::getTSO(context.getTMTContext().getPDClient(), PDClientHelper::get_tso_maxtime); } String stmt = createTableStmt(keyspace_id, database_id, *table_info, name_mapper, tombstone_ts, log); @@ -1230,7 +1230,7 @@ void SchemaBuilder::applyDropPhysicalTable( table_id, action); - const UInt64 tombstone_ts = tmt_context.getPDClient()->getTS(); + const UInt64 tombstone_ts = PDClientHelper::getTSO(tmt_context.getPDClient(), PDClientHelper::get_tso_maxtime); // TODO:try to optimize alterCommands AlterCommands commands; { diff --git a/dbms/src/TiDB/Schema/SchemaGetter.cpp b/dbms/src/TiDB/Schema/SchemaGetter.cpp index 41e41386398..4841059ddcb 100644 --- a/dbms/src/TiDB/Schema/SchemaGetter.cpp +++ b/dbms/src/TiDB/Schema/SchemaGetter.cpp @@ -276,9 +276,8 @@ TiDB::DBInfoPtr SchemaGetter::getDatabase(DatabaseID db_id) if (json.empty()) return nullptr; - LOG_DEBUG(log, "Get DB Info from TiKV, database_id={} {}", db_id, json); - auto db_info = std::make_shared(json, keyspace_id); - return db_info; + LOG_DEBUG(log, "Get DatabaseInfo from TiKV, database_id={} {}", db_id, json); + return std::make_shared(json, keyspace_id); } template @@ -310,7 +309,7 @@ std::pair SchemaGetter::getTableInfoImpl(DatabaseID db return {nullptr, get_by_mvcc}; } } - LOG_DEBUG(log, "Get Table Info from TiKV, table_id={} {}", table_id, table_info_json); + LOG_DEBUG(log, "Get TableInfo from TiKV, table_id={} {}", table_id, table_info_json); return {std::make_shared(table_info_json, keyspace_id), get_by_mvcc}; } template std::pair SchemaGetter::getTableInfoImpl(DatabaseID db_id, TableID table_id); @@ -339,7 +338,7 @@ std::vector SchemaGetter::listTables(DatabaseID db_id) auto db_key = getDBKey(db_id); if (!checkDBExists(db_key)) { - LOG_ERROR(log, "DB {} Not Exists!", db_id); + LOG_ERROR(log, "The database does not exist, database_id={}", db_id); return {}; } @@ -362,5 +361,4 @@ std::vector SchemaGetter::listTables(DatabaseID db_id) return res; } -// end of namespace. } // namespace DB diff --git a/dbms/src/TiDB/Schema/SchemaSyncService.cpp b/dbms/src/TiDB/Schema/SchemaSyncService.cpp index 9a54d13079c..87029e2226f 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncService.cpp +++ b/dbms/src/TiDB/Schema/SchemaSyncService.cpp @@ -144,7 +144,7 @@ void SchemaSyncService::removeKeyspaceGCTasks() keyspace_handle_iter = keyspace_handle_map.erase(keyspace_handle_iter); context.getTMTContext().getSchemaSyncerManager()->removeSchemaSyncer(keyspace); - PDClientHelper::remove_ks_gc_sp(keyspace); + PDClientHelper::removeKeyspaceGCSafepoint(keyspace); keyspace_gc_context.erase(keyspace); // clear the last gc safepoint num_remove_tasks += 1; } diff --git a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp index de18d38b4cf..000086aaf87 100644 --- a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp +++ b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -23,6 +24,21 @@ namespace DB { +template +typename TiDBSchemaSyncer::Getter TiDBSchemaSyncer:: + createSchemaGetter(KeyspaceID keyspace_id) +{ + if constexpr (mock_getter) + { + return Getter(); + } + else + { + auto tso = PDClientHelper::getTSO(cluster->pd_client, PDClientHelper::get_tso_maxtime); + return Getter(cluster.get(), tso, keyspace_id); + } +} + template bool TiDBSchemaSyncer::syncSchemas(Context & context) { diff --git a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h index 99a23a55e29..aa146615e3c 100644 --- a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h +++ b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h @@ -51,18 +51,7 @@ class TiDBSchemaSyncer : public SchemaSyncer DatabaseInfoCache databases; TableIDMap table_id_map; - Getter createSchemaGetter(KeyspaceID keyspace_id) - { - if constexpr (mock_getter) - { - return Getter(); - } - else - { - auto tso = cluster->pd_client->getTS(); - return Getter(cluster.get(), tso, keyspace_id); - } - } + Getter createSchemaGetter(KeyspaceID keyspace_id); public: TiDBSchemaSyncer(KVClusterPtr cluster_, KeyspaceID keyspace_id_)