Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicate the age bucket conversion behaviour in the export script #2670 #2681

Merged
merged 2 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-serving/scripts/export-data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ during the build phase:
You will need write access to S3. Alternatively, you can setup localstack to
have a mock S3 environment. To run a hypothetical export for Antarctica:

docker run -e 'COUNTRY=Antarctica' -e 'CONN=xx' country-export
docker run -e 'COUNTRY=Antarctica' -e 'CONN=xx' -e 'BUCKET=yyy' country-export

## Setting up exports

Expand Down
8 changes: 7 additions & 1 deletion data-serving/scripts/export-data/country_export.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ require_env "${CONN:-}" "Specify MongoDB connection string in CONN"
require_env "${BUCKET:-}" "Specify S3 bucket to output files in BUCKET"
require_env "${COUNTRY:-}" "Specify which country code to export in COUNTRY"

SCRATCH="$(mktemp -d)"
BUCKETS="${SCRATCH}/buckets.json"
trap 'rm -rf "$SCRATCH"' EXIT # Cleanup before exit

FORMAT="${FORMAT:-csv,tsv,json}"
QUERY="{\"list\": true, \"location.country\": \"$COUNTRY\"}"

mongoexport --uri="$CONN" --collection=agebuckets --type=json > "${BUCKETS}"
mongoexport --query="$QUERY" --uri="$CONN" --collection=cases \
--fieldFile=fields.txt --type=csv | python3 transform.py -f "$FORMAT" "$COUNTRY"
--fieldFile=fields.txt --type=csv | python3 transform.py -f "$FORMAT" -b "${BUCKETS}" "$COUNTRY"

# ignore shellcheck warning on word splitting, as it's actually needed here
# shellcheck disable=SC2086
Expand Down
3 changes: 1 addition & 2 deletions data-serving/scripts/export-data/fields.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ caseReference.sourceId
caseReference.sourceUrl
caseReference.uploadIds
caseReference.verificationStatus
demographics.ageRange.end
demographics.ageRange.start
demographics.ageBuckets
demographics.ethnicity
demographics.gender
demographics.nationalities
Expand Down
22 changes: 22 additions & 0 deletions data-serving/scripts/export-data/test_age_buckets.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[
{
"_id": "001",
"start": 0,
"end": 0
},
{
"_id": "002",
"start": 1,
"end": 5
},
{
"_id": "003",
"start": 6,
"end": 9
},
{
"_id": "004",
"start": 10,
"end": 14
}
]
50 changes: 47 additions & 3 deletions data-serving/scripts/export-data/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@
"travelHistory.travel.methods": "Ship,Raft",
}

_BUCKETS = [
{
"_id": "001",
"start": 20,
"end": 24,
},
{
"_id": "002",
"start": 25,
"end": 29,
}
]


def _read_csv(fn):
with open(fn) as f:
Expand Down Expand Up @@ -107,7 +120,7 @@ def test_convert_travel():
def test_transform_output_match(fmt):
expected = Path(f'test_transform_mongoexport_expected.{fmt}').read_text()
with redirect_stdout(io.StringIO()) as f:
T.transform('test_transform_mongoexport.csv', '-', [fmt])
T.transform('test_transform_mongoexport.csv', '-', [fmt], "test_age_buckets.json")
# use str.splitlines to ignore line endings

expected_lines = expected.splitlines()
Expand All @@ -120,13 +133,44 @@ def test_transform_output_match(fmt):

def test_transform_empty(tmp_path):
output = f"{tmp_path}/empty"
T.transform('test_transform_mongoexport_header.csv', output, ['csv'])
T.transform('test_transform_mongoexport_header.csv', output, ['csv'], "test_age_buckets.json")
assert not Path(f"{output}.csv.gz").exists()


def test_transform_creates_output(tmp_path):
formats = ['csv', 'tsv', 'json']
output = f"{tmp_path}/output"
T.transform('test_transform_mongoexport.csv', output, formats)
T.transform('test_transform_mongoexport.csv', output, formats, "test_age_buckets.json")
for fmt in formats:
assert Path(f"{output}.{fmt}.gz").exists()


