diff --git a/.changeset/metal-zebras-kick.md b/.changeset/metal-zebras-kick.md new file mode 100644 index 000000000..478942ab4 --- /dev/null +++ b/.changeset/metal-zebras-kick.md @@ -0,0 +1,5 @@ +--- +"livekit-agents": patch +--- + +Add ServerMessage.termination handler diff --git a/livekit-agents/livekit/agents/ipc/proc_pool.py b/livekit-agents/livekit/agents/ipc/proc_pool.py index 12bff6385..6bcfd6c10 100644 --- a/livekit-agents/livekit/agents/ipc/proc_pool.py +++ b/livekit-agents/livekit/agents/ipc/proc_pool.py @@ -48,6 +48,16 @@ def __init__( def processes(self) -> list[SupervisedProc]: return self._processes + def get_by_job_id(self, job_id: str) -> SupervisedProc | None: + return next( + ( + x + for x in self._processes + if x.running_job and x.running_job.job.id == job_id + ), + None, + ) + def start(self) -> None: if self._started: return diff --git a/livekit-agents/livekit/agents/worker.py b/livekit-agents/livekit/agents/worker.py index f0aeafdc6..da3f90e37 100644 --- a/livekit-agents/livekit/agents/worker.py +++ b/livekit-agents/livekit/agents/worker.py @@ -485,6 +485,13 @@ async def _recv_task(): self._handle_availability(msg.availability) elif which == "assignment": self._handle_assignment(msg.assignment) + elif which == "termination": + user_task = self._loop.create_task( + self._handle_termination(msg.termination), + name="agent_job_termination", + ) + self._tasks.add(user_task) + user_task.add_done_callback(self._tasks.discard) tasks = [ asyncio.create_task(_load_task()), @@ -627,3 +634,10 @@ def _handle_assignment(self, assignment: agent.JobAssignment): "received assignment for an unknown job", extra={"job": assignment.job, "agent_name": self._opts.agent_name}, ) + + async def _handle_termination(self, msg: agent.JobTermination): + proc = self._proc_pool.get_by_job_id(msg.job_id) + if not proc: + # safe to ignore + return + await proc.aclose()