Skip to content

Commit

Permalink
ddl: Fix zero safepoint make schema gc run repeatly (#8361)
Browse files Browse the repository at this point in the history
close #8356
  • Loading branch information
JaySon-Huang authored Nov 13, 2023
1 parent 949f1ac commit 1addc37
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 17 deletions.
50 changes: 36 additions & 14 deletions dbms/src/TiDB/Schema/SchemaSyncService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include <TiDB/Schema/TiDBSchemaManager.h>
#include <common/logger_useful.h>

#include <optional>

namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -54,9 +56,10 @@ SchemaSyncService::SchemaSyncService(DB::Context & context_)
void SchemaSyncService::addKeyspaceGCTasks()
{
const auto keyspaces = context.getTMTContext().getStorages().getAllKeyspaces();
std::unique_lock<std::shared_mutex> lock(keyspace_map_mutex);

UInt64 num_add_tasks = 0;
// Add new sync schema task for new keyspace.
std::unique_lock<std::shared_mutex> lock(keyspace_map_mutex);
for (auto const iter : keyspaces)
{
auto keyspace = iter.first;
Expand All @@ -75,14 +78,14 @@ void SchemaSyncService::addKeyspaceGCTasks()
/// They must be performed synchronously,
/// otherwise table may get mis-GC-ed if RECOVER was not properly synced caused by schema sync pause but GC runs too aggressively.
// GC safe point must be obtained ahead of syncing schema.
auto gc_safe_point
= PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), keyspace);
stage = "Sync schemas";
done_anything = syncSchemas(keyspace);
if (done_anything)
GET_METRIC(tiflash_schema_trigger_count, type_timer).Increment();

stage = "GC";
auto gc_safe_point
= PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), keyspace);
done_anything = gc(gc_safe_point, keyspace);

return done_anything;
Expand All @@ -91,34 +94,40 @@ void SchemaSyncService::addKeyspaceGCTasks()
{
LOG_ERROR(
ks_log,
"{} failed by {} \n stack : {}",
"{}, keyspace={} failed by {} \n stack : {}",
stage,
keyspace,
e.displayText(),
e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(ks_log, "{} failed by {}", stage, e.displayText());
LOG_ERROR(ks_log, "{}, keyspace={} failed by {}", stage, keyspace, e.displayText());
}
catch (const std::exception & e)
{
LOG_ERROR(ks_log, "{} failed by {}", stage, e.what());
LOG_ERROR(ks_log, "{}, keyspace={} failed by {}", stage, keyspace, e.what());
}
return false;
},
false,
context.getSettingsRef().ddl_sync_interval_seconds * 1000);

keyspace_handle_map.emplace(keyspace, task_handle);
num_add_tasks += 1;
}

auto log_level = num_add_tasks > 0 ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(log, log_level, "add sync schema task for keyspaces done, num_add_tasks={}", num_add_tasks);
}

void SchemaSyncService::removeKeyspaceGCTasks()
{
const auto keyspaces = context.getTMTContext().getStorages().getAllKeyspaces();
std::unique_lock<std::shared_mutex> lock(keyspace_map_mutex);

UInt64 num_remove_tasks = 0;
// Remove stale sync schema task.
std::unique_lock<std::shared_mutex> lock(keyspace_map_mutex);
for (auto keyspace_handle_iter = keyspace_handle_map.begin(); keyspace_handle_iter != keyspace_handle_map.end();
/*empty*/)
{
Expand All @@ -128,24 +137,28 @@ void SchemaSyncService::removeKeyspaceGCTasks()
++keyspace_handle_iter;
continue;
}

auto keyspace_log = log->getChild(fmt::format("keyspace={}", keyspace));
LOG_INFO(keyspace_log, "remove sync schema task");
background_pool.removeTask(keyspace_handle_iter->second);
keyspace_handle_iter = keyspace_handle_map.erase(keyspace_handle_iter);

context.getTMTContext().getSchemaSyncerManager()->removeSchemaSyncer(keyspace);
PDClientHelper::remove_ks_gc_sp(keyspace);

keyspace_gc_context.erase(keyspace);
keyspace_gc_context.erase(keyspace); // clear the last gc safepoint
}

auto log_level = num_remove_tasks > 0 ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(log, log_level, "remove sync schema task for keyspaces done, num_remove_tasks={}", num_remove_tasks);
}

