Skip to content

Commit

Permalink
Merge pull request #371 from bolna-ai/fix/call-transfer-prompt
Browse files Browse the repository at this point in the history
Fix/call transfer prompt
  • Loading branch information
marmikcfc authored Aug 11, 2024
2 parents 2976300 + 8a5a35f commit 555f401
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 40 deletions.
87 changes: 52 additions & 35 deletions bolna/agent_manager/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import aiohttp

from bolna.constants import ACCIDENTAL_INTERRUPTION_PHRASES, FILLER_DICT, PRE_FUNCTION_CALL_MESSAGE
from bolna.constants import ACCIDENTAL_INTERRUPTION_PHRASES, DEFAULT_USER_ONLINE_MESSAGE, DEFAULT_USER_ONLINE_MESSAGE_TRIGGER_DURATION, FILLER_DICT, PRE_FUNCTION_CALL_MESSAGE
from bolna.helpers.function_calling_helpers import trigger_api
from bolna.memory.cache.vector_cache import VectorCache
from .base_manager import BaseManager
Expand Down Expand Up @@ -198,19 +198,24 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con
self.request_logs = []
self.hangup_task = None

self.conversation_config = None
if task_id == 0:

self.background_check_task = None
self.hangup_task = None
self.output_chunk_size = 16384 if self.sampling_rate == 24000 else 4096 #0.5 second chunk size for calls
# For nitro
self.nitro = True
conversation_config = task.get("task_config", {})
logger.info(f"Conversation config {conversation_config}")
self.conversation_config = task.get("task_config", {})
logger.info(f"Conversation config {self.conversation_config}")

self.call_transfer_number = conversation_config.get("call_transfer_number", None)
self.trigger_user_online_message_after = self.conversation_config.get("trigger_user_online_message_after", DEFAULT_USER_ONLINE_MESSAGE_TRIGGER_DURATION)
self.check_if_user_online = self.conversation_config.get("check_if_user_online", True)
self.check_user_online_message = self.conversation_config.get("check_user_online_message", DEFAULT_USER_ONLINE_MESSAGE)

self.call_transfer_number = self.conversation_config.get("call_transfer_number", None)
logger.info(f"Will transfer call to {self.call_transfer_number}")
self.kwargs["process_interim_results"] = "true" if conversation_config.get("optimize_latency", False) is True else "false"
self.kwargs["process_interim_results"] = "true" if self.conversation_config.get("optimize_latency", False) is True else "false"
logger.info(f"Processing interim results {self.kwargs['process_interim_results'] }")
# Routes
self.routes = task['tools_config']['llm_agent'].get("routes", None)
Expand All @@ -229,46 +234,46 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con
logger.info(f"Time to setup routes {time.time() - start_time}")


# for long pauses and rushing
if conversation_config is not None:
# for long pauses and rushing
if self.conversation_config is not None:
self.minimum_wait_duration = self.task_config["tools_config"]["transcriber"]["endpointing"]
logger.info(f"minimum wait duration {self.minimum_wait_duration}")
self.last_spoken_timestamp = time.time() * 1000
self.incremental_delay = conversation_config.get("incremental_delay", 100)
self.incremental_delay = self.conversation_config.get("incremental_delay", 100)
logger.info(f"incremental_delay - {self.incremental_delay}")
self.required_delay_before_speaking = max(self.minimum_wait_duration - self.incremental_delay, 0) #Everytime we get a message we increase it by 100 miliseconds
self.time_since_first_interim_result = -1

#Cut conversation
self.hang_conversation_after = conversation_config.get("hangup_after_silence", 10)
self.hang_conversation_after = self.conversation_config.get("hangup_after_silence", 10)
self.check_if_user_is_still_there = 5
logger.info(f"hangup_after_silence {self.hang_conversation_after}")
self.last_transmitted_timesatamp = 0
self.let_remaining_audio_pass_through = False #Will be used to let remaining audio pass through in case of utterenceEnd event and there's still audio left to be sent
self.use_llm_to_determine_hangup = conversation_config.get("hangup_after_LLMCall", False)
self.use_llm_to_determine_hangup = self.conversation_config.get("hangup_after_LLMCall", False)

