Skip to content

Commit

Permalink
Twilio Python Sdk Example
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor11l committed Dec 10, 2024
1 parent 4487ffa commit 636401e
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 0 deletions.
83 changes: 83 additions & 0 deletions examples/conversational-ai/twilio/python-sdk/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
import json
import traceback
from dotenv import load_dotenv
from fastapi import FastAPI, Request, WebSocket
from fastapi.responses import HTMLResponse
from twilio.twiml.voice_response import VoiceResponse, Connect
from elevenlabs import ElevenLabs
from elevenlabs.conversational_ai.conversation import Conversation
from twilio_audio_interface import TwilioAudioInterface
from starlette.websockets import WebSocketDisconnect

load_dotenv()

ELEVEN_LABS_AGENT_ID = os.getenv("AGENT_ID")
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY")

app = FastAPI()


@app.get("/")
async def root():
return {"message": "Twilio-ElevenLabs Integration Server"}


@app.post("/twilio/inbound_call")
async def handle_incoming_call(request: Request):
form_data = await request.form()
call_sid = form_data.get("CallSid", "Unknown")
from_number = form_data.get("From", "Unknown")
print(f"Incoming call: CallSid={call_sid}, From={from_number}")

response = VoiceResponse()
connect = Connect()
connect.stream(url=f"wss://{request.url.hostname}/media-stream-eleven")
response.append(connect)
return HTMLResponse(content=str(response), media_type="application/xml")


@app.websocket("/media-stream-eleven")
async def handle_media_stream(websocket: WebSocket):
await websocket.accept()
print("WebSocket connection opened")

audio_interface = TwilioAudioInterface(websocket)
eleven_labs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY)

try:
conversation = Conversation(
client=eleven_labs_client,
agent_id=ELEVEN_LABS_AGENT_ID,
requires_auth=True, # Security > Enable authentication
audio_interface=audio_interface,
callback_agent_response=lambda text: print(f"Agent: {text}"),
callback_user_transcript=lambda text: print(f"User: {text}"),
)

conversation.start_session()
print("Conversation started")

async for message in websocket.iter_text():
if not message:
continue
await audio_interface.handle_twilio_message(json.loads(message))

except WebSocketDisconnect:
print("WebSocket disconnected")
except Exception:
print("Error occurred in WebSocket handler:")
traceback.print_exc()
finally:
try:
conversation.end_session()
conversation.wait_for_session_end()
print("Conversation ended")
except Exception:
print("Error ending conversation session:")
traceback.print_exc()


if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import asyncio
import base64
import json
from fastapi import WebSocket
from elevenlabs.conversational_ai.conversation import AudioInterface
from starlette.websockets import WebSocketDisconnect, WebSocketState


class TwilioAudioInterface(AudioInterface):
def __init__(self, websocket: WebSocket):
self.websocket = websocket
self.input_callback = None
self.stream_sid = None
self.loop = asyncio.get_event_loop()

def start(self, input_callback):
self.input_callback = input_callback

def stop(self):
self.input_callback = None
self.stream_sid = None

def output(self, audio: bytes):
"""
This method should return quickly and not block the calling thread.
"""
asyncio.run_coroutine_threadsafe(self.send_audio_to_twilio(audio), self.loop)

def interrupt(self):
asyncio.run_coroutine_threadsafe(self.send_clear_message_to_twilio(), self.loop)

async def send_audio_to_twilio(self, audio: bytes):
if self.stream_sid:
audio_payload = base64.b64encode(audio).decode("utf-8")
audio_delta = {
"event": "media",
"streamSid": self.stream_sid,
"media": {"payload": audio_payload},
}
try:
if self.websocket.application_state == WebSocketState.CONNECTED:
await self.websocket.send_text(json.dumps(audio_delta))
except (WebSocketDisconnect, RuntimeError):
pass

async def send_clear_message_to_twilio(self):
if self.stream_sid:
clear_message = {"event": "clear", "streamSid": self.stream_sid}
try:
if self.websocket.application_state == WebSocketState.CONNECTED:
await self.websocket.send_text(json.dumps(clear_message))
except (WebSocketDisconnect, RuntimeError):
pass

async def handle_twilio_message(self, data):
event_type = data.get("event")
if event_type == "start":
self.stream_sid = data["start"]["streamSid"]
elif event_type == "media" and self.input_callback:
audio_data = base64.b64decode(data["media"]["payload"])
self.input_callback(audio_data)

0 comments on commit 636401e

Please sign in to comment.