def test_transform_buckets_age_ranges():
expected = Path(f'test_transform_mongoexport_bucketed_ages_expected.csv').read_text()
with redirect_stdout(io.StringIO()) as f:
T.transform('test_transform_mongoexport_bucketed_ages.csv', '-', ['csv'], 'test_age_buckets.json')

expected_lines = expected.splitlines()
actual_lines = f.getvalue().splitlines()

lines_to_compare = zip(expected_lines, actual_lines)
for line_pair in lines_to_compare:
assert line_pair[0] == line_pair[1]


def test_age_bucket_conversion():
case_buckets_json = "[\"001\", \"002\"]"
(start, end) = T.age_range(case_buckets_json, _BUCKETS)
assert start == 20
assert end == 29


def test_age_bucket_row_conversion():
row = {
"_id": "ObjectId(abc123)",
"travelHistory.traveledPrior30Days": "false",
"demographics.ageBuckets": "[\"001\"]"
}
converted_row = T.convert_row(row, _BUCKETS)
assert converted_row["demographics.ageRange.start"] == 20
assert converted_row["demographics.ageRange.end"] == 24
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
_id,caseReference.additionalSources,caseReference.sourceEntryId,caseReference.sourceId,caseReference.sourceUrl,caseReference.uploadIds,caseReference.verificationStatus,demographics.ageBuckets,demographics.ethnicity,demographics.gender,demographics.nationalities,demographics.occupation,events,location.administrativeAreaLevel1,location.administrativeAreaLevel2,location.administrativeAreaLevel3,location.country,location.geoResolution,location.geometry.latitude,location.geometry.longitude,location.name,location.place,location.query,notes,pathogens,preexistingConditions.hasPreexistingConditions,preexistingConditions.values,revisionMetadata.creationMetadata.curator,revisionMetadata.creationMetadata.date,revisionMetadata.creationMetadata.notes,revisionMetadata.editMetadata.curator,revisionMetadata.editMetadata.date,revisionMetadata.editMetadata.notes,revisionMetadata.revisionNumber,SGTF,symptoms.status,symptoms.values,transmission.linkedCaseIds,transmission.places,transmission.routes,travelHistory.travel.dateRange.end,travelHistory.travel.dateRange.start,travelHistory.travel.location.name,travelHistory.travel.methods,travelHistory.travel.purpose,travelHistory.traveledPrior30Days,vaccines.0.name,vaccines.0.batch,vaccines.0.date,vaccines.0.sideEffects,vaccines.1.name,vaccines.1.batch,vaccines.1.date,vaccines.1.sideEffects,vaccines.2.name,vaccines.2.batch,vaccines.2.date,vaccines.2.sideEffects,vaccines.3.name,vaccines.3.batch,vaccines.3.date,vaccines.3.sideEffects
ObjectId(6817283abaa89324a90109aa),[],,787123878aa90909811aaff1,http://foo/bar.csv,"[""bb12399abbb19230900aa123""]",UNVERIFIED,"[""001"", ""002""]",,Male,[],,"[{""name"":""confirmed"",""dateRange"":{""start"":{""$date"":""2021-10-01T00:00:00.000Z""},""end"":{""$date"":""2021-01-01T00:00:00.000Z""}}}]",,,,Antarctica,Country,-79.402,0.323,Antarctica,,,,[],,,ingestion@example.com,2021-01-02T13:42:34.991Z,,,,,0,,,,,,,,,,,true,,,,,,,,,,,,,,,,
ObjectId(798989a98998acc98989a1bb),[],,787123878aa90909811aaff1,http://foo/bar.csv,"[""bb12399abbb19230900aa123""]",UNVERIFIED,"[""003""]",,Female,[],,"[{""name"":""confirmed"",""dateRange"":{""start"":{""$date"":""2021-01-05T00:00:00.000Z""},""end"":{""$date"":""2021-01-05T00:00:00.000Z""}}}]",,,,Antarctica,Country,-79.402,0.323,Antarctica,,,,[],,,ingestion@example.com,2021-01-02T13:42:34.991Z,,,,,0,,,,,,,,,,,true,,,,,,,,,,,,,,,,
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
_id,caseReference.additionalSources,caseReference.sourceEntryId,caseReference.sourceId,caseReference.sourceUrl,caseReference.uploadIds,caseReference.verificationStatus,demographics.ageRange.end,demographics.ageRange.start,demographics.ethnicity,demographics.gender,demographics.nationalities,demographics.occupation,events.confirmed.date,events.confirmed.value,events.firstClinicalConsultation.date,events.hospitalAdmission.date,events.hospitalAdmission.value,events.icuAdmission.date,events.icuAdmission.value,events.onsetSymptoms.date,events.outcome.date,events.outcome.value,events.selfIsolation.date,location.administrativeAreaLevel1,location.administrativeAreaLevel2,location.administrativeAreaLevel3,location.country,location.geometry.latitude,location.geometry.longitude,location.geoResolution,location.name,location.place,notes,pathogens,preexistingConditions.hasPreexistingConditions,preexistingConditions.values,revisionMetadata.creationMetadata.date,revisionMetadata.creationMetadata.notes,revisionMetadata.editMetadata.date,revisionMetadata.editMetadata.notes,revisionMetadata.revisionNumber,SGTF,symptoms.status,symptoms.values,transmission.linkedCaseIds,transmission.places,transmission.routes,travelHistory.travel.dateRange.end,travelHistory.travel.dateRange.start,travelHistory.travel.location.administrativeAreaLevel1,travelHistory.travel.location.administrativeAreaLevel2,travelHistory.travel.location.administrativeAreaLevel3,travelHistory.travel.location.country,travelHistory.travel.location.geometry.coordinates,travelHistory.travel.location.geoResolution,travelHistory.travel.location.name,travelHistory.travel.location.place,travelHistory.travel.methods,travelHistory.travel.purpose,travelHistory.traveledPrior30Days,vaccines.0.batch,vaccines.0.date,vaccines.0.name,vaccines.0.sideEffects,vaccines.1.batch,vaccines.1.date,vaccines.1.name,vaccines.1.sideEffects,vaccines.2.batch,vaccines.2.date,vaccines.2.name,vaccines.2.sideEffects,vaccines.3.batch,vaccines.3.date,vaccines.3.name,vaccines.3.sideEffects,variantOfConcern
ObjectId(6817283abaa89324a90109aa),,,787123878aa90909811aaff1,http://foo/bar.csv,bb12399abbb19230900aa123,UNVERIFIED,5,0,,Male,,,2021-01-01,,,,,,,,,,,,,,Antarctica,-79.402,0.323,Country,Antarctica,,,,,,2021-01-02T13:42:34.991Z,,,,0,NA,,,,,,,,,,,,,,,,,true,,,,,,,,,,,,,,,,,,
ObjectId(798989a98998acc98989a1bb),,,787123878aa90909811aaff1,http://foo/bar.csv,bb12399abbb19230900aa123,UNVERIFIED,9,6,,Female,,,2021-01-05,,,,,,,,,,,,,,Antarctica,-79.402,0.323,Country,Antarctica,,,,,,2021-01-02T13:42:34.991Z,,,,0,NA,,,,,,,,,,,,,,,,,true,,,,,,,,,,,,,,,,,,
59 changes: 42 additions & 17 deletions data-serving/scripts/export-data/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def get_headers_and_fields(fileobject) -> list[str]:
logging.exception("Error in reading mongoexport header")
sys.exit(1)
cols_to_add = [
"demographics.ageRange.start",
"demographics.ageRange.end",
"events.confirmed.value",
"events.confirmed.date",
"events.firstClinicalConsultation.date",
Expand All @@ -196,21 +198,33 @@ def get_headers_and_fields(fileobject) -> list[str]:
"events.outcome.value",
"events.selfIsolation.date",
]
cols_to_remove = [
"demographics.ageBuckets",
]
fields = set(headers).union(set(cols_to_add))
fields = fields.union(set(__TRAVEL + __VARIANT))
fields = fields.difference(cols_to_remove)
fields = sorted(list(fields - set(__OMIT)), key=str.casefold)
return headers, fields


