Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix database cleanup on reindex #126

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/dipdup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class PostgresDatabaseConfig:
port: int = 5432
schema_name: str = 'public'
password: str = ''
immune_tables: Optional[List[str]] = None
immune_tables: List[str] = Field(default_factory=list)

@property
def connection_string(self) -> str:
Expand All @@ -86,8 +86,9 @@ def connection_string(self) -> str:

@validator('immune_tables')
def valid_immune_tables(cls, v):
if v and 'dipdup_state' in v:
raise ConfigurationError('`dipdup_state` table can\'t be immune')
for table in v:
if table.startswith('dipdup'):
raise ConfigurationError('Tables with `dipdup` prefix can\'t be immune')
return v


Expand Down
44 changes: 25 additions & 19 deletions src/dipdup/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any, Dict, Optional

from tortoise import Tortoise
from tortoise.transactions import in_transaction
from tortoise.transactions import get_connection

from dipdup.config import ContractConfig, DipDupConfig, IndexConfig, IndexTemplateConfig, PostgresDatabaseConfig
from dipdup.datasources.datasource import Datasource
Expand Down Expand Up @@ -46,24 +46,30 @@ async def restart(self) -> None:

async def reindex(self) -> None:
"""Drop all tables or whole database and restart with the same CLI arguments"""
if isinstance(self.config.database, PostgresDatabaseConfig):
exclude_expression = ''
if self.config.database.immune_tables:
immune_tables = [f"'{t}'" for t in self.config.database.immune_tables]
exclude_expression = f' AND tablename NOT IN ({",".join(immune_tables)})'

async with in_transaction() as conn:
await conn.execute_script(
f'''
DO $$ DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = current_schema(){exclude_expression}) LOOP
EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE';
END LOOP;
END $$;
'''
)

async def _recreate_schema(conn, name: str) -> None:
await conn.execute_script(f'DROP SCHEMA IF EXISTS {name} CASCADE')
await conn.execute_script(f'CREATE SCHEMA {name}')

async def _move_table(conn, name: str, schema: str, new_schema: str) -> None:
await conn.execute_script(f'ALTER TABLE {schema}.{name} SET SCHEMA {new_schema}')

database_config = self.config.database
if isinstance(database_config, PostgresDatabaseConfig):
conn = get_connection(None)
immune_schema_name = f'{database_config.schema_name}_immune'

if database_config.immune_tables:
await _recreate_schema(conn, immune_schema_name)

for table in database_config.immune_tables:
await _move_table(conn, table, database_config.schema_name, immune_schema_name)

await _recreate_schema(conn, database_config.schema_name)

for table in database_config.immune_tables:
await _move_table(conn, table, immune_schema_name, database_config.schema_name)

else:
await Tortoise._drop_databases()
await self.restart()
Expand Down