Skip to content

Commit

Permalink
Merge pull request #2975 from globaldothealth/ingestion_deltas
Browse files Browse the repository at this point in the history
Ingestion deltas for non-UUID sources
  • Loading branch information
abhidg committed Mar 21, 2023
2 parents 26e5cf0 + 2b63db2 commit dda3640
Show file tree
Hide file tree
Showing 28 changed files with 1,519 additions and 311 deletions.
35 changes: 29 additions & 6 deletions data-serving/scripts/prune-uploads/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# prune uploads
# Prune uploads

Script: `prune_upload.py`

This is a script that should be run periodically that sets every upload older
than the last acceptable upload to *list = false* (for non-UUID sources). For
Expand Down Expand Up @@ -64,11 +66,6 @@ Remove the -n to actually prune uploads.

* **-n**, **--dry-run**: Dry run, do not change the database

* **-d**, **--allow-decrease**: Allow cases to decrease in ingestion for non-UUID
uploads. By default, non-UUID uploads are only accepted if they increase the number
of cases. This option has no effect on UUID source uploads, which are always
ingested.

* **-r**, **--run-hooks**=*hook1*[,*hook2*]: Runs hooks after prune finishes. Specify
*all* to run all hooks configured to run.

Expand Down Expand Up @@ -105,3 +102,29 @@ an example: US would map to `exporter_united_states`). These job definitions
export country-specific data to an S3 bucket. This hook submits jobs to the
`export-queue` jom queue, triggering the country export process for the sources
it just ingested.

# Prune database

Script: `prune_db.py`

***ENSURE INGESTION PROCESSES ARE SWITCHED OFF BEFORE RUNNING THIS SCRIPT***

This script is designed to be run manually to prune the database of all cases
that are unprocessed, or have been marked for removal (i.e. where `list=False`
or `list` does not exist as a field in the record). It is
intended to be run infrequently when a database clean-up is required, such
as after a prolonged period of failed ingestions (this can lead to uncontrolled
growth in the number of records).

Note that it is not strictly necessary to run this script, as any
successful ingestion will trigger pruning of the source/country-specific records
anyway, but the ingestion process itself may be running too slowly by that time
to complete within a reasonable timeframe, and this script will clean-up across
ALL sources.

To dry-run, setup the Python virtual environment and ensure that the database
connection (`CONN`) has been specified, as above, then run `python3 prune_db.py`
with no arguments (always do this first, it will provide statistics on how many
records are in the database, how many records match deletion criteria, and how
long these queries took to run). To commit changes, run the script with the
`--live` command line argument: `python3 prune_db.py --live`.
74 changes: 74 additions & 0 deletions data-serving/scripts/prune-uploads/prune_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
import sys
import logging
import pymongo
import argparse
from time import time
from logger import setup_logger


def prune_db():
# parse CLI arguments
parser = argparse.ArgumentParser(
description="Deletes cases marked list=False."
)
parser.add_argument("--live", help="Live action", action="store_true")
args = parser.parse_args()

# make database connection
try:
if (CONN := os.environ.get("CONN")) is None:
logging.error("Specify MongoDB connection_string in CONN")
sys.exit(1)
client = pymongo.MongoClient(CONN)
db = client[os.environ.get("DB", "covid19")]
logging.info("Database connection ok")
except pymongo.errors.ConnectionFailure:
logging.error("Database connection fail")
sys.exit(1)

# removal criteria
criteria = {"$or": [{"list": False}, {"list": {"$exists": False}}]}

# report attempted database query
logging.info(f"Removal criteria is set to {criteria=}")
logging.info(f"Database is set to {CONN}")
logging.info(f"Flag is set to {args.live=}")

# get removal statistics
ts = time()
count_total = db.cases.estimated_document_count()
logging.info(f"Got case count: {count_total} ({(time()-ts):.3f} secs)")

ts = time()
count_listFalse = db.cases.count_documents(criteria)
logging.info(f"Got removal count: {count_listFalse} ({(time()-ts):.3f} secs)")
prc = 100 * count_listFalse / count_total

# perform records removal
if args.live:
logging.info("(LIVE) Sending delete query...")
logging.info(f"Removing {count_listFalse} of {count_total} records ({prc:.1f}%)")
try:
pass
ts = time()
response = db.cases.delete_many(criteria)
logging.info("(LIVE) Query completed (acknowledged: "
f"{response.acknowledged}, "
f"deleted_count={response.deleted_count}) ({(time()-ts):.3f} secs)")
ts = time()
updated_count_total = db.cases.count_documents({})
logging.info(f"(LIVE) Updated case count: {updated_count_total} ({(time()-ts):.3f} secs)")
except pymongo.errors.PyMongoError as e:
logging.info(f"(LIVE) Database ERROR: {e}")
else:
logging.info("(DRY-RUN) - No changes will be made")
logging.info(f"Removing {count_listFalse} of {count_total} records ({prc:.1f}%)")
logging.info("(DRY-RUN) - No changes made")


if __name__ == "__main__":
setup_logger()
logging.info("Starting database pruning...")
prune_db()
logging.info("Work complete")
Loading

0 comments on commit dda3640

Please sign in to comment.