def convert_row(row: dict[str, Any]) -> Optional[dict[str, Any]]:
def age_range(case_buckets: str, buckets: [dict[str, Any]]) -> (int, int):
bucket_ids = json.loads(case_buckets)
matching_buckets = [b for b in buckets if b["_id"] in bucket_ids]
min_age = min([b["start"] for b in matching_buckets])
max_age = max([b["end"] for b in matching_buckets])
return (min_age, max_age)


def convert_row(row: dict[str, Any], buckets: [dict[str, Any]]) -> Optional[dict[str, Any]]:
if "ObjectId" not in row["_id"]:
return None
if type(row["notes"]) == str:
if type(row.get("notes")) == str:
row["notes"] = row["notes"].replace("\n", ", ")
for arr_field in __ARRAYS:
if row[arr_field]:
if row.get(arr_field):
row[arr_field] = convert_string_list(row[arr_field])
if row["caseReference.additionalSources"]:
if row.get("caseReference.additionalSources"):
row["caseReference.additionalSources"] = convert_addl_sources(
row["caseReference.additionalSources"]
)
Expand All @@ -222,6 +236,9 @@ def convert_row(row: dict[str, Any]) -> Optional[dict[str, Any]]:
if row["travelHistory.traveledPrior30Days"] == "true":
if "travelHistory.travel" in row:
row.update(convert_travel(row["travelHistory.travel"]))
if row.get("demographics.ageBuckets", None):
(row["demographics.ageRange.start"], row["demographics.ageRange.end"]) = age_range(row["demographics.ageBuckets"], buckets)
del row["demographics.ageBuckets"]
return row


