Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fix unstable test "fullstack-test/mpp/rollup_tpcds.test" (#9628) #9639

Open
wants to merge 3 commits into
base: release-8.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 105 additions & 64 deletions dbms/src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
#include <Poco/FileStream.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageLog.h>
#include <common/logger_useful.h>

#include <boost/range/join.hpp>
#include <memory>
#include <string_view>


namespace DB
Expand All @@ -67,9 +69,13 @@ extern const char exception_between_create_database_meta_and_directory[];
}


InterpreterCreateQuery::InterpreterCreateQuery(const ASTPtr & query_ptr_, Context & context_)
InterpreterCreateQuery::InterpreterCreateQuery(
const ASTPtr & query_ptr_,
Context & context_,
std::string_view log_suffix_)
: query_ptr(query_ptr_)
, context(context_)
, log_suffix(log_suffix_)
{}


Expand Down Expand Up @@ -447,7 +453,6 @@ ColumnsDescription InterpreterCreateQuery::setColumns(
return res;
}


void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.storage)
Expand Down Expand Up @@ -488,6 +493,89 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
}


/**
* Try to acquire a DDLGuard to execute the "CREATE TABLE" actions.
*
* Return the gurad if this thread become the owner to execute "CREATE TABLE".
* If the thread does not is the owner to execute "CREATE TABLE".
* - If the table already exists, and the request specifies IF NOT EXISTS,
* then we allow concurrent CREATE queries (which do nothing).
* - Otherwise, concurrent queries for creating a table, if the table does not exist,
* wait for `timeout_seconds` at max to check whether the table creation is completly
* created. If the table has been created within timeout, then do nothing and return.
* If timeout happen at last, throw an exception.
*/
std::unique_ptr<DDLGuard> tryGetDDLGuard(
Context & context,
const String & database_name,
const String & table_name,
bool create_if_not_exists,
size_t timeout_seconds,
std::string_view log_suffix)
{
constexpr int wait_useconds = 50'000;
const size_t max_retries = timeout_seconds * 1'000'000 / wait_useconds;
try
{
auto guard = context.getDDLGuardIfTableDoesntExist(
database_name,
table_name,
"Table " + database_name + "." + table_name + " is creating or attaching right now");

if (!guard)
{
if (create_if_not_exists)
return {}; // return a null guard
else
throw Exception(
"Table " + database_name + "." + table_name + " already exists.",
ErrorCodes::TABLE_ALREADY_EXISTS);
}
return guard;
}
catch (Exception & e)
{
// Concurrent queries for creating the same table may run into this branch.
// We have to wait for the table created completely, then return to use the table.
// Thus, we choose to do a retry here to wait the table created completed.
if (e.code() == ErrorCodes::TABLE_ALREADY_EXISTS || e.code() == ErrorCodes::DDL_GUARD_IS_ACTIVE)
{
auto log = Logger::get(log_suffix);
LOG_WARNING(log, "Concurrent create table happens, error_code={} error_msg={}", e.code(), e.message());
for (size_t i = 0; i < max_retries; ++i)
{
// Once we can get the table from `context`, consider the table create has been "completed"
// and return a null guard
if (context.isTableExist(database_name, table_name))
return {};

// sleep a while and retry
LOG_WARNING(
log,
"Waiting for the completion of concurrent table creation action"
", sleep for {} ms and try again",
wait_useconds / 1000);
usleep(wait_useconds);
}

// timeout, throw an exception
LOG_ERROR(
log,
"still failed to wait for the completion of concurrent table creation in InterpreterCreateQuery, "
"max_retries={} stack_info={}",
max_retries,
e.getStackTrace().toString());
e.rethrow();
}
else
{
e.addMessage(std::string(log_suffix));
e.rethrow();
}
}
return {}; // not reachable
}

BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
String path = context.getPath();
Expand Down Expand Up @@ -534,8 +622,6 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
/// Set the table engine if it was not specified explicitly.
setEngine(create);

StoragePtr res;

