From c89960882146897049a393db0de8903bea0d54fc Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 24 Dec 2024 10:34:21 -0800 Subject: [PATCH] improve interruption handling, avoid agent from getting stuck (#1290) --- .changeset/silent-oranges-warn.md | 5 +++++ .../livekit/agents/pipeline/pipeline_agent.py | 21 ++++++++++++++----- 2 files changed, 21 insertions(+), 5 deletions(-) create mode 100644 .changeset/silent-oranges-warn.md diff --git a/.changeset/silent-oranges-warn.md b/.changeset/silent-oranges-warn.md new file mode 100644 index 000000000..e7bcd0189 --- /dev/null +++ b/.changeset/silent-oranges-warn.md @@ -0,0 +1,5 @@ +--- +"livekit-agents": patch +--- + +improve interruption handling, avoid agent from getting stuck diff --git a/livekit-agents/livekit/agents/pipeline/pipeline_agent.py b/livekit-agents/livekit/agents/pipeline/pipeline_agent.py index 7b5c28e79..b2a223bd0 100644 --- a/livekit-agents/livekit/agents/pipeline/pipeline_agent.py +++ b/livekit-agents/livekit/agents/pipeline/pipeline_agent.py @@ -714,6 +714,9 @@ async def _synthesize_answer_task( ) ) + # we want to add this question even if it's empty. during false positive interruptions, + # adding an empty user message gives the LLM context so it could continue from where + # it had been interrupted. copied_ctx.messages.append( ChatMessage.create(text=handle.user_question, role="user") ) @@ -1035,7 +1038,7 @@ async def _llm_stream_to_str_generator( def _validate_reply_if_possible(self) -> None: """Check if the new agent speech should be played""" - if self._playing_speech is not None: + if self._playing_speech and not self._playing_speech.interrupted: should_ignore_input = False if not self._playing_speech.allow_interruptions: should_ignore_input = True @@ -1049,19 +1052,24 @@ def _validate_reply_if_possible(self) -> None: "interrupt threshold is not met", extra={"speech_id": self._playing_speech.id}, ) + if should_ignore_input: self._transcribed_text = "" return if self._pending_agent_reply is None: - if self._opts.preemptive_synthesis or not self._transcribed_text: + if self._opts.preemptive_synthesis: return + # as long as we don't have a pending reply, we need to synthesize it + # in order to keep the conversation flowing. + # transcript could be empty at this moment, if the user interrupted the agent + # but did not generate any transcribed text. self._synthesize_agent_reply() assert self._pending_agent_reply is not None - # in some bad timing, we could end up with two pushed agent replies inside the speech queue. + # due to timing, we could end up with two pushed agent replies inside the speech queue. # so make sure we directly interrupt every reply when validating a new one for speech in self._speech_q: if not speech.is_reply: @@ -1072,7 +1080,10 @@ def _validate_reply_if_possible(self) -> None: logger.debug( "validated agent reply", - extra={"speech_id": self._pending_agent_reply.id}, + extra={ + "speech_id": self._pending_agent_reply.id, + "text": self._transcribed_text, + }, ) if self._last_speech_time is not None: @@ -1101,7 +1112,7 @@ def _interrupt_if_possible(self) -> None: def _should_interrupt(self) -> bool: if self._playing_speech is None: - return True + return False if ( not self._playing_speech.allow_interruptions