diff --git a/dbms/src/TiDB/Schema/SchemaSyncService.cpp b/dbms/src/TiDB/Schema/SchemaSyncService.cpp index 3b1f5d51606..247f1a4a196 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncService.cpp +++ b/dbms/src/TiDB/Schema/SchemaSyncService.cpp @@ -27,6 +27,8 @@ #include #include +#include + namespace DB { namespace ErrorCodes @@ -54,9 +56,10 @@ SchemaSyncService::SchemaSyncService(DB::Context & context_) void SchemaSyncService::addKeyspaceGCTasks() { const auto keyspaces = context.getTMTContext().getStorages().getAllKeyspaces(); - std::unique_lock lock(keyspace_map_mutex); + UInt64 num_add_tasks = 0; // Add new sync schema task for new keyspace. + std::unique_lock lock(keyspace_map_mutex); for (auto const iter : keyspaces) { auto keyspace = iter.first; @@ -75,14 +78,14 @@ void SchemaSyncService::addKeyspaceGCTasks() /// They must be performed synchronously, /// otherwise table may get mis-GC-ed if RECOVER was not properly synced caused by schema sync pause but GC runs too aggressively. // GC safe point must be obtained ahead of syncing schema. - auto gc_safe_point - = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), keyspace); stage = "Sync schemas"; done_anything = syncSchemas(keyspace); if (done_anything) GET_METRIC(tiflash_schema_trigger_count, type_timer).Increment(); stage = "GC"; + auto gc_safe_point + = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), keyspace); done_anything = gc(gc_safe_point, keyspace); return done_anything; @@ -91,18 +94,19 @@ void SchemaSyncService::addKeyspaceGCTasks() { LOG_ERROR( ks_log, - "{} failed by {} \n stack : {}", + "{}, keyspace={} failed by {} \n stack : {}", stage, + keyspace, e.displayText(), e.getStackTrace().toString()); } catch (const Poco::Exception & e) { - LOG_ERROR(ks_log, "{} failed by {}", stage, e.displayText()); + LOG_ERROR(ks_log, "{}, keyspace={} failed by {}", stage, keyspace, e.displayText()); } catch (const std::exception & e) { - LOG_ERROR(ks_log, "{} failed by {}", stage, e.what()); + LOG_ERROR(ks_log, "{}, keyspace={} failed by {}", stage, keyspace, e.what()); } return false; }, @@ -110,15 +114,20 @@ void SchemaSyncService::addKeyspaceGCTasks() context.getSettingsRef().ddl_sync_interval_seconds * 1000); keyspace_handle_map.emplace(keyspace, task_handle); + num_add_tasks += 1; } + + auto log_level = num_add_tasks > 0 ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG; + LOG_IMPL(log, log_level, "add sync schema task for keyspaces done, num_add_tasks={}", num_add_tasks); } void SchemaSyncService::removeKeyspaceGCTasks() { const auto keyspaces = context.getTMTContext().getStorages().getAllKeyspaces(); - std::unique_lock lock(keyspace_map_mutex); + UInt64 num_remove_tasks = 0; // Remove stale sync schema task. + std::unique_lock lock(keyspace_map_mutex); for (auto keyspace_handle_iter = keyspace_handle_map.begin(); keyspace_handle_iter != keyspace_handle_map.end(); /*empty*/) { @@ -128,6 +137,7 @@ void SchemaSyncService::removeKeyspaceGCTasks() ++keyspace_handle_iter; continue; } + auto keyspace_log = log->getChild(fmt::format("keyspace={}", keyspace)); LOG_INFO(keyspace_log, "remove sync schema task"); background_pool.removeTask(keyspace_handle_iter->second); @@ -135,17 +145,20 @@ void SchemaSyncService::removeKeyspaceGCTasks() context.getTMTContext().getSchemaSyncerManager()->removeSchemaSyncer(keyspace); PDClientHelper::remove_ks_gc_sp(keyspace); - - keyspace_gc_context.erase(keyspace); + keyspace_gc_context.erase(keyspace); // clear the last gc safepoint } + + auto log_level = num_remove_tasks > 0 ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG; + LOG_IMPL(log, log_level, "remove sync schema task for keyspaces done, num_remove_tasks={}", num_remove_tasks); } -SchemaSyncService::~SchemaSyncService() +void SchemaSyncService::shutdown() { if (handle) { // stop the root handle first background_pool.removeTask(handle); + handle = nullptr; } for (auto const & iter : keyspace_handle_map) @@ -156,6 +169,11 @@ SchemaSyncService::~SchemaSyncService() LOG_INFO(log, "SchemaSyncService stopped"); } +SchemaSyncService::~SchemaSyncService() +{ + shutdown(); +} + bool SchemaSyncService::syncSchemas(KeyspaceID keyspace_id) { return context.getTMTContext().getSchemaSyncerManager()->syncSchemas(context, keyspace_id); @@ -168,12 +186,12 @@ inline std::tuple isSafeForGC(const DatabaseOrTablePtr & ptr, T return {tombstone_ts != 0 && tombstone_ts < gc_safepoint, tombstone_ts}; } -Timestamp SchemaSyncService::lastGcSafePoint(KeyspaceID keyspace_id) const +std::optional SchemaSyncService::lastGcSafePoint(KeyspaceID keyspace_id) const { std::shared_lock lock(keyspace_map_mutex); auto iter = keyspace_gc_context.find(keyspace_id); if (iter == keyspace_gc_context.end()) - return 0; + return std::nullopt; return iter->second.last_gc_safepoint; } @@ -185,8 +203,12 @@ void SchemaSyncService::updateLastGcSafepoint(KeyspaceID keyspace_id, Timestamp bool SchemaSyncService::gc(Timestamp gc_safepoint, KeyspaceID keyspace_id) { - const Timestamp last_gc_safepoint = lastGcSafePoint(keyspace_id); - if (last_gc_safepoint != 0 && gc_safepoint == last_gc_safepoint) + const std::optional last_gc_safepoint = lastGcSafePoint(keyspace_id); + // for new deploy cluster, there is an interval that gc_safepoint return 0, skip it + if (gc_safepoint == 0) + return false; + // the gc safepoint is not changed since last schema gc run, skip it + if (last_gc_safepoint.has_value() && gc_safepoint == *last_gc_safepoint) return false; auto keyspace_log = log->getChild(fmt::format("keyspace={}", keyspace_id)); diff --git a/dbms/src/TiDB/Schema/SchemaSyncService.h b/dbms/src/TiDB/Schema/SchemaSyncService.h index 8551c1cd4d4..d7b31b92807 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncService.h +++ b/dbms/src/TiDB/Schema/SchemaSyncService.h @@ -45,13 +45,15 @@ class SchemaSyncService bool gc(Timestamp gc_safepoint, KeyspaceID keyspace_id); + void shutdown(); + private: bool syncSchemas(KeyspaceID keyspace_id); void addKeyspaceGCTasks(); void removeKeyspaceGCTasks(); - Timestamp lastGcSafePoint(KeyspaceID keyspace_id) const; + std::optional lastGcSafePoint(KeyspaceID keyspace_id) const; void updateLastGcSafepoint(KeyspaceID keyspace_id, Timestamp gc_safepoint); private: diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index cd11a4127d1..e6414084c43 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -274,7 +274,6 @@ try auto table_ids = MockTiDB::instance().newTables(db_name, tables, pd_client->getTS(), "dt"); refreshSchema(); - for (auto table_id : table_ids) { refreshTableSchema(table_id); @@ -286,15 +285,26 @@ try MockTiDB::instance().dropTable(global_ctx, db_name, "t1", true); refreshSchema(); + for (auto table_id : table_ids) + { + refreshTableSchema(table_id); + } global_ctx.initializeSchemaSyncService(); auto sync_service = global_ctx.getSchemaSyncService(); + // run gc with safepoint == 0, will be skip + ASSERT_FALSE(sync_service->gc(0, NullspaceID)); ASSERT_TRUE(sync_service->gc(10000000, NullspaceID)); // run gc with the same safepoint, will be skip ASSERT_FALSE(sync_service->gc(10000000, NullspaceID)); - // run gc for another keyspace + // run gc for another keyspace with same safepoint, will be executed + ASSERT_TRUE(sync_service->gc(10000000, 1024)); + // run gc with changed safepoint ASSERT_TRUE(sync_service->gc(20000000, 1024)); + // run gc with the same safepoint ASSERT_FALSE(sync_service->gc(20000000, 1024)); + + sync_service->shutdown(); } CATCH