Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize schema sync on read and refine mock TiDB #170

Merged
merged 5 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 65 additions & 37 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <Debug/MockTiDB.h>

#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDecimal.h>
Expand All @@ -7,10 +9,10 @@
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>

#include <Functions/FunctionHelpers.h>

#include <Debug/MockTiDB.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>

namespace DB
{
Expand All @@ -28,8 +30,57 @@ Table::Table(const String & database_name_, const String & table_name_, TableInf

MockTiDB::MockTiDB() { databases["default"] = 0; }

void MockTiDB::dropDB(const String & database_name)
TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_name, const String & table_name, bool drop_regions)
{
String qualified_name = database_name + "." + table_name;
auto it_by_name = tables_by_name.find(qualified_name);
if (it_by_name == tables_by_name.end())
return nullptr;

auto & kvstore = context.getTMTContext().getKVStore();
auto & region_table = context.getTMTContext().getRegionTable();

auto table = it_by_name->second;
if (table->isPartitionTable())
{
for (const auto & partition : table->table_info.partition.definitions)
{
tables_by_id.erase(partition.id);
if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(partition.id))
kvstore->removeRegion(e.first, &region_table);
region_table.mockDropRegionsInTable(partition.id);
}
}
}
tables_by_id.erase(table->id());

tables_by_name.erase(it_by_name);

if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(table->id()))
kvstore->removeRegion(e.first, &region_table);
region_table.mockDropRegionsInTable(table->id());
}

return table;
}

void MockTiDB::dropDB(Context & context, const String & database_name, bool drop_regions)
{
std::lock_guard lock(tables_mutex);

std::vector<String> table_names;
std::for_each(tables_by_id.begin(), tables_by_id.end(), [&](const auto & pair) {
if (pair.second->table_info.db_name == database_name)
table_names.emplace_back(pair.second->table_info.name);
});

for (const auto & table_name : table_names)
dropTableInternal(context, database_name, table_name, drop_regions);

version++;

SchemaDiff diff;
Expand All @@ -44,38 +95,22 @@ void MockTiDB::dropDB(const String & database_name)
databases.erase(database_name);
}

void MockTiDB::dropTable(const String & database_name, const String & table_name, bool is_drop_db)
void MockTiDB::dropTable(Context & context, const String & database_name, const String & table_name, bool drop_regions)
{
std::lock_guard lock(tables_mutex);

String qualified_name = database_name + "." + table_name;
auto it_by_name = tables_by_name.find(qualified_name);
if (it_by_name == tables_by_name.end())
auto table = dropTableInternal(context, database_name, table_name, drop_regions);
if (!table)
return;

const auto & table = it_by_name->second;
if (table->isPartitionTable())
{
for (const auto & partition : table->table_info.partition.definitions)
{
tables_by_id.erase(partition.id);
}
}
tables_by_id.erase(table->id());

tables_by_name.erase(it_by_name);
version++;

if (!is_drop_db)
{
version++;

SchemaDiff diff;
diff.type = SchemaActionDropTable;
diff.schema_id = table->table_info.db_id;
diff.table_id = table->id();
diff.version = version;
version_diff[version] = diff;
}
SchemaDiff diff;
diff.type = SchemaActionDropTable;
diff.schema_id = table->table_info.db_id;
diff.table_id = table->id();
diff.version = version;
version_diff[version] = diff;
}

ColumnInfo getColumnInfoFromColumn(const NameAndTypePair & column, ColumnID id)
Expand Down Expand Up @@ -361,13 +396,6 @@ TablePtr MockTiDB::getTableByName(const String & database_name, const String & t
return getTableByNameInternal(database_name, table_name);
}

void MockTiDB::traverseTables(std::function<void(TablePtr)> f)
{
std::lock_guard lock(tables_mutex);

std::for_each(tables_by_id.begin(), tables_by_id.end(), [&](const auto & pair) { f(pair.second); });
}

TablePtr MockTiDB::getTableByNameInternal(const String & database_name, const String & table_name)
{
String qualified_name = database_name + "." + table_name;
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class MockTiDB : public ext::singleton<MockTiDB>

TableID newPartition(const String & database_name, const String & table_name, const String & partition_name, Timestamp tso);

void dropTable(const String & database_name, const String & table_name, bool is_drop_db);
void dropTable(Context & context, const String & database_name, const String & table_name, bool drop_regions);

void dropDB(const String & database_name);
void dropDB(Context & context, const String & database_name, bool drop_regions);

void addColumnToTable(const String & database_name, const String & table_name, const NameAndTypePair & column);

Expand All @@ -86,8 +86,6 @@ class MockTiDB : public ext::singleton<MockTiDB>

TablePtr getTableByName(const String & database_name, const String & table_name);

void traverseTables(std::function<void(TablePtr)> f);

TiDB::TableInfoPtr getTableInfoByID(TableID table_id);

TiDB::DBInfoPtr getDBInfoByID(DatabaseID db_id);
Expand All @@ -101,6 +99,7 @@ class MockTiDB : public ext::singleton<MockTiDB>
Int64 getVersion() { return version; }

private:
TablePtr dropTableInternal(Context & context, const String & database_name, const String & table_name, bool drop_regions);
TablePtr getTableByNameInternal(const String & database_name, const String & table_name);

private:
Expand Down
42 changes: 6 additions & 36 deletions dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,11 @@ void MockTiDBTable::dbgFuncDropTiDBDB(Context & context, const ASTs & args, DBGI
if (args.size() == 3)
drop_regions = typeid_cast<const ASTIdentifier &>(*args[1]).name == "true";

std::vector<String> table_names;
MockTiDB::instance().traverseTables([&](MockTiDB::TablePtr table) {
if (table->table_info.db_name == database_name)
table_names.push_back(table->table_info.name);
});
for (auto table_name : table_names)
dbgFuncDropTiDBTableImpl(context, database_name, table_name, drop_regions, true, output);
MockTiDB::instance().dropDB(database_name);
MockTiDB::instance().dropDB(context, database_name, drop_regions);

std::stringstream ss;
ss << "dropped db #" << database_name;
output(ss.str());
}

void MockTiDBTable::dbgFuncDropTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output)
Expand All @@ -140,12 +137,7 @@ void MockTiDBTable::dbgFuncDropTiDBTable(Context & context, const ASTs & args, D
bool drop_regions = true;
if (args.size() == 3)
drop_regions = typeid_cast<const ASTIdentifier &>(*args[1]).name == "true";
dbgFuncDropTiDBTableImpl(context, database_name, table_name, drop_regions, false, output);
}

