Skip to content

Commit

Permalink
Refine schema sync on read logic and add test (#120)
Browse files Browse the repository at this point in the history
* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Minor fix to schema debug

* Fix alter tmt and pass tests

* Fix build fail

* Add lock for mock schema syncer

* Fix schema sync service init context

* Adjust schema tests

* Not sync if no schema change detected

* Adjust txn mock tests

* Fix default value bug

* Rename some tests

* Remove sync schema test

* Remove a lot useless code

* Refine schema sync on read, and add drop on read test
  • Loading branch information
zanmato1984 authored Jul 22, 2019
1 parent 1a6a1c4 commit 33fb39d
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 44 deletions.
30 changes: 14 additions & 16 deletions dbms/src/Debug/MockSchemaSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,24 +328,22 @@ void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table)
/// Table existing, detect schema changes and apply.
const TableInfo & orig_table_info = storage->getTableInfo();
AlterCommands alter_commands = detectSchemaChanges(table_info, orig_table_info);

std::stringstream ss;
ss << "Detected schema changes: ";
for (const auto & command : alter_commands)
{
// TODO: Other command types.
if (command.type == AlterCommand::ADD_COLUMN)
ss << "ADD COLUMN " << command.column_name << " " << command.data_type->getName() << ", ";
else if (command.type == AlterCommand::DROP_COLUMN)
ss << "DROP COLUMN " << command.column_name << ", ";
else if (command.type == AlterCommand::MODIFY_COLUMN)
ss << "MODIFY COLUMN " << command.column_name << " " << command.data_type->getName() << ", ";
}

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": " << ss.str());

if (!alter_commands.empty())
{
std::stringstream ss;
ss << "Detected schema changes: ";
for (const auto & command : alter_commands)
{
// TODO: Other command types.
if (command.type == AlterCommand::ADD_COLUMN)
ss << "ADD COLUMN " << command.column_name << " " << command.data_type->getName() << ", ";
else if (command.type == AlterCommand::DROP_COLUMN)
ss << "DROP COLUMN " << command.column_name << ", ";
else if (command.type == AlterCommand::MODIFY_COLUMN)
ss << "MODIFY COLUMN " << command.column_name << " " << command.data_type->getName() << ", ";
}
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": " << ss.str());

// Call storage alter to apply schema changes.
storage->alterForTMT(alter_commands, table_info, table->table_info.db_name, context);

Expand Down
75 changes: 49 additions & 26 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names)
{
/// Read from table function.
storage = context.getQueryContext().executeTableFunction(table_expression);
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
}
else
{
Expand All @@ -159,12 +160,17 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names)

getDatabaseAndTableNames(database_name, table_name);

storage = context.getTable(database_name, table_name);
if (settings.schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION)
{
storage = context.getTable(database_name, table_name);
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
}
else
{
getAndLockStorageWithSchemaVersion(database_name, table_name, settings.schema_version);
}
}

if (storage)
table_lock = alignStorageSchemaAndLock(settings.schema_version);

query_analyzer = std::make_unique<ExpressionAnalyzer>(
query_ptr, context, storage, source_columns, required_result_column_names, subquery_depth, !only_analyze);

Expand All @@ -187,47 +193,64 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names)
}


TableStructureReadLockPtr InterpreterSelectQuery::alignStorageSchemaAndLock(Int64 schema_version)
void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 schema_version)
{
/// Regular read lock for non-TMT or schema version unspecified.
const auto merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get());
if (schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION || !merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
return storage->lockStructure(false, __PRETTY_FUNCTION__);
String qualified_name = database_name + "." + table_name;

/// Lambda for get storage, then align schema version under the read lock.
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<StoragePtr, TableStructureReadLockPtr, Int64, bool> {
/// Get storage in case it's dropped then re-created.
// If schema synced, call getTable without try, leading to exception on table not existing.
auto storage_ = schema_synced ? context.getTable(database_name, table_name) : context.tryGetTable(database_name, table_name);
if (!storage_)
return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false);

const auto merge_tree = dynamic_cast<const StorageMergeTree *>(storage_.get());
if (!merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
throw Exception("Specifying schema_version for non-TMT storage: " + storage->getName() + ", table: " + qualified_name + " is not allowed", ErrorCodes::LOGICAL_ERROR);

/// Lambda for schema version check under the read lock.
auto checkSchemaVersionAndLock = [&](bool schema_synced) -> std::tuple<TableStructureReadLockPtr, Int64> {
auto lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
/// Lock storage.
auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__);

/// Check schema version.
auto storage_schema_version = merge_tree->getTableInfo().schema_version;
if (storage_schema_version > schema_version)
throw Exception("Storage schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);
throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);

if ((schema_synced && storage_schema_version <= schema_version) || (!schema_synced && storage_schema_version == schema_version))
return std::make_tuple(lock, storage_schema_version);
return std::make_tuple(storage_, lock, storage_schema_version, true);

return std::make_tuple(nullptr, storage_schema_version);
return std::make_tuple(nullptr, nullptr, storage_schema_version, false);
};

/// Try check and lock once.
/// Try get storage and lock once.
StoragePtr storage_;
TableStructureReadLockPtr lock;
Int64 storage_schema_version;
bool ok;
{
auto [lock, storage_schema_version] = checkSchemaVersionAndLock(false);
if (lock)
std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(false);
if (ok)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check OK, no syncing required.");
return lock;
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK, no syncing required.");
storage = storage_;
table_lock = lock;
return;
}
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check not OK.");
}

/// If first try failed, sync schema and check again.
/// If first try failed, sync schema and try again.
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", not OK, syncing schemas.");
context.getTMTContext().getSchemaSyncer()->syncSchemas(context);

auto [lock, storage_schema_version] = checkSchemaVersionAndLock(true);
if (lock)
std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(true);
if (ok)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check OK after syncing.");
return lock;
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK after syncing.");
storage = storage_;
table_lock = lock;
return;
}

throw Exception("Shouldn't reach here", ErrorCodes::UNKNOWN_EXCEPTION);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/Transaction/Types.h>


namespace Poco { class Logger; }
Expand Down Expand Up @@ -111,7 +112,7 @@ class InterpreterSelectQuery : public IInterpreter

void init(const Names & required_result_column_names);

TableStructureReadLockPtr alignStorageSchemaAndLock(Int64 schema_version);
void getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 schema_version);

void executeImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct Settings

#define APPLY_FOR_SETTINGS(M) \
M(SettingString, regions, "", "the region need to be read.") \
M(SettingBool, resolve_locks, false, "tmt read tso.") \
M(SettingBool, resolve_locks, false, "tmt resolve locks.") \
M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \
M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
Expand Down
26 changes: 26 additions & 0 deletions tests/mutable-test/txn_schema/drop_on_read.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
=> DBGInvoke __enable_schema_sync_service('false')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)
=> DBGInvoke __mock_schema_syncer('true')

=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __drop_tidb_table(default, test, 'false')
=> select * from default.test
=> select * from default.test " --schema_version "100
Received exception from server (version {#WORD}):
Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist..

=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)')
=> select * from default.test
Received exception from server (version {#WORD}):
Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist..
=> select * from default.test " --schema_version "100

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
=> DBGInvoke __enable_schema_sync_service('true')

0 comments on commit 33fb39d

Please sign in to comment.