Expand Down Expand Up @@ -288,19 +305,21 @@ def open_writers(formats: list[str], fields: list[str], output: str):
files[fmt].close()


def transform(input: Optional[str], output: str, formats: list[str]):
def transform(input: Optional[str], output: str, formats: list[str], bucketpath: str):
with (open(input) if input else sys.stdin) as inputfile:
headers, fields = get_headers_and_fields(inputfile)
reader = csv.DictReader(inputfile, fieldnames=headers)
hasrows = False
with open_writers(formats, fields, output) as writers:
for i, row in enumerate(map(convert_row, reader)):
hasrows = True
writerow(formats, writers, row, i)
if output != "-" and not hasrows: # cleanup empty files
cleanup_files = [Path(f"{output}.{fmt}.gz") for fmt in formats]
for file in cleanup_files:
file.unlink(missing_ok=True)
with open(bucketpath) as bucketfile:
buckets = json.load(bucketfile)
headers, fields = get_headers_and_fields(inputfile)
reader = csv.DictReader(inputfile, fieldnames=headers)
hasrows = False
with open_writers(formats, fields, output) as writers:
for i, row in enumerate(map(lambda row: convert_row(row, buckets), reader)):
hasrows = True
writerow(formats, writers, row, i)
if output != "-" and not hasrows: # cleanup empty files
cleanup_files = [Path(f"{output}.{fmt}.gz") for fmt in formats]
for file in cleanup_files:
file.unlink(missing_ok=True)


if __name__ == "__main__":
Expand All @@ -318,5 +337,11 @@ def transform(input: Optional[str], output: str, formats: list[str]):
"--input",
help="Input file to transform instead of stdin"
)
parser.add_argument(
"-b",
"--buckets",
help="JSON collection of age buckets to determine case age ranges",
required=True
)
args = parser.parse_args()
transform(args.input, args.output, args.format.split(","))
transform(args.input, args.output, args.format.split(","), args.buckets)