self.check_for_completion_prompt = conversation_config.get("call_cancellation_prompt", None)
self.check_for_completion_prompt = self.conversation_config.get("call_cancellation_prompt", None)
if self.check_for_completion_prompt is not None:
completion_json_format = {"answer": "A simple Yes or No based on if you should cut the phone or not"}
self.check_for_completion_prompt = f"{self.check_for_completion_prompt}\nYour response should be in the following json format\n{completion_json_format}"
self.check_for_completion_llm = os.getenv("CHECK_FOR_COMPLETION_LLM")
self.time_since_last_spoken_human_word = 0

#Handling accidental interruption
self.number_of_words_for_interruption = conversation_config.get("number_of_words_for_interruption", 3)
self.number_of_words_for_interruption = self.conversation_config.get("number_of_words_for_interruption", 3)
self.asked_if_user_is_still_there = False #Used to make sure that if user's phrase qualifies as acciedental interruption, we don't break the conversation loop
self.first_message_passed = True if self.task_config["tools_config"]["output"]["provider"] == 'default' else False
self.started_transmitting_audio = False
self.accidental_interruption_phrases = set(ACCIDENTAL_INTERRUPTION_PHRASES)
#self.interruption_backoff_period = 1000 #conversation_config.get("interruption_backoff_period", 300) #this is the amount of time output loop will sleep before sending next audio
self.use_llm_for_hanging_up = conversation_config.get("hangup_after_LLMCall", False)
self.use_llm_for_hanging_up = self.conversation_config.get("hangup_after_LLMCall", False)
self.allow_extra_sleep = False #It'll help us to back off as soon as we hear interruption for a while

#Backchanneling
self.should_backchannel = conversation_config.get("backchanneling", False)
self.should_backchannel = self.conversation_config.get("backchanneling", False)
self.backchanneling_task = None
self.backchanneling_start_delay = conversation_config.get("backchanneling_start_delay", 5)
self.backchanneling_message_gap = conversation_config.get("backchanneling_message_gap", 2) #Amount of duration co routine will sleep
self.backchanneling_start_delay = self.conversation_config.get("backchanneling_start_delay", 5)
self.backchanneling_message_gap = self.conversation_config.get("backchanneling_message_gap", 2) #Amount of duration co routine will sleep
if self.should_backchannel and not turn_based_conversation and task_id == 0:
logger.info(f"Should backchannel")
self.backchanneling_audios = f'{kwargs.get("backchanneling_audio_location", os.getenv("BACKCHANNELING_PRESETS_DIR"))}/{self.synthesizer_voice.lower()}'
Expand All @@ -289,14 +294,15 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con
self.transcriber_message = ''

# Ambient noise
self.ambient_noise = conversation_config.get("ambient_noise", False)
self.ambient_noise = self.conversation_config.get("ambient_noise", False)
self.ambient_noise_task = None
if self.ambient_noise:
logger.info(f"Ambient noise is True {self.ambient_noise}")
self.soundtrack = f"{conversation_config.get('ambient_noise_track', 'convention_hall')}.wav"
self.soundtrack = f"{self.conversation_config.get('ambient_noise_track', 'convention_hall')}.wav"


# Classifier for filler
self.use_fillers = conversation_config.get("use_fillers", False)
self.use_fillers = self.conversation_config.get("use_fillers", False)
if self.use_fillers:
self.filler_classifier = kwargs.get("classifier", None)
if self.filler_classifier is None:
Expand Down Expand Up @@ -826,8 +832,11 @@ async def __filler_classification_task(self, message):
await self._handle_llm_output(next_step, filler, should_bypass_synth, new_meta_info, is_filler = True)

