From 40051576b0b5e7b3ac7682173c7ecea3fcedad1a Mon Sep 17 00:00:00 2001 From: Marmik Pandya Date: Sun, 11 Aug 2024 16:28:49 +0530 Subject: [PATCH 1/2] configurable message to check if user is online --- bolna/agent_manager/task_manager.py | 78 ++++++++++++++++++----------- bolna/constants.py | 5 +- bolna/models.py | 4 ++ 3 files changed, 56 insertions(+), 31 deletions(-) diff --git a/bolna/agent_manager/task_manager.py b/bolna/agent_manager/task_manager.py index 6dfd1743..1cfe3796 100644 --- a/bolna/agent_manager/task_manager.py +++ b/bolna/agent_manager/task_manager.py @@ -12,7 +12,7 @@ import aiohttp -from bolna.constants import ACCIDENTAL_INTERRUPTION_PHRASES, FILLER_DICT, PRE_FUNCTION_CALL_MESSAGE +from bolna.constants import ACCIDENTAL_INTERRUPTION_PHRASES, DEFAULT_USER_ONLINE_MESSAGE, DEFAULT_USER_ONLINE_MESSAGE_TRIGGER_DURATION, FILLER_DICT, PRE_FUNCTION_CALL_MESSAGE from bolna.helpers.function_calling_helpers import trigger_api from bolna.memory.cache.vector_cache import VectorCache from .base_manager import BaseManager @@ -198,6 +198,7 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con self.request_logs = [] self.hangup_task = None + self.conversation_config = None if task_id == 0: self.background_check_task = None @@ -205,12 +206,18 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con self.output_chunk_size = 16384 if self.sampling_rate == 24000 else 4096 #0.5 second chunk size for calls # For nitro self.nitro = True - conversation_config = task.get("task_config", {}) - logger.info(f"Conversation config {conversation_config}") + self.conversation_config = task.get("task_config", {}) + logger.info(f"Conversation config {self.conversation_config}") - self.call_transfer_number = conversation_config.get("call_transfer_number", None) + self.trigger_user_online_message_after = self.conversation_config.get("trigger_user_online_message_after", DEFAULT_USER_ONLINE_MESSAGE_TRIGGER_DURATION) + self.check_if_user_online = self.conversation_config.get("check_if_user_online", True) + self.check_user_online_message = self.conversation_config.get("check_user_online_message", DEFAULT_USER_ONLINE_MESSAGE) + + + + self.call_transfer_number = self.conversation_config.get("call_transfer_number", None) logger.info(f"Will transfer call to {self.call_transfer_number}") - self.kwargs["process_interim_results"] = "true" if conversation_config.get("optimize_latency", False) is True else "false" + self.kwargs["process_interim_results"] = "true" if self.conversation_config.get("optimize_latency", False) is True else "false" logger.info(f"Processing interim results {self.kwargs['process_interim_results'] }") # Routes self.routes = task['tools_config']['llm_agent'].get("routes", None) @@ -229,25 +236,25 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con logger.info(f"Time to setup routes {time.time() - start_time}") - # for long pauses and rushing - if conversation_config is not None: + # for long pauses and rushing + if self.conversation_config is not None: self.minimum_wait_duration = self.task_config["tools_config"]["transcriber"]["endpointing"] logger.info(f"minimum wait duration {self.minimum_wait_duration}") self.last_spoken_timestamp = time.time() * 1000 - self.incremental_delay = conversation_config.get("incremental_delay", 100) + self.incremental_delay = self.conversation_config.get("incremental_delay", 100) logger.info(f"incremental_delay - {self.incremental_delay}") self.required_delay_before_speaking = max(self.minimum_wait_duration - self.incremental_delay, 0) #Everytime we get a message we increase it by 100 miliseconds self.time_since_first_interim_result = -1 #Cut conversation - self.hang_conversation_after = conversation_config.get("hangup_after_silence", 10) + self.hang_conversation_after = self.conversation_config.get("hangup_after_silence", 10) self.check_if_user_is_still_there = 5 logger.info(f"hangup_after_silence {self.hang_conversation_after}") self.last_transmitted_timesatamp = 0 self.let_remaining_audio_pass_through = False #Will be used to let remaining audio pass through in case of utterenceEnd event and there's still audio left to be sent - self.use_llm_to_determine_hangup = conversation_config.get("hangup_after_LLMCall", False) + self.use_llm_to_determine_hangup = self.conversation_config.get("hangup_after_LLMCall", False) - self.check_for_completion_prompt = conversation_config.get("call_cancellation_prompt", None) + self.check_for_completion_prompt = self.conversation_config.get("call_cancellation_prompt", None) if self.check_for_completion_prompt is not None: completion_json_format = {"answer": "A simple Yes or No based on if you should cut the phone or not"} self.check_for_completion_prompt = f"{self.check_for_completion_prompt}\nYour response should be in the following json format\n{completion_json_format}" @@ -255,20 +262,20 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con self.time_since_last_spoken_human_word = 0 #Handling accidental interruption - self.number_of_words_for_interruption = conversation_config.get("number_of_words_for_interruption", 3) + self.number_of_words_for_interruption = self.conversation_config.get("number_of_words_for_interruption", 3) self.asked_if_user_is_still_there = False #Used to make sure that if user's phrase qualifies as acciedental interruption, we don't break the conversation loop self.first_message_passed = True if self.task_config["tools_config"]["output"]["provider"] == 'default' else False self.started_transmitting_audio = False self.accidental_interruption_phrases = set(ACCIDENTAL_INTERRUPTION_PHRASES) #self.interruption_backoff_period = 1000 #conversation_config.get("interruption_backoff_period", 300) #this is the amount of time output loop will sleep before sending next audio - self.use_llm_for_hanging_up = conversation_config.get("hangup_after_LLMCall", False) + self.use_llm_for_hanging_up = self.conversation_config.get("hangup_after_LLMCall", False) self.allow_extra_sleep = False #It'll help us to back off as soon as we hear interruption for a while #Backchanneling - self.should_backchannel = conversation_config.get("backchanneling", False) + self.should_backchannel = self.conversation_config.get("backchanneling", False) self.backchanneling_task = None - self.backchanneling_start_delay = conversation_config.get("backchanneling_start_delay", 5) - self.backchanneling_message_gap = conversation_config.get("backchanneling_message_gap", 2) #Amount of duration co routine will sleep + self.backchanneling_start_delay = self.conversation_config.get("backchanneling_start_delay", 5) + self.backchanneling_message_gap = self.conversation_config.get("backchanneling_message_gap", 2) #Amount of duration co routine will sleep if self.should_backchannel and not turn_based_conversation and task_id == 0: logger.info(f"Should backchannel") self.backchanneling_audios = f'{kwargs.get("backchanneling_audio_location", os.getenv("BACKCHANNELING_PRESETS_DIR"))}/{self.synthesizer_voice.lower()}' @@ -289,14 +296,15 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con self.transcriber_message = '' # Ambient noise - self.ambient_noise = conversation_config.get("ambient_noise", False) + self.ambient_noise = self.conversation_config.get("ambient_noise", False) self.ambient_noise_task = None if self.ambient_noise: logger.info(f"Ambient noise is True {self.ambient_noise}") - self.soundtrack = f"{conversation_config.get('ambient_noise_track', 'convention_hall')}.wav" + self.soundtrack = f"{self.conversation_config.get('ambient_noise_track', 'convention_hall')}.wav" + # Classifier for filler - self.use_fillers = conversation_config.get("use_fillers", False) + self.use_fillers = self.conversation_config.get("use_fillers", False) if self.use_fillers: self.filler_classifier = kwargs.get("classifier", None) if self.filler_classifier is None: @@ -826,6 +834,8 @@ async def __filler_classification_task(self, message): await self._handle_llm_output(next_step, filler, should_bypass_synth, new_meta_info, is_filler = True) async def __execute_function_call(self, url, method, param, api_token, model_args, meta_info, next_step, called_fun, **resp): + self.check_if_user_online = False + if called_fun == "transfer_call": logger.info(f"Transfer call function called param {param}") call_sid = self.tools["input"].get_call_sid() @@ -858,7 +868,8 @@ async def __execute_function_call(self, url, method, param, api_token, model_arg convert_to_request_log(str(response), meta_info , None, "function_call", direction = "response", is_cached= False, run_id = self.run_id) convert_to_request_log(format_messages(model_args['messages'], True), meta_info, self.llm_config['model'], "llm", direction = "request", is_cached= False, run_id = self.run_id) - self.toggle_blank_filler_message = True + self.check_if_user_online = self.conversation_config.get("check_if_user_online", True) + if called_fun != "transfer_call": await self.__do_llm_generation(model_args["messages"], meta_info, next_step, should_trigger_function_call = True) @@ -1523,8 +1534,12 @@ async def __process_output_loop(self): if "is_final_chunk_of_entire_response" in message['meta_info'] and message['meta_info']['is_final_chunk_of_entire_response']: self.started_transmitting_audio = False - logger.info("##### End of synthesizer stream and ") - self.asked_if_user_is_still_there = False + logger.info("##### End of synthesizer stream and ") + + #If we're sending the message to check if user is still here, don't set asked_if_user_is_still_there to True + if message['meta_info']['text'] != self.check_user_online_message: + self.asked_if_user_is_still_there = False + num_chunks = 0 self.turn_id +=1 if not self.first_message_passed: @@ -1567,21 +1582,24 @@ async def __check_for_completion(self): logger.info(f"{time_since_last_spoken_AI_word} seconds since last spoken time stamp and hence cutting the phone call and last transmitted timestampt ws {self.last_transmitted_timesatamp} and time since last spoken human word {self.time_since_last_spoken_human_word}") await self.__process_end_of_conversation() break - elif time_since_last_spoken_AI_word > 6 and not self.asked_if_user_is_still_there and self.time_since_last_spoken_human_word < self.last_transmitted_timesatamp : + elif time_since_last_spoken_AI_word > self.trigger_user_online_message_after and not self.asked_if_user_is_still_there and self.time_since_last_spoken_human_word < self.last_transmitted_timesatamp : logger.info(f"Asking if the user is still there") self.asked_if_user_is_still_there = True - if self.should_record: - meta_info={'io': 'default', "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'wav'} - await self._synthesize(create_ws_data_packet("Hey, are you still there?", meta_info= meta_info)) - else: - meta_info={'io': self.tools["output"].get_provider(), "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'pcm'} - await self._synthesize(create_ws_data_packet("Hey, are you still there?", meta_info= meta_info)) + + if self.check_if_user_online: + if self.should_record: + meta_info={'io': 'default', "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'wav'} + await self._synthesize(create_ws_data_packet(self.check_user_online_message, meta_info= meta_info)) + else: + meta_info={'io': self.tools["output"].get_provider(), "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'pcm'} + await self._synthesize(create_ws_data_packet(self.check_user_online_message, meta_info= meta_info)) #Just in case we need to clear messages sent before await self.tools["output"].handle_interruption() else: - logger.info(f"Only {time_since_last_spoken_AI_word} seconds since last spoken time stamp and hence not cutting the phone call") + logger.info(f"Only {time_since_last_spoken_AI_word} seconds since last spoken time stamp and hence not cutting the phone call") + async def __check_for_backchanneling(self): while True: if self.callee_speaking and time.time() - self.callee_speaking_start_time > self.backchanneling_start_delay: diff --git a/bolna/constants.py b/bolna/constants.py index ed6c3ff7..c3d3a14f 100644 --- a/bolna/constants.py +++ b/bolna/constants.py @@ -58,4 +58,7 @@ CHECKING_THE_DOCUMENTS_FILLER = "Umm, just a moment, getting details..." PREDEFINED_FUNCTIONS = ['transfer_call'] -TRANSFERING_CALL_FILLER = "Sure, I'll transfer the call for you. Please wait a moment..." \ No newline at end of file +TRANSFERING_CALL_FILLER = "Sure, I'll transfer the call for you. Please wait a moment..." + +DEFAULT_USER_ONLINE_MESSAGE = "Hey, are you still there?" +DEFAULT_USER_ONLINE_MESSAGE_TRIGGER_DURATION = 6 diff --git a/bolna/models.py b/bolna/models.py index b31fd03a..751fcc5c 100644 --- a/bolna/models.py +++ b/bolna/models.py @@ -213,6 +213,10 @@ class ConversationConfig(BaseModel): call_terminate: Optional[int] = 90 use_fillers: Optional[bool] = False call_transfer_number: Optional[str] = "" + trigger_user_online_message_after:Optional[int] = 6 + check_user_online_message:Optional[str] = "Hey, are you still there" + check_if_user_online:Optional[bool] = True + @validator('hangup_after_silence', pre=True, always=True) def set_hangup_after_silence(cls, v): From 8a5a35f45f7021cf2037e096e5856114e32b329c Mon Sep 17 00:00:00 2001 From: Marmik Pandya Date: Sun, 11 Aug 2024 22:04:46 +0530 Subject: [PATCH 2/2] handle case where llm sends text along with call transfer message --- bolna/agent_manager/task_manager.py | 17 ++++++++--------- bolna/llms/openai_llm.py | 13 +++++++++---- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/bolna/agent_manager/task_manager.py b/bolna/agent_manager/task_manager.py index 1cfe3796..d8dd1d02 100644 --- a/bolna/agent_manager/task_manager.py +++ b/bolna/agent_manager/task_manager.py @@ -213,8 +213,6 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con self.check_if_user_online = self.conversation_config.get("check_if_user_online", True) self.check_user_online_message = self.conversation_config.get("check_user_online_message", DEFAULT_USER_ONLINE_MESSAGE) - - self.call_transfer_number = self.conversation_config.get("call_transfer_number", None) logger.info(f"Will transfer call to {self.call_transfer_number}") self.kwargs["process_interim_results"] = "true" if self.conversation_config.get("optimize_latency", False) is True else "false" @@ -837,7 +835,8 @@ async def __execute_function_call(self, url, method, param, api_token, model_arg self.check_if_user_online = False if called_fun == "transfer_call": - logger.info(f"Transfer call function called param {param}") + logger.info(f"Transfer call function called param {param}. First sleeping for 2 seconds to make sure we're done speaking the filler") + await asyncio.sleep(2) #Sleep for 1 second to ensure that the filler is spoken before transfering call call_sid = self.tools["input"].get_call_sid() user_id, agent_id = self.assistant_id.split("/") self.history = copy.deepcopy(model_args["messages"]) @@ -1582,11 +1581,11 @@ async def __check_for_completion(self): logger.info(f"{time_since_last_spoken_AI_word} seconds since last spoken time stamp and hence cutting the phone call and last transmitted timestampt ws {self.last_transmitted_timesatamp} and time since last spoken human word {self.time_since_last_spoken_human_word}") await self.__process_end_of_conversation() break - elif time_since_last_spoken_AI_word > self.trigger_user_online_message_after and not self.asked_if_user_is_still_there and self.time_since_last_spoken_human_word < self.last_transmitted_timesatamp : - logger.info(f"Asking if the user is still there") - self.asked_if_user_is_still_there = True - + elif time_since_last_spoken_AI_word > self.trigger_user_online_message_after and not self.asked_if_user_is_still_there and self.time_since_last_spoken_human_word < self.last_transmitted_timesatamp: if self.check_if_user_online: + logger.info(f"Asking if the user is still there") + self.asked_if_user_is_still_there = True + if self.should_record: meta_info={'io': 'default', "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'wav'} await self._synthesize(create_ws_data_packet(self.check_user_online_message, meta_info= meta_info)) @@ -1594,8 +1593,8 @@ async def __check_for_completion(self): meta_info={'io': self.tools["output"].get_provider(), "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'pcm'} await self._synthesize(create_ws_data_packet(self.check_user_online_message, meta_info= meta_info)) - #Just in case we need to clear messages sent before - await self.tools["output"].handle_interruption() + #Just in case we need to clear messages sent before + await self.tools["output"].handle_interruption() else: logger.info(f"Only {time_since_last_spoken_AI_word} seconds since last spoken time stamp and hence not cutting the phone call") diff --git a/bolna/llms/openai_llm.py b/bolna/llms/openai_llm.py index 3507836f..8da40d51 100644 --- a/bolna/llms/openai_llm.py +++ b/bolna/llms/openai_llm.py @@ -1,3 +1,4 @@ +import asyncio import os from dotenv import load_dotenv from openai import AsyncOpenAI, OpenAI @@ -107,15 +108,19 @@ async def generate_stream(self, messages, synthesize=True, request_json=False, m filler = PRE_FUNCTION_CALL_MESSAGE if called_fun != "transfer_call" else TRANSFERING_CALL_FILLER yield filler , True, latency, False self.gave_out_prefunction_call_message = True + if len(buffer) > 0: - yield buffer, False, latency, False + yield buffer, True, latency, False buffer = '' logger.info(f"Response from LLM {resp}") + + #TODO: Need to remmeber why was this put up and see if this should be removed? if buffer != '': yield buffer, False, latency, False buffer = '' if (text_chunk := chunk.choices[0].delta.function_call.arguments): resp += text_chunk + elif text_chunk := chunk.choices[0].delta.content: textual_response = True answer += text_chunk @@ -140,7 +145,7 @@ async def generate_stream(self, messages, synthesize=True, request_json=False, m api_token = func_dict['api_token'] api_call_return = { "url": url, - "method":method.lower(), + "method":None if method is None else method.lower(), "param": param, "api_token":api_token, "model_args": model_args, @@ -241,7 +246,7 @@ async def generate_assistant_stream(self, message, synthesize=True, request_json i = [i for i in range(len(tools)) if called_fun == tools[i].function.name][0] if not self.gave_out_prefunction_call_message and not textual_response: - filler = PRE_FUNCTION_CALL_MESSAGE if called_fun != "transfer_call" else TRANSFERING_CALL_FILLER + filler = PRE_FUNCTION_CALL_MESSAGE if called_fun != "transfer_call" else TRANSFERING_CALL_FILLER yield filler, True, latency, False self.gave_out_prefunction_call_message = True if len(buffer) > 0: @@ -284,7 +289,7 @@ async def generate_assistant_stream(self, message, synthesize=True, request_json model_args['messages'] = message api_call_return = { "url": url, - "method":method.lower(), + "method":None if method is None else method.lower(), "param": param, "api_token":api_token, "model_args": model_args,