Skip to content

Commit

Permalink
Add ServerMessage.termination handler (livekit#635)
Browse files Browse the repository at this point in the history
Co-authored-by: Théo Monnom <theo.8bits@gmail.com>
  • Loading branch information
nbsp and theomonnom authored Aug 23, 2024
1 parent 808a536 commit 2cd35e1
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/metal-zebras-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-agents": patch
---

Add ServerMessage.termination handler
10 changes: 10 additions & 0 deletions livekit-agents/livekit/agents/ipc/proc_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ def __init__(
def processes(self) -> list[SupervisedProc]:
return self._processes

def get_by_job_id(self, job_id: str) -> SupervisedProc | None:
return next(
(
x
for x in self._processes
if x.running_job and x.running_job.job.id == job_id
),
None,
)

def start(self) -> None:
if self._started:
return
Expand Down
14 changes: 14 additions & 0 deletions livekit-agents/livekit/agents/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,13 @@ async def _recv_task():
self._handle_availability(msg.availability)
elif which == "assignment":
self._handle_assignment(msg.assignment)
elif which == "termination":
user_task = self._loop.create_task(
self._handle_termination(msg.termination),
name="agent_job_termination",
)
self._tasks.add(user_task)
user_task.add_done_callback(self._tasks.discard)

tasks = [
asyncio.create_task(_load_task()),
Expand Down Expand Up @@ -627,3 +634,10 @@ def _handle_assignment(self, assignment: agent.JobAssignment):
"received assignment for an unknown job",
extra={"job": assignment.job, "agent_name": self._opts.agent_name},
)

async def _handle_termination(self, msg: agent.JobTermination):
proc = self._proc_pool.get_by_job_id(msg.job_id)
if not proc:
# safe to ignore
return
await proc.aclose()

0 comments on commit 2cd35e1

Please sign in to comment.