Skip to content

Commit

Permalink
✨ Maintenance/add consistency scripts (#2531)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Sep 9, 2021
1 parent 6aa8d03 commit 2b49ad5
Show file tree
Hide file tree
Showing 3 changed files with 403 additions and 0 deletions.
350 changes: 350 additions & 0 deletions scripts/maintenance/check_consistency_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
#! /usr/bin/env python3

"""Script to check consistency of the file storage backend in oSparc.
1. From an osparc database, go over all projects, get the project IDs and Node IDs
2. From the same database, now get all the files listed like projectID/nodeID from 1.
3. We get a list of files that are needed for the current projects
4. connect to the S3 backend, check that these files exist
5. generate a report with: project uuid, owner, files missing in S3
"""
import asyncio
import csv
import logging
import os
import re
import subprocess
from collections import defaultdict
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Set, Tuple

import aiopg
import typer
from dateutil import parser
from tenacity import after_log, retry, stop_after_attempt, wait_random

log = logging.getLogger(__name__)


@asynccontextmanager
async def managed_docker_compose(
postgres_volume_name: str, postgres_username: str, postgres_password: str
):
typer.echo("starting up database in localhost")
compose_file = Path.cwd() / "consistency" / "docker-compose.yml"
try:
subprocess.run(
f"docker-compose --file {compose_file} up --detach",
shell=True,
check=True,
cwd=compose_file.parent,
env={**os.environ, **{"POSTGRES_DATA_VOLUME": postgres_volume_name}},
)
typer.echo(
f"database started: adminer available on http://127.0.0.1:18080/?pgsql=postgres&username={postgres_username}&db=simcoredb&ns=public"
)

@retry(
wait=wait_random(1, 3),
stop=stop_after_attempt(10),
after=after_log(log, logging.WARN),
)
async def postgres_responsive():
async with aiopg.create_pool(
f"dbname=simcoredb user={postgres_username} password={postgres_password} host=127.0.0.1"
) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")

await postgres_responsive()
yield
finally:
subprocess.run(
f"docker-compose --file {compose_file} down",
shell=True,
check=True,
cwd=compose_file.parent,
)


async def _get_projects_nodes(pool) -> Dict[str, Any]:
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"SELECT uuid, workbench, prj_owner, users.name, users.email"
' FROM "projects"'
" INNER JOIN users"
" ON projects.prj_owner = users.id"
" WHERE users.role != 'GUEST'"
)
typer.secho(
f"found {cursor.rowcount} project rows, now getting project with valid node ids..."
)
project_db_rows = await cursor.fetchall()
project_nodes = {
project_uuid: {
"nodes": list(workbench.keys()),
"owner": prj_owner,
"name": user_name,
"email": user_email,
}
for project_uuid, workbench, prj_owner, user_name, user_email in project_db_rows
if len(workbench) > 0
}
typer.echo(
f"processed {cursor.rowcount} project rows, found {len(project_nodes)} valid projects."
)
return project_nodes


async def _get_files_from_project_nodes(
pool, project_uuid: str, node_ids: List[str]
) -> Set[Tuple[str, int, datetime]]:
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
array = str([f"{project_uuid}/{n}%" for n in node_ids])
await cursor.execute(
"SELECT file_uuid, file_size, last_modified"
' FROM "file_meta_data"'
f" WHERE file_meta_data.file_uuid LIKE any (array{array}) AND location_id = '0'"
)

# here we got all the files for that project uuid/node_ids combination
file_rows = await cursor.fetchall()
return {
(file_uuid, file_size, parser.parse(last_modified or "2000-01-01"))
for file_uuid, file_size, last_modified in file_rows
}


POWER_LABELS = {0: "B", 1: "KiB", 2: "MiB", 3: "GiB"}
LABELS_POWER = {v: k for k, v in POWER_LABELS.items()}


def convert_s3_label_to_bytes(s3_size: str) -> int:
"""convert 12MiB to 12 * 1024**2"""
match = re.match(r"([0-9.]+)(\w+)", s3_size)
if match:
return int(float(match.groups()[0]) * 1024 ** LABELS_POWER[match.groups()[1]])
return -1


async def limited_gather(*tasks, max_concurrency: int):
wrapped_tasks = tasks
if max_concurrency > 0:
semaphore = asyncio.Semaphore(max_concurrency)

async def sem_task(task):
async with semaphore:
return await task

wrapped_tasks = [sem_task(t) for t in tasks]

return await asyncio.gather(*wrapped_tasks, return_exceptions=True)