async def __execute_function_call(self, url, method, param, api_token, model_args, meta_info, next_step, called_fun, **resp):
self.check_if_user_online = False

if called_fun == "transfer_call":
logger.info(f"Transfer call function called param {param}")
logger.info(f"Transfer call function called param {param}. First sleeping for 2 seconds to make sure we're done speaking the filler")
await asyncio.sleep(2) #Sleep for 1 second to ensure that the filler is spoken before transfering call
call_sid = self.tools["input"].get_call_sid()
user_id, agent_id = self.assistant_id.split("/")
self.history = copy.deepcopy(model_args["messages"])
Expand Down Expand Up @@ -858,7 +867,8 @@ async def __execute_function_call(self, url, method, param, api_token, model_arg
convert_to_request_log(str(response), meta_info , None, "function_call", direction = "response", is_cached= False, run_id = self.run_id)

convert_to_request_log(format_messages(model_args['messages'], True), meta_info, self.llm_config['model'], "llm", direction = "request", is_cached= False, run_id = self.run_id)
self.toggle_blank_filler_message = True
self.check_if_user_online = self.conversation_config.get("check_if_user_online", True)

if called_fun != "transfer_call":
await self.__do_llm_generation(model_args["messages"], meta_info, next_step, should_trigger_function_call = True)

Expand Down Expand Up @@ -1523,8 +1533,12 @@ async def __process_output_loop(self):

if "is_final_chunk_of_entire_response" in message['meta_info'] and message['meta_info']['is_final_chunk_of_entire_response']:
self.started_transmitting_audio = False
logger.info("##### End of synthesizer stream and ")
self.asked_if_user_is_still_there = False
logger.info("##### End of synthesizer stream and ")

#If we're sending the message to check if user is still here, don't set asked_if_user_is_still_there to True
if message['meta_info']['text'] != self.check_user_online_message:
self.asked_if_user_is_still_there = False

num_chunks = 0
self.turn_id +=1
if not self.first_message_passed:
Expand Down Expand Up @@ -1567,21 +1581,24 @@ async def __check_for_completion(self):
logger.info(f"{time_since_last_spoken_AI_word} seconds since last spoken time stamp and hence cutting the phone call and last transmitted timestampt ws {self.last_transmitted_timesatamp} and time since last spoken human word {self.time_since_last_spoken_human_word}")
await self.__process_end_of_conversation()
break
elif time_since_last_spoken_AI_word > 6 and not self.asked_if_user_is_still_there and self.time_since_last_spoken_human_word < self.last_transmitted_timesatamp :
logger.info(f"Asking if the user is still there")
self.asked_if_user_is_still_there = True
if self.should_record:
meta_info={'io': 'default', "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'wav'}
await self._synthesize(create_ws_data_packet("Hey, are you still there?", meta_info= meta_info))
else:
meta_info={'io': self.tools["output"].get_provider(), "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'pcm'}
await self._synthesize(create_ws_data_packet("Hey, are you still there?", meta_info= meta_info))
elif time_since_last_spoken_AI_word > self.trigger_user_online_message_after and not self.asked_if_user_is_still_there and self.time_since_last_spoken_human_word < self.last_transmitted_timesatamp:
if self.check_if_user_online:
logger.info(f"Asking if the user is still there")
self.asked_if_user_is_still_there = True

if self.should_record:
meta_info={'io': 'default', "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'wav'}
await self._synthesize(create_ws_data_packet(self.check_user_online_message, meta_info= meta_info))
else:
meta_info={'io': self.tools["output"].get_provider(), "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'pcm'}
await self._synthesize(create_ws_data_packet(self.check_user_online_message, meta_info= meta_info))

#Just in case we need to clear messages sent before
await self.tools["output"].handle_interruption()
#Just in case we need to clear messages sent before
await self.tools["output"].handle_interruption()
else:
logger.info(f"Only {time_since_last_spoken_AI_word} seconds since last spoken time stamp and hence not cutting the phone call")
logger.info(f"Only {time_since_last_spoken_AI_word} seconds since last spoken time stamp and hence not cutting the phone call")


async def __check_for_backchanneling(self):
while True:
if self.callee_speaking and time.time() - self.callee_speaking_start_time > self.backchanneling_start_delay:
Expand Down
5 changes: 4 additions & 1 deletion bolna/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@

CHECKING_THE_DOCUMENTS_FILLER = "Umm, just a moment, getting details..."
PREDEFINED_FUNCTIONS = ['transfer_call']
TRANSFERING_CALL_FILLER = "Sure, I'll transfer the call for you. Please wait a moment..."
TRANSFERING_CALL_FILLER = "Sure, I'll transfer the call for you. Please wait a moment..."

DEFAULT_USER_ONLINE_MESSAGE = "Hey, are you still there?"
DEFAULT_USER_ONLINE_MESSAGE_TRIGGER_DURATION = 6
13 changes: 9 additions & 4 deletions bolna/llms/openai_llm.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
from dotenv import load_dotenv
from openai import AsyncOpenAI, OpenAI
Expand Down Expand Up @@ -107,15 +108,19 @@ async def generate_stream(self, messages, synthesize=True, request_json=False, m
filler = PRE_FUNCTION_CALL_MESSAGE if called_fun != "transfer_call" else TRANSFERING_CALL_FILLER
yield filler , True, latency, False
self.gave_out_prefunction_call_message = True

if len(buffer) > 0:
yield buffer, False, latency, False
yield buffer, True, latency, False
buffer = ''
logger.info(f"Response from LLM {resp}")

#TODO: Need to remmeber why was this put up and see if this should be removed?
if buffer != '':
yield buffer, False, latency, False
buffer = ''
if (text_chunk := chunk.choices[0].delta.function_call.arguments):
resp += text_chunk

elif text_chunk := chunk.choices[0].delta.content:
textual_response = True
answer += text_chunk
Expand All @@ -140,7 +145,7 @@ async def generate_stream(self, messages, synthesize=True, request_json=False, m
api_token = func_dict['api_token']
api_call_return = {
"url": url,
"method":method.lower(),
"method":None if method is None else method.lower(),
"param": param,
"api_token":api_token,
"model_args": model_args,
Expand Down Expand Up @@ -241,7 +246,7 @@ async def generate_assistant_stream(self, message, synthesize=True, request_json
i = [i for i in range(len(tools)) if called_fun == tools[i].function.name][0]

if not self.gave_out_prefunction_call_message and not textual_response:
filler = PRE_FUNCTION_CALL_MESSAGE if called_fun != "transfer_call" else TRANSFERING_CALL_FILLER
filler = PRE_FUNCTION_CALL_MESSAGE if called_fun != "transfer_call" else TRANSFERING_CALL_FILLER
yield filler, True, latency, False
self.gave_out_prefunction_call_message = True
if len(buffer) > 0:
Expand Down Expand Up @@ -284,7 +289,7 @@ async def generate_assistant_stream(self, message, synthesize=True, request_json
model_args['messages'] = message
api_call_return = {
"url": url,
"method":method.lower(),
"method":None if method is None else method.lower(),
"param": param,
"api_token":api_token,
"model_args": model_args,
Expand Down
4 changes: 4 additions & 0 deletions bolna/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ class ConversationConfig(BaseModel):
call_terminate: Optional[int] = 90
use_fillers: Optional[bool] = False
call_transfer_number: Optional[str] = ""
trigger_user_online_message_after:Optional[int] = 6
check_user_online_message:Optional[str] = "Hey, are you still there"
check_if_user_online:Optional[bool] = True


@validator('hangup_after_silence', pre=True, always=True)
def set_hangup_after_silence(cls, v):
Expand Down

0 comments on commit 555f401

Please sign in to comment.