forked from livekit/agents
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gemini_voice_agent.py
90 lines (70 loc) · 2.64 KB
/
gemini_voice_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import logging
from dotenv import load_dotenv
from livekit.agents import (
AutoSubscribe,
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
metrics,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import google, openai, silero
load_dotenv()
logger = logging.getLogger("voice-assistant")
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
# An example Voice Agent using Google STT, Gemini 2.0 Flash, and Google TTS.
# Prerequisites:
# 1. livekit-plugins-openai[vertex] package installed
# 2. save your service account credentials and set the following environments:
# * GOOGLE_APPLICATION_CREDENTIALS to the path of the service account key file
# * GOOGLE_CLOUD_PROJECT to your Google Cloud project ID
# 3. the following services are enabled on your Google Cloud project:
# * Vertex AI
# * Cloud Speech-to-Text API
# * Cloud Text-to-Speech API
# Read more about authentication with Google: https://cloud.google.com/docs/authentication/application-default-credentials
async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are a voice assistant created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
),
)
logger.info(f"connecting to room {ctx.room.name}")
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# wait for the first participant to connect
participant = await ctx.wait_for_participant()
logger.info(f"starting voice assistant for participant {participant.identity}")
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=google.STT(),
llm=openai.LLM.with_vertex(model="google/gemini-2.0-flash-exp"),
tts=google.TTS(
voice_name="en-US-Journey-D",
),
chat_ctx=initial_ctx,
)
agent.start(ctx.room, participant)
usage_collector = metrics.UsageCollector()
@agent.on("metrics_collected")
def _on_metrics_collected(mtrcs: metrics.AgentMetrics):
metrics.log_metrics(mtrcs)
usage_collector.collect(mtrcs)
async def log_usage():
summary = usage_collector.get_summary()
logger.info(f"Usage: ${summary}")
ctx.add_shutdown_callback(log_usage)
await agent.say(
"Hi there, this is Gemini, how can I help you today?", allow_interruptions=False
)
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
),
)