Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2901 fix country export #2903

Merged
merged 6 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/export-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Exporter code deploy

on:
push:
branches: ['*-stable']
branches: [main]
paths:
- '.github/workflows/export-deploy.yml'
- 'data-serving/scripts/export-data/**'
Expand Down
4 changes: 2 additions & 2 deletions data-serving/scripts/export-data/country_export.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ trap 'rm -rf "$SCRATCH"' EXIT # Cleanup before exit
FORMAT="${FORMAT:-csv,tsv,json}"
QUERY="{\"list\": true, \"location.country\": \"$COUNTRY\"}"

mongoexport --uri="$CONN" --collection=agebuckets --type=json > "${BUCKETS}"
mongoexport --uri="$CONN" --collection=ageBuckets --type=json --jsonArray -o "${BUCKETS}"
mongoexport --query="$QUERY" --uri="$CONN" --collection=cases \
--fieldFile=fields.txt --type=csv | python3 transform.py -f "$FORMAT" -b "${BUCKETS}" "$COUNTRY"

# ignore shellcheck warning on word splitting, as it's actually needed here
# shellcheck disable=SC2086
for fmt in ${FORMAT//,/ }
do
test -f "${COUNTRY}.${fmt}.gz" && aws s3 cp "${COUNTRY}.${fmt}.gz" "s3://${BUCKET}/${fmt}/"
test -f "${COUNTRY}.${fmt}.gz" && aws s3 cp "${COUNTRY}.${fmt}.gz" "s3://${BUCKET}/${fmt}/"
done
2 changes: 2 additions & 0 deletions data-serving/scripts/export-data/fields.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ caseReference.sourceId
caseReference.sourceUrl
caseReference.uploadIds
caseReference.verificationStatus
demographics.ageRange.start
demographics.ageRange.end
demographics.ageBuckets
demographics.ethnicity
demographics.gender
Expand Down
23 changes: 1 addition & 22 deletions data-serving/scripts/export-data/test_age_buckets.json
Original file line number Diff line number Diff line change
@@ -1,22 +1 @@
[
{
"_id": "001",
"start": 0,
"end": 0
},
{
"_id": "002",
"start": 1,
"end": 5
},
{
"_id": "003",
"start": 6,
"end": 9
},
{
"_id": "004",
"start": 10,
"end": 14
}
]
[{"_id":{"$oid":"6267c0c29708d9612e027301"},"start":0,"end":0},{"_id":{"$oid":"6267c0c29708d9612e027302"},"start":1,"end":5},{"_id":{"$oid":"6267c0c29708d9612e027303"},"start":6,"end":10},{"_id":{"$oid":"6267c0c39708d9612e027304"},"start":11,"end":15},{"_id":{"$oid":"6267c0c39708d9612e027305"},"start":16,"end":20},{"_id":{"$oid":"6267c0c39708d9612e027306"},"start":21,"end":25},{"_id":{"$oid":"6267c0c39708d9612e027307"},"start":26,"end":30},{"_id":{"$oid":"6267c0c39708d9612e027308"},"start":31,"end":35},{"_id":{"$oid":"6267c0c39708d9612e027309"},"start":36,"end":40},{"_id":{"$oid":"6267c0c39708d9612e02730a"},"start":41,"end":45},{"_id":{"$oid":"6267c0c39708d9612e02730b"},"start":46,"end":50},{"_id":{"$oid":"6267c0c39708d9612e02730c"},"start":51,"end":55},{"_id":{"$oid":"6267c0c39708d9612e02730d"},"start":56,"end":60},{"_id":{"$oid":"6267c0c39708d9612e02730e"},"start":61,"end":65},{"_id":{"$oid":"6267c0c49708d9612e02730f"},"start":66,"end":70},{"_id":{"$oid":"6267c0c49708d9612e027310"},"start":71,"end":75},{"_id":{"$oid":"6267c0c49708d9612e027311"},"start":76,"end":80},{"_id":{"$oid":"6267c0c49708d9612e027312"},"start":81,"end":85},{"_id":{"$oid":"6267c0c49708d9612e027313"},"start":86,"end":90},{"_id":{"$oid":"6267c0c49708d9612e027314"},"start":91,"end":95},{"_id":{"$oid":"6267c0c49708d9612e027315"},"start":96,"end":100},{"_id":{"$oid":"6267c0c49708d9612e027316"},"start":101,"end":105},{"_id":{"$oid":"6267c0c49708d9612e027317"},"start":106,"end":110},{"_id":{"$oid":"6267c0c49708d9612e027318"},"start":111,"end":115},{"_id":{"$oid":"6267c0c49708d9612e027319"},"start":116,"end":120}]
7 changes: 7 additions & 0 deletions data-serving/scripts/export-data/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,10 @@ def test_age_bucket_row_conversion():
converted_row = T.convert_row(row, _BUCKETS)
assert converted_row["demographics.ageRange.start"] == 20
assert converted_row["demographics.ageRange.end"] == 24


@pytest.mark.parametrize("source,expected", [((22, 22),(21, 25)), ((58, 62),(56, 65)), ((130, 150), None)])
def test_get_age_bucket_as_range(source,expected):
with Path(__file__).with_name("test_age_buckets.json").open() as fp:
AGE_BUCKETS = json.load(fp)
assert T.get_age_bucket_as_range(AGE_BUCKETS, *source) == expected
53 changes: 52 additions & 1 deletion data-serving/scripts/export-data/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from logger import setup_logger

MINIMUM_AGE_WINDOW = 5

VALID_FORMATS = ["csv", "tsv", "json"]

Expand Down Expand Up @@ -209,6 +210,22 @@ def get_headers_and_fields(fileobject) -> list[str]:
fields = sorted(list(fields - set(__OMIT)), key=str.casefold)
return headers, fields

def get_age_bucket_as_range(buckets: list[dict[str]], age_start: int, age_end: int) -> Optional[tuple[int, int]]:
"Returns age bucket from demographics.ageRange.start and demographics.ageRange.end"

def which_bucket(age: int) -> int:
for i, bucket in enumerate(buckets):
if bucket["start"] <= age <= bucket["end"]:
return i
return -1

index_bucket_start = which_bucket(int(age_start))
index_bucket_end = which_bucket(int(age_end))
if index_bucket_start < 0 or index_bucket_end < 0:
return None
bounds = [buckets[index_bucket_start]["start"], buckets[index_bucket_end]["start"],
buckets[index_bucket_start]["end"], buckets[index_bucket_end]["end"]]
return (min(bounds), max(bounds))

def age_range(case_buckets: str, buckets: [dict[str, Any]]) -> (int, int):
bucket_ids = json.loads(case_buckets)
Expand All @@ -219,6 +236,26 @@ def age_range(case_buckets: str, buckets: [dict[str, Any]]) -> (int, int):


def convert_row(row: dict[str, Any], buckets: [dict[str, Any]]) -> Optional[dict[str, Any]]:
"""
Converts row for export using age buckets

To handle the transition from ageRange to ageBuckets, transform()
support both, with a preference to ageBuckets if that field exists.

There are three scenarios that are handled:

1. demographics.ageBuckets is present. This is the preferred option and
the code sets the minimum and maximum of the buckets present as the age
range

2. demographics.ageRange is present and the age window (difference
between the maximum and minimum is at least MINIMUM_AGE_WINDOW. In this
case, this is passed unchanged for export.

3. demographics.ageRange has a age window below MINIMUM_AGE_WINDOW. In
this case the code matches it to the nearest age bucket(s).

"""
if "ObjectId" not in row["_id"]:
return None
for arr_field in __ARRAYS:
Expand All @@ -236,8 +273,22 @@ def convert_row(row: dict[str, Any], buckets: [dict[str, Any]]) -> Optional[dict
if row["travelHistory.traveledPrior30Days"] == "true":
if "travelHistory.travel" in row:
row.update(convert_travel(row["travelHistory.travel"]))
if (
row.get("demographics.ageRange.start") and
(
int(row["demographics.ageRange.end"]) -
int(row["demographics.ageRange.start"]) + 1 < MINIMUM_AGE_WINDOW
) and
(bucketed_age_range := get_age_bucket_as_range(
buckets,
row["demographics.ageRange.start"],
row["demographics.ageRange.end"]
))
):
row["demographics.ageRange.start"], row["demographics.ageRange.end"] = bucketed_age_range
# prefer ageBuckets if available
if row.get("demographics.ageBuckets", None):
(row["demographics.ageRange.start"], row["demographics.ageRange.end"]) = age_range(row["demographics.ageBuckets"], buckets)
row["demographics.ageRange.start"], row["demographics.ageRange.end"] = age_range(row["demographics.ageBuckets"], buckets)
del row["demographics.ageBuckets"]
return row

Expand Down
5 changes: 3 additions & 2 deletions data-serving/scripts/prune-uploads/hooks/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def run(sources: list[dict[str, Any]], env: str, dry_run: bool = False):
)
except Exception as e:
logging.exception("Error occurred when fetching job definitions")
return
raise

if (jobdef := f"{PREFIX}-{env}") in jobdefs:
logging.info(f"Submitting aggregation job: {jobdef} ...")
Expand All @@ -38,5 +38,6 @@ def run(sources: list[dict[str, Any]], env: str, dry_run: bool = False):
logging.info(f"Successfully submitted job for {jobdef}")
except Exception as e:
logging.exception(f"Error occurred while trying to submit {jobdef}")
raise
else:
logging.info(f"Could not find job definition: {jobdef}")
raise Exception(f"Could not find job definition: {jobdef}")
3 changes: 2 additions & 1 deletion data-serving/scripts/prune-uploads/hooks/country_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def run(sources: list[dict[str, Any]], env: str, dry_run: bool = False):
jobdefs = set.union(*(get_exporters(s, env) for s in sources))
all_exporters = list_exporters(env)
if unknown_exporters := jobdefs - all_exporters:
logging.warning(f"Ignoring unknown exporters {unknown_exporters}")
raise Exception(f"Missing exporters {unknown_exporters}")
for jobdef in jobdefs & all_exporters:
try:
logging.info(f"Submitting job for {jobdef} ...")
Expand All @@ -92,3 +92,4 @@ def run(sources: list[dict[str, Any]], env: str, dry_run: bool = False):
)
except Exception as e:
logging.exception(f"Error occurred while trying to submit {jobdef}")
raise
27 changes: 19 additions & 8 deletions data-serving/scripts/prune-uploads/prune_uploads.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,20 @@