{
std::unique_ptr<DDLGuard> guard;

Expand All @@ -547,72 +633,25 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
database = context.getDatabase(database_name);
data_path = database->getDataPath();

/** If the table already exists, and the request specifies IF NOT EXISTS,
* then we allow concurrent CREATE queries (which do nothing).
* Otherwise, concurrent queries for creating a table, if the table does not exist,
* can throw an exception, even if IF NOT EXISTS is specified.
*/
try
guard = tryGetDDLGuard(
context,
database_name,
table_name,
create.if_not_exists,
/*timeout_seconds=*/5,
log_suffix);
if (!guard)
{
guard = context.getDDLGuardIfTableDoesntExist(
database_name,
table_name,
"Table " + database_name + "." + table_name + " is creating or attaching right now");

if (!guard)
{
if (create.if_not_exists)
return {};
else
throw Exception(
"Table " + database_name + "." + table_name + " already exists.",
ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
catch (Exception & e)
{
// Due to even if it throws this two error code, it can't ensure the table is completely created
// So we have to wait for the table created completely, then return to use the table.
// Thus, we choose to do a retry here to wait the table created completed.
if (e.code() == ErrorCodes::TABLE_ALREADY_EXISTS || e.code() == ErrorCodes::DDL_GUARD_IS_ACTIVE)
{
auto log = Logger::get(fmt::format("InterpreterCreateQuery {} {}", database_name, table_name));
LOG_WARNING(
log,
"createTable failed with error code is {}, error info is {}, stack_info is {}",
e.code(),
e.displayText(),
e.getStackTrace().toString());
const size_t max_retry = 50;
const int wait_useconds = 20000;
for (size_t i = 0; i < max_retry; i++) // retry
{
if (context.isTableExist(database_name, table_name))
return {};

// sleep a while and retry
LOG_ERROR(
log,
"createTable failed but table not exist now, \nWe will sleep for {} ms and try again.",
wait_useconds / 1000);
usleep(wait_useconds); // sleep 20ms
}
LOG_ERROR(
log,
"still failed to createTable in InterpreterCreateQuery for retry {} times",
max_retry);
e.rethrow();
}
else
{
e.rethrow();
}
// Not the owner to create IStorage instance, and the table is created
// completely, let's return
return {};
}
}
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return {};

res = StorageFactory::instance().get(
// Guard is acquired, let's create the IStorage instance
StoragePtr res = StorageFactory::instance().get(
create,
data_path,
table_name,
Expand Down Expand Up @@ -642,6 +681,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)

if (!create.is_temporary)
database->attachTable(table_name, res);

// the table has been created completely
}

/// If the query is a CREATE SELECT, insert the data into the table.
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/InterpreterCreateQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using StoragePtr = std::shared_ptr<IStorage>;
class InterpreterCreateQuery : public IInterpreter
{
public:
InterpreterCreateQuery(const ASTPtr & query_ptr_, Context & context_);
InterpreterCreateQuery(const ASTPtr & query_ptr_, Context & context_, std::string_view log_suffix_ = "");

BlockIO execute() override;

Expand Down Expand Up @@ -68,6 +68,7 @@ class InterpreterCreateQuery : public IInterpreter

ASTPtr query_ptr;
Context & context;
std::string_view log_suffix;

/// Using while loading database.
ThreadPool * thread_pool = nullptr;
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,10 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateDatabaseByInfo(const TiDB::DB

ASTPtr ast = parseCreateStatement(statement);

InterpreterCreateQuery interpreter(ast, context);
InterpreterCreateQuery interpreter(
ast,
context,
fmt::format("keyspace={} database_id={}", keyspace_id, db_info->id));
interpreter.setInternal(true);
interpreter.setForceRestoreData(false);
interpreter.execute();
Expand Down Expand Up @@ -1186,7 +1189,15 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateStorageInstance(
ast_create_query->if_not_exists = true;
ast_create_query->database = database_mapped_name;

InterpreterCreateQuery interpreter(ast, context);
InterpreterCreateQuery interpreter(
ast,
context,
fmt::format(
"keyspace={} database_id={} table_id={} action={}",
keyspace_id,
database_id,
table_info->id,
action));
interpreter.setInternal(true);
interpreter.setForceRestoreData(false);
interpreter.execute();
Expand Down