void MockTiDBTable::dbgFuncDropTiDBTableImpl(
Context & context, String database_name, String table_name, bool drop_regions, bool is_drop_db, DBGInvoker::Printer output)
{
MockTiDB::TablePtr table = nullptr;
TableID table_id = InvalidTableID;
try
Expand All @@ -161,29 +153,7 @@ void MockTiDBTable::dbgFuncDropTiDBTableImpl(
return;
}

TMTContext & tmt = context.getTMTContext();
auto & kvstore = tmt.getKVStore();
auto & region_table = tmt.getRegionTable();

if (table->isPartitionTable() && drop_regions)
{
auto partition_ids = table->getPartitionIDs();
std::for_each(partition_ids.begin(), partition_ids.end(), [&](TableID partition_id) {
for (auto & e : region_table.getRegionsByTable(partition_id))
kvstore->removeRegion(e.first, &region_table);

region_table.mockDropRegionsInTable(partition_id);
});
}

if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(table_id))
kvstore->removeRegion(e.first, &region_table);
region_table.mockDropRegionsInTable(table_id);
}

MockTiDB::instance().dropTable(database_name, table_name, is_drop_db);
MockTiDB::instance().dropTable(context, database_name, table_name, drop_regions);

std::stringstream ss;
ss << "dropped table #" << table_id;
Expand Down
30 changes: 22 additions & 8 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,13 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names)
}


void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 schema_version)
void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 query_schema_version)
{
String qualified_name = database_name + "." + table_name;

/// Get current schema version in schema syncer for a chance to shortcut.
auto global_schema_version = context.getTMTContext().getSchemaSyncer()->getCurrentVersion();

/// Lambda for get storage, then align schema version under the read lock.
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<StoragePtr, TableStructureReadLockPtr, Int64, bool> {
/// Get storage in case it's dropped then re-created.
Expand All @@ -214,10 +217,15 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d

/// Check schema version.
auto storage_schema_version = merge_tree->getTableInfo().schema_version;
if (storage_schema_version > schema_version)
throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);

if ((schema_synced && storage_schema_version <= schema_version) || (!schema_synced && storage_schema_version == schema_version))
// Not allow storage schema version greater than query schema version in any case.
if (storage_schema_version > query_schema_version)
throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);

// If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version.
// If schema not synced, we are good if storage schema version is right on query schema version.
// Otherwise we are at the risk of out-of-date schema, but we still have a chance to be sure that we are good, if global schema version is greater than query schema version.
if ((schema_synced && storage_schema_version <= query_schema_version)
|| (!schema_synced && (storage_schema_version == query_schema_version || global_schema_version > query_schema_version)))
return std::make_tuple(storage_, lock, storage_schema_version, true);

return std::make_tuple(nullptr, nullptr, storage_schema_version, false);
Expand All @@ -227,12 +235,18 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
StoragePtr storage_;
TableStructureReadLockPtr lock;
Int64 storage_schema_version;
auto log_schema_version = [&](const String & result) {
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema " << result
<< " Schema version [storage, global, query]: "
<< "[" << storage_schema_version << ", " << global_schema_version << ", " << query_schema_version
<< "].");
};
bool ok;
{
std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(false);
if (ok)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK, no syncing required.");
log_schema_version("OK, no syncing required.");
storage = storage_;
table_lock = lock;
return;
Expand All @@ -241,7 +255,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d

/// If first try failed, sync schema and try again.
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", not OK, syncing schemas.");
log_schema_version("not OK, syncing schemas.");
auto start_time = Clock::now();
context.getTMTContext().getSchemaSyncer()->syncSchemas(context);
auto schema_sync_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
Expand All @@ -250,7 +264,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(true);
if (ok)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK after syncing.");
log_schema_version("OK after syncing.");
storage = storage_;
table_lock = lock;
return;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class KVStore final : private boost::noncopyable
void updateRegionTableBySnapshot(RegionTable & region_table);

private:
friend class MockTiDB;
friend struct MockTiDBTable;
void removeRegion(const RegionID region_id, RegionTable * region_table);

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class RegionTable : private boost::noncopyable
void flushRegion(TableID table_id, RegionID partition_id, size_t & cache_size, const bool try_persist = true);

// For debug
friend class MockTiDB;
friend struct MockTiDBTable;

void mockDropRegionsInTable(TableID table_id);
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Transaction/SchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ class SchemaSyncer
public:
virtual ~SchemaSyncer() = default;

/**
* Get current version of CH schema.
*/
virtual Int64 getCurrentVersion() = 0;

/**
* Synchronize all schemas between TiDB and CH.
* @param context
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/Transaction/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ struct TiDBSchemaSyncer : public SchemaSyncer
cur_version = 0;
}

Int64 getCurrentVersion() override
{
std::lock_guard<std::mutex> lock(schema_mutex);
return cur_version;
}

bool syncSchemas(Context & context) override
{
std::lock_guard<std::mutex> lock(schema_mutex);
Expand Down