SchemaSyncService::~SchemaSyncService()
void SchemaSyncService::shutdown()
{
if (handle)
{
// stop the root handle first
background_pool.removeTask(handle);
handle = nullptr;
}

for (auto const & iter : keyspace_handle_map)
Expand All @@ -156,6 +169,11 @@ SchemaSyncService::~SchemaSyncService()
LOG_INFO(log, "SchemaSyncService stopped");
}

SchemaSyncService::~SchemaSyncService()
{
shutdown();
}

bool SchemaSyncService::syncSchemas(KeyspaceID keyspace_id)
{
return context.getTMTContext().getSchemaSyncerManager()->syncSchemas(context, keyspace_id);
Expand All @@ -168,12 +186,12 @@ inline std::tuple<bool, Timestamp> isSafeForGC(const DatabaseOrTablePtr & ptr, T
return {tombstone_ts != 0 && tombstone_ts < gc_safepoint, tombstone_ts};
}

Timestamp SchemaSyncService::lastGcSafePoint(KeyspaceID keyspace_id) const
std::optional<Timestamp> SchemaSyncService::lastGcSafePoint(KeyspaceID keyspace_id) const
{
std::shared_lock lock(keyspace_map_mutex);
auto iter = keyspace_gc_context.find(keyspace_id);
if (iter == keyspace_gc_context.end())
return 0;
return std::nullopt;
return iter->second.last_gc_safepoint;
}

Expand All @@ -185,8 +203,12 @@ void SchemaSyncService::updateLastGcSafepoint(KeyspaceID keyspace_id, Timestamp

bool SchemaSyncService::gc(Timestamp gc_safepoint, KeyspaceID keyspace_id)
{
const Timestamp last_gc_safepoint = lastGcSafePoint(keyspace_id);
if (last_gc_safepoint != 0 && gc_safepoint == last_gc_safepoint)
const std::optional<Timestamp> last_gc_safepoint = lastGcSafePoint(keyspace_id);
// for new deploy cluster, there is an interval that gc_safepoint return 0, skip it
if (gc_safepoint == 0)
return false;
// the gc safepoint is not changed since last schema gc run, skip it
if (last_gc_safepoint.has_value() && gc_safepoint == *last_gc_safepoint)
return false;

auto keyspace_log = log->getChild(fmt::format("keyspace={}", keyspace_id));
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/TiDB/Schema/SchemaSyncService.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ class SchemaSyncService

bool gc(Timestamp gc_safepoint, KeyspaceID keyspace_id);

void shutdown();

private:
bool syncSchemas(KeyspaceID keyspace_id);

void addKeyspaceGCTasks();
void removeKeyspaceGCTasks();

Timestamp lastGcSafePoint(KeyspaceID keyspace_id) const;
std::optional<Timestamp> lastGcSafePoint(KeyspaceID keyspace_id) const;
void updateLastGcSafepoint(KeyspaceID keyspace_id, Timestamp gc_safepoint);

private:
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ try
auto table_ids = MockTiDB::instance().newTables(db_name, tables, pd_client->getTS(), "dt");

refreshSchema();

for (auto table_id : table_ids)
{
refreshTableSchema(table_id);
Expand All @@ -286,15 +285,26 @@ try
MockTiDB::instance().dropTable(global_ctx, db_name, "t1", true);

refreshSchema();
for (auto table_id : table_ids)
{
refreshTableSchema(table_id);
}

global_ctx.initializeSchemaSyncService();
auto sync_service = global_ctx.getSchemaSyncService();
// run gc with safepoint == 0, will be skip
ASSERT_FALSE(sync_service->gc(0, NullspaceID));
ASSERT_TRUE(sync_service->gc(10000000, NullspaceID));
// run gc with the same safepoint, will be skip
ASSERT_FALSE(sync_service->gc(10000000, NullspaceID));
// run gc for another keyspace
// run gc for another keyspace with same safepoint, will be executed
ASSERT_TRUE(sync_service->gc(10000000, 1024));
// run gc with changed safepoint
ASSERT_TRUE(sync_service->gc(20000000, 1024));
// run gc with the same safepoint
ASSERT_FALSE(sync_service->gc(20000000, 1024));

sync_service->shutdown();
}
CATCH

Expand Down

0 comments on commit 1addc37

Please sign in to comment.