Skip to content

Commit

Permalink
Fix migration issues in ClickHouse clusters with multiple replicas an…
Browse files Browse the repository at this point in the history
…d shards

This commit introduces a retry mechanism to handle synchronization errors in migrations caused by outdated metadata in ClickHouse replicas. Additionally, it ensures DROP TABLE operations use SYNC. These changes address one of the critical issues in the Snuba migration system, improving stability and reliability when working with ClickHouse clusters.
  • Loading branch information
pyhp2017 committed Feb 3, 2025
1 parent 3aaac63 commit d9d7e3c
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions snuba/migrations/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ def format_sql(self) -> str:
return self.__statement


class RetryOnSyncError:
def execute(self) -> None:
for i in range(30, -1, -1): # wait at most ~30 seconds
try:
super().execute() # No error!
break
except Exception as e:
if (
i
and "Metadata on replica is not up to date with common metadata in Zookeeper"
in str(e)
):
time.sleep(1)
else:
raise


class CreateTable(SqlOperation):
"""
The create table operation takes a table name, column list and table engine.
Expand Down Expand Up @@ -212,7 +229,7 @@ def __init__(
self.table_name = table_name

def format_sql(self) -> str:
return f"DROP TABLE IF EXISTS {self.table_name};"
return f"DROP TABLE IF EXISTS {self.table_name} SYNC;"


class TruncateTable(SqlOperation):
Expand Down Expand Up @@ -282,7 +299,7 @@ def format_sql(self) -> str:
return f"ALTER TABLE {self.__table_name} REMOVE TTL;"


class AddColumn(SqlOperation):
class AddColumn(RetryOnSyncError, SqlOperation):
"""
Adds a column to a table.
Expand Down Expand Up @@ -321,7 +338,7 @@ def __repr__(self) -> str:
return f"AddColumn(storage_set={repr(self.storage_set)}, table_name={repr(self.table_name)}, column={repr(self.column)}, after={repr(self.__after)}, target={repr(self.target)})"


class DropColumn(SqlOperation):
class DropColumn(RetryOnSyncError, SqlOperation):
"""
Drops a column from a table.
Expand Down Expand Up @@ -352,7 +369,7 @@ def __repr__(self) -> str:
return f"DropColumn(storage_set={repr(self.storage_set)}, table_name={repr(self.table_name)}, column_name={repr(self.column_name)}, target={repr(self.target)})"


class ModifyColumn(SqlOperation):
class ModifyColumn(RetryOnSyncError, SqlOperation):
"""
Modify a column in a table.
Expand Down Expand Up @@ -507,7 +524,7 @@ def format_sql(self) -> str:
return f"ALTER TABLE {self.__table_name} {', '.join(statements)};"


class DropIndex(SqlOperation):
class DropIndex(RetryOnSyncError, SqlOperation):
"""
Drops an index.
"""
Expand Down Expand Up @@ -540,7 +557,7 @@ def _block_on_mutations(
super()._block_on_mutations(conn, poll_seconds, timeout_seconds)


class DropIndices(SqlOperation):
class DropIndices(RetryOnSyncError, SqlOperation):
"""
Drops many indices.
Only works with the MergeTree family of tables.
Expand Down

0 comments on commit d9d7e3c

Please sign in to comment.