async def _get_files_from_s3_backend(
s3_endpoint: str,
s3_access: str,
s3_secret: str,
s3_bucket: str,
project_uuid: str,
progress,
) -> Set[Tuple[str, int, datetime]]:
s3_file_entries = set()
try:
# TODO: this could probably run faster if we maintain the client, and run successive commands in there
command = (
f"docker run --rm "
f"--env MC_HOST_mys3='https://{s3_access}:{s3_secret}@{s3_endpoint}' "
"minio/mc "
f"ls --recursive mys3/{s3_bucket}/{project_uuid}/"
)
process = await asyncio.create_subprocess_shell(
command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
stdout, _ = await process.communicate()
decoded_stdout = stdout.decode()
if decoded_stdout != b"":
# formatted as:
# [2021-09-07 04:35:49 UTC] 1.5GiB 05e821d1-2b4b-455f-86b6-9e197545c1ad/work.tgz
# DATE_creation? size node_id/file_path.ext
list_of_files = decoded_stdout.split("\n")
for file in list_of_files:
match = re.findall(r"\[(.+)\] (\S+) (.+)", file)
if match:
last_modified, size, node_id_file = match[0]
s3_file_entries.add(
(
f"{project_uuid}/{node_id_file}",
convert_s3_label_to_bytes(size),
parser.parse(last_modified),
)
)

except subprocess.CalledProcessError:
pass

progress.update(1)
return s3_file_entries


def write_file(file_path: Path, data, fieldnames):
with file_path.open("w", newline="") as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(fieldnames)
csv_writer.writerows(data)


async def main_async(
postgres_volume_name: str,
postgres_username: str,
postgres_password: str,
s3_endpoint: str,
s3_access: str,
s3_secret: str,
s3_bucket: str,
):

# ---------------------- GET FILE ENTRIES FROM DB ---------------------------------------------------------------------
async with managed_docker_compose(
postgres_volume_name, postgres_username, postgres_password
):
# now the database is up, let's get all the projects owned by a non-GUEST account
async with aiopg.create_pool(
f"dbname=simcoredb user={postgres_username} password={postgres_password} host=127.0.0.1"
) as pool:
project_nodes = await _get_projects_nodes(pool)
# Rationale: the project database does not contain all the files in a node (logs, state files are missing)
# Therefore, we will list here all the files that are registered in the file_meta_data table using the same projectid/nodeid
all_sets_of_file_entries = await asyncio.gather(
*[
_get_files_from_project_nodes(pool, project_uuid, prj_data["nodes"])
for project_uuid, prj_data in project_nodes.items()
]
)
db_file_entries: Set[Tuple[str, int, datetime]] = set().union(
*all_sets_of_file_entries
)
db_file_entries_path = Path.cwd() / "db_file_entries.csv"
write_file(
db_file_entries_path, db_file_entries, ["file_uuid", "size", "last modified"]
)
typer.secho(
f"processed {len(project_nodes)} projects, found {len(db_file_entries)} file entries, saved in {db_file_entries_path}",
fg=typer.colors.YELLOW,
)

# ---------------------- GET FILE ENTRIES FROM S3 ---------------------------------------------------------------------
# let's proceed with S3 backend: files are saved in BUCKET_NAME/projectID/nodeID/fileA.ext
# Rationale: Similarly we list here all the files in each of the projects. And it goes faster to list them recursively.
typer.echo(
f"now connecting with S3 backend and getting files for {len(project_nodes)} projects..."
)
# pull first: prevents _get_files_from_s3_backend from pulling it and poluting outputs
subprocess.run("docker pull minio/mc", shell=True, check=True)
with typer.progressbar(length=len(project_nodes)) as progress:
all_sets_in_s3 = await limited_gather(
*[
_get_files_from_s3_backend(
s3_endpoint, s3_access, s3_secret, s3_bucket, project_uuid, progress
)
for project_uuid in project_nodes
],
max_concurrency=20,
)
s3_file_entries = set().union(*all_sets_in_s3)
s3_file_entries_path = Path.cwd() / "s3_file_entries.csv"
write_file(
s3_file_entries_path,
s3_file_entries,
fieldnames=["file_uuid", "size", "last_modified"],
)
typer.echo(
f"processed {len(project_nodes)} projects, found {len(s3_file_entries)} file entries, saved in {s3_file_entries_path}"
)

# ---------------------- COMPARISON ---------------------------------------------------------------------
db_file_uuids = {db_file_uuid for db_file_uuid, _, _ in db_file_entries}
s3_file_uuids = {s3_file_uuid for s3_file_uuid, _, _ in s3_file_entries}
common_files = db_file_uuids.intersection(s3_file_uuids)
s3_missing_files_uuids = db_file_uuids.difference(s3_file_entries)
db_missing_files_uuids = s3_file_uuids.difference(db_file_uuids)
typer.secho(
f"{len(common_files)} files are the same in both system", fg=typer.colors.BLUE
)
typer.secho(
f"{len(s3_missing_files_uuids)} files are missing in S3", fg=typer.colors.RED
)
typer.secho(
f"{len(db_missing_files_uuids)} files are missing in DB", fg=typer.colors.RED
)

# ------------------ WRITING REPORT --------------------------------------------
s3_missing_files_path = Path.cwd() / "s3_missing_files.csv"
db_missing_files_path = Path.cwd() / "db_missing_files.csv"
db_file_map: Dict[str, Tuple[int, datetime]] = {
e[0]: e[1:] for e in db_file_entries
}

def order_by_owner(
list_of_files_uuids: Set[str],
) -> Dict[Tuple[str, str, str], List[Tuple[str, int, datetime]]]:
files_by_owner = defaultdict(list)
for file_uuid in list_of_files_uuids:
# project_id/node_id/file
prj_uuid = file_uuid.split("/")[0]
prj_data = project_nodes[prj_uuid]
files_by_owner[
(
prj_data["owner"],
prj_data["name"],
prj_data["email"],
)
].append(file_uuid)
return files_by_owner

def write_to_file(path: Path, files_by_owner):
with path.open("wt") as fp:
fp.write(f"owner,name,email,file,size,last_modified\n")
for (owner, name, email), files in files_by_owner.items():
for file in files:
size, modified = db_file_map.get(file, ("?", "?"))
fp.write(f"{owner},{name},{email},{file}, {size}, {modified}\n")

write_to_file(s3_missing_files_path, order_by_owner(s3_missing_files_uuids))
write_to_file(db_missing_files_path, order_by_owner(db_missing_files_uuids))


def main(
postgres_volume_name: str,
postgres_username: str,
postgres_password: str,
s3_endpoint: str,
s3_access: str,
s3_secret: str,
s3_bucket: str,
):
asyncio.run(
main_async(
postgres_volume_name,
postgres_username,
postgres_password,
s3_endpoint,
s3_access,
s3_secret,
s3_bucket,
)
)


if __name__ == "__main__":
typer.run(main)
Loading

0 comments on commit 2b49ad5

Please sign in to comment.