Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2714 batch update #2751

Merged
merged 5 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,52 @@ def excluded_case_ids(
raise ValidationError(f"cannot understand query {predicate}")
return [c._id for c in self.store.excluded_cases(source_id, predicate)]

def update_case(self, source_id: str, update: dict) -> Case:
def update_case(self, case_id: str, update: dict) -> Case:
"""Update the case document with the provided ID. Raises NotFoundError if
there is no case with that ID, or ValidationError if the case would not be
left in a valid state. If the update is successfully applied, returns the updated
form of the case."""
case = self.store.case_by_id(source_id)
diff = DocumentUpdate.from_dict(update)
updated_case = self.validate_updated_case(case_id, diff)
# tell the store to apply the update rather than replacing the whole document:
# should be more efficient given a competent DB
self.store.update_case(case_id, diff)
return updated_case

def batch_update(self, updates: List[dict]) -> int:
"""Update a collection of documents. Each dictionary in the list is a description
of an update, but it also carries the _id field to indicate which case to update.
Raises NotFoundError if any update identifies a case that isn't present, PreconditionUnsatisfiedError
if any update doesn't include an id, or ValidationError if any update leaves a case
in an inconsistent state."""

def remove_id(d: dict):
d2 = dict(d)
del d2["_id"]
return d2

try:
update_map = {
u["_id"]: DocumentUpdate.from_dict(remove_id(u)) for u in updates
}
except KeyError:
raise PreconditionUnsatisfiedError("not every update includes an _id")
for id, update in iter(update_map.items()):
self.validate_updated_case(id, update)
return self.store.batch_update(update_map)

def validate_updated_case(self, id: str, update: DocumentUpdate):
"""Find out whether updating a case would result in it being invalid.
Raises NotFoundError if the case doesn't exist, or ValidationError if
the update results in an invalid case. Returns the updated, valid case
on success."""
case = self.store.case_by_id(id)
if case is None:
raise NotFoundError(f"No case with ID {source_id}")
raise NotFoundError(f"No case with ID {id}")
# build the updated version of the case to validate
diff = DocumentUpdate.from_dict(update)
updated_case = case.updated_document(diff)
updated_case = case.updated_document(update)
updated_case.validate()
self.check_case_preconditions(updated_case)
# tell the store to apply the update rather than replacing the whole document:
# should be more efficient given a competent DB
self.store.update_case(source_id, diff)
return updated_case

def create_case_if_valid(self, maybe_case: dict):
Expand Down
10 changes: 10 additions & 0 deletions data-serving/reusable-data-service/data_service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ def batch_status_change():
return jsonify({"message": e.args[0]}), e.http_code


@app.route("/api/cases/batchUpdate", methods=["POST"])
def batch_update():
try:
req = request.get_json()
count = case_controller.batch_update(req.get("cases"))
return jsonify({"numModified": count}), 200
except WebApplicationError as e:
return jsonify({"message": e.args[0]}), e.http_code


@app.route("/api/excludedCaseIds")
def excluded_case_ids():
try:
Expand Down
14 changes: 14 additions & 0 deletions data-serving/reusable-data-service/data_service/model/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,26 @@ def apply_update(self, update: DocumentUpdate):
self._internal_set_value(key, None)

def _internal_set_value(self, key, value):
self._internal_ensure_containers_exist(key)
container, prop = self._internal_object_and_property_for_key_path(key)
# patch up the type for updates created from a JSON API
if container.field_type(prop) == datetime.date and type(value) == str:
value = datetime.date.fromisoformat(value)
setattr(container, prop, value)

def _internal_ensure_containers_exist(self, key):
if (dot_index := key.rfind(".")) == -1:
return # no nested containers
container_keys = key[:dot_index].split(".")
container = self
for component in container_keys:
next_container = getattr(container, component)
if next_container is None:
container_type = container.field_type(component)
next_container = container_type()
setattr(container, component, next_container)
container = next_container

def _internal_object_and_property_for_key_path(self, key):
if (dot_index := key.rfind(".")) == -1:
container = self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,22 @@ def excluded_cases(self, source_id: str, filter: Filter) -> List[Case]:
def update_case(self, id: str, update: DocumentUpdate):
if len(update) == 0:
return # nothing to do
# TODO convert str to ObjectId
command = self.mongodb_update_command(update)
self.get_case_collection().update_one({"_id": ObjectId(id)}, command)

def batch_update(self, updates: dict[str, DocumentUpdate]):
mongo_commands = {
ObjectId(k): self.mongodb_update_command(v)
for k, v in iter(updates.items())
}
update_ones = [
pymongo.UpdateOne({"_id": k}, v) for k, v in iter(mongo_commands.items())
]
result = self.get_case_collection().bulk_write(update_ones)
return result.modified_count

@staticmethod
def mongodb_update_command(update: DocumentUpdate):
objectify_id = (
lambda k, v: ObjectId(v)
if Case.field_type_for_key_path(k) == ObjectId
Expand All @@ -130,8 +145,7 @@ def update_case(self, id: str, update: DocumentUpdate):
command["$set"] = sets
if len(unsets) > 0:
command["$unset"] = unsets

self.get_case_collection().update_one({"_id": ObjectId(id)}, command)
return command

def matching_case_iterator(self, predicate: Filter):
"""Return an object that iterates over cases matching the predicate."""
Expand Down
56 changes: 56 additions & 0 deletions data-serving/reusable-data-service/tests/test_case_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def update_case(self, id: str, update: DocumentUpdate):
case = self.case_by_id(id)
case.apply_update(update)

def batch_update(self, updates: dict[str, DocumentUpdate]):
for id, update in iter(updates.items()):
self.update_case(id, update)
return len(updates)

def update_case_status(
self, id: str, status: str, exclusion: CaseExclusionMetadata
):
Expand Down Expand Up @@ -529,3 +534,54 @@ def test_updating_case_to_valid_state_returns_updated_case(case_controller):

new_case = case_controller.update_case("1", {"confirmationDate": date(2021, 6, 24)})
assert new_case.confirmationDate == date(2021, 6, 24)


def test_batch_update_cases_returns_number_of_modified_cases(case_controller):
for i in range(4):
_ = case_controller.create_case(
{
"confirmationDate": date(2021, 6, i + 1),
"caseReference": {"sourceId": "123ab4567890123ef4567890"},
},
)
update_one = {
"_id": "1",
"caseReference": {"status": "EXCLUDED"},
"caseExclusion": {"date": date(2022, 2, 2), "note": "Bad case no likey"},
}
update_two = {"_id": "2", "caseReference": {"status": "VERIFIED"}}
num_modified = case_controller.batch_update([update_one, update_two])
assert num_modified == 2
case_one = case_controller.get_case("1")
assert case_one.caseReference.status == "EXCLUDED"
case_two = case_controller.get_case("2")
assert case_two.caseReference.status == "VERIFIED"
case_three = case_controller.get_case("3")
assert case_three.caseReference.status == "UNVERIFIED"


def test_batch_update_raises_if_id_not_supplied(case_controller):
update = {"confirmationDate": date(2022, 5, 3)}
with pytest.raises(PreconditionUnsatisfiedError):
case_controller.batch_update([update])


def test_batch_update_raises_if_case_would_be_invalid(case_controller):
case_controller.create_case(
{
"confirmationDate": date(2021, 6, 23),
"caseReference": {
"sourceId": "123ab4567890123ef4567890",
"status": "VERIFIED",
},
}
)
update = {"_id": "1", "confirmationDate": None}
with pytest.raises(ValidationError):
case_controller.batch_update([update])


def test_batch_update_raises_if_case_not_found(case_controller):
update = {"_id": "1", "confirmationDate": date(2022, 5, 13)}
with pytest.raises(NotFoundError):
case_controller.batch_update([update])
31 changes: 31 additions & 0 deletions data-serving/reusable-data-service/tests/test_case_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,3 +468,34 @@ def test_update_object_id_on_case(client_with_patched_mongo):
put_response.get_json()["caseReference"]["sourceId"]
== "fedc1234567890123456789a"
)


def test_batch_update(client_with_patched_mongo):
db = pymongo.MongoClient("mongodb://localhost:27017/outbreak")
inserted = db["outbreak"]["cases"].insert_many(
[
{
"confirmationDate": datetime(2022, 5, i),
"caseReference": {
"sourceId": bson.ObjectId("fedc12345678901234567890"),
"status": "EXCLUDED",
},
"caseExclusion": {
"date": datetime(2022, 6, i),
"note": f"Excluded upon this day, the {i}th of June",
},
}
for i in range(1, 4)
]
)
inserted_ids = [str(anId) for anId in inserted.inserted_ids]
updates = [
{"_id": inserted_ids[i - 1], "confirmationDate": f"2022-04-0{i}"}
for i in range(1, 4)
]

post_result = client_with_patched_mongo.post(
"/api/cases/batchUpdate", json={"cases": updates}
)
assert post_result.status_code == 200
assert post_result.get_json()["numModified"] == 3