Skip to content

Commit

Permalink
[flash-360] support cyclic rename (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Aug 6, 2019
1 parent 0f3ea5f commit 2f8ee1b
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 53 deletions.
172 changes: 151 additions & 21 deletions dbms/src/Storages/Transaction/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ inline AlterCommands detectSchemaChanges(Logger * log, const TableInfo & table_i
return alter_commands;
}

void SchemaBuilder::applyAlterTableImpl(TiDB::TableInfoPtr table_info, const String & db_name, StorageMergeTree * storage)
void SchemaBuilder::applyAlterTableImpl(TableInfoPtr table_info, const String & db_name, StorageMergeTree * storage)
{
table_info->schema_version = target_version;
auto orig_table_info = storage->getTableInfo();
Expand Down Expand Up @@ -275,9 +275,9 @@ void SchemaBuilder::applyRenameTable(DBInfoPtr db_info, DatabaseID old_db_id, Ta

void SchemaBuilder::applyRenameTableImpl(const String & old_db, const String & new_db, const String & old_table, const String & new_table)
{
LOG_INFO(log, "The " + old_db + "." + old_table + " will be renamed to " + new_db + "." + new_table);
if (old_db == new_db && old_table == new_table)
{
LOG_INFO(log, "The " + old_db + "." + old_table + " has been renamed, nothing needs to do");
return;
}

Expand Down Expand Up @@ -328,10 +328,13 @@ void SchemaBuilder::applyCreateSchemaImpl(TiDB::DBInfoPtr db_info)
void SchemaBuilder::applyDropSchema(DatabaseID schema_id)
{
auto database_name = databases[schema_id];
if (database_name == "")
if (unlikely(database_name == ""))
{
LOG_INFO(
log, "Syncer wants to drop database: " + std::to_string(schema_id) + " . But database is not found, may has been dropped.");
return;
}
LOG_INFO(log, "Try to drop database: " + database_name);
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = database_name;
drop_query->if_exists = true;
Expand Down Expand Up @@ -477,45 +480,172 @@ void SchemaBuilder::applyDropTable(TiDB::DBInfoPtr dbInfo, Int64 table_id)
applyDropTableImpl(database_name, table_info.name);
}

void SchemaBuilder::updateDB(TiDB::DBInfoPtr db_info)
// Drop Invalid Tables in Every DB
void SchemaBuilder::dropInvalidTables(std::vector<std::pair<TableInfoPtr, DBInfoPtr>> table_dbs)
{
auto database_name = databases[db_info->id];
if (database_name == "")
{
applyCreateSchemaImpl(db_info);
}
auto tables = getter.listTables(db_info->id);
auto & tmt_context = context.getTMTContext();

std::set<TableID> table_ids;

for (auto table : tables)
table_ids.insert(table->id);
for (auto table_db : table_dbs)
table_ids.insert(table_db.first->id);

auto & tmt_context = context.getTMTContext();
auto storage_map = tmt_context.getStorages().getAllStorage();
for (auto it = storage_map.begin(); it != storage_map.end(); it++)
{
auto storage = it->second;
if (storage->getDatabaseName() == db_info->name && table_ids.count(storage->getTableInfo().id) == 0)
if (table_ids.count(storage->getTableInfo().id) == 0)
{
// Drop Table
applyDropTableImpl(db_info->name, storage->getTableName());
LOG_DEBUG(log, "Table " + db_info->name + "." + storage->getTableName() + " is dropped during schema all schemas");
const String db_name = storage->getDatabaseName();
applyDropTableImpl(db_name, storage->getTableName());
LOG_DEBUG(log, "Table " + db_name + "." + storage->getTableName() + " is dropped during schema all schemas");
}
}
}

using TableName = std::pair<String, String>;
using TableNamePair = std::pair<TableName, TableName>;
using TableNameMap = std::map<TableName, TableName>;
using TableNameSet = std::set<TableName>;
constexpr char TmpTableNamePrefix[] = "_tiflash_tmp_";

inline TableName generateTmpTable(const TableName & name) { return TableName(name.first, String(TmpTableNamePrefix) + name.second); }

TableNamePair resolveRename(
SchemaBuilder * builder, TableNameMap & map, TableNameMap::iterator it, TableNameSet & visited)
{
TableName target_name = it->second;
TableName origin_name = it->first;
visited.insert(it->first);
auto next_it = map.find(target_name);
if (next_it == map.end())
{
builder->applyRenameTableImpl(origin_name.first, target_name.first, origin_name.second, target_name.second);
map.erase(it);
return TableNamePair();
}
else if (visited.find(target_name) != visited.end())
{
// There is a cycle.
auto tmp_name = generateTmpTable(target_name);
builder->applyRenameTableImpl(target_name.first, tmp_name.first, target_name.second, tmp_name.second);
builder->applyRenameTableImpl(origin_name.first, target_name.first, origin_name.second, target_name.second);
map.erase(it);
return TableNamePair(target_name, tmp_name);
}
else
{
auto pair = resolveRename(builder, map, next_it, visited);
if (pair.first == origin_name)
{
origin_name = pair.second;
}
builder->applyRenameTableImpl(origin_name.first, target_name.first, origin_name.second, target_name.second);
map.erase(it);
return pair;
}
}

void SchemaBuilder::alterAndRenameTables(std::vector<std::pair<TableInfoPtr, DBInfoPtr>> table_dbs)
{
// Rename Table First.
auto & tmt_context = context.getTMTContext();
auto storage_map = tmt_context.getStorages().getAllStorage();
TableNameMap rename_map;
for (auto table_db : table_dbs)
{
auto storage = static_cast<StorageMergeTree *>(tmt_context.getStorages().get(table_db.first->id).get());
if (storage != nullptr)
{
const String old_db = storage->getDatabaseName();
const String old_table = storage->getTableName();
const String new_db = table_db.second->name;
const String new_table = table_db.first->name;
if (old_db != new_db || old_table != new_table)
{
rename_map[TableName(old_db, old_table)] = TableName(new_db, new_table);
}
}
}

while (!rename_map.empty())
{
auto it = rename_map.begin();
TableNameSet visited;
resolveRename(this, rename_map, it, visited);
}

// Then Alter Table
for (auto table_db : table_dbs)
{
auto storage = static_cast<StorageMergeTree *>(tmt_context.getStorages().get(table_db.first->id).get());
if (storage != nullptr)
{
const String db_name = storage->getDatabaseName();
applyAlterTableImpl(table_db.first, db_name, storage);
}
}
}

for (auto table : tables)
void SchemaBuilder::createTables(std::vector<std::pair<TableInfoPtr, DBInfoPtr>> table_dbs)
{
auto & tmt_context = context.getTMTContext();
for (auto table_db : table_dbs)
{
auto storage = static_cast<StorageMergeTree *>(tmt_context.getStorages().get(table->id).get());
auto storage = static_cast<StorageMergeTree *>(tmt_context.getStorages().get(table_db.first->id).get());
if (storage == nullptr)
{
applyCreateTable(db_info, table->id);
applyCreateTableImpl(*table_db.second, *table_db.first);
}
else
}
}

void SchemaBuilder::syncAllSchema()
{
LOG_DEBUG(log, "try load all schemas.");

std::vector<DBInfoPtr> all_schema = getter.listDBs();

for (auto db_info : all_schema)
{
LOG_DEBUG(log, "Load schema : " + db_info->name);
}

std::set<TiDB::DatabaseID> db_ids;
for (auto db : all_schema)
{
db_ids.insert(db->id);
}

// Drop invalid databases;
for (auto it = databases.begin(); it != databases.end(); it++)
{
if (db_ids.count(it->first) == 0)
{
applyAlterTableImpl(table, db_info->name, storage);
applyDropSchema(it->first);
}
}

// Collect All Table Info and Create DBs.
std::vector<std::pair<TableInfoPtr, DBInfoPtr>> all_tables;
for (auto db : all_schema)
{
auto database_name = databases[db->id];
if (database_name == "")
{
applyCreateSchemaImpl(db);
}
std::vector<TableInfoPtr> tables = getter.listTables(db->id);
for (auto table : tables)
{
all_tables.push_back(std::make_pair(table, db));
}
}

dropInvalidTables(all_tables);
alterAndRenameTables(all_tables);
createTables(all_tables);
}

// end namespace
Expand Down
13 changes: 10 additions & 3 deletions dbms/src/Storages/Transaction/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ struct SchemaBuilder

void applyDiff(const SchemaDiff & diff);

void updateDB(TiDB::DBInfoPtr db_info);

void applyDropSchema(DatabaseID schema_id);

void syncAllSchema();

void applyRenameTableImpl(const String & old_db, const String & new_db, const String & old_table, const String & new_table);

private:
bool applyCreateSchema(DatabaseID schema_id);

Expand All @@ -54,7 +56,12 @@ struct SchemaBuilder

void applyRenameTable(TiDB::DBInfoPtr db_info, TiDB::DatabaseID old_db_id, TiDB::TableID table_id);

void applyRenameTableImpl(const String & old_db, const String & new_db, const String & old_table, const String & new_table);

void createTables(std::vector<std::pair<TiDB::TableInfoPtr, TiDB::DBInfoPtr>> table_dbs);

void alterAndRenameTables(std::vector<std::pair<TiDB::TableInfoPtr, TiDB::DBInfoPtr>> table_dbs);

void dropInvalidTables(std::vector<std::pair<TiDB::TableInfoPtr, TiDB::DBInfoPtr>> table_dbs);
};

} // namespace DB
44 changes: 15 additions & 29 deletions dbms/src/Storages/Transaction/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ struct TiDBSchemaSyncer : public SchemaSyncer
{
return false;
}
LOG_INFO(log, "start to sync schemas. current version is: " + std::to_string(cur_version) + " and try to sync schema version to: " + std::to_string(version));
LOG_INFO(log,
"start to sync schemas. current version is: " + std::to_string(cur_version)
+ " and try to sync schema version to: " + std::to_string(version));
if (!tryLoadSchemaDiffs(getter, version, context))
{
loadAllSchema(getter, version, context);
Expand Down Expand Up @@ -68,41 +70,25 @@ struct TiDBSchemaSyncer : public SchemaSyncer
diffs.push_back(getter.getSchemaDiff(used_version));
}
LOG_DEBUG(log, "end load schema diffs.");
for (const auto & diff : diffs)
try
{
builder.applyDiff(diff);
for (const auto & diff : diffs)
{
builder.applyDiff(diff);
}
}
catch (Exception & e)
{
LOG_ERROR(log, "apply diff meets exception : " + e.displayText());
return false;
}
return true;
}

bool loadAllSchema(SchemaGetter & getter, Int64 version, Context & context)
void loadAllSchema(SchemaGetter & getter, Int64 version, Context & context)
{
LOG_DEBUG(log, "try load all schemas.");

std::vector<TiDB::DBInfoPtr> all_schema = getter.listDBs();

for (auto db_info : all_schema)
{
LOG_DEBUG(log, "Load schema : " + db_info->name);
}

SchemaBuilder builder(getter, context, databases, version);

std::set<TiDB::DatabaseID> db_ids;
for (auto db : all_schema)
{
builder.updateDB(db);
db_ids.insert(db->id);
}
// Drop databases;
for (auto it = databases.begin(); it != databases.end(); it++)
{
if (db_ids.count(it->first) == 0)
{
builder.applyDropSchema(it->first);
}
}
return true;
builder.syncAllSchema();
}
};

Expand Down

0 comments on commit 2f8ee1b

Please sign in to comment.