Skip to content

Commit

Permalink
perf: Make async the Event Grid events
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Feb 12, 2024
1 parent 863c13b commit dc65da6
Showing 1 changed file with 57 additions and 39 deletions.
96 changes: 57 additions & 39 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@


# General imports
from typing import Any, Callable, Coroutine, Generator, List, Optional, Tuple, Type
from typing import (
Any,
Callable,
Coroutine,
Generator,
List,
Optional,
Tuple,
Type,
)
from azure.communication.callautomation import (
CallAutomationClient,
CallConnectionClient,
Expand All @@ -27,7 +36,7 @@
from azure.eventgrid import EventGridEvent, SystemEventNames
from azure.identity import DefaultAzureCredential
from enum import Enum
from fastapi import FastAPI, status, Request, HTTPException, BackgroundTasks
from fastapi import FastAPI, status, Request, HTTPException, BackgroundTasks, Response
from fastapi.responses import JSONResponse, HTMLResponse
from helpers.config_models.database import ModeEnum as DatabaseMode
from helpers.logging import build_logger
Expand Down Expand Up @@ -220,46 +229,55 @@ async def call_initiate_get(phone_number: str) -> None:
description="Handle incoming call from a Azure Event Grid event originating from Azure Communication Services.",
)
async def call_inbound_post(request: Request):
for event_dict in await request.json():
event = EventGridEvent.from_dict(event_dict)
event_type = event.event_type

_logger.debug(f"Call inbound event {event_type} with data {event.data}")

if event_type == SystemEventNames.EventGridSubscriptionValidationEventName:
validation_code = event.data["validationCode"]
_logger.info(f"Validating Event Grid subscription ({validation_code})")
return JSONResponse(
content={"validationResponse": event.data["validationCode"]},
status_code=status.HTTP_200_OK,
)
responses = await asyncio.gather(
*[call_inbound_worker(event_dict) for event_dict in await request.json()]
)
for response in responses:
if response:
return response
return Response(status_code=status.HTTP_204_NO_CONTENT)

elif event_type == SystemEventNames.AcsIncomingCallEventName:
if event.data["from"]["kind"] == "phoneNumber":
phone_number = event.data["from"]["phoneNumber"]["value"]
else:
phone_number = event.data["from"]["rawId"]

_logger.debug(f"Incoming call handler caller ID: {phone_number}")
call_context = event.data["incomingCallContext"]
async def call_inbound_worker(event_dict: dict[str, Any]) -> Optional[JSONResponse]:
event = EventGridEvent.from_dict(event_dict)
event_type = event.event_type

try:
answer_call_result = call_automation_client.answer_call(
callback_url=await callback_url(phone_number),
cognitive_services_endpoint=CONFIG.cognitive_service.endpoint,
incoming_call_context=call_context,
)
_logger.info(
f"Answered call with {phone_number} ({answer_call_result.call_connection_id})"
)
except HttpResponseError as e:
if (
"lifetime validation of the signed http request failed"
in e.message.lower()
):
_logger.debug("Old call event received, ignoring")
else:
raise e
_logger.debug(f"Call inbound event {event_type} with data {event.data}")

if event_type == SystemEventNames.EventGridSubscriptionValidationEventName:
validation_code = event.data["validationCode"]
_logger.info(f"Validating Event Grid subscription ({validation_code})")
return JSONResponse(
content={"validationResponse": event.data["validationCode"]},
status_code=status.HTTP_200_OK,
)

elif event_type == SystemEventNames.AcsIncomingCallEventName:
if event.data["from"]["kind"] == "phoneNumber":
phone_number = event.data["from"]["phoneNumber"]["value"]
else:
phone_number = event.data["from"]["rawId"]

_logger.debug(f"Incoming call handler caller ID: {phone_number}")
call_context = event.data["incomingCallContext"]

try:
answer_call_result = call_automation_client.answer_call(
callback_url=await callback_url(phone_number),
cognitive_services_endpoint=CONFIG.cognitive_service.endpoint,
incoming_call_context=call_context,
)
_logger.info(
f"Answered call with {phone_number} ({answer_call_result.call_connection_id})"
)
except HttpResponseError as e:
if (
"lifetime validation of the signed http request failed"
in e.message.lower()
):
_logger.debug("Old call event received, ignoring")
else:
raise e


@api.post(
Expand Down

0 comments on commit dc65da6

Please sign in to comment.