Skip to content

Commit

Permalink
[Migrations] Support for async upgrade functions
Browse files Browse the repository at this point in the history
The config updater now detects if the upgrade/downgrade functions
are regular functions or coroutines and executes them the right
way, reducing the amount of boilerplate required.

We now initialize the global variables to access MongoDB directly
from the main script of the config updater.
  • Loading branch information
odesenfans authored and hoh committed May 9, 2022
1 parent 56d97a2 commit f2193ec
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 20 deletions.
Empty file added deployment/__init__.py
Empty file.
Empty file.
38 changes: 33 additions & 5 deletions deployment/migrations/config_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
"""

import argparse
import asyncio
import importlib.util
import logging
import os
import sys
from types import ModuleType

LOGGER = logging.getLogger()
from configmanager import Config

from aleph.config import get_defaults
from aleph.model import init_db_globals

LOGGER = logging.getLogger()

SERIALIZED_KEY_FILE = "serialized-node-secret.key"

Expand Down Expand Up @@ -76,17 +81,27 @@ def setup_logging(log_level: int) -> None:
)


def init_config(config_file: str) -> Config:
config = Config(schema=get_defaults())
config.yaml.load(config_file)
return config


def import_module_from_path(path: str) -> ModuleType:
spec = importlib.util.spec_from_file_location("migration_module", path)
migration_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(migration_module)
return migration_module


def main(args: argparse.Namespace):
async def main(args: argparse.Namespace):
log_level = logging.DEBUG if args.verbose else logging.INFO
setup_logging(log_level)

# Initialize some basic config and global variables
config = init_config(args.config)
init_db_globals(config=config)

migration_scripts_dir = os.path.join(os.path.dirname(__file__), "scripts")
migration_scripts = sorted(
f for f in os.listdir(migration_scripts_dir) if f.endswith(".py")
Expand All @@ -102,14 +117,27 @@ def main(args: argparse.Namespace):
LOGGER.info(f"%s: %s", migration_script, migration_module.__doc__)
LOGGER.info(f"Running %s for %s...", command, migration_script)
migration_func = getattr(migration_module, args.command)
migration_func(config_file=args.config, key_dir=args.key_dir, key_file=args.key_file)

LOGGER.info(f"Successfully ran %s. You can now start the Core Channel Node.", command)
kwargs = {
"config_file": args.config,
"key_dir": args.key_dir,
"key_file": args.key_file,
"config": config,
}

if asyncio.iscoroutinefunction(migration_func):
await migration_func(**kwargs)
else:
migration_func(**kwargs)

LOGGER.info(
f"Successfully ran %s. You can now start the Core Channel Node.", command
)


if __name__ == "__main__":
try:
main(cli_parse())
asyncio.run(main(cli_parse()))
except Exception as e:
LOGGER.error("%s", str(e))
sys.exit(1)
17 changes: 2 additions & 15 deletions deployment/migrations/scripts/0002-refresh-chain-pins.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,17 @@
committed on a chain. It has to be run after the introduction of PermanentPin in version 0.2.0.
"""

import asyncio
import logging
from typing import Optional

from configmanager import Config

from aleph.config import get_defaults
from aleph.model import PermanentPin, init_db
from aleph.model import PermanentPin
from aleph.model.chains import Chain

logger = logging.getLogger()


async def async_upgrade(config_file: Optional[str], **kwargs):
config = Config(schema=get_defaults())
if config_file is not None:
config.yaml.load(config_file)

init_db(config=config, ensure_indexes=False)

async def upgrade(config: Config, **kwargs):
# We measure over 5000 permanent pins on new nodes that did process all chaindata.
# We therefore use this value to estimate if a node did process all chaindata already or not.
expected_permanent_pins = 5000
Expand All @@ -37,10 +28,6 @@ async def async_upgrade(config_file: Optional[str], **kwargs):
)


def upgrade(config_file: str, **kwargs):
asyncio.run(async_upgrade(config_file=config_file, **kwargs))


def downgrade(**kwargs):
# Nothing to do, processing the chain data multiple times only adds some load on the node.
pass

0 comments on commit f2193ec

Please sign in to comment.