diff --git a/dbms/src/Debug/MockSchemaGetter.h b/dbms/src/Debug/MockSchemaGetter.h index f02699866ce..11c5d97f036 100644 --- a/dbms/src/Debug/MockSchemaGetter.h +++ b/dbms/src/Debug/MockSchemaGetter.h @@ -17,16 +17,25 @@ #include #include +#include + namespace DB { - struct MockSchemaGetter { TiDB::DBInfoPtr getDatabase(DatabaseID db_id) { return MockTiDB::instance().getDBInfoByID(db_id); } Int64 getVersion() { return MockTiDB::instance().getVersion(); } - SchemaDiff getSchemaDiff(Int64 version) { return MockTiDB::instance().getSchemaDiff(version); } + std::optional getSchemaDiff(Int64 version) + { + return MockTiDB::instance().getSchemaDiff(version); + } + + bool checkSchemaDiffExists(Int64 version) + { + return MockTiDB::instance().checkSchemaDiffExists(version); + } TiDB::TableInfoPtr getTableInfo(DatabaseID, TableID table_id) { return MockTiDB::instance().getTableInfoByID(table_id); } diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 7b3bdb0948f..99d9625461b 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -668,9 +668,14 @@ std::pair MockTiDB::getDBIDByName(const String & database_name return std::make_pair(false, -1); } -SchemaDiff MockTiDB::getSchemaDiff(Int64 version_) +std::optional MockTiDB::getSchemaDiff(Int64 version_) { return version_diff[version_]; } +bool MockTiDB::checkSchemaDiffExists(Int64 version) +{ + return version_diff.find(version) != version_diff.end(); +} + } // namespace DB diff --git a/dbms/src/Debug/MockTiDB.h b/dbms/src/Debug/MockTiDB.h index 36d2af90859..261e547b13a 100644 --- a/dbms/src/Debug/MockTiDB.h +++ b/dbms/src/Debug/MockTiDB.h @@ -127,7 +127,9 @@ class MockTiDB : public ext::Singleton std::pair getDBIDByName(const String & database_name); - SchemaDiff getSchemaDiff(Int64 version); + bool checkSchemaDiffExists(Int64 version); + + std::optional getSchemaDiff(Int64 version); std::unordered_map getDatabases() { return databases; } diff --git a/dbms/src/Storages/Transaction/ReadIndexWorker.cpp b/dbms/src/Storages/Transaction/ReadIndexWorker.cpp index 3223c815989..7de79dd5c6d 100644 --- a/dbms/src/Storages/Transaction/ReadIndexWorker.cpp +++ b/dbms/src/Storages/Transaction/ReadIndexWorker.cpp @@ -880,7 +880,7 @@ BatchReadIndexRes ReadIndexWorkerManager::batchReadIndex( } } { // if meet timeout, which means part of regions can not get response from leader, try to poll rest tasks - TEST_LOG_FMT("rest {}, poll rest tasks onece", tasks.size()); + TEST_LOG_FMT("rest {}, poll rest tasks once", tasks.size()); while (!tasks.empty()) { diff --git a/dbms/src/TiDB/Schema/SchemaGetter.cpp b/dbms/src/TiDB/Schema/SchemaGetter.cpp index 7f52f9301b1..6e333d6ba87 100644 --- a/dbms/src/TiDB/Schema/SchemaGetter.cpp +++ b/dbms/src/TiDB/Schema/SchemaGetter.cpp @@ -19,7 +19,6 @@ namespace DB { - namespace ErrorCodes { extern const int SCHEMA_SYNC_ERROR; @@ -188,18 +187,26 @@ Int64 SchemaGetter::getVersion() return std::stoll(ver); } +bool SchemaGetter::checkSchemaDiffExists(Int64 ver) +{ + String key = getSchemaDiffKey(ver); + String data = TxnStructure::get(snap, key); + return !data.empty(); +} + String SchemaGetter::getSchemaDiffKey(Int64 ver) { return std::string(schemaDiffPrefix) + ":" + std::to_string(ver); } -SchemaDiff SchemaGetter::getSchemaDiff(Int64 ver) +std::optional SchemaGetter::getSchemaDiff(Int64 ver) { String key = getSchemaDiffKey(ver); String data = TxnStructure::get(snap, key); if (data.empty()) { - throw TiFlashException("cannot find schema diff for version: " + std::to_string(ver), Errors::Table::SyncError); + LOG_FMT_WARNING(log, "The schema diff for version {}, key {} is empty.", ver, key); + return std::nullopt; } SchemaDiff diff; diff.deserialize(data); diff --git a/dbms/src/TiDB/Schema/SchemaGetter.h b/dbms/src/TiDB/Schema/SchemaGetter.h index 02d2f7a7c88..fe0ecd59af0 100644 --- a/dbms/src/TiDB/Schema/SchemaGetter.h +++ b/dbms/src/TiDB/Schema/SchemaGetter.h @@ -26,6 +26,8 @@ #include +#include + namespace DB { // The enum results are completely the same as the DDL Action listed in the "parser/model/ddl.go" of TiDB codebase, which must be keeping in sync. @@ -138,7 +140,9 @@ struct SchemaGetter Int64 getVersion(); - SchemaDiff getSchemaDiff(Int64 ver); + bool checkSchemaDiffExists(Int64 ver); + + std::optional getSchemaDiff(Int64 ver); static String getSchemaDiffKey(Int64 ver); diff --git a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h index 4fdba195acb..a23aeab139f 100644 --- a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h +++ b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h @@ -106,21 +106,31 @@ struct TiDBSchemaSyncer : public SchemaSyncer Stopwatch watch; SCOPE_EXIT({ GET_METRIC(tiflash_schema_apply_duration_seconds).Observe(watch.elapsedSeconds()); }); - LOG_FMT_INFO(log, "start to sync schemas. current version is: {} and try to sync schema version to: {}", cur_version, version); + LOG_FMT_INFO(log, "Start to sync schemas. current version is: {} and try to sync schema version to: {}", cur_version, version); // Show whether the schema mutex is held for a long time or not. GET_METRIC(tiflash_schema_applying).Set(1.0); SCOPE_EXIT({ GET_METRIC(tiflash_schema_applying).Set(0.0); }); GET_METRIC(tiflash_schema_apply_count, type_diff).Increment(); - if (!tryLoadSchemaDiffs(getter, version, context)) + // After the feature concurrent DDL, TiDB does `update schema version` before `set schema diff`, and they are done in separate transactions. + // So TiFlash may see a schema version X but no schema diff X, meaning that the transaction of schema diff X has not been committed or has + // been aborted. + // However, TiDB makes sure that if we get a schema version X, then the schema diff X-1 must exist. Otherwise the transaction of schema diff + // X-1 is aborted and we can safely ignore it. + // Since TiDB can not make sure the schema diff of the latest schema version X is not empty, under this situation we should set the `cur_version` + // to X-1 and try to fetch the schema diff X next time. + Int64 version_after_load_diff = 0; + if (version_after_load_diff = tryLoadSchemaDiffs(getter, version, context); version_after_load_diff == -1) { GET_METRIC(tiflash_schema_apply_count, type_full).Increment(); loadAllSchema(getter, version, context); + // After loadAllSchema, we need update `version_after_load_diff` by last diff value exist or not + version_after_load_diff = getter.checkSchemaDiffExists(version) ? version : version - 1; } - cur_version = version; + cur_version = version_after_load_diff; GET_METRIC(tiflash_schema_version).Set(cur_version); - LOG_FMT_INFO(log, "end sync schema, version has been updated to {}", cur_version); + LOG_FMT_INFO(log, "End sync schema, version has been updated to {}{}", cur_version, cur_version == version ? "" : "(latest diff is empty)"); return true; } @@ -144,30 +154,60 @@ struct TiDBSchemaSyncer : public SchemaSyncer return it->second; } - bool tryLoadSchemaDiffs(Getter & getter, Int64 version, Context & context) + // Return Values + // - if latest schema diff is not empty, return the (latest_version) + // - if latest schema diff is empty, return the (latest_version - 1) + // - if error happend, return (-1) + Int64 tryLoadSchemaDiffs(Getter & getter, Int64 latest_version, Context & context) { - if (isTooOldSchema(cur_version, version)) + if (isTooOldSchema(cur_version, latest_version)) { - return false; + return -1; } - LOG_FMT_DEBUG(log, "try load schema diffs."); + LOG_FMT_DEBUG(log, "Try load schema diffs."); - SchemaBuilder builder(getter, context, databases, version); + SchemaBuilder builder(getter, context, databases, latest_version); Int64 used_version = cur_version; - std::vector diffs; - while (used_version < version) + // First get all schema diff from `cur_version` to `latest_version`. Only apply the schema diff(s) if we fetch all + // schema diff without any exception. + std::vector> diffs; + while (used_version < latest_version) { used_version++; diffs.push_back(getter.getSchemaDiff(used_version)); } - LOG_FMT_DEBUG(log, "end load schema diffs with total {} entries.", diffs.size()); + LOG_FMT_DEBUG(log, "End load schema diffs with total {} entries.", diffs.size()); + try { - for (const auto & diff : diffs) + for (size_t diff_index = 0; diff_index < diffs.size(); ++diff_index) { - builder.applyDiff(diff); + const auto & schema_diff = diffs[diff_index]; + + if (!schema_diff) + { + // If `schema diff` from `latest_version` got empty `schema diff` + // Then we won't apply to `latest_version`, but we will apply to `latest_version - 1` + // If `schema diff` from [`cur_version`, `latest_version - 1`] got empty `schema diff` + // Then we should just skip it. + // + // example: + // - `cur_version` is 1, `latest_version` is 10 + // - The schema diff of schema version [2,4,6] is empty, Then we just skip it. + // - The schema diff of schema version 10 is empty, Then we should just apply version into 9 + if (diff_index != diffs.size() - 1) + { + LOG_FMT_WARNING(log, "Skip the schema diff from version {}. ", cur_version + diff_index + 1); + continue; + } + + // if diff_index == diffs.size() - 1, return used_version - 1; + return used_version - 1; + } + + builder.applyDiff(*schema_diff); } } catch (TiFlashException & e) @@ -177,7 +217,7 @@ struct TiDBSchemaSyncer : public SchemaSyncer GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); } LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString()); - return false; + return -1; } catch (Exception & e) { @@ -187,21 +227,22 @@ struct TiDBSchemaSyncer : public SchemaSyncer } GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString()); - return false; + return -1; } catch (Poco::Exception & e) { GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.displayText()); - return false; + return -1; } catch (std::exception & e) { GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.what()); - return false; + return -1; } - return true; + + return used_version; } void loadAllSchema(Getter & getter, Int64 version, Context & context)