diff --git a/.gitignore b/.gitignore index f6bdcb66..33986b54 100644 --- a/.gitignore +++ b/.gitignore @@ -50,8 +50,6 @@ coverage.xml .hypothesis/ .pytest_cache/ cover/ -local_setup/RAG/ -local_setup/None/ # Translations *.mo diff --git a/README.md b/README.md index d70fb3ec..95b7b411 100644 --- a/README.md +++ b/README.md @@ -140,7 +140,7 @@ Once you have the above docker setup and running, you can create agents and init ``` -1. The response of the previous API will return a uuid as the `agent_id`. Use this `agent_id` to initiate a call via the telephony server running on `8001` port (for Twilio) or `8002` port (for Plivo) at `http://localhost:8001/call` +2. The response of the previous API will return a uuid as the `agent_id`. Use this `agent_id` to initiate a call via the telephony server running on `8001` port (for Twilio) or `8002` port (for Plivo) at `http://localhost:8001/call`
Call Payload
diff --git a/bolna/agent_manager/assistant_manager.py b/bolna/agent_manager/assistant_manager.py index 05e34365..76ee8a98 100644 --- a/bolna/agent_manager/assistant_manager.py +++ b/bolna/agent_manager/assistant_manager.py @@ -59,4 +59,4 @@ async def run(self, local=False, run_id=None): logger.info(f"task_output {task_output}") if task["task_type"] == "extraction": input_parameters["extraction_details"] = task_output["extracted_data"] - logger.info("Done with execution of the agent") \ No newline at end of file + logger.info("Done with execution of the agent") diff --git a/bolna/agent_manager/task_manager.py b/bolna/agent_manager/task_manager.py index 4e1643a5..688d5440 100644 --- a/bolna/agent_manager/task_manager.py +++ b/bolna/agent_manager/task_manager.py @@ -10,7 +10,6 @@ import copy from datetime import datetime -import openai import aiohttp from bolna.constants import ACCIDENTAL_INTERRUPTION_PHRASES, FILLER_DICT, PRE_FUNCTION_CALL_MESSAGE @@ -20,19 +19,14 @@ from bolna.agent_types import * from bolna.providers import * from bolna.prompts import * -from bolna.helpers.utils import calculate_audio_duration, create_ws_data_packet, get_file_names_in_directory, get_raw_audio_bytes, get_route_info, is_valid_md5, \ - get_required_input_types, format_messages, get_prompt_responses, resample, run_in_seperate_thread, save_audio_file_to_s3, update_prompt_with_context, get_md5_hash, clean_json_string, wav_bytes_to_pcm, convert_to_request_log, yield_chunks_from_memory +from bolna.helpers.utils import calculate_audio_duration, create_ws_data_packet, get_file_names_in_directory, get_raw_audio_bytes, is_valid_md5, \ + get_required_input_types, format_messages, get_prompt_responses, resample, save_audio_file_to_s3, update_prompt_with_context, get_md5_hash, clean_json_string, wav_bytes_to_pcm, convert_to_request_log, yield_chunks_from_memory from bolna.helpers.logger_config import configure_logger from semantic_router import Route from semantic_router.layer import RouteLayer from semantic_router.encoders import FastEmbedEncoder -from concurrent.futures import ThreadPoolExecutor asyncio.get_event_loop().set_debug(True) - -# this is exp.. can we change -# asyncio.get_event_loop().set_default_executor(ThreadPoolExecutor(20)) - logger = configure_logger(__name__) @@ -53,19 +47,12 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con self.average_transcriber_latency = 0.0 self.task_config = task + logger.info(f"API TOOLS IN TOOLS CONFIG {task['tools_config'].get('api_tools')}") if task['tools_config'].get('api_tools', None) is not None: logger.info(f"API TOOLS is present {task['tools_config']['api_tools']}") self.kwargs['api_tools'] = task['tools_config']['api_tools'] - if task['tools_config']["llm_agent"]['extra_config'].get('assistant_id', None) is not None: - self.kwargs['assistant_id'] = task['tools_config']["llm_agent"]['extra_config']['assistant_id'] - logger.info(f"Assistant id for agent is {self.kwargs['assistant_id']}") if self.__has_extra_config(): - pass - #self.kwargs['assistant_id'] = task['tools_config']["llm_agent"]['extra_config']['assistant_id'] - #logger.info(f"Assistant id for agent is {self.kwargs['assistant_id']}") - - if self.__is_openai_assistant(): self.kwargs['assistant_id'] = task['tools_config']["llm_agent"]['extra_config']['assistant_id'] logger.info(f"Assistant id for agent is {self.kwargs['assistant_id']}") @@ -178,31 +165,12 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con #self.stream = not turn_based_conversation #Currently we are allowing only realtime conversation based usecases. Hence it'll always be true unless connected through dashboard self.is_local = False self.llm_config = None - self.llm_config_map = {} - self.llm_agent_map = {} - if self.__is_multiagent(): - logger.info(f"Gotta write the code for this shit as well") - for agent, config in self.task_config["tools_config"]["llm_agent"]['extra_config']['agent_map'].items(): - self.llm_config_map[agent] = config.copy() - self.llm_config_map[agent]['buffer_size'] = self.task_config["tools_config"]["synthesizer"]['buffer_size'] - if 'assistant_id' in config: - self.llm_config_map[agent]['agent_type'] = "openai_assistant" - elif 'vectorstore_id' in config: - self.llm_config_map[agent]['agent_type'] = "knowledgebase_agent" - - elif not self.__is_openai_assistant(): - logger.info(f"NOT OPEN AI ASSISTANT") - if self.task_config["tools_config"]["llm_agent"] is not None: - agent_type = self.task_config["tools_config"]["llm_agent"].get("agent_type", None) - llm_config = self.task_config["tools_config"]["llm_agent"] if not agent_type else self.task_config["tools_config"]["llm_agent"]['extra_config'] - self.llm_agent_config = llm_config.copy() - logger.info(f"SETTING FOLLOW UP TASK DETAILS") - self.llm_config = { - "model": llm_config['model'], - "max_tokens": llm_config['max_tokens'], - "provider": llm_config['provider'], - } - + if self.task_config["tools_config"]["llm_agent"] is not None: + self.llm_config = { + "model": self.task_config["tools_config"]["llm_agent"]["model"], + "max_tokens": self.task_config["tools_config"]["llm_agent"]["max_tokens"], + "provider": self.task_config["tools_config"]["llm_agent"]["provider"] + } # Output stuff self.output_task = None @@ -216,16 +184,19 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con self.curr_sequence_id = 0 self.sequence_ids = {-1} #-1 is used for data that needs to be passed and is developed by task manager like backchannleing etc. - + # setting transcriber + self.__setup_transcriber() + # setting synthesizer + self.__setup_synthesizer(self.llm_config) + # setting llm + llm = self.__setup_llm(self.llm_config) + #Setup tasks + self.__setup_tasks(llm) + #setup request logs self.request_logs = [] self.hangup_task = None - # basically take care about the filler which is sent - self.time_blank_filler_message = task["task_config"]["time_blank_filler_message"] - self.toggle_blank_filler_message = task["task_config"]["toggle_blank_filler_message"] - self.blank_filler_message = task["task_config"]["blank_filler_message"] - if task_id == 0: self.background_check_task = None @@ -240,38 +211,24 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con logger.info(f"Will transfer call to {self.call_transfer_number}") self.kwargs["process_interim_results"] = "true" if conversation_config.get("optimize_latency", False) is True else "false" logger.info(f"Processing interim results {self.kwargs['process_interim_results'] }") - - # Routes aka guardrails - - self.guardrails = task['tools_config']['llm_agent'].get("routes", (task['tools_config']['llm_agent']['extra_config'].get("guardrails", None))) + # Routes + self.routes = task['tools_config']['llm_agent'].get("routes", None) self.route_layer = None - if self.guardrails: + if self.routes: start_time = time.time() - if self.__is_multiagent(): - guardrails_meta = self.kwargs.get('routes', None) - guardrails_meta = guardrails_meta['guardrails'] - else: - guardrails_meta = self.kwargs.get('routes', None) - + routes_meta = self.kwargs.get('routes', None) if self.kwargs['routes']: - self.route_encoder = guardrails_meta["route_encoder"] - self.vector_caches = guardrails_meta["vector_caches"] - self.route_responses_dict = guardrails_meta["route_responses_dict"] - self.route_layer = guardrails_meta["route_layer"] + self.route_encoder = routes_meta["route_encoder"] + self.vector_caches = routes_meta["vector_caches"] + self.route_responses_dict = routes_meta["route_responses_dict"] + self.route_layer = routes_meta["route_layer"] logger.info(f"Time to setup routes from warrmed up cache {time.time() - start_time}") else: - # This is blocking and hence we should be setting up routes earlier and passing it - self.__setup_routes(self.guardrails) + self.__setup_routes(self.routes) logger.info(f"Time to setup routes {time.time() - start_time}") - - if self.__is_multiagent(): - guardrails_meta = self.kwargs.pop('routes', None) - self.agent_routing = guardrails_meta['agent_routing_config']['route_layer'] - self.default_agent = task['tools_config']['llm_agent']['extra_config']['default_agent'] - logger.info(f"Inisialised with default agent {self.default_agent}, agent_routing {self.agent_routing}") - # for long pauses and rushing + # for long pauses and rushing if conversation_config is not None: self.minimum_wait_duration = self.task_config["tools_config"]["transcriber"]["endpointing"] logger.info(f"minimum wait duration {self.minimum_wait_duration}") @@ -345,54 +302,12 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con logger.info("Not using fillers to decrease latency") else: self.filler_preset_directory = f"{os.getenv('FILLERS_PRESETS_DIR')}/{self.synthesizer_voice.lower()}" - # setting transcriber - self.__setup_transcriber() - # setting synthesizer - self.__setup_synthesizer(self.llm_config) - - # setting llm - if self.llm_config is not None: - logger.info(f"LLM CONFIG IS NONE {self.task_config['task_type']}") - llm = self.__setup_llm(self.llm_config) - #Setup tasks - agent_params = { - 'llm': llm, - 'agent_type': self.llm_agent_config.get("agent_type","simple_llm_agent") - } - self.__setup_tasks(**agent_params) - - elif self.__is_multiagent(): - # Setup task for multiagent conversation - for agent in self.task_config["tools_config"]["llm_agent"]['extra_config']['agent_map']: - if 'routes' in self.llm_config_map[agent]: - del self.llm_config_map[agent]['routes'] #Remove routes from here as it'll create conflict ahead - llm = self.__setup_llm(self.llm_config_map[agent]) - agent_type = self.llm_config_map[agent].get("agent_type","simple_llm_agent") - logger.info(f"Getting response for {llm} and agent type {agent_type} and {agent}") - agent_params = { - 'llm': llm, - 'agent_type': agent_type - } - if agent_type == "openai_assistant": - agent_params['assistant_config'] = self.llm_config_map[agent] - llm_agent = self.__setup_tasks(**agent_params) - self.llm_agent_map[agent] = llm_agent - elif self.__is_openai_assistant(): - # if self.task_config['tools_config']["llm_agent"].get("agent_type", None) is None: - # assistant_config = {"assistant_id": self.task_config['tools_config']["llm_agent"]['assistant_id']} - self.__setup_tasks(agent_type = "openai_assistant", assistant_config= task['tools_config']["llm_agent"]['extra_config']) - - def __is_openai_assistant(self): - if self.task_config["task_type"] == "webhook": - return False - agent_type = self.task_config['tools_config']["llm_agent"].get("agent_type", self.task_config['tools_config']["llm_agent"].get("agent_flow_type")) - return agent_type == "openai_assistant" - def __is_multiagent(self): + def __has_extra_config(self): if self.task_config["task_type"] == "webhook": return False - agent_type = self.task_config['tools_config']["llm_agent"].get("agent_type", None) - return agent_type == "multiagent" + extra_config = self.task_config['tools_config']["llm_agent"].get("extra_config", None) + return False if extra_config is None else True def __setup_routes(self, routes): embedding_model = routes.get("embedding_model", os.getenv("ROUTE_EMBEDDING_MODEL")) @@ -515,7 +430,7 @@ def __setup_transcriber(self): except Exception as e: logger.error(f"Something went wrong with starting transcriber {e}") - def __setup_synthesizer(self, llm_config = None): + def __setup_synthesizer(self, llm_config): logger.info(f"Synthesizer config: {self.task_config['tools_config']['synthesizer']}") if self._is_conversation_task(): self.kwargs["use_turbo"] = self.task_config["tools_config"]["transcriber"]["language"] == "en" @@ -534,48 +449,33 @@ def __setup_synthesizer(self, llm_config = None): self.task_config["tools_config"]["synthesizer"]["stream"] = True if self.enforce_streaming else False #Hardcode stream to be False as we don't want to get blocked by a __listen_synthesizer co-routine self.tools["synthesizer"] = synthesizer_class(**self.task_config["tools_config"]["synthesizer"], **provider_config, **self.kwargs, caching = caching) - if self.task_config["tools_config"]["llm_agent"] is not None and llm_config is not None: + if self.task_config["tools_config"]["llm_agent"] is not None: llm_config["buffer_size"] = self.task_config["tools_config"]["synthesizer"].get('buffer_size') def __setup_llm(self, llm_config): if self.task_config["tools_config"]["llm_agent"] is not None: - logger.info(f'### PROVIDER {llm_config["provider"] }') - if llm_config["provider"] in SUPPORTED_LLM_PROVIDERS.keys(): - llm_class = SUPPORTED_LLM_PROVIDERS.get(llm_config["provider"]) + logger.info(f'### PROVIDER {self.task_config["tools_config"]["llm_agent"]["provider"] }') + if self.task_config["tools_config"]["llm_agent"]["provider"] in SUPPORTED_LLM_PROVIDERS.keys(): + llm_class = SUPPORTED_LLM_PROVIDERS.get(self.task_config["tools_config"]["llm_agent"]["provider"]) logger.info(f"LLM CONFIG {llm_config}") llm = llm_class(**llm_config, **self.kwargs) return llm else: - raise Exception(f'LLM {self.llm_agent_config["provider"]} not supported') - - def __get_agent_object(self, llm, agent_type, assistant_config = None ): - if agent_type == "simple_llm_agent": - llm_agent = StreamingContextualAgent(llm) - elif agent_type == "openai_assistant": - logger.info(f"setting up backend as openai_assistants {assistant_config}") - llm_agent = OpenAIAssistantAgent(**assistant_config) - else: - raise(f"{agent_type} Agent type is not created yet") - return llm_agent - - def __setup_tasks(self, llm = None, agent_type = None, assistant_config= None): - if self.task_config["task_type"] == "conversation" and not self.__is_multiagent(): - self.tools["llm_agent"] = self.__get_agent_object(llm, agent_type, assistant_config) - if agent_type == "llama-index-rag": - logger.info("#### Setting up llama-index-rag agent ####") - extra_config = self.task_config["tools_config"]["llm_agent"].get("extra_config", {}) - vector_store_config = extra_config.get("vector_store", {}) - self.tools["llm_agent"] = LlamaIndexRag( - vector_id=vector_store_config.get("vector_id"), - temperature=extra_config.get("temperature", 0.1), - model=extra_config.get("model", "gpt-3.5-turbo-16k"), - buffer=40, - max_tokens=100, # You might want to make this configurable - provider_config=vector_store_config - ) - logger.info("Llama-index rag agent is created") - elif self.__is_multiagent(): - return self.__get_agent_object(llm, agent_type, assistant_config) + raise Exception(f'LLM {self.task_config["tools_config"]["llm_agent"]["provider"]} not supported') + + def __setup_tasks(self, llm): + if self.task_config["task_type"] == "conversation": + agent_type = self.task_config["tools_config"]["llm_agent"].get("agent_type", self.task_config["tools_config"]["llm_agent"]["agent_flow_type"]) + if agent_type == "streaming": + self.tools["llm_agent"] = StreamingContextualAgent(llm) + elif agent_type == "openai_assistant": + logger.info("setting up backend as openai_assistants") + self.tools["llm_agent"] = OpenAIAssistantAgent(llm) + elif agent_type in ("preprocessed", "formulaic"): + preprocessed = self.task_config["tools_config"]["llm_agent"]["agent_flow_type"] == "preprocessed" + logger.info(f"LLM TYPE {type(llm)}") + self.tools["llm_agent"] = GraphBasedConversationAgent(llm, context_data=self.context_data, + prompts=self.prompts, preprocessed=preprocessed) elif self.task_config["task_type"] == "extraction": logger.info("Setting up extraction agent") self.tools["llm_agent"] = ExtractionContextualAgent(llm, prompt=self.system_prompt) @@ -585,80 +485,66 @@ def __setup_tasks(self, llm = None, agent_type = None, assistant_config= None): self.tools["llm_agent"] = SummarizationContextualAgent(llm, prompt=self.system_prompt) self.summarized_data = None elif self.task_config["task_type"] == "webhook": + if "webhookURL" in self.task_config["tools_config"]["api_tools"]: webhook_url = self.task_config["tools_config"]["api_tools"]["webhookURL"] else: webhook_url = self.task_config["tools_config"]["api_tools"]["tools_params"]["webhook"]["url"] + logger.info(f"Webhook URL {webhook_url}") self.tools["webhook_agent"] = WebhookAgent(webhook_url=webhook_url) + + logger.info("prompt and config setup completed") ######################## # Helper methods ######################## - def __get_final_prompt(self, prompt, today): - enriched_prompt = prompt - if self.context_data is not None: - enriched_prompt = update_prompt_with_context(enriched_prompt, self.context_data) - notes = "### Note:\n" - if self._is_conversation_task() and self.use_fillers: - notes += f"1.{FILLER_PROMPT}\n" - return f"{enriched_prompt}\n{notes}\n{DATE_PROMPT.format(today)}" - async def load_prompt(self, assistant_name, task_id, local, **kwargs): logger.info("prompt and config setup started") - agent_type = self.task_config["tools_config"]["llm_agent"].get("agent_type", "simple_llm_agent") - if self.task_config["task_type"] == "webhook" or agent_type in ["openai_assistant", "llamaindex_rag_agent"]: + if self.task_config["task_type"] == "webhook" or self.task_config["tools_config"]["llm_agent"]["agent_flow_type"] == "openai_assistant": return self.is_local = local today = datetime.now().strftime("%A, %B %d, %Y") - prompt_responses = kwargs.get('prompt_responses', None) - if not prompt_responses: - prompt_responses = await get_prompt_responses(assistant_id=self.assistant_id, local=self.is_local) - logger.info(f"GOT prompt responses {prompt_responses}") - current_task = "task_{}".format(task_id + 1) - if self.__is_multiagent(): - logger.info(f"Getting {current_task} from prompt responses of type {type(prompt_responses)}, prompt responses key {prompt_responses.keys()}") - prompts = prompt_responses.get(current_task, None) - self.prompt_map = {} - for agent in self.task_config["tools_config"]["llm_agent"]['extra_config']['agent_map']: - prompt = prompts[agent]['system_prompt'] - prompt = self.__prefill_prompts(self.task_config, prompt, self.task_config['task_type']) - prompt = self.__get_final_prompt(prompt, today) - if agent == self.task_config["tools_config"]["llm_agent"]['extra_config']['default_agent']: - self.system_prompt = { - 'role': 'system', - 'content': prompt - } - self.prompt_map[agent] = prompt - logger.info(f"Initialised prompt dict {self.prompt_map}, Set default prompt {self.system_prompt}") + if "prompt" in self.task_config["tools_config"]["llm_agent"]: + #This will be tre when we have extraction or maybe never + self.prompts = { + "system_prompt": f'{self.task_config["tools_config"]["llm_agent"]["prompt"]} \n### Date\n Today\'s Date is {today}' + } + logger.info(f"Prompt given in llm_agent and hence storing the prompt") else: + prompt_responses = kwargs.get('prompt_responses', None) + if not prompt_responses: + prompt_responses = await get_prompt_responses(assistant_id=self.assistant_id, local=self.is_local) + current_task = "task_{}".format(task_id + 1) self.prompts = self.__prefill_prompts(self.task_config, prompt_responses.get(current_task, None), self.task_config['task_type']) - - if "system_prompt" in self.prompts: - # This isn't a graph based agent - enriched_prompt = self.prompts["system_prompt"] - logger.info("There's a system prompt") - if self.context_data is not None: - enriched_prompt = update_prompt_with_context(self.prompts["system_prompt"], self.context_data) - self.prompts["system_prompt"] = enriched_prompt - - notes = "### Note:\n" - - if self._is_conversation_task() and self.use_fillers: - notes += f"1.{FILLER_PROMPT}\n" - - self.system_prompt = { - 'role': "system", - 'content': f"{enriched_prompt}\n{notes}\n{DATE_PROMPT.format(today)}" - } - else: - self.system_prompt = { - 'role': "system", - 'content': "" - } + if self.task_config["tools_config"]["llm_agent"]['agent_flow_type'] == "preprocessed": + self.tools["llm_agent"].load_prompts_and_create_graph(self.prompts) + + if "system_prompt" in self.prompts: + # This isn't a graph based agent + enriched_prompt = self.prompts["system_prompt"] + logger.info("There's a system prompt") + if self.context_data is not None: + enriched_prompt = update_prompt_with_context(self.prompts["system_prompt"], self.context_data) + self.prompts["system_prompt"] = enriched_prompt + + notes = "### Note:\n" + + if self._is_conversation_task() and self.use_fillers: + notes += f"1.{FILLER_PROMPT}\n" + + self.system_prompt = { + 'role': "system", + 'content': f"{enriched_prompt}\n{notes}\n{DATE_PROMPT.format(today)}" + } + else: + self.system_prompt = { + 'role': "system", + 'content': "" + } if len(self.system_prompt['content']) == 0: self.history = [] if len(self.history) == 0 else self.history @@ -766,14 +652,11 @@ def _is_summarization_task(self): def _is_conversation_task(self): return self.task_config["task_type"] == "conversation" - def __is_openai_assistant_agent(self): - return self.task_config["tools_config"]["llm_agent"].get("agent_type", None) == "openai_assistant" - def _is_preprocessed_flow(self): - return "agent_flow_type" in self.task_config["tools_config"]["llm_agent"] and self.task_config["tools_config"]["llm_agent"]['agent_flow_type'] == "preprocessed" + return self.task_config["tools_config"]["llm_agent"]['agent_flow_type'] == "preprocessed" def _is_formulaic_flow(self): - return "agent_flow_type" in self.task_config["tools_config"]["llm_agent"] and self.task_config["tools_config"]["llm_agent"]['agent_flow_type'] == "formulaic" + return self.task_config["tools_config"]["llm_agent"]['agent_flow_type'] == "formulaic" def _get_next_step(self, sequence, origin): try: @@ -888,7 +771,7 @@ async def _process_conversation_preprocessed_task(self, message, sequence, meta_ messages.append({'role': 'user', 'content': message['data']}) logger.info(f"Starting LLM Agent {messages}") #Expose get current classification_response method from the agent class and use it for the response log - convert_to_request_log(message = format_messages(messages, use_system_prompt= True), meta_info= meta_info, component="llm", direction="request", model=self.llm_agent_config["model"], is_cached= True, run_id= self.run_id) + convert_to_request_log(message = format_messages(messages, use_system_prompt= True), meta_info= meta_info, component="llm", direction="request", model=self.task_config["tools_config"]["llm_agent"]["model"], is_cached= True, run_id= self.run_id) async for next_state in self.tools['llm_agent'].generate(messages, label_flow=self.label_flow): if next_state == "": meta_info["end_of_conversation"] = True @@ -930,7 +813,7 @@ async def __filler_classification_task(self, message): sequence, meta_info = self._extract_sequence_and_meta(message) next_step = self._get_next_step(sequence, "llm") start_time = time.perf_counter() - filler_class = await asyncio.to_thread(self.filler_classifier.classify(message['data'])) + filler_class = self.filler_classifier.classify(message['data']) logger.info(f"doing the classification task in {time.perf_counter() - start_time}") new_meta_info = copy.deepcopy(meta_info) self.current_filler = filler_class @@ -939,13 +822,6 @@ async def __filler_classification_task(self, message): await self._handle_llm_output(next_step, filler, should_bypass_synth, new_meta_info, is_filler = True) async def __execute_function_call(self, url, method, param, api_token, model_args, meta_info, next_step, called_fun, **resp): - self.toggle_blank_filler_message = False - - # only for testing - for i in range(10): - logger.info(f"sleep for {i} sec") - await asyncio.sleep(1) - if called_fun == "transfer_call": logger.info(f"Transfer call function called param {param}") call_sid = self.tools["input"].get_call_sid() @@ -967,7 +843,7 @@ async def __execute_function_call(self, url, method, param, api_token, model_arg response_text = await response.text() logger.info(f"Response from the server after call transfer: {response_text}") return - + response = await trigger_api(url= url, method=method.lower(), param= param, api_token= api_token, meta_info = meta_info, run_id = self.run_id, **resp) content = FUNCTION_CALL_PROMPT.format(called_fun, method, str(response)) model_args["messages"].append({"role":"system","content":content}) @@ -978,13 +854,14 @@ async def __execute_function_call(self, url, method, param, api_token, model_arg self.toggle_blank_filler_message = True if called_fun != "transfer_call": await self.__do_llm_generation(model_args["messages"], meta_info, next_step, should_trigger_function_call = True) - + + def __store_into_history(self, meta_info, messages, llm_response, should_trigger_function_call = False): if self.current_request_id in self.llm_rejected_request_ids: logger.info("##### User spoke while LLM was generating response") else: self.llm_response_generated = True - convert_to_request_log(message=llm_response, meta_info= meta_info, component="llm", direction="response", model=self.llm_agent_config["model"], run_id= self.run_id) + convert_to_request_log(message=llm_response, meta_info= meta_info, component="llm", direction="response", model=self.task_config["tools_config"]["llm_agent"]["model"], run_id= self.run_id) if should_trigger_function_call: #Now, we need to consider 2 things here #1. There was silence between function call and now @@ -1057,12 +934,12 @@ async def __do_llm_generation(self, messages, meta_info, next_step, should_bypas messages.append({"role": "assistant", "content": llm_response}) self.history = copy.deepcopy(messages) await self._handle_llm_output(next_step, llm_response, should_bypass_synth, meta_info) - convert_to_request_log(message = llm_response, meta_info= meta_info, component="llm", direction="response", model=self.tools["llm_agent"].get_model(), run_id= self.run_id) + convert_to_request_log(message = llm_response, meta_info= meta_info, component="llm", direction="response", model=self.task_config["tools_config"]["llm_agent"]["model"], run_id= self.run_id) if self.stream and llm_response != PRE_FUNCTION_CALL_MESSAGE: logger.info(f"Storing {llm_response} into history should_trigger_function_call {should_trigger_function_call}") self.__store_into_history(meta_info, messages, llm_response, should_trigger_function_call= should_trigger_function_call) - + async def _process_conversation_task(self, message, sequence, meta_info): next_step = None @@ -1073,20 +950,8 @@ async def _process_conversation_task(self, message, sequence, meta_info): next_step = self._get_next_step(sequence, "llm") meta_info['llm_start_time'] = time.time() route = None - - if self.__is_multiagent(): - tasks = [asyncio.create_task(run_in_seperate_thread(lambda: get_route_info(message['data'], self.agent_routing)))] - if self.route_layer is not None: - tasks.append(run_in_seperate_thread(lambda: get_route_info(message['data'], self.route_layer))) - tasks_op = await asyncio.gather(*tasks) - current_agent = tasks_op[0] - if self.route_layer is not None: - route = tasks_op[1] - - logger.info(f"Current agent {current_agent}") - self.tools['llm_agent'] = self.llm_agent_map[current_agent] - elif self.route_layer is not None: - route = await asyncio.to_thread(self.route_layer(message['data']).name) + if self.route_layer is not None: + route = self.route_layer(message['data']).name logger.info(f"Got route name {route}") if route is not None: @@ -1098,8 +963,8 @@ async def _process_conversation_task(self, message, sequence, meta_info): logger.info(f"Route {route} has a vector cache") relevant_utterance = self.vector_caches[route].get(message['data']) cache_response = self.route_responses_dict[route][relevant_utterance] - convert_to_request_log(message = message['data'], meta_info= meta_info, component="llm", direction="request", model=self.llm_agent_config["model"], run_id= self.run_id) - convert_to_request_log(message = message['data'], meta_info= meta_info, component="llm", direction="response", model=self.llm_agent_config["model"], is_cached= True, run_id= self.run_id) + convert_to_request_log(message = message['data'], meta_info= meta_info, component="llm", direction="request", model=self.task_config["tools_config"]["llm_agent"]["model"], run_id= self.run_id) + convert_to_request_log(message = message['data'], meta_info= meta_info, component="llm", direction="response", model=self.task_config["tools_config"]["llm_agent"]["model"], is_cached= True, run_id= self.run_id) messages = copy.deepcopy(self.history) messages += [{'role': 'user', 'content': message['data']},{'role': 'assistant', 'content': cache_response}] self.interim_history = copy.deepcopy(messages) @@ -1123,7 +988,8 @@ async def _process_conversation_task(self, message, sequence, meta_info): logger.info(f"Message {messages} history {self.history}") messages.append({'role': 'user', 'content': message['data']}) ### TODO CHECK IF THIS IS EVEN REQUIRED - convert_to_request_log(message=format_messages(messages, use_system_prompt= True), meta_info= meta_info, component="llm", direction="request", model= self.tools["llm_agent"].get_model(), run_id= self.run_id) + convert_to_request_log(message=format_messages(messages, use_system_prompt= True), meta_info= meta_info, component="llm", direction="request", model=self.task_config["tools_config"]["llm_agent"]["model"], run_id= self.run_id) + await self.__do_llm_generation(messages, meta_info, next_step, should_bypass_synth) # TODO : Write a better check for completion prompt if self.use_llm_to_determine_hangup and not self.turn_based_conversation: @@ -1133,7 +999,7 @@ async def _process_conversation_task(self, message, sequence, meta_info): {'role': 'system', 'content': self.check_for_completion_prompt}, {'role': 'user', 'content': format_messages(self.history, use_system_prompt= True)}] logger.info(f"##### Answer from the LLM {answer}") - convert_to_request_log(message=format_messages(prompt, use_system_prompt= True), meta_info= meta_info, component="llm", direction="request", model=self.llm_agent_config["model"], run_id= self.run_id) + convert_to_request_log(message=format_messages(prompt, use_system_prompt= True), meta_info= meta_info, component="llm", direction="request", model=self.task_config["tools_config"]["llm_agent"]["model"], run_id= self.run_id) convert_to_request_log(message=answer, meta_info= meta_info, component="llm", direction="response", model= self.check_for_completion_llm, run_id= self.run_id) if should_hangup: @@ -1181,6 +1047,9 @@ async def _run_llm_task(self, message): await asyncio.sleep(self.consider_next_transcript_after - time.time()) logger.info(f"Running preprocessedf task") await self._process_conversation_preprocessed_task(message, sequence, meta_info) + + elif self._is_formulaic_flow(): + await self._process_conversation_formulaic_task(message, sequence, meta_info) else: await self._process_conversation_task(message, sequence, meta_info) else: @@ -1643,13 +1512,8 @@ async def __process_output_loop(self): if "is_final_chunk_of_entire_response" in message['meta_info'] and message['meta_info']['is_final_chunk_of_entire_response']: self.started_transmitting_audio = False - logger.info("##### End of synthesizer stream and ") - - #If we're sending the message to check if user is still here, don't set asked_if_user_is_still_there to True - if message['meta_info']['text'] != self.blank_filler_message: - self.asked_if_user_is_still_there = False - - # self.asked_if_user_is_still_there = False + logger.info("##### End of synthesizer stream and ") + self.asked_if_user_is_still_there = False num_chunks = 0 self.turn_id +=1 if not self.first_message_passed: @@ -1692,17 +1556,15 @@ async def __check_for_completion(self): logger.info(f"{time_since_last_spoken_AI_word} seconds since last spoken time stamp and hence cutting the phone call and last transmitted timestampt ws {self.last_transmitted_timesatamp} and time since last spoken human word {self.time_since_last_spoken_human_word}") await self.__process_end_of_conversation() break - elif time_since_last_spoken_AI_word > self.time_blank_filler_message and not self.asked_if_user_is_still_there and self.time_since_last_spoken_human_word < self.last_transmitted_timesatamp : + elif time_since_last_spoken_AI_word > 6 and not self.asked_if_user_is_still_there and self.time_since_last_spoken_human_word < self.last_transmitted_timesatamp : logger.info(f"Asking if the user is still there") self.asked_if_user_is_still_there = True - - if self.toggle_blank_filler_message: - if self.should_record: - meta_info={'io': 'default', "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'wav'} - await self._synthesize(create_ws_data_packet(self.blank_filler_message, meta_info= meta_info)) - else: - meta_info={'io': self.tools["output"].get_provider(), "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'pcm'} - await self._synthesize(create_ws_data_packet(self.blank_filler_message, meta_info= meta_info)) + if self.should_record: + meta_info={'io': 'default', "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'wav'} + await self._synthesize(create_ws_data_packet("Hey, are you still there?", meta_info= meta_info)) + else: + meta_info={'io': self.tools["output"].get_provider(), "request_id": str(uuid.uuid4()), "cached": False, "sequence_id": -1, 'format': 'pcm'} + await self._synthesize(create_ws_data_packet("Hey, are you still there?", meta_info= meta_info)) #Just in case we need to clear messages sent before await self.tools["output"].handle_interruption() diff --git a/bolna/agent_types/__init__.py b/bolna/agent_types/__init__.py index 29e2d56b..ddf29dac 100644 --- a/bolna/agent_types/__init__.py +++ b/bolna/agent_types/__init__.py @@ -3,5 +3,4 @@ from .graph_based_conversational_agent import GraphBasedConversationAgent from .summarization_agent import SummarizationContextualAgent from .webhook_agent import WebhookAgent -from .openai_assistant import OpenAIAssistantAgent -from .llama_index_rag_agent import LlamaIndexRag \ No newline at end of file +from .openai_assistant import OpenAIAssistantAgent \ No newline at end of file diff --git a/bolna/agent_types/contextual_conversational_agent.py b/bolna/agent_types/contextual_conversational_agent.py index a40424ce..c6b12246 100644 --- a/bolna/agent_types/contextual_conversational_agent.py +++ b/bolna/agent_types/contextual_conversational_agent.py @@ -18,9 +18,6 @@ def __init__(self, llm): self.conversation_completion_llm = OpenAiLLM(model=os.getenv('CHECK_FOR_COMPLETION_LLM', llm.model)) self.history = [{'content': ""}] - def get_model(self): - return self.llm.get_model() - async def check_for_completion(self, messages, check_for_completion_prompt = CHECK_FOR_COMPLETION_PROMPT): prompt = [ {'role': 'system', 'content': check_for_completion_prompt}, diff --git a/bolna/agent_types/extraction_agent.py b/bolna/agent_types/extraction_agent.py index c5aa0460..5bedc287 100644 --- a/bolna/agent_types/extraction_agent.py +++ b/bolna/agent_types/extraction_agent.py @@ -12,9 +12,6 @@ def __init__(self, llm, prompt=None): self.is_inference_on = False self.has_intro_been_sent = False - def get_model(self): - return self.llm.get_model() - async def generate(self, history): logger.info("extracting json from the previous conversation data") json_data = await self.llm.generate(history, request_json=True) diff --git a/bolna/agent_types/llama_index_rag_agent.py b/bolna/agent_types/llama_index_rag_agent.py deleted file mode 100644 index 1ae40578..00000000 --- a/bolna/agent_types/llama_index_rag_agent.py +++ /dev/null @@ -1,234 +0,0 @@ -import os -import time -import asyncio -import logging -from typing import List, Tuple, Generator, AsyncGenerator - -from dotenv import load_dotenv, find_dotenv - -from llama_index.core import VectorStoreIndex, StorageContext -from llama_index.core.llms import ChatMessage -from llama_index.core.tools import QueryEngineTool, ToolMetadata -from llama_index.llms.openai import OpenAI -from llama_index.vector_stores.lancedb import LanceDBVectorStore -from llama_index.agent.openai import OpenAIAgent - -from .base_agent import BaseAgent -from pymongo import MongoClient -from bolna.helpers.logger_config import configure_logger -from bolna.helpers.data_ingestion_pipe import lance_db -from bolna.rag.base import DatabaseConnector - -from bolna.rag.mongodb_rag import MongoDBConfig, MongoDBConnector, RAGEngine as MongoDBRAGEngine - - -load_dotenv(find_dotenv(), override=True) -logger = configure_logger(__name__) - -class LlamaIndexRag(BaseAgent): - """ - A class that implements a RAG (Retrieval-Augmented Generation) system using LlamaIndex. - - This class sets up and manages an OpenAI-based language model, a vector store, and an agent - for performing document search and question answering tasks. - - Attributes: - vector_id (str): Identifier for the vector store. - temperature (float): Temperature setting for the language model. - model (str): The name of the OpenAI model to use. - buffer (int): Size of the token buffer for streaming responses. - max_tokens (int): Maximum number of tokens for the language model output. - OPENAI_KEY (str): OpenAI API key retrieved from environment variables. - llm (OpenAI): Instance of the OpenAI language model. - vector_store (LanceDBVectorStore): Vector store for document storage and retrieval. - vector_index (VectorStoreIndex): Index built on top of the vector store. - query_engine: Query engine for searching the vector index. - agent (OpenAIAgent): Agent that uses the query engine to answer questions. - """ - - def __init__(self, vector_id: str, temperature: float, model: str, buffer: int = 40, max_tokens: int = 100, provider_config: dict = None): - """ - Initialize the LlamaIndexRag instance. - - Args: - vector_id (str): Identifier for the vector store. - temperature (float): Temperature setting for the language model. - model (str): The name of the OpenAI model to use. - buffer (int, optional): Size of the token buffer for streaming responses. Defaults to 40. - max_tokens (int, optional): Maximum number of tokens for the language model output. Defaults to 100. - """ - super().__init__() - self.vector_id = vector_id - self.temperature = temperature - self.model = model - self.buffer = buffer - self.max_tokens = max_tokens - self.provider_config = provider_config - self.OPENAI_KEY = os.getenv('OPENAI_API_KEY') - # self.LANCE_DB = "/Users/vipul/Nova/Work/Bolna/LlamaIndex-MutiRAG/DataBase/dev" - self.LANCE_DB = os.getenv(LANCEDB_DIR) - self.provider = None - self.query_engine = None - - self._setup_llm() - self._setup_provider() - self._setup_agent() - - def _setup_llm(self): - """Set up the OpenAI language model.""" - self.llm = OpenAI( - model=self.model, - temperature=self.temperature, - api_key=self.OPENAI_KEY, - max_tokens=self.max_tokens - ) - - def _setup_provider(self): - """Based on the relevant provider config, set up the provider.""" - - provider_name = self.provider_config.get('provider') - logging.info(f"Provider Name : {provider_name}") - - if provider_name == 'mongodb': - logging.info(f"Setting up {provider_name} RAG") - config = MongoDBConfig(**self.provider_config['provider_config']) - connector = MongoDBConnector(config) - connector.connect() - connector.verify_data() - self.provider = MongoDBRAGEngine(connector) - self.provider.setup() - logger.info(f"{provider_name.capitalize()} RAG engine initialized") - # Add more providers here as elif statements - else: - logging.info(f"LanceDB RAG") - logging.info(f"URI : LanceDB {self.LANCE_DB}") - self.vector_store = LanceDBVectorStore(uri=self.LANCE_DB , table_name=self.provider_config['provider_config'].get('vector_id')) - logging.info(f"Table params : {self.provider_config['provider_config'].get('vector_id')}") - storage_context = StorageContext.from_defaults(vector_store=self.vector_store) - self.vector_index = VectorStoreIndex([], storage_context=storage_context) - self.query_engine = self.vector_index.as_query_engine() - logger.info("LanceDB vector store initialized") - - # def _setup_vector_store(self): - # """Set up the vector store and index.""" - # logger.info(f"LLAMA INDEX VALUES: {(lance_db, self.vector_id)}") - - - # #We want to incorprate the latest functionality of MONGODB in our llama-index-rag agent script : Task 1 - # #Lightweight server : Backlog : Task 2 - # #Trying out different set of dbs for making sure that bolna rag agent script (e.g llama-index-rag script) is modular and can be utilized for the various scopes - # # Proxy db : Task 3 -> We want to have a proxy db which can be utilized for any kind of db.. - - # #Extras : 1. Payload config might be different based on the dbs that we are trying right now... - # # 2. Ansh is doing with the filler improvements : line number 60 / constants.py - # # 3. - - - # # check for the provider here - # # logic is : check if the user has provided some provider config and if it is then we will - # # check for the relevant provider using the self. - - - # if self.provider_config and self.provider_config.get('provider') == 'mongodb': - # config = self.provider_config['provider_config'] - # client = MongoClient(config['connection_string']) - # self.vector_store = MongoDBAtlasVectorSearch( - # client=client, - # db_name=config['db_name'], - # collection_name=config['collection_name'], - # index_name=config['index_name'] - # ) - # logger.info("MongoDB vector store initialized") - # else: - # self.vector_store = LanceDBVectorStore(uri=lance_db, table_name=self.vector_id) - # logger.info("LanceDB vector store initialized") - - # storage_context = StorageContext.from_defaults(vector_store=self.vector_store) - # self.vector_index = VectorStoreIndex([], storage_context=storage_context) - - - - # self.query_engine = self.vector_index.as_query_engine() - - def _setup_agent(self): - """Set up the OpenAI agent with the query engine tool.""" - tools = [ - QueryEngineTool( - self.query_engine, - metadata=ToolMetadata( - name="search", - description="Search the document, pass the entire user message in the query", - ), - ) - ] - self.agent = OpenAIAgent.from_tools(tools=tools, verbose=True) - logger.info("LLAMA INDEX AGENT IS CREATED") - - @staticmethod - def convert_history(history: List[dict]) -> Tuple[ChatMessage, List[ChatMessage]]: - """ - Convert a list of message dictionaries to ChatMessage objects. - - Args: - history (List[dict]): A list of dictionaries containing message data. - - Returns: - Tuple[ChatMessage, List[ChatMessage]]: A tuple containing the latest message and the chat history. - """ - message = ChatMessage(role=history[-1]['role'], content=history[-1]['content']) - chat_history = [ChatMessage(role=msg['role'], content=msg['content']) for msg in history[:-1]] - return message, chat_history - - async def generate(self, message: List[dict], **kwargs) -> AsyncGenerator[Tuple[str, bool, float, bool], None]: - """ - Generate a response based on the input message and chat history. - - This method streams the generated response, yielding chunks of text along with metadata. - - Args: - message (List[dict]): A list of dictionaries containing the message data and chat history. - **kwargs: Additional keyword arguments (unused in this implementation). - - Yields: - Tuple[str, bool, float, bool]: A tuple containing: - - The generated text chunk. - - A boolean indicating if this is the final chunk. - - The latency of the first token. - - A boolean indicating if the response was truncated (always False in this implementation). - """ - logger.info(f"Generate function Input: {message}") - message, history = await asyncio.to_thread(self.convert_history, message) - start_time = time.time() - - if self.provider: - # Use provider-specific query method - response = self.provider.query(message.content) - yield response.response, True, time.time() - start_time, False - - else: - buffer = "" - latency = -1 - start_time = time.time() - - # llamaindex provides astream_chat, no need for to_thread as we are running this over cloud! - #token_generator = await asyncio.to_thread(self.agent.stream_chat, message.content, history) - token_generator = await self.agent.astream_chat(message.content, history) - - async for token in token_generator.async_response_gen(): - if latency < 0: - latency = time.time() - start_time - buffer += " " + token - if len(buffer.split()) >= self.buffer: - yield buffer.strip(), False, latency, False - logger.info(f"LLM BUFFER FULL BUFFER OUTPUT: {buffer}") - buffer = "" - - if buffer: - logger.info(f"LLM BUFFER FLUSH BUFFER OUTPUT: {buffer}") - yield buffer.strip(), True, latency, False - async def __del__(self): - if hasattr(self, 'provider') and hasattr(self.provider, 'disconnect'): - self.provider.disconnect() - - def get_model(self): - return "Model" \ No newline at end of file diff --git a/bolna/agent_types/openai_assistant.py b/bolna/agent_types/openai_assistant.py index 912fdd2b..18b3cc6e 100644 --- a/bolna/agent_types/openai_assistant.py +++ b/bolna/agent_types/openai_assistant.py @@ -1,11 +1,4 @@ -import json -import os -import time from dotenv import load_dotenv -from openai import AsyncOpenAI, OpenAI - -from bolna.constants import CHECKING_THE_DOCUMENTS_FILLER, PRE_FUNCTION_CALL_MESSAGE -from bolna.helpers.utils import convert_to_request_log from .base_agent import BaseAgent from bolna.helpers.logger_config import configure_logger @@ -14,175 +7,11 @@ class OpenAIAssistantAgent(BaseAgent): - def __init__(self, name, assistant_id, max_tokens = 100, temperature = 0.2, buffer_size = 100, **kwargs): + def __init__(self, llm): super().__init__() - self.name = name - self.assistant_id = assistant_id - self.custom_tools = kwargs.get("api_tools", None) - self.max_tokens = max_tokens - self.temperature = temperature - self.started_streaming = False - self.buffer_size = buffer_size - if self.custom_tools is not None: - self.trigger_function_call = True - self.api_params = self.custom_tools['tools_params'] - logger.info(f"Function dict {self.api_params}") - self.tools = self.custom_tools['tools'] - else: - self.trigger_function_call = False - - llm_key = kwargs.get('llm_key', os.getenv('OPENAI_API_KEY')) - if llm_key != "sk-": - llm_key = os.getenv('OPENAI_API_KEY') - else: - llm_key = kwargs['llm_key'] - self.async_client = AsyncOpenAI(api_key=llm_key) - api_key = llm_key - - logger.info(f"Initializing OpenAI assistant with assistant id {self.assistant_id}") - self.openai = OpenAI(api_key=api_key) - #self.thread_id = self.openai.beta.threads.create().id - self.model_args = {"max_completion_tokens": self.max_tokens, "temperature": self.temperature} - my_assistant = self.openai.beta.assistants.retrieve(self.assistant_id) - if my_assistant.tools is not None: - self.tools = [i for i in my_assistant.tools if i.type == "function"] - #logger.info(f'thread id : {self.thread_id}') - self.run_id = kwargs.get("run_id", None) - self.gave_out_prefunction_call_message = False - - def get_model(self): - return self.name - + self.llm = llm async def generate(self, message, synthesize=False, meta_info=None): - async for token in self.generate_assistant_stream(message, synthesize=synthesize, meta_info=meta_info): + async for token in self.llm.generate_assistant_stream(message, synthesize=synthesize, meta_info=meta_info): logger.info('Agent: {}'.format(token)) yield token - - async def generate_assistant_stream(self, message, synthesize=True, request_json=False, meta_info=None): - if len(message) == 0: - raise Exception("No messages provided") - - #response_format = self.get_response_format(request_json) - - answer, buffer, resp, called_fun, api_params, i = "", "", "", "", "", 0 - logger.info(f"request to open ai {message} max tokens {self.max_tokens} ") - - latency = False - start_time = time.time() - textual_response = False - - if self.trigger_function_call: - tools = self.tools - - - thread_id = self.openai.beta.threads.create(messages= message[1:-2]).id - - model_args = self.model_args - model_args["thread_id"] = thread_id - model_args["assistant_id"] = self.assistant_id - model_args["stream"] = True - #model_args["response_format"] = response_format - logger.info(f"request to open ai with thread {thread_id} & asst. id {self.assistant_id}") - - await self.async_client.beta.threads.messages.create(thread_id=model_args["thread_id"], role="user", content=message[-1]['content']) - - async for chunk in await self.async_client.beta.threads.runs.create(**model_args): - logger.info(f"chunk received : {chunk}") - if self.trigger_function_call and chunk.event == "thread.run.step.delta": - if chunk.data.delta.step_details.tool_calls[0].type == "file_search" or chunk.data.delta.step_details.tool_calls[0].type == "search_files": - yield CHECKING_THE_DOCUMENTS_FILLER, False, time.time() - start_time, False - continue - textual_response = False - if not self.started_streaming: - first_chunk_time = time.time() - latency = first_chunk_time - start_time - logger.info(f"LLM Latency: {latency:.2f} s") - self.started_streaming = True - if not self.gave_out_prefunction_call_message and not textual_response: - yield PRE_FUNCTION_CALL_MESSAGE, True, latency, False - self.gave_out_prefunction_call_message = True - if len(buffer) > 0: - yield buffer, False, latency, False - buffer = '' - yield buffer, False, latency, False - - buffer = '' - if chunk.data.delta.step_details.tool_calls[0].function.name and chunk.data.delta.step_details.tool_calls[0].function.arguments is not None: - logger.info(f"Should do a function call {chunk.data.delta.step_details.tool_calls[0].function.name}") - called_fun = str(chunk.data.delta.step_details.tool_calls[0].function.name) - i = [i for i in range(len(tools)) if called_fun == tools[i].function.name][0] - if (text_chunk := chunk.data.delta.step_details.tool_calls[0].function.arguments): - resp += text_chunk - logger.info(f"Response from LLM {resp}") - elif chunk.event == 'thread.message.delta': - if not self.started_streaming: - first_chunk_time = time.time() - latency = first_chunk_time - start_time - logger.info(f"LLM Latency: {latency:.2f} s") - self.started_streaming = True - textual_response = True - text_chunk = chunk.data.delta.content[0].text.value - answer += text_chunk - buffer += text_chunk - if len(buffer) >= self.buffer_size and synthesize: - buffer_words = buffer.split(" ") - text = ' '.join(buffer_words[:-1]) - - if not self.started_streaming: - self.started_streaming = True - yield text, False, latency, False - buffer = buffer_words[-1] - - if self.trigger_function_call and not textual_response and (all(key in resp for key in tools[i].function.parameters['properties'].keys())) and (called_fun in self.api_params): - self.gave_out_prefunction_call_message = False - logger.info(f"Function call parameters {resp}") - convert_to_request_log(resp, meta_info, self.model, "llm", direction="response", is_cached=False, run_id=self.run_id) - resp = json.loads(resp) - func_dict = self.api_params[called_fun] - logger.info(f"Payload to send {resp} func_dict {func_dict}") - url = func_dict['url'] - method = func_dict['method'] - param = func_dict['param'] - api_token = func_dict['api_token'] - api_call_return = { - "url": url, - "method":method.lower(), - "param": param, - "api_token":api_token, - "model_args": model_args, - "meta_info": meta_info, - "called_fun": called_fun, - **resp - } - - yield api_call_return, False, latency, True - - response = await self.trigger_api(url=url, method=method.lower(), param=param, api_token=api_token, **resp) - content = f"We did made a function calling for user. We hit the function : {called_fun}, we hit the url {url} and send a {method} request and it returned us the response as given below: {str(response)} \n\n . Kindly understand the above response and convey this response in a contextual form to user." - logger.info(f"Logging function call parameters ") - runs = await self.async_client.beta.threads.runs.list(thread_id=model_args["thread_id"]) - if runs.data and runs.data[0].status in ["in_progress", "queued", "requires_action"]: - await self.async_client.beta.threads.runs.cancel(thread_id=model_args["thread_id"], run_id=runs.data[0].id) - - await self.async_client.beta.threads.messages.create(thread_id=model_args["thread_id"], role="assistant", content=content) - async for chunk in await self.async_client.beta.threads.runs.create(**model_args): - logger.info(f"chunk received : {chunk}") - if chunk.event == 'thread.message.delta': - text_chunk = chunk.data.delta.content[0].text.value - answer += text_chunk - buffer += text_chunk - if len(buffer) >= self.buffer_size and synthesize: - buffer_words = buffer.split(" ") - text = ' '.join(buffer_words[:-1]) - - if not self.started_streaming: - self.started_streaming = True - yield text, False, latency, False - buffer = buffer_words[-1] - - if synthesize: # This is used only in streaming sense - yield buffer, True, latency, False - else: - yield answer, True, latency, False - self.started_streaming = False diff --git a/bolna/agent_types/summarization_agent.py b/bolna/agent_types/summarization_agent.py index 7a5e6208..7f568d4a 100644 --- a/bolna/agent_types/summarization_agent.py +++ b/bolna/agent_types/summarization_agent.py @@ -12,10 +12,6 @@ def __init__(self, llm, prompt=None): self.is_inference_on = False self.has_intro_been_sent = False - - def get_model(self): - return self.llm.get_model() - async def generate(self, history): summary = "" logger.info("extracting json from the previous conversation data") diff --git a/bolna/helpers/data_ingestion_pipe.py b/bolna/helpers/data_ingestion_pipe.py deleted file mode 100644 index a654f8e8..00000000 --- a/bolna/helpers/data_ingestion_pipe.py +++ /dev/null @@ -1,126 +0,0 @@ -from uuid import uuid4 -import os -from llama_index.embeddings.openai import OpenAIEmbedding -from llama_index.llms.openai import OpenAI -from llama_index.vector_stores.lancedb import LanceDBVectorStore -from llama_parse import LlamaParse -from llama_index.core.node_parser import ( - MarkdownElementNodeParser, - SentenceSplitter -) -from typing import Dict, Tuple -from llama_index.core import VectorStoreIndex -from llama_index.core import StorageContext -import dotenv -import asyncio -from .logger_config import configure_logger -import tempfile -import threading - -dotenv.load_dotenv() - -lance_db = "/tmp/RAG" -llama_cloud_key = os.getenv("LLAMA_CLOUD_API_KEY") -chatgpt = os.getenv("OPENAI_API_KEY") -logger = configure_logger(__name__) -ingestion_tasks = [] - -logger.info(f"{lance_db}") -async def ingestion_task(temp_file_name:str,table_name:str,chunk_size:int = 512,overlaping:int = 200): - parser = LlamaParse( - api_key=llama_cloud_key, - result_type="markdown", - ) - embed_model = OpenAIEmbedding(model="text-embedding-3-small",api_key=chatgpt) - llm = OpenAI(model="gpt-3.5-turbo", temperature=0.2,api_key=chatgpt) - logger.info(f"Emdeding model, LLM model and Llama Parser were loaded") - if LanceDBVectorStore(lance_db)._table_exists(table_name): - vector_store = LanceDBVectorStore(lance_db,table_name,mode="append") - logger.info(f"vector store is loaded") - docs = await parser.aload_data(temp_file_name) - node_parser = MarkdownElementNodeParser(num_workers=8,llm=llm) - nodes = await node_parser.aget_nodes_from_documents(docs) - nodes, objects = node_parser.get_nodes_and_objects(nodes) - nodes = await SentenceSplitter(chunk_size=chunk_size, chunk_overlap=overlaping).aget_nodes_from_documents( - nodes - ) - storage_context = StorageContext.from_defaults(vector_store=vector_store) - vector_index = VectorStoreIndex(nodes=nodes,storage_context=storage_context,embed_model=embed_model) - else: - vector_store = LanceDBVectorStore(lance_db,table_name) - logger.info(f"vector store is loaded") - docs = await parser.aload_data(temp_file_name) - node_parser = MarkdownElementNodeParser(num_workers=8,llm=llm) - nodes = await node_parser.aget_nodes_from_documents(docs) - nodes, objects = node_parser.get_nodes_and_objects(nodes) - nodes = await SentenceSplitter(chunk_size=chunk_size, chunk_overlap=overlaping).aget_nodes_from_documents( - nodes - ) - storage_context = StorageContext.from_defaults(vector_store=vector_store) - vector_index = VectorStoreIndex(nodes=nodes,storage_context=storage_context,embed_model=embed_model) - -def create_table(table_name,temp_file_name:str): - loop = asyncio.new_event_loop() - # table_name = str(uuid4()) - loop.run_until_complete(ingestion_task(temp_file_name=temp_file_name,table_name=table_name)) - -class TaskStatus: - SUCCESS = "SUCCESS" - WAIT = "WAIT" - PROCESSING = "PROCESSING" - ERROR = "ERROR" -class IngestionTask: - def __init__(self,file_name:str, chunk_size:int = 512, overlaping:int = 200) -> None: - self.chunk_size = chunk_size - self.overlaping = overlaping - self.file_name = file_name - - self._status:int = TaskStatus.WAIT - self._table_id:str = None - self._message:str = None -class IngestionPipeline: - def __init__(self,nuof_process:int=2) -> None: - self.task_queue:asyncio.queues.Queue = asyncio.queues.Queue() - # self.TaskIngestionThread = threading.Thread(target=self.__start) - self.task_keeping_status_dict:Dict[str:IngestionTask] = dict() - # self.TaskIngestionThread.start() - # ingestion_task = asyncio.get_event_loop().create_task(self.start()) - ingestion_tasks = [] - for _ in range(nuof_process): - ingestion_tasks.append(asyncio.create_task(self.start())) - # asyncio.gather(ingestion_tasks) - - async def add_task(self,task_id:str, task:IngestionTask): - self.task_keeping_status_dict[task_id] = task - await self.task_queue.put(task) - - async def check_task_status(self,task_id)->Tuple[int,str,str]: - task:IngestionTask = self.task_keeping_status_dict.get(task_id) - return task._status, task._table_id, task._message - - async def start(self): - logger.info("INGESTION PROCESS STARTED") - while True: - - task:IngestionTask = await self.task_queue.get() - - logger.info(f"got packet for processing") - task._status = TaskStatus.PROCESSING - table_id = str(uuid4()) - try: - await ingestion_task(task.file_name,table_id,task.chunk_size,task.overlaping) - task._table_id = table_id - task._message = "every thing run succesfully" - task._status = TaskStatus.SUCCESS - except Exception as e: - logger.info(f"ERROR: {e}") - task._message = "there is an error" - task._status = TaskStatus.ERROR - def _start(self): - # loop = asyncio.new_event_loop() - # try: - # loop.run_in_executor(self.start()) - # except Exception as e: - # logger.info(f"error: {e}") - asyncio.run(self.start()) - \ No newline at end of file diff --git a/bolna/helpers/utils.py b/bolna/helpers/utils.py index 25152291..2fcec132 100644 --- a/bolna/helpers/utils.py +++ b/bolna/helpers/utils.py @@ -542,13 +542,4 @@ def convert_to_request_log(message, meta_info, model, component = "transcriber", else: log['is_final'] = False #This is logged only for users to know final transcript from the transcriber log['engine'] = engine - asyncio.create_task(write_request_logs(log, run_id)) - -def get_route_info(message, route_layer): - route = route_layer(message) - logger.info(f"route gotten {route}") - return route.name - -async def run_in_seperate_thread(fun): - resp = await asyncio.to_thread(fun) - return resp \ No newline at end of file + asyncio.create_task(write_request_logs(log, run_id)) \ No newline at end of file diff --git a/bolna/ingestion_server/__init__.py b/bolna/ingestion_server/__init__.py deleted file mode 100644 index 052da0fb..00000000 --- a/bolna/ingestion_server/__init__.py +++ /dev/null @@ -1,177 +0,0 @@ -from bolna.ingestion_server.embeddings import EmbedProviders -from bolna.ingestion_server.rags import RAGProviders, BaseRAG -from bolna.ingestion_server.datachecks import RAGConfig, RAGTask, RAGTaskStatus -import asyncio -from threading import Thread -from bolna.ingestion_server.utils import configure_logger -from typing import Dict -from uuid import uuid4 -import tempfile -import os - -logger = configure_logger(__name__) - -class RAG: - """ - Retrieval-Augmented Generation (RAG) implementation. - - This class handles the ingestion and storage of documents for RAG. - """ - - def __init__(self, VectorDB: BaseRAG, Workers: int = 2) -> None: - """ - Initialize the RAG instance. - - Args: - VectorDB (BaseRAG): The vector database to use. - Workers (int, optional): Number of worker threads. Defaults to 2. - """ - self.file_process_task_queue: asyncio.Queue = asyncio.Queue() - self.file_store_task_queue: asyncio.Queue = asyncio.Queue() - - self.VectorDB: BaseRAG = VectorDB - self.Workers: int = Workers - - self.RAG_THREAD = Thread(target=self.start) - self.shutdown = False - - async def __shutdown_loop(self): - """Monitor the shutdown flag.""" - while not self.shutdown: - await asyncio.sleep(0.5) - - async def __ingestion_task(self): - """Process ingestion tasks from the queue.""" - while not self.shutdown: - task: RAGTask = await self.file_process_task_queue.get() - task._status = RAGTaskStatus.PROCESSING - try: - nodes = await self.VectorDB.generate_nodes_sentence_splitter(task.file_loc) - except Exception as e: - logger.error(f"ERROR in {e}") - task._status = RAGTaskStatus.ERROR - continue - task._nodes = nodes - await self.file_store_task_queue.put(task) - - async def __nodes_storage(self): - """Store processed nodes in the vector database.""" - while not self.shutdown: - task: RAGTask = await self.file_store_task_queue.get() - try: - index = await self.VectorDB.add_index(task._nodes) - except Exception as e: - logger.error(f"ERROR in {e}") - task._status = RAGTaskStatus.ERROR - continue - task._index = index - task._status = RAGTaskStatus.SUCESSFUL - - def start(self): - """Start the RAG processing loop.""" - loop = asyncio.new_event_loop() - ingestion_task_pool = [loop.create_task(self.__ingestion_task()) for _ in range(self.Workers)] - file_storage = loop.create_task(self.__nodes_storage()) - loop.run_until_complete(self.__shutdown_loop()) - file_storage.cancel() - for t in ingestion_task_pool: - t.cancel() - loop.close() - - -class RAGFactory: - """ - Factory class for creating and managing RAG instances. - """ - - def __init__(self) -> None: - """Initialize the RAGFactory.""" - self.RAGS: Dict[str, RAG] = dict() - - def make_rag(self, config: RAGConfig): - """ - Create a new RAG instance. - - Args: - config (RAGConfig): Configuration for the RAG instance. - - Returns: - str: Unique identifier for the created RAG instance. - """ - rag_name = f"RAG-{uuid4()}" - embedding_name = EmbedProviders[config.provider_config.embedding_name.provider](config.provider_config.embedding_name.embedding_model_name) - vector_db = RAGProviders[config.provider](embedding_name, config.provider_config) - rag = RAG(vector_db, config.provider_config.worker) - rag.RAG_THREAD.start() - self.RAGS[rag_name] = rag - return rag_name - - def stop_all(self): - """Stop all RAG instances.""" - for rag in self.RAGS.values(): - rag.shutdown = True - - def stop(self, rag_name: str): - """ - Stop a specific RAG instance. - - Args: - rag_name (str): Identifier of the RAG instance to stop. - - Raises: - ValueError: If the specified RAG instance doesn't exist. - """ - if rag_name in self.RAGS.keys(): - self.RAGS[rag_name].shutdown = True - self.RAGS.pop(rag_name) - else: - raise ValueError("No RAG with that ID exists") - - async def file_ingest(self, rag_name, file) -> RAGTask: - """ - Ingest a file into a RAG instance. - - Args: - rag_name (str): Identifier of the RAG instance. - file: File object to ingest. - - Returns: - RAGTask: Task object representing the ingestion process. - - Raises: - ValueError: If the specified RAG instance doesn't exist or if the file type is unsupported. - """ - if rag_name not in self.RAGS.keys(): - raise ValueError(f"RAG: {rag_name} does not exist") - if file.content_type not in ["application/pdf", "application/x-pdf"]: - raise ValueError("Only PDF files are supported for now") - - task_id = str(uuid4()) - temp_file = tempfile.NamedTemporaryFile() - temp_file.write(await file.read()) - prev = temp_file.name - file_name = f"/tmp/{task_id}.pdf" - os.rename(prev, file_name) - task = RAGTask(file_loc=file_name) - await self.RAGS[rag_name].file_process_task_queue.put(task) - - while task._status in [RAGTaskStatus.WAIT, RAGTaskStatus.PROCESSING]: - await asyncio.sleep(0.4) - - os.rename(file_name, prev) - return task - - async def retrieve_query(self, rag_name: str, index: str, query: str): - """ - Retrieve documents based on a query. - - Args: - rag_name (str): Identifier of the RAG instance. - index (str): Index to search in. - query (str): Query string. - - Returns: - List of relevant documents. - """ - rag = self.RAGS[rag_name] - return await rag.VectorDB.get_docs_index(query=query, index=index) \ No newline at end of file diff --git a/bolna/ingestion_server/datachecks/__init__.py b/bolna/ingestion_server/datachecks/__init__.py deleted file mode 100644 index 7b0b27fe..00000000 --- a/bolna/ingestion_server/datachecks/__init__.py +++ /dev/null @@ -1,49 +0,0 @@ -from pydantic import BaseModel -from typing import Union, Optional - -# Embedding config -class embeddings(BaseModel): - provider:str - embedding_model_name:Optional[str] = "" - -# DB Configs -class LanceDBConfig(BaseModel): - loc: Optional[str] = "" - -class MongoDBConfig(BaseModel): - uri: Optional[str] = "" - db: Optional[str] = "" - collection_name: Optional[str] = "" - -# Provider Configs -class ProviderConfig(BaseModel): - embedding_name:embeddings - chunk_size:int - overlapping:int - worker:int - similarity_top_k:int - rag: Union[LanceDBConfig,MongoDBConfig] - -# Rag Config -class RAGConfig(BaseModel): - provider:str - provider_config:ProviderConfig - -class Query(BaseModel): - provider:str - index:str - query:str - -# Utility checks for RAG -class RAGTaskStatus: - WAIT = "WAITING" - PROCESSING = "PROCESSING" - ERROR = "ERROR" - SUCESSFUL = "SUCESSFUL" - -class RAGTask(BaseModel): - file_loc:str - _status:str = RAGTaskStatus.WAIT - _message:str = "" - _index:str = "" - _nodes:list = [] diff --git a/bolna/ingestion_server/embeddings/__init__.py b/bolna/ingestion_server/embeddings/__init__.py deleted file mode 100644 index 8fcce722..00000000 --- a/bolna/ingestion_server/embeddings/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Dict - -from bolna.ingestion_server.embeddings.openai_emded import OpenAI -from bolna.ingestion_server.embeddings.base import BaseEmbed - -# EmbedProviders dictionary with the key being the name of the provider and the value being the class of the provider -EmbedProviders : Dict[str, BaseEmbed] = { - "OpenAI": OpenAI -} \ No newline at end of file diff --git a/bolna/ingestion_server/embeddings/base.py b/bolna/ingestion_server/embeddings/base.py deleted file mode 100644 index 37b4dd94..00000000 --- a/bolna/ingestion_server/embeddings/base.py +++ /dev/null @@ -1,10 +0,0 @@ -from abc import ABC, abstractmethod - -class BaseEmbed(ABC): - def __init__(self, name: str) -> None: - self.name = name - - @abstractmethod - def get_embedding(self): - """Method to retrieve the embedding representation.""" - raise NotImplementedError("Subclasses must implement this method.") \ No newline at end of file diff --git a/bolna/ingestion_server/embeddings/openai_emded.py b/bolna/ingestion_server/embeddings/openai_emded.py deleted file mode 100644 index cc0bcdc9..00000000 --- a/bolna/ingestion_server/embeddings/openai_emded.py +++ /dev/null @@ -1,31 +0,0 @@ -from bolna.ingestion_server.embeddings.base import BaseEmbed -from llama_index.embeddings.openai import OpenAIEmbedding -import dotenv -import os - -dotenv.load_dotenv() - -class OpenAI(BaseEmbed): - def __init__(self, model: str) -> None: - """Initialize the OpenAI embedding provider. - - Args: - model (str): The model name to be used for embeddings. - - Raises: - ValueError: If the OpenAI API key is not found in the environment variables. - """ - super().__init__("OpenAI") - self.model = model - api_key = os.getenv("OPENAI_API_KEY") - if api_key is None: - raise ValueError("OPENAI KEY IS NOT FOUND") - self.embedding_instance = OpenAIEmbedding(model=model, api_key=api_key) - - def get_embedding(self): - """Retrieve the embedding instance. - - Returns: - OpenAIEmbedding: The instance of the OpenAI embedding. - """ - return self.embedding_instance \ No newline at end of file diff --git a/bolna/ingestion_server/main.py b/bolna/ingestion_server/main.py deleted file mode 100644 index f37ec5ba..00000000 --- a/bolna/ingestion_server/main.py +++ /dev/null @@ -1,73 +0,0 @@ -from fastapi import FastAPI, UploadFile -from bolna.ingestion_server.datachecks import RAGConfig -from bolna.ingestion_server import RAGFactory -import uvicorn -import time -from typing import Dict -import dotenv - -dotenv.load_dotenv() -DB: Dict[str, RAGConfig] = {} - -rag_factory = RAGFactory() -app = FastAPI() - - -@app.get("/") -def heartbeat() -> float: - """Health check endpoint that returns the current server time.""" - return time.time() - - -@app.post("/create-rag") -def create_rag(request: RAGConfig) -> Dict[str, str]: - """Create a RAG configuration and return its ID. - - Args: - request (RAGConfig): The RAG configuration to create. - - Returns: - Dict[str, str]: A dictionary containing the created RAG ID. - """ - print(request) - rag_id = rag_factory.make_rag(request) - return {"rag_id": rag_id} - - -@app.post("/rag-upload-file/{rag_id}") -async def rag_upload_file(file: UploadFile, rag_id: str) -> Dict[str, str]: - """Upload a file for a specific RAG ID. - - Args: - file (UploadFile): The file to upload. - rag_id (str): The ID of the RAG to associate with the file. - - Returns: - Dict[str, str]: A dictionary containing the upload status and index. - """ - try: - task = await rag_factory.file_ingest(rag_name=rag_id, file=file) - return {"index": task._index, "status": task._status, "message": "DONE"} - except Exception as e: - return {"index": None, "status": "ERROR", "message": f"{e}"} - - -@app.get("/rag-retrive/{rag_id}/{index}") -async def rag_retrive(query: str, rag_id: str, index: str) -> list: - """Retrieve documents based on a query for a specific RAG ID and index. - - Args: - query (str): The query string to search for. - rag_id (str): The ID of the RAG to search in. - index (str): The index to search in. - - Returns: - list: A list of documents matching the query. - """ - docs = await rag_factory.retrive_query(rag_name=rag_id, index=index, query=query) - send_filter = [{"text": node.text, "score": node.score} for node in docs] - return send_filter - - -if __name__ == "__main__": - uvicorn.run("main:app", port=8000, host="0.0.0.0", reload=True) \ No newline at end of file diff --git a/bolna/ingestion_server/rags/__init__.py b/bolna/ingestion_server/rags/__init__.py deleted file mode 100644 index 3cd6f199..00000000 --- a/bolna/ingestion_server/rags/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from bolna.ingestion_server.rags.lancedb_rag import LanceDB -from bolna.ingestion_server.rags.mongoDB_rag import MongoDB -from bolna.ingestion_server.rags.base import BaseRAG - -from typing import Dict - -RAGProviders : Dict[str, BaseRAG] = { - "LanceDB": LanceDB, - "MongoDB": MongoDB -} \ No newline at end of file diff --git a/bolna/ingestion_server/rags/base.py b/bolna/ingestion_server/rags/base.py deleted file mode 100644 index 4b9c8cba..00000000 --- a/bolna/ingestion_server/rags/base.py +++ /dev/null @@ -1,156 +0,0 @@ -import os -import dotenv -from uuid import uuid4 - -from llama_parse import LlamaParse -from llama_index.core.node_parser import ( - MarkdownElementNodeParser, - SentenceSplitter, - TextSplitter -) -from llama_index.llms.openai import OpenAI -from llama_index.core import Settings - -from bolna.ingestion_server.embeddings import BaseEmbed -from bolna.ingestion_server.utils import configure_logger - - -dotenv.load_dotenv() - -logger = configure_logger(__name__) - -class BaseRAG: - """ - Base class for Retrieval-Augmented Generation (RAG) systems. - - Attributes: - provider (str): The provider for the RAG system. - base_embed (BaseEmbed): The embedding model used. - embeding_model: The actual embedding model instance. - chunk_size (int): Size of the chunks for splitting documents. - overlapping (int): Overlap size for chunking. - LLAMA_CLOUD (str): API key for Llama Cloud. - parse (LlamaParse): Instance of LlamaParse for data parsing. - OPENAI_API_KEY (str): API key for OpenAI. - llm (OpenAI): Instance of OpenAI model. - """ - - def __init__(self, provider: str, embedding_name: BaseEmbed, chunk_size: int, overlapping: int) -> None: - """ - Initializes the BaseRAG instance with the specified parameters. - - Args: - provider (str): The provider for the RAG system. - embedding_name (BaseEmbed): The embedding model used. - chunk_size (int): Size of the chunks for splitting documents. - overlapping (int): Overlap size for chunking. - - Raises: - ValueError: If required environment variables are not set. - """ - self.provider = provider - self.base_embed: BaseEmbed = embedding_name - self.embeding_model = self.base_embed.get_embedding() - self.chunk_size = chunk_size - self.overlapping = overlapping - self.LLAMA_CLOUD = os.getenv("LLAMA_CLOUD_API_KEY") - - if self.LLAMA_CLOUD is None: - raise ValueError("LLAMA_CLOUD_API_KEY is not set in .env") - - self.parse = LlamaParse(api_key=self.LLAMA_CLOUD, result_type="markdown") - - self.OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") - - if self.OPENAI_API_KEY is None: - raise ValueError("OPENAI_API_KEY is not set in .env") - - self.llm = OpenAI(model="gpt-3.5-turbo", temperature=0.2, api_key=self.OPENAI_API_KEY) - - Settings.embed_model = self.embeding_model - Settings.llm = self.llm - - def generate_index_name(self) -> str: - """Generates a unique index name using UUID. - - Returns: - str: A unique index name. - """ - return str(uuid4()) - - async def generate_nodes_sentence_splitter(self, file_loc: str): - """Generates nodes using a sentence splitter. - - Args: - file_loc (str): The file location to load data from. - - Returns: - nodes: The generated nodes after processing. - """ - docs = await self.parse.aload_data(file_path=file_loc) - node_parser = MarkdownElementNodeParser(num_workers=8, llm=self.llm) - nodes = await node_parser.aget_nodes_from_documents(docs) - nodes, _ = node_parser.get_nodes_and_objects(nodes) - nodes = await SentenceSplitter(chunk_size=self.chunk_size, chunk_overlap=self.overlapping).aget_nodes_from_documents(nodes) - return nodes - - async def generate_nodes_text_splitter(self, file_loc: str): - """Generates nodes using a text splitter. - - Args: - file_loc (str): The file location to load data from. - - Returns: - nodes: The generated nodes after processing. - """ - docs = await self.parse.aload_data(file_path=file_loc) - node_parser = MarkdownElementNodeParser(num_workers=8, llm=self.llm) - nodes = await node_parser.aget_nodes_from_documents(docs) - nodes, _ = node_parser.get_nodes_and_objects(nodes) - nodes = await TextSplitter(chunk_size=self.chunk_size, chunk_overlap=self.overlapping).aget_nodes_from_documents(nodes) - return nodes - - async def append_index(self, nodes) -> str: - """Appends nodes to the existing index. - - Args: - nodes: The nodes to append. - - Raises: - NotImplementedError: This method should be implemented in subclasses. - """ - raise NotImplementedError - - async def add_index(self, nodes) -> str: - """Adds nodes to the index. - - Args: - nodes: The nodes to add. - - Raises: - NotImplementedError: This method should be implemented in subclasses. - """ - raise NotImplementedError - - async def delete_index(self, index: str) -> bool: - """Deletes an index. - - Args: - index (str): The index to delete. - - Raises: - NotImplementedError: This method should be implemented in subclasses. - """ - raise NotImplementedError - - async def get_docs_index(self, query: str, index: str): - """Retrieves documents from the index based on a query. - - Args: - query (str): The query to search for. - index (str): The index to search in. - - Raises: - NotImplementedError: This method should be implemented in subclasses. - """ - raise NotImplementedError \ No newline at end of file diff --git a/bolna/ingestion_server/rags/lancedb_rag.py b/bolna/ingestion_server/rags/lancedb_rag.py deleted file mode 100644 index 42e5cdef..00000000 --- a/bolna/ingestion_server/rags/lancedb_rag.py +++ /dev/null @@ -1,90 +0,0 @@ -from typing import Any, Coroutine -from bolna.ingestion_server.embeddings.base import BaseEmbed -from bolna.ingestion_server.rags.base import BaseRAG -from llama_index.vector_stores.lancedb import LanceDBVectorStore -from llama_index.core import VectorStoreIndex, StorageContext -from bolna.ingestion_server.datachecks import ProviderConfig, LanceDBConfig -from llama_index.core.retrievers import VectorIndexRetriever - -class LanceDB(BaseRAG): - """ - LanceDB class for managing vector storage and retrieval using LanceDB. - - Attributes: - similarity_top_k (int): Number of top similar items to retrieve. - config (LanceDBConfig): Configuration for LanceDB. - loc (str): Location for the vector database. - path (str): Path to the vector database data. - """ - - def __init__(self, embedding_name: BaseEmbed, config: ProviderConfig) -> None: - """ - Initializes the LanceDB instance. - - Args: - embedding_name (BaseEmbed): The embedding model used. - config (ProviderConfig): Configuration for the provider. - """ - super().__init__("LanceDB", embedding_name, config.chunk_size, config.overlapping) - self.similarity_top_k = config.similarity_top_k - self.config: LanceDBConfig = config.rag - self.loc = self.config.loc - self.path = f"vectordb_data/{self.loc}" - - async def append_index(self, nodes) -> Coroutine[Any, Any, str]: - """Appends nodes to the existing index. - - Args: - nodes: The nodes to append. - - Returns: - Coroutine: A coroutine that returns None. - """ - return None - - async def add_index(self, nodes) -> str: - """Adds nodes to the index and creates a new table. - - Args: - nodes: The nodes to add. - - Returns: - str: The name of the created table. - """ - table_name = self.generate_index_name() - # TODO: add reranking in the DB - vector_store = LanceDBVectorStore(self.path, table_name=table_name) - storage_context = StorageContext.from_defaults(vector_store=vector_store) - vector_index = VectorStoreIndex(nodes=nodes, storage_context=storage_context, embed_model=self.embeding_model) - return table_name - - async def delete_index(self, index: str) -> bool: - """Deletes an index. - - Args: - index (str): The index to delete. - - Returns: - bool: Result of the deletion operation. - """ - return await super().delete_index(index) - - async def get_docs_index(self, query: str, index: str): - """Retrieves documents from the index based on a query. - - Args: - query (str): The query to search for. - index (str): The index to search in. - - Returns: - Retrieved documents based on the query. - """ - vector_store = LanceDBVectorStore(uri=self.path, table_name=index) - storage_context = StorageContext.from_defaults(vector_store=vector_store) - vector_index = VectorStoreIndex(nodes=[], storage_context=storage_context) - query_engine = VectorIndexRetriever(vector_index, similarity_top_k=self.similarity_top_k) - return query_engine.retrieve(query) - - # query_engine = vector_index.as_query_engine(llm=self.llm) - - diff --git a/bolna/ingestion_server/rags/mongoDB_rag.py b/bolna/ingestion_server/rags/mongoDB_rag.py deleted file mode 100644 index 713f53ef..00000000 --- a/bolna/ingestion_server/rags/mongoDB_rag.py +++ /dev/null @@ -1,96 +0,0 @@ -from typing import Any, Coroutine -from pymongo import MongoClient -from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch - -from bolna.ingestion_server.embeddings.base import BaseEmbed -from bolna.ingestion_server.rags.base import BaseRAG -from bolna.ingestion_server.datachecks import ProviderConfig, MongoDBConfig - -from llama_index.core import VectorStoreIndex, StorageContext -from llama_index.core.retrievers import VectorIndexRetriever - -class MongoDB(BaseRAG): - """ - MongoDB class for managing vector storage and retrieval using MongoDB. - - Attributes: - similarity_top_k (int): Number of top similar items to retrieve. - config (MongoDBConfig): Configuration for MongoDB. - client (MongoClient): MongoDB client instance. - """ - - def __init__(self, embedding_name: BaseEmbed, config: ProviderConfig) -> None: - """ - Initializes the MongoDB instance. - - Args: - embedding_name (BaseEmbed): The embedding model used. - config (ProviderConfig): Configuration for the provider. - """ - super().__init__("MongoDB", embedding_name, config.chunk_size, config.overlapping) - self.similarity_top_k = config.similarity_top_k - self.config: MongoDBConfig = config.rag - self.client = MongoClient(self.config.uri) - - async def append_index(self, nodes) -> Coroutine[Any, Any, str]: - """Appends nodes to the existing index. - - Args: - nodes: The nodes to append. - - Returns: - Coroutine: A coroutine that calls the base class method. - """ - return await super().append_index(nodes) - - async def get_docs_index(self, query: str, index: str): - """Retrieves documents from the index based on a query. - - Args: - query (str): The query to search for. - index (str): The index to search in. - - Returns: - Retrieved documents based on the query. - """ - vector_store = MongoDBAtlasVectorSearch( - self.client, - db_name=self.config.db, - collection_name=self.config.collection_name, - vector_index_name=index - ) - vector_store_context = StorageContext.from_defaults(vector_store=vector_store) - vector_store_index = VectorStoreIndex(nodes=[], storage_context=vector_store_context) - vector_store_retriever = VectorIndexRetriever(index=vector_store_index, similarity_top_k=self.similarity_top_k) - return vector_store_retriever.retrieve(query) - - async def delete_index(self, index: str) -> Coroutine[Any, Any, bool]: - """Deletes an index. - - Args: - index (str): The index to delete. - - Returns: - Coroutine: A coroutine that calls the base class method. - """ - return await super().delete_index(index) - - async def add_index(self, nodes) -> str: - """Adds nodes to the index and creates a new index. - - Args: - nodes: The nodes to add. - - Returns: - str: The name of the created index. - """ - index = self.generate_index_name() - vector_store = MongoDBAtlasVectorSearch( - self.client, - db_name=self.config.db, - collection_name=self.config.collection_name, - index_name=index - ) - vector_store_context = StorageContext.from_defaults(vector_store=vector_store) - vector_store_index = VectorStoreIndex(nodes=nodes, storage_context=vector_store_context, embed_model=self.embeding_model) - return index \ No newline at end of file diff --git a/bolna/ingestion_server/utils/__init__.py b/bolna/ingestion_server/utils/__init__.py deleted file mode 100644 index 6f2c0c32..00000000 --- a/bolna/ingestion_server/utils/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from bolna.ingestion_server.utils.log import configure_logger \ No newline at end of file diff --git a/bolna/ingestion_server/utils/log.py b/bolna/ingestion_server/utils/log.py deleted file mode 100644 index acc08150..00000000 --- a/bolna/ingestion_server/utils/log.py +++ /dev/null @@ -1,31 +0,0 @@ -import logging - -VALID_LOGGING_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] - -def configure_logger(file_name, enabled=True, logging_level='INFO'): - """ - Configures a logger for the specified file. - - Parameters: - - file_name (str): The name of the file for which the logger is being configured. - - enabled (bool): Flag to enable or disable the logger. Default is True. - - logging_level (str): The logging level to set. Must be one of the valid levels. Default is 'INFO'. - - Returns: - - logger (logging.Logger): Configured logger instance. - """ - if logging_level not in VALID_LOGGING_LEVELS: - logging_level = "INFO" - - logging.basicConfig( - level=logging_level, - format="%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - - logger = logging.getLogger(file_name) - - if not enabled: - logger.disabled = True - - return logger \ No newline at end of file diff --git a/bolna/llms/litellm.py b/bolna/llms/litellm.py index 14f0fbbe..a0689865 100644 --- a/bolna/llms/litellm.py +++ b/bolna/llms/litellm.py @@ -35,8 +35,6 @@ def __init__(self, model, max_tokens=30, buffer_size=40, if "api_version" in kwargs: self.model_args["api_version"] = kwargs["api_version"] - def get_model(self): - return self.model async def generate_stream(self, messages, synthesize=True, meta_info = None): answer, buffer = "", "" model_args = self.model_args.copy() diff --git a/bolna/llms/openai_llm.py b/bolna/llms/openai_llm.py index 816f5f1a..3507836f 100644 --- a/bolna/llms/openai_llm.py +++ b/bolna/llms/openai_llm.py @@ -52,7 +52,6 @@ def __init__(self, max_tokens=100, buffer_size=40, model="gpt-3.5-turbo-16k", te llm_key = kwargs['llm_key'] self.async_client = AsyncOpenAI(api_key=llm_key) api_key = llm_key - self.assistant_id = kwargs.get("assistant_id", None) if self.assistant_id: logger.info(f"Initializing OpenAI assistant with assistant id {self.assistant_id}") @@ -66,8 +65,7 @@ def __init__(self, max_tokens=100, buffer_size=40, model="gpt-3.5-turbo-16k", te self.run_id = kwargs.get("run_id", None) self.gave_out_prefunction_call_message = False - def get_model(self): - return self.model + async def generate_stream(self, messages, synthesize=True, request_json=False, meta_info = None): if len(messages) == 0: raise Exception("No messages provided") @@ -140,7 +138,6 @@ async def generate_stream(self, messages, synthesize=True, request_json=False, m method = func_dict['method'] param = func_dict['param'] api_token = func_dict['api_token'] - api_call_return = { "url": url, "method":method.lower(), @@ -204,7 +201,6 @@ async def generate_assistant_stream(self, message, synthesize=True, request_json answer, buffer, resp, called_fun, api_params, i = "", "", "", "", "", 0 logger.info(f"request to open ai {message} max tokens {self.max_tokens} ") - latency = False start_time = time.time() textual_response = False @@ -252,7 +248,6 @@ async def generate_assistant_stream(self, message, synthesize=True, request_json yield buffer, False, latency, False buffer = '' yield buffer, False, latency, False - buffer = '' if (text_chunk := chunk.data.delta.step_details.tool_calls[0].function.arguments): diff --git a/bolna/models.py b/bolna/models.py index 186f8b1b..b31fd03a 100644 --- a/bolna/models.py +++ b/bolna/models.py @@ -2,8 +2,6 @@ from typing import Optional, List, Union, Dict from pydantic import BaseModel, Field, validator, ValidationError, Json from pydantic_core import PydanticCustomError - -from bolna.agent_types.base_agent import BaseAgent from .providers import * AGENT_WELCOME_MESSAGE = "This call is being recorded for quality assurance and training. Please speak now." @@ -97,7 +95,7 @@ def validate_language(cls, value): class Synthesizer(BaseModel): provider: str - provider_config: Union[PollyConfig, XTTSConfig, ElevenLabsConfig, OpenAIConfig, FourieConfig, MeloConfig, StylettsConfig, DeepgramConfig] = Field(union_mode='smart') + provider_config: Union[PollyConfig, XTTSConfig, ElevenLabsConfig, OpenAIConfig, FourieConfig, MeloConfig, StylettsConfig, DeepgramConfig, AzureConfig] = Field(union_mode='smart') stream: bool = False buffer_size: Optional[int] = 40 # 40 characters in a buffer audio_format: Optional[str] = "pcm" @@ -105,12 +103,12 @@ class Synthesizer(BaseModel): @validator("provider") def validate_model(cls, value): - return validate_attribute(value, ["polly", "xtts", "elevenlabs", "openai", "deepgram", "melotts", "styletts"]) + return validate_attribute(value, ["polly", "xtts", "elevenlabs", "openai", "deepgram", "melotts", "styletts", "azuretts"]) class IOModel(BaseModel): provider: str - format: Optional[str] = "wav" + format: str @validator("provider") def validate_provider(cls, value): @@ -129,39 +127,16 @@ class Route(BaseModel): # Routes can be used for FAQs caching, prompt routing, guard rails, agent assist function calling class Routes(BaseModel): embedding_model: Optional[str] = "Snowflake/snowflake-arctic-embed-l" - routes: Optional[List[Route]] = [] + routes: List[Route] class OpenaiAssistants(BaseModel): name: Optional[str] = None assistant_id: str = None - max_tokens: Optional[int] =100 - temperature: Optional[float] = 0.2 - buffer_size: Optional[int] = 100 - provider: Optional[str] = "openai" - model: Optional[str] = "gpt-3.5-turbo" - -class MongoDBProviderConfig(BaseModel): - connection_string: Optional[str] = None - db_name: Optional[str] = None - collection_name: Optional[str] = None - index_name: Optional[str] = None - llm_model: Optional[str] = None - embedding_model: Optional[str] = None - embedding_dimensions: Optional[str] = None - -class LanceDBProviderConfig(BaseModel): - vector_id: str - -class VectorStore(BaseModel): - provider: str - provider_config: Union[LanceDBProviderConfig, MongoDBProviderConfig] - -class ExtraConfig(BaseModel): - vector_store : VectorStore class LLM(BaseModel): model: Optional[str] = "gpt-3.5-turbo" max_tokens: Optional[int] = 100 + agent_flow_type: Optional[str] = "streaming" family: Optional[str] = "openai" temperature: Optional[float] = 0.1 request_json: Optional[bool] = False @@ -173,54 +148,18 @@ class LLM(BaseModel): presence_penalty: Optional[float] = 0.0 provider: Optional[str] = "openai" base_url: Optional[str] = None - - -class SIMPLE_LLM_AGENT(LLM): - agent_flow_type: Optional[str] = "streaming" #It is used for backwards compatibility - routes: Optional[Routes] = None + routes: Optional[Routes] = None extraction_details: Optional[str] = None summarization_details: Optional[str] = None - -class Node(BaseModel): - id: str - type: str #Can be router or conversation for now - llm: LLM - exit_criteria: str - exit_response: Optional[str] = None - exit_prompt: Optional[str] = None - is_root: Optional[bool] = False - -class Edge(BaseModel): - start_node: str # Node ID - end_node: str - condition: Optional[tuple] = None #extracted value from previous step and it's value - -class LLM_AGENT_GRAPH(BaseModel): - nodes: List[Node] - edges: List[Edge] - -class AGENT_ROUTE_CONFIG(BaseModel): - utterances: List[str] - threshold: Optional[float] = 0.85 - -class MultiAgent(BaseModel): - agent_map: Dict[str, Union[LLM, OpenaiAssistants]] - agent_routing_config: Dict[str, AGENT_ROUTE_CONFIG] - default_agent: str - embedding_model: Optional[str] = "Snowflake/snowflake-arctic-embed-l" - -class LLM_AGENT(BaseModel): - agent_flow_type: str - agent_type: str #can be llamaindex_rag, simple_llm_agent, router_agent, dag_agent, openai_assistant, custom, etc - #extra_config: Union[OpenaiAssistants, LLM_AGENT_GRAPH, MultiAgent, LLM, SIMPLE_LLM_AGENT] - guardrails: Optional[Routes] = None #Just to reduce confusion - extra_config: Union[OpenaiAssistants, LLM_AGENT_GRAPH, MultiAgent, LLM] + backend: Optional[str] = "bolna" + extra_config: Optional[OpenaiAssistants] = None class MessagingModel(BaseModel): provider: str template: str + # Need to redefine it class CalendarModel(BaseModel): provider: str @@ -245,7 +184,7 @@ class ToolModel(BaseModel): tools_params: Dict[str, APIParams] class ToolsConfig(BaseModel): - llm_agent: Optional[Union[LLM_AGENT, SIMPLE_LLM_AGENT]] = None + llm_agent: Optional[LLM] = None synthesizer: Optional[Synthesizer] = None transcriber: Optional[Transcriber] = None input: Optional[IOModel] = None @@ -275,11 +214,6 @@ class ConversationConfig(BaseModel): use_fillers: Optional[bool] = False call_transfer_number: Optional[str] = "" - - time_blank_filler_message:Optional[int] = 6 - blank_filler_message:Optional[str] = "Hey, are you still there" - toggle_blank_filler_message:Optional[bool] = True - @validator('hangup_after_silence', pre=True, always=True) def set_hangup_after_silence(cls, v): return v if v is not None else 10 # Set default value if None is passed @@ -295,4 +229,4 @@ class AgentModel(BaseModel): agent_name: str agent_type: str = "other" tasks: List[Task] - agent_welcome_message: Optional[str] = AGENT_WELCOME_MESSAGE \ No newline at end of file + agent_welcome_message: Optional[str] = AGENT_WELCOME_MESSAGE diff --git a/bolna/rag/__init__.py b/bolna/rag/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/bolna/rag/base.py b/bolna/rag/base.py deleted file mode 100644 index 0ce0c51d..00000000 --- a/bolna/rag/base.py +++ /dev/null @@ -1,59 +0,0 @@ -from abc import ABC, abstractmethod -from llama_index.core import VectorStoreIndex -from llama_index.core.settings import Settings -from llama_index.llms.openai import OpenAI -from llama_index.embeddings.openai import OpenAIEmbedding - -class DatabaseConnector(ABC): - def __init__(self, db_name: str): - self.db_name = db_name - - @abstractmethod - def connect(self): - pass - - @abstractmethod - def disconnect(self): - pass - - @abstractmethod - def verify_data(self): - pass - - @abstractmethod - def setup_vector_store(self): - pass - - -class VectorSearchEngine: - def __init__(self, db_connector): - self.db_connector = db_connector - self.index = None - - def setup_llama_index(self): - embed_model = OpenAIEmbedding(model="text-embedding-3-small", dimensions=256) - llm = OpenAI() - Settings.llm = llm - Settings.embed_model = embed_model - - def create_index(self): - vector_store = self.db_connector.setup_vector_store() - self.index = VectorStoreIndex.from_vector_store(vector_store) - - def query(self, query_text, similarity_top_k=5): - if not self.index: - raise ValueError("Index not created. Call create_index() first.") - query_engine = self.index.as_query_engine(similarity_top_k=similarity_top_k) - return query_engine.query(query_text) - -# def run_queries(search_engine, queries): -# for query in queries: -# response = search_engine.query(query) -# print(f"Query: {query}") -# print(f"Response: {response}") -# print("Source nodes:") -# for node in response.source_nodes: -# print(f"Score: {node.score}") -# print(f"Content: {node.node.get_content()[:200]}...") -# print(f"Metadata: {node.node.metadata}") -# print() \ No newline at end of file diff --git a/bolna/rag/main.py b/bolna/rag/main.py deleted file mode 100644 index ecf230d2..00000000 --- a/bolna/rag/main.py +++ /dev/null @@ -1,155 +0,0 @@ -import os -import logging -import uuid -from typing import List, Optional, Dict -from fastapi import FastAPI, HTTPException, File, UploadFile -from pydantic import BaseModel -from llama_index.core import SimpleDirectoryReader -from llama_index.core.node_parser import SimpleNodeParser - -from bolna.rag.base import DatabaseConnector, VectorSearchEngine -from bolna.rag.mongodb_rag import MongoDBConnector - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) - -app = FastAPI() - -# Global variables -db_connector = None -search_engine = None - -class EmbeddingConfig(BaseModel): - model: str - -class DatabaseConfig(BaseModel): - connection_string: str - db_name: str - collection_name: str - index_name: str - llm_model: Optional[str] = "gpt-3.5-turbo" - embedding_model: Optional[str] = "text-embedding-3-small" - embedding_dimensions: Optional[int] = 256 - -class ProviderConfig(BaseModel): - provider: str - provider_config: DatabaseConfig # DatabaseConfig nested here - -class RAGConfig(BaseModel): - providers: ProviderConfig - openai_api_key: str - -class QueryRequest(BaseModel): - query: str - similarity_top_k: int = 3 - -@app.post("/setup") -async def setup_rag(config: RAGConfig): - global db_connector, search_engine - - logger.info("Setting up RAG system...") - os.environ["OPENAI_API_KEY"] = config.openai_api_key - - providers = config.providers # Corrected this line - if providers.provider.lower() == "mongodb": - db_config = providers.provider_config - db_connector = MongoDBConnector(db_config) - else: - raise HTTPException(status_code=400, detail="Unsupported provider") - - db_connector.connect() - doc_count = db_connector.collection.count_documents({}) - logger.info(f"Documents in collection: {doc_count}") - - if doc_count == 0: - logger.info("No documents found in the collection, ingestion is required.") - return {"message": "No documents found in the collection, ingestion is required."} - - search_engine = VectorSearchEngine(db_connector) - search_engine.setup_llama_index() - search_engine.create_index() - - logger.info("RAG system setup complete") - return { - "message": "RAG system setup complete", - "db_name": db_connector.db_name, - "collection_name": db_connector.collection_name - } - -@app.post("/query") -async def query(request: QueryRequest): - if not search_engine: - raise HTTPException(status_code=400, detail="RAG system not set up. Call /setup first.") - - logger.info(f"Received query: {request.query}") - response = search_engine.query(request.query, similarity_top_k=request.similarity_top_k) - - formatted_response = { - "query": request.query, - "response": str(response), - "source_nodes": [ - { - "score": node.score, - "content": node.node.get_content()[:20], - "metadata": node.node.metadata - } for node in response.source_nodes - ] - } - - logger.info("Query response generated") - return formatted_response - -@app.post("/ingest") -async def ingest(files: List[UploadFile] = File(...)): - if not db_connector or not search_engine: - raise HTTPException(status_code=400, detail="RAG system not set up. Call /setup first.") - - logger.info(f"Ingesting {len(files)} files") - - try: - ingestion_id = str(uuid.uuid4()) - temp_dir = f"temp_upload_{ingestion_id}" - os.makedirs(temp_dir, exist_ok=True) - - # Save uploaded files temporarily - for file in files: - file_path = os.path.join(temp_dir, file.filename) - with open(file_path, "wb") as buffer: - buffer.write(await file.read()) - - # Use SimpleDirectoryReader to load documents - documents = SimpleDirectoryReader(temp_dir).load_data() - - # Add ingestion_id to document metadata - for doc in documents: - doc.metadata["ingestion_id"] = ingestion_id - - # Create nodes - parser = SimpleNodeParser.from_defaults() - nodes = parser.get_nodes_from_documents(documents) - - # Generate embeddings and add to vector store - vector_store = db_connector.setup_vector_store() - vector_store.add(nodes) - - # Clean up temporary directory - for file in os.listdir(temp_dir): - os.remove(os.path.join(temp_dir, file)) - os.rmdir(temp_dir) - - logger.info(f"Ingested {len(nodes)} nodes with ingestion ID: {ingestion_id}") - return {"message": f"Ingested {len(nodes)} nodes", "ingestion_id": ingestion_id} - except Exception as e: - logger.error(f"Error during ingestion: {str(e)}") - raise HTTPException(status_code=500, detail=f"Ingestion failed: {str(e)}") - -@app.get("/status") -async def status(): - return { - "rag_system_ready": db_connector is not None and search_engine is not None - } - -# if __name__ == "__main__": -# import uvicorn -# uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/bolna/rag/mongodb_rag.py b/bolna/rag/mongodb_rag.py deleted file mode 100644 index c41f1578..00000000 --- a/bolna/rag/mongodb_rag.py +++ /dev/null @@ -1,125 +0,0 @@ -import os -import logging -from typing import List, Optional -from pydantic import BaseModel -from pymongo import MongoClient -from pymongo.errors import ConnectionFailure -from llama_index.core import VectorStoreIndex -from llama_index.core.settings import Settings -from llama_index.llms.openai import OpenAI -from llama_index.embeddings.openai import OpenAIEmbedding -from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch - -from bolna.rag.base import DatabaseConnector - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) - -class MongoDBConfig(BaseModel): - connection_string: str - db_name: str - collection_name: str - index_name: str - llm_model: Optional[str] = "gpt-3.5-turbo" # Default LLM model - embedding_model: Optional[str] = "text-embedding-3-small" # Default embedding model - embedding_dimensions: Optional[int] = 256 # Default dimensions - -class MongoDBConnector(DatabaseConnector): - def __init__(self, config: MongoDBConfig): - super().__init__(config.db_name) - self.config = config - self.client = None - self.db = None - self.collection = None - self.collection_name = config.collection_name # Add this line - - def connect(self): - try: - self.client = MongoClient(self.config.connection_string) - self.db = self.client[self.config.db_name] - self.collection = self.db[self.config.collection_name] - logger.info("Connection to MongoDB successful") - except ConnectionFailure as e: - logger.error(f"Connection failed: {e}") - raise - - def disconnect(self): - if self.client: - self.client.close() - logger.info("Disconnected from MongoDB") - - def verify_data(self): - doc_count = self.collection.count_documents({}) - logger.info(f"Total documents in collection: {doc_count}") - if doc_count > 0: - logger.info(f"Documents found in the collection.") - else: - logger.warning("No documents found in the collection.") - - def setup_vector_store(self): - return MongoDBAtlasVectorSearch( - self.client, - db_name=self.config.db_name, - collection_name=self.config.collection_name, - index_name=self.config.index_name - ) - -class RAGEngine: - def __init__(self, db_connector: MongoDBConnector): - self.db_connector = db_connector - self.index = None - - def setup(self): - embed_model = OpenAIEmbedding(model=self.db_connector.config.embedding_model, dimensions=self.db_connector.config.embedding_dimensions) - llm = OpenAI(model=self.db_connector.config.llm_model) - Settings.llm = llm - Settings.embed_model = embed_model - - vector_store = self.db_connector.setup_vector_store() - self.index = VectorStoreIndex.from_vector_store(vector_store) - - def query(self, query_text: str, similarity_top_k: int = 5): - if not self.index: - raise ValueError("Index not created. Call setup() first.") - query_engine = self.index.as_query_engine(similarity_top_k=similarity_top_k) - response = query_engine.query(query_text) - - logger.info(f"Query: {query_text}") - logger.info(f"Response: {response.response}") - for node in response.source_nodes: - logger.info(f"Source - Score: {node.score}, Content: {node.node.get_content()[:10]}") - #logger.info(f"Source - Score: {node.score}, Content: {node.node.get_content()[:10]}..., Metadata: {node.node.metadata}") - - return response - -# def main(): -# OPENAI_API_KEY = "***" -# os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY - -# config = MongoDBConfig( -# connection_string="**", -# db_name="**", -# collection_name="**", -# index_name="**", -# ) - -# try: -# db_connector = MongoDBConnector(config) -# db_connector.connect() -# db_connector.verify_data() - -# rag_engine = RAGEngine(db_connector) -# rag_engine.setup() - -# query = "Any romantic movie for me? you can give anything you want?" -# rag_engine.query(query) - -# except Exception as e: -# logger.error(f"An error occurred: {e}") -# finally: -# if 'db_connector' in locals(): -# db_connector.disconnect() - -# if __name__ == "__main__": -# main() diff --git a/bolna/rag/testing_connection.py b/bolna/rag/testing_connection.py deleted file mode 100644 index ab103cae..00000000 --- a/bolna/rag/testing_connection.py +++ /dev/null @@ -1,27 +0,0 @@ - -import pymongo - -def get_mongo_client(mongo_uri): - """Establish connection to the MongoDB.""" - try: - client = pymongo.MongoClient(mongo_uri) - print("Connection to MongoDB successful") - return client - except pymongo.errors.ConnectionFailure as e: - print(f"Connection failed: {e}") - return None - -mongo_uri = "mongodb+srv://vipul:qqgr4bwAYl5pZSU9@testing-rag.nqaknom.mongodb.net/" - -if not mongo_uri: - print("MONGO_URI not set in environment variables") - -mongo_client = get_mongo_client(mongo_uri) - -DB_NAME="movies" -COLLECTION_NAME="movies_records" - -db = mongo_client[DB_NAME] -collection = db[COLLECTION_NAME] -print(type(db)) -print(type(collection)) diff --git a/bolna/transcriber/base_transcriber.py b/bolna/transcriber/base_transcriber.py index 0a72a885..c640fdc2 100644 --- a/bolna/transcriber/base_transcriber.py +++ b/bolna/transcriber/base_transcriber.py @@ -21,7 +21,7 @@ def __init__(self, input_queue=None): self.current_request_id = None def update_meta_info(self): - self.meta_info['request_id'] = self.current_request_id if self.current_request_id else None # refer to the generate_request_id function + self.meta_info['request_id'] = self.current_request_id if self.current_request_id else None self.meta_info['previous_request_id'] = self.previous_request_id self.meta_info['origin'] = "transcriber" diff --git a/bolna/transcriber/deepgram_transcriber.py b/bolna/transcriber/deepgram_transcriber.py index b937d3b3..f847f633 100644 --- a/bolna/transcriber/deepgram_transcriber.py +++ b/bolna/transcriber/deepgram_transcriber.py @@ -353,10 +353,10 @@ async def transcribe(self): async with self.deepgram_connect() as deepgram_ws: if self.stream: self.sender_task = asyncio.create_task(self.sender_stream(deepgram_ws)) - self.heartbeat_task = asyncio.create_task(self.send_heartbeat(deepgram_ws)) # deepgram requires heartbeat signals in certain duration. + self.heartbeat_task = asyncio.create_task(self.send_heartbeat(deepgram_ws)) async for message in self.receiver(deepgram_ws): if self.connection_on: - await self.push_to_transcriber_queue(message) # sending the chunks from the twilio to the transcriber. + await self.push_to_transcriber_queue(message) else: logger.info("closing the deepgram connection") await self._close(deepgram_ws, data={"type": "CloseStream"}) diff --git a/local_setup/docker-compose.yml b/local_setup/docker-compose.yml index b9ad7989..24424cc7 100644 --- a/local_setup/docker-compose.yml +++ b/local_setup/docker-compose.yml @@ -6,8 +6,6 @@ services: build: context: . dockerfile: dockerfiles/bolna_server.Dockerfile - args: - BRANCH: ${BRANCH:-master} ports: - "5001:5001" depends_on: @@ -19,20 +17,6 @@ services: - $HOME/.aws/credentials:/root/.aws/credentials:ro - $HOME/.aws/config:/root/.aws/config:ro - ingestion-app: - image: ingestion-app:latest - build: - context: . - dockerfile: dockerfiles/ingestion_server.Dockerfile - ports: - - "8000:8000" - depends_on: - - redis - - ngrok - - bolna-app - env_file: - - .env - # redis service used as a persistent storage redis: image: redis:latest diff --git a/local_setup/dockerfiles/bolna_server.Dockerfile b/local_setup/dockerfiles/bolna_server.Dockerfile index 966a9c87..414aca2c 100644 --- a/local_setup/dockerfiles/bolna_server.Dockerfile +++ b/local_setup/dockerfiles/bolna_server.Dockerfile @@ -7,10 +7,7 @@ COPY ./quickstart_server.py /app RUN apt-get update && apt-get install libgomp1 git -y RUN apt-get -y update && apt-get -y upgrade && apt-get install -y --no-install-recommends ffmpeg RUN pip install -r requirements.txt - -ARG BRANCH=master -RUN pip install --force-reinstall git+https://github.com/bolna-ai/bolna@${BRANCH} - +RUN pip install --force-reinstall git+https://github.com/bolna-ai/bolna@master RUN pip install scipy==1.11.0 RUN pip install torch==2.0.1 RUN pip install torchaudio==2.0.1 diff --git a/local_setup/dockerfiles/ingestion_server.Dockerfile b/local_setup/dockerfiles/ingestion_server.Dockerfile deleted file mode 100644 index 4b410135..00000000 --- a/local_setup/dockerfiles/ingestion_server.Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -FROM python:3.10.13-slim - -WORKDIR /app -COPY ./requirements.txt /app -COPY ./quickstart_ingestion_server.py /app - -RUN apt-get update && apt-get install -y --no-install-recommends gcc -RUN pip install --no-cache-dir -r requirements.txt - -EXPOSE 8000 - -CMD ["uvicorn", "quickstart_ingestion_server:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/local_setup/ngrok-config.yml b/local_setup/ngrok-config.yml index 4907d3d9..c868f12b 100644 --- a/local_setup/ngrok-config.yml +++ b/local_setup/ngrok-config.yml @@ -11,7 +11,3 @@ tunnels: bolna-app: addr: bolna-app:5001 proto: http - ingestion-app: - addr: ingestion-app:8000 - proto: http - diff --git a/local_setup/quick_scripts/buildup_docker.sh b/local_setup/quick_scripts/buildup_docker.sh deleted file mode 100755 index dbc74825..00000000 --- a/local_setup/quick_scripts/buildup_docker.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash - -# Prompt the user for the branch name -read -p "Enter the branch name: " BRANCH - -# Build the images with the specified branch name -BRANCH=$BRANCH docker-compose build --no-cache - -# Run the containers in detached mode -docker-compose up -d - -# Append logs to log.txt -docker-compose logs -f >> log.txt & - -echo "All Docker images are built, and containers are up and running. Logs are being appended to log.txt." diff --git a/local_setup/quick_scripts/cleanup_docker.sh b/local_setup/quick_scripts/cleanup_docker.sh deleted file mode 100755 index f966ccb8..00000000 --- a/local_setup/quick_scripts/cleanup_docker.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash - -# Stop all running containers -docker stop $(docker ps -q) - -# Remove all containers -docker rm $(docker ps -a -q) - -# Remove all images -docker rmi $(docker images -q) - -echo "All Docker containers and images have been deleted." \ No newline at end of file diff --git a/local_setup/quick_scripts/update_packages.sh b/local_setup/quick_scripts/update_packages.sh deleted file mode 100755 index eb4f0bd8..00000000 --- a/local_setup/quick_scripts/update_packages.sh +++ /dev/null @@ -1,102 +0,0 @@ -#!/bin/bash - -set -e - -# Colors for output -GREEN='\033[0;32m' -YELLOW='\033[0;33m' -RED='\033[0;31m' -NC='\033[0m' # No Color - -# Function to print colored output -print_color() { - printf "${1}${2}${NC}\n" -} - -# Function to find the project root (where setup.py or pyproject.toml is located) -find_project_root() { - local dir=$(pwd) - while [[ "$dir" != "/" ]]; do - if [[ -f "$dir/setup.py" || -f "$dir/pyproject.toml" ]]; then - echo "$dir" - return 0 - fi - dir=$(dirname "$dir") - done - echo "Project root not found" >&2 - return 1 -} - -# Function to find and activate virtual environment -activate_venv() { - local project_root=$1 - local venv_dir - - # Check common virtual environment directory names - for venv in venv env .venv .env; do - if [[ -f "$project_root/$venv/bin/activate" ]]; then - venv_dir="$project_root/$venv" - break - fi - done - - if [[ -n "$venv_dir" ]]; then - print_color $YELLOW "Found virtual environment in $venv_dir" - source "$venv_dir/bin/activate" - print_color $GREEN "Virtual environment activated: $(which python)" - else - print_color $RED "No virtual environment found in the project root." - exit 1 - fi -} - -# Update the package -update_package() { - local project_root=$(find_project_root) - if [[ $? -ne 0 ]]; then - print_color $RED "Failed to find project root. Are you in the correct directory?" - exit 1 - fi - - print_color $GREEN "Project root found at: $project_root" - cd "$project_root" - - print_color $GREEN "Activating virtual environment..." - activate_venv "$project_root" - - print_color $GREEN "Updating package and dependencies..." - - # Update pip and setuptools - pip install --upgrade pip setuptools - - # Install the package in editable mode - if pip install -e .; then - print_color $GREEN "Package updated successfully!" - else - print_color $RED "An error occurred while updating the package." - exit 1 - fi - - # Check and install requirements if requirements.txt exists - if [[ -f "requirements.txt" ]]; then - print_color $GREEN "Installing requirements from requirements.txt..." - if pip install -r requirements.txt; then - print_color $GREEN "Requirements installed successfully!" - else - print_color $RED "An error occurred while installing the requirements." - exit 1 - fi - else - print_color $YELLOW "No requirements.txt found. Skipping requirements installation." - fi - -} - -# Main execution -main() { - update_package - print_color $GREEN "Update process completed successfully!" -} - -# Run the main function -main \ No newline at end of file diff --git a/local_setup/quickstart_ingestion_server.py b/local_setup/quickstart_ingestion_server.py deleted file mode 100644 index 592294dc..00000000 --- a/local_setup/quickstart_ingestion_server.py +++ /dev/null @@ -1,69 +0,0 @@ -from fastapi import FastAPI, UploadFile -from bolna.ingestion_server.datachecks import RAGConfig -from bolna.ingestion_server import RAGFactory -import uvicorn -import time -from typing import Dict -import dotenv - -dotenv.load_dotenv() -DB: Dict[str, RAGConfig] = {} - -rag_factory = RAGFactory() -app = FastAPI() - - -@app.get("/") -def heartbeat() -> float: - """Health check endpoint that returns the current server time.""" - return time.time() - - -@app.post("/create-rag") -def create_rag(request: RAGConfig) -> Dict[str, str]: - """Create a RAG configuration and return its ID. - - Args: - request (RAGConfig): The RAG configuration to create. - - Returns: - Dict[str, str]: A dictionary containing the created RAG ID. - """ - print(request) - rag_id = rag_factory.make_rag(request) - return {"rag_id": rag_id} - - -@app.post("/rag-upload-file/{rag_id}") -async def rag_upload_file(file: UploadFile, rag_id: str) -> Dict[str, str]: - """Upload a file for a specific RAG ID. - - Args: - file (UploadFile): The file to upload. - rag_id (str): The ID of the RAG to associate with the file. - - Returns: - Dict[str, str]: A dictionary containing the upload status and index. - """ - try: - task = await rag_factory.file_ingest(rag_name=rag_id, file=file) - return {"index": task._index, "status": task._status, "message": "DONE"} - except Exception as e: - return {"index": None, "status": "ERROR", "message": f"{e}"} - - -@app.get("/rag-retrive/{rag_id}/{index}") -async def rag_retrive(query: str, rag_id: str, index: str) -> list: - """Retrieve documents based on a query for a specific RAG ID and index. - - Args: - query (str): The query string to search for. - rag_id (str): The ID of the RAG to search in. - index (str): The index to search in. - - Returns: - list: A list of documents matching the query. - """ - docs = await rag_factory.retrieve_query(rag_name=rag_id, index=index, query=query) - send_filter = [{"text": node.text, "score": node.score} for node in docs] - return send_filter \ No newline at end of file diff --git a/local_setup/quickstart_server.py b/local_setup/quickstart_server.py index 475e8b1d..e0399703 100644 --- a/local_setup/quickstart_server.py +++ b/local_setup/quickstart_server.py @@ -2,7 +2,7 @@ import asyncio import uuid import traceback -from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Query, UploadFile +from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware import redis.asyncio as redis from dotenv import load_dotenv @@ -12,11 +12,7 @@ from bolna.models import * from bolna.llms import LiteLLM from bolna.agent_manager.assistant_manager import AssistantManager -from bolna.helpers.data_ingestion_pipe import create_table, ingestion_task, ingestion_tasks, IngestionPipeline, IngestionTask, TaskStatus -import tempfile -import threading -from typing import Dict -import os + load_dotenv() logger = configure_logger(__name__) @@ -34,53 +30,11 @@ allow_headers=["*"] ) -logger.info(f"PROCESS ID {os.getpid()}") class CreateAgentPayload(BaseModel): agent_config: AgentModel agent_prompts: Optional[Dict[str, Dict[str, str]]] -ingestion_pipeline = IngestionPipeline() -@app.post("/ingestion-pipeline") -async def start_ingestion(table_id:str,file:UploadFile): - if file.content_type in ["application/pdf","application/x-pdf"]: - task_id = str(uuid.uuid4()) - - logger.info(f"added new task: {task_id}") - temp_file = tempfile.NamedTemporaryFile() - temp_file.write(await file.read()) - prev = temp_file.name - file_name = f"/tmp/{task_id}.pdf" - os.rename(prev,file_name) - - - task = IngestionTask(file_name) - await ingestion_pipeline.task_queue.put(task) - while task._status != TaskStatus.ERROR: - await asyncio.sleep(0.3) - logger.info(f"TASK: {task_id} status: {task._status}") - if task._status == TaskStatus.SUCCESS: - break - - - - # if table_id != "None": - # table = table_id - # else: - # table = str(uuid.uuid4()) - - # thread = threading.Thread(target=create_table,args=(table,file_name)) - # thread.start() - # while thread.is_alive(): - # await asyncio.sleep(0.3) - - - os.rename(file_name,prev) - - - return {"status":task._status,"message":task._message,"table_id":task._table_id} - - return {"status":TaskStatus.ERROR,"message":"only accept the pdf types for know","task_id":None} @app.post("/agent") async def create_agent(agent_data: CreateAgentPayload): @@ -139,4 +93,4 @@ async def websocket_endpoint(agent_id: str, websocket: WebSocket, user_agent: st active_websockets.remove(websocket) except Exception as e: traceback.print_exc() - logger.error(f"error in executing {e}") \ No newline at end of file + logger.error(f"error in executing {e}") diff --git a/local_setup/requirements.txt b/local_setup/requirements.txt index de8a1882..1838e052 100644 --- a/local_setup/requirements.txt +++ b/local_setup/requirements.txt @@ -3,4 +3,4 @@ fastapi==0.108.0 plivo==4.47.0 redis==5.0.1 twilio==8.9.0 -uvicorn==0.22.0 \ No newline at end of file +uvicorn==0.22.0 diff --git a/local_setup/telephony_server/twilio_api_server.py b/local_setup/telephony_server/twilio_api_server.py index 67bd873c..fc8874b1 100644 --- a/local_setup/telephony_server/twilio_api_server.py +++ b/local_setup/telephony_server/twilio_api_server.py @@ -22,8 +22,7 @@ def populate_ngrok_tunnels(): - response = requests.get("http://localhost:4040/api/tunnels") # ngrok interface - app_callback_url, websocket_url = None, None + response = requests.get("http://ngrok:4040/api/tunnels") # ngrok interface telephony_url, bolna_url = None, None if response.status_code == 200: @@ -63,7 +62,7 @@ async def make_call(request: Request): from_=twilio_phone_number, url=f"{telephony_host}/twilio_connect?bolna_host={bolna_host}&agent_id={agent_id}", method="POST", - record=False + record=True ) except Exception as e: print(f'make_call exception: {str(e)}') diff --git a/requirements.txt b/requirements.txt index f4291b3d..fbcdc52b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,100 +1,28 @@ -# Web Framework and API -fastapi==0.108.0 -uvicorn==0.22.0 -starlette==0.32.0.post1 - -# Data Validation and Settings Management -pydantic==2.8.2 -python-dotenv==1.0.0 - -# Async Libraries -aiohttp==3.9.1 aiobotocore==2.9.0 aiofiles==23.2.1 -aiocsv==1.3.1 -aiodynamo==23.10.1 -aiohttp-retry==2.8.3 -aioitertools==0.11.0 -aiormq==6.8.0 -aiosignal==1.3.1 - -# Database and Storage -redis==5.0.1 -pymongo==4.8.0 -lancedb==0.10.2 -SQLAlchemy==2.0.31 - -# Machine Learning and NLP -numpy==1.26.1 -scipy==1.11.4 -scikit-learn==1.5.1 -torch==2.1.2 -torchaudio==2.1.2 -transformers==4.39.3 -sentence-transformers==3.0.1 +aiohttp==3.9.1 +azure-cognitiveservices-speech==1.38.0 +daily-python==0.9.1 +fastapi==0.108.0 fastembed==0.2.7 -onnxruntime==1.18.1 -nltk==3.8.1 - -# LLM and Related Libraries -openai>=1.10.0 litellm==1.40.20 -llama-index==0.10.57 -llama-index-vector-stores-lancedb==0.1.7 -llama-index-vector-stores-mongodb==0.1.8 -cohere==5.3.2 - -# Text Processing -tiktoken>=0.6.0 -tokenizers==0.15.2 - -# Audio Processing -azure-cognitiveservices-speech==1.38.0 +numpy==1.26.1 +openai>=1.10.0 +pydantic==2.5.3 pydub==0.25.1 -soundfile==0.12.1 - -# Date and Time Handling python-dateutil==2.8.2 -pytz==2024.1 - -# HTTP and Networking +python-dotenv==1.0.0 +redis==5.0.1 requests==2.31.0 -httpx==0.25.2 -websockets==10.4 - -# File Handling and Parsing -python-multipart==0.0.6 -Pillow==10.4.0 -pypdf==4.3.1 - -# API Clients +tiktoken>=0.6.0 twilio==8.9.0 -daily-python==0.9.1 -slack_sdk==3.31.0 -stripe==8.0.0 - -# Task Queue and Job Processing -celery==5.3.6 -taskiq==0.11.0 - -# Logging and Monitoring -loguru==0.7.2 -sentry-sdk==2.9.0 - -# Development and Testing +uvicorn==0.22.0 +websockets==10.4 +onnxruntime>=1.16.3 +scipy==1.11.4 uvloop==0.19.0 -pytest==7.4.0 - -# Data Processing and Analysis -pandas==2.2.2 -pyarrow==15.0.0 - -# Utilities -python-dateutil==2.8.2 -tenacity==8.5.0 -ratelimiter==1.2.0.post0 - -# Optional Dependencies (uncomment if needed) -huggingface-hub==0.23.2 -semantic-router==0.0.46 -supabase==2.3.3 \ No newline at end of file +tokenizers +huggingface-hub +semantic-router +sentence_transformers +optimum[onnxruntime]