HOOKS = ["country_export", "aggregate"]
PRUNE_UPLOADS_WEBHOOK_URL = os.getenv("PRUNE_UPLOADS_WEBHOOK_URL")


def _ids(xs):
return [str(x["_id"]) for x in xs]


def notify(string, webhook_url):
if string:
response = requests.post(webhook_url, json={"text": string})
return response.status_code == 200
def notify(string: str) -> bool:
if not PRUNE_UPLOADS_WEBHOOK_URL:
raise ValueError("Missing environment variable PRUNE_UPLOADS_WEBHOOK_URL")
if not string:
raise ValueError("notify() requires non-empty string")
response = requests.post(PRUNE_UPLOADS_WEBHOOK_URL, json={"text": string})
return response.status_code == 200


def accept_reject_msg(accept, reject, success=True, prefix=' '):
Expand Down Expand Up @@ -322,15 +326,22 @@ def get_selected_hooks(run_hooks):
if args.dry_run:
logging.info("\n".join(m))

if webhook_url := os.environ.get("PRUNE_UPLOADS_WEBHOOK_URL"):
notify("\n".join(m), webhook_url)
notify("\n".join(m))

selected_hooks = get_selected_hooks(args.run_hooks)
if not ingested_sources:
logging.info("No sources were ingested, skipping hooks.")
sys.exit(0)

if "country_export" in selected_hooks:
hooks.country_export.run(ingested_sources, env, args.dry_run)
try:
hooks.country_export.run(ingested_sources, env, args.dry_run)
except Exception as e:
logging.error(e)
notify(str(e))
if "aggregate" in selected_hooks:
hooks.aggregate.run(ingested_sources, env, args.dry_run)
try:
hooks.aggregate.run(ingested_sources, env, args.dry_run)
except Exception as e:
logging.error(e)
notify(str(e))
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// index used by mongoexport in export-data scripts

const indexes = [
{
name: 'byCountryIfListed',
key: {
'location.country': 1
list: 1,
},
},
];
module.exports = {
async up(db, client) {
await db.command({
createIndexes: 'cases',
indexes: indexes,
});
},

async down(db, client) {
await db.command({
dropIndexes: 'cases',
index: ['byCountryIfListed'],
});
},
};