Skip to content

Commit

Permalink
Further hardening and fixes of asyncio and events
Browse files Browse the repository at this point in the history
- convert before event handlers to run concurrently
- fix task group broke (unused) return_exceptions=True case of dispatch_event
- rename sub-exception vars for clarity
- update relevant test
  • Loading branch information
linkous8 committed Apr 24, 2024
1 parent 8c4d712 commit 3c14eed
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 42 deletions.
2 changes: 1 addition & 1 deletion servo/connectors/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2334,7 +2334,7 @@ async def readiness_monitor() -> None:

description = state.to_description()
except ExceptionGroup as eg:
if any(isinstance(se, servo.EventError) for se in eg.exceptions):
if any(isinstance(sub_e, servo.EventError) for sub_e in eg.exceptions):
raise
else:
raise servo.AdjustmentFailedError(str(eg.message)) from eg
Expand Down
80 changes: 41 additions & 39 deletions servo/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ def decorator(fn: EventCallable) -> EventCallable:
if preposition == Preposition.before:
# 'before' event takes same args as 'on' event, but returns None
ref_signature = ref_signature.replace(return_annotation="None")

servo.utilities.inspect.assert_equal_callable_descriptors(
servo.utilities.inspect.CallableDescriptor(
signature=ref_signature,
Expand Down Expand Up @@ -891,6 +892,7 @@ async def run_event_handlers(
value=error,
)

# TODO refactor to use ExceptionGroups with events retrievable from exceptions
if return_exceptions:
results.append(error.__event_result__)
else:
Expand Down Expand Up @@ -1006,36 +1008,34 @@ async def run(self) -> List[EventResult]:

# Invoke the before event handlers
if self._prepositions & Preposition.before:
for connector in self._connectors:
try:
results = await connector.run_event_handlers(
self.event,
Preposition.before,
*self._args,
return_exceptions=False,
**self._kwargs,
)
try:
async with asyncio.TaskGroup() as tg:
for connector in self._connectors:
tg.create_task(
connector.run_event_handlers(
self.event,
Preposition.before,
*self._args,
return_exceptions=False,
**self._kwargs,
)
)

except servo.errors.EventCancelledError as error:
except ExceptionGroup as eg:
cancelled_errs = [
sub_e
for sub_e in eg.exceptions
if isinstance(sub_e, servo.errors.EventCancelledError)
]
if cancelled_errs:
# Return an empty result set
canceller_names = (ce.connector.name for ce in cancelled_errs)
servo.logger.warning(
f'event cancelled by before event handler on connector "{connector.name}": {error}'
f'event cancelled by before event handler on connector "{", ".join(canceller_names)}": {eg.exceptions}'
)
return []
except ExceptionGroup as eg:
if any(
(
isinstance(se, servo.errors.EventCancelledError)
for se in eg.exceptions
)
):
# Return an empty result set
servo.logger.warning(
f'event cancelled by before event handler on connector "{connector.name}": {eg}'
)
return []
else:
raise
else:
raise

# Invoke the on event handlers and gather results
if self._prepositions & Preposition.on:
Expand All @@ -1052,21 +1052,23 @@ async def run(self) -> List[EventResult]:
if results:
break
else:
async with asyncio.TaskGroup() as tg:
ev_tasks = [
tg.create_task(
c.run_event_handlers(
self.event,
Preposition.on,
return_exceptions=self._return_exceptions,
*self._args,
**self._kwargs,
)
)
for c in self._connectors
]
tasks = (
c.run_event_handlers(
self.event,
Preposition.on,
return_exceptions=self._return_exceptions,
*self._args,
**self._kwargs,
)
for c in self._connectors
)
if self._return_exceptions:
results = await asyncio.gather(*tasks)
else:
async with asyncio.TaskGroup() as tg:
tg_tasks = [tg.create_task(t) for t in tasks]
results = (tt.result() for tt in tg_tasks)

results = (et.result() for et in ev_tasks)
results = list(filter(lambda r: r is not None, results))
results = functools.reduce(lambda x, y: x + y, results, [])

Expand Down
4 changes: 2 additions & 2 deletions tests/servo_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ async def test_cancellation_of_event_from_before_handler(mocker, servo: Servo):

# Mock the before handler to throw a cancel exception
mock = mocker.patch.object(before_handler, "handler")
mock.side_effect = EventCancelledError("it burns when I pee")
mock.side_effect = EventCancelledError("it burns when I pee", connector=connector)
results = await servo.dispatch_event("promote")

# Check that on and after callbacks were never called
Expand All @@ -284,7 +284,7 @@ async def test_cancellation_of_event_from_before_handler(mocker, servo: Servo):
assert messages[0].record["level"].name == "WARNING"
assert (
messages[0].record["message"]
== 'event cancelled by before event handler on connector "first_test_servo": it burns when I pee'
== "event cancelled by before event handler on connector \"first_test_servo\": (EventCancelledError('it burns when I pee'),)"
)


Expand Down

0 comments on commit 3c14eed

Please sign in to comment.