diff --git a/.env.sample b/.env.sample index f42c3842..5327fb3d 100644 --- a/.env.sample +++ b/.env.sample @@ -5,13 +5,21 @@ OPENAI_MODEL=gpt-3.5-turbo REDIS_URL=redis://redis:6379 WHISPER_URL= STYLE_TTS= +BUCKET_NAME=bolna-user-prompts-sandbox + +# RAG credentials +LANCEDB_DIR= +LLAMA_CLOUD_API_KEY= + +# AWS credentials +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= # Twilio credentials TWILIO_ACCOUNT_SID= TWILIO_AUTH_TOKEN= TWILIO_PHONE_NUMBER= - # Plivo credentials PLIVO_AUTH_ID= PLIVO_AUTH_TOKEN= diff --git a/.gitignore b/.gitignore index 33986b54..e0b6486a 100644 --- a/.gitignore +++ b/.gitignore @@ -167,4 +167,5 @@ agent_data/**/mp3 */__pycache__/ */*/__pycache__/ logs/ -agent_data/ \ No newline at end of file +agent_data/ +testing/ \ No newline at end of file diff --git a/bolna/agent_manager/task_manager.py b/bolna/agent_manager/task_manager.py index 0e03322e..5d860224 100644 --- a/bolna/agent_manager/task_manager.py +++ b/bolna/agent_manager/task_manager.py @@ -11,6 +11,7 @@ from datetime import datetime import aiohttp +from bolna.bolna.agent_types.llama_index_rag_agent import LlamaIndexRag from bolna.constants import ACCIDENTAL_INTERRUPTION_PHRASES, DEFAULT_USER_ONLINE_MESSAGE, DEFAULT_USER_ONLINE_MESSAGE_TRIGGER_DURATION, FILLER_DICT, PRE_FUNCTION_CALL_MESSAGE from bolna.helpers.function_calling_helpers import trigger_api @@ -52,7 +53,12 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con logger.info(f"API TOOLS is present {task['tools_config']['api_tools']}") self.kwargs['api_tools'] = task['tools_config']['api_tools'] - if self.__has_extra_config(): + # if self.__has_extra_config(): + # pass + # #self.kwargs['assistant_id'] = task['tools_config']["llm_agent"]['extra_config']['assistant_id'] + # #logger.info(f"Assistant id for agent is {self.kwargs['assistant_id']}") + + if self.__is_openai_assistant(): self.kwargs['assistant_id'] = task['tools_config']["llm_agent"]['extra_config']['assistant_id'] logger.info(f"Assistant id for agent is {self.kwargs['assistant_id']}") @@ -166,12 +172,44 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con #self.stream = not turn_based_conversation #Currently we are allowing only realtime conversation based usecases. Hence it'll always be true unless connected through dashboard self.is_local = False self.llm_config = None - if self.task_config["tools_config"]["llm_agent"] is not None: - self.llm_config = { - "model": self.task_config["tools_config"]["llm_agent"]["model"], - "max_tokens": self.task_config["tools_config"]["llm_agent"]["max_tokens"], - "provider": self.task_config["tools_config"]["llm_agent"]["provider"] - } + self.llm_config_map = {} + self.llm_agent_map = {} + if self.__is_multiagent(): + logger.info(f"Gotta write the code for this shit as well") + for agent, config in self.task_config["tools_config"]["llm_agent"]['extra_config']['agent_map'].items(): + self.llm_config_map[agent] = config.copy() + self.llm_config_map[agent]['buffer_size'] = self.task_config["tools_config"]["synthesizer"]['buffer_size'] + if 'assistant_id' in config: + self.llm_config_map[agent]['agent_type'] = "openai_assistant" + + elif not self.__is_openai_assistant(): + logger.info(f"NOT OPEN AI ASSISTANT") + if self.task_config["tools_config"]["llm_agent"] is not None: + if self.__is_knowledgebase_agent(): + self.llm_agent_config = self.task_config["tools_config"]["llm_agent"] + self.llm_config = { + "model": self.llm_agent_config['extra_config']['model'], + "max_tokens": self.llm_agent_config['extra_config']['max_tokens'], + "provider": self.llm_agent_config['extra_config']['provider'], + } + else: + agent_type = self.task_config["tools_config"]["llm_agent"].get("agent_type", None) + if not agent_type: + self.llm_agent_config = self.task_config["tools_config"]["llm_agent"] + else: + self.llm_agent_config = self.task_config["tools_config"]["llm_agent"]['extra_config'] + + self.llm_config = { + "model": self.llm_agent_config['model'], + "max_tokens": self.llm_agent_config['max_tokens'], + "provider": self.llm_agent_config['provider'], + } + + + logger.info(f"SETTING FOLLOW UP TASK DETAILS {self.llm_agent_config}") + + + # Output stuff self.output_task = None @@ -230,8 +268,12 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con else: self.__setup_routes(self.routes) logger.info(f"Time to setup routes {time.time() - start_time}") - - + + if self.__is_multiagent(): + guardrails_meta = self.kwargs.pop('routes', None) + self.agent_routing = guardrails_meta['agent_routing_config']['route_layer'] + self.default_agent = task['tools_config']['llm_agent']['extra_config']['default_agent'] + logger.info(f"Inisialised with default agent {self.default_agent}, agent_routing {self.agent_routing}") # for long pauses and rushing if self.conversation_config is not None: self.minimum_wait_duration = self.task_config["tools_config"]["transcriber"]["endpointing"] @@ -307,13 +349,61 @@ def __init__(self, assistant_name, task_id, task, ws, input_parameters=None, con logger.info("Not using fillers to decrease latency") else: self.filler_preset_directory = f"{os.getenv('FILLERS_PRESETS_DIR')}/{self.synthesizer_voice.lower()}" + # setting transcriber + self.__setup_transcriber() + # setting synthesizer + self.__setup_synthesizer(self.llm_config) - def __has_extra_config(self): + # setting llm + if self.llm_config is not None: + logger.info(f"LLM CONFIG IS NOT NONE {self.task_config['task_type']} llm agent config {self.llm_agent_config}") + llm = self.__setup_llm(self.llm_config) + #Setup tasks + agent_params = { + 'llm': llm, + 'agent_type': self.llm_agent_config.get("agent_type","simple_llm_agent") + } + self.__setup_tasks(**agent_params) + + elif self.__is_multiagent(): + # Setup task for multiagent conversation + for agent in self.task_config["tools_config"]["llm_agent"]['extra_config']['agent_map']: + if 'routes' in self.llm_config_map[agent]: + del self.llm_config_map[agent]['routes'] #Remove routes from here as it'll create conflict ahead + llm = self.__setup_llm(self.llm_config_map[agent]) + agent_type = self.llm_config_map[agent].get("agent_type","simple_llm_agent") + logger.info(f"Getting response for {llm} and agent type {agent_type} and {agent}") + agent_params = { + 'llm': llm, + 'agent_type': agent_type + } + if agent_type == "openai_assistant": + agent_params['assistant_config'] = self.llm_config_map[agent] + llm_agent = self.__setup_tasks(**agent_params) + self.llm_agent_map[agent] = llm_agent + elif self.__is_openai_assistant(): + # if self.task_config['tools_config']["llm_agent"].get("agent_type", None) is None: + # assistant_config = {"assistant_id": self.task_config['tools_config']["llm_agent"]['assistant_id']} + self.__setup_tasks(agent_type = "openai_assistant", assistant_config= task['tools_config']["llm_agent"]['extra_config']) + + def __is_openai_assistant(self): if self.task_config["task_type"] == "webhook": return False extra_config = self.task_config['tools_config']["llm_agent"].get("extra_config", None) return False if extra_config is None else True + def __is_knowledgebase_agent(self): + if self.task_config["task_type"] == "webhook": + return False + agent_type = self.task_config['tools_config']["llm_agent"].get("agent_type", None) + return agent_type == "knowledgebase_agent" + + def __is_knowledgebase_agent(self): + if self.task_config["task_type"] == "webhook": + return False + agent_type = self.task_config['tools_config']["llm_agent"].get("agent_type", None) + return agent_type == "knowledgebase_agent" + def __setup_routes(self, routes): embedding_model = routes.get("embedding_model", os.getenv("ROUTE_EMBEDDING_MODEL")) self.route_encoder = FastEmbedEncoder(name=embedding_model) @@ -468,19 +558,35 @@ def __setup_llm(self, llm_config): else: raise Exception(f'LLM {self.task_config["tools_config"]["llm_agent"]["provider"]} not supported') - def __setup_tasks(self, llm): - if self.task_config["task_type"] == "conversation": - agent_type = self.task_config["tools_config"]["llm_agent"].get("agent_type", self.task_config["tools_config"]["llm_agent"]["agent_flow_type"]) - if agent_type == "streaming": - self.tools["llm_agent"] = StreamingContextualAgent(llm) - elif agent_type == "openai_assistant": - logger.info("setting up backend as openai_assistants") - self.tools["llm_agent"] = OpenAIAssistantAgent(llm) - elif agent_type in ("preprocessed", "formulaic"): - preprocessed = self.task_config["tools_config"]["llm_agent"]["agent_flow_type"] == "preprocessed" - logger.info(f"LLM TYPE {type(llm)}") - self.tools["llm_agent"] = GraphBasedConversationAgent(llm, context_data=self.context_data, - prompts=self.prompts, preprocessed=preprocessed) + def __get_agent_object(self, llm, agent_type, assistant_config = None ): + if agent_type == "simple_llm_agent": + logger.info(f"Simple llm agent") + llm_agent = StreamingContextualAgent(llm) + elif agent_type == "openai_assistant": + logger.info(f"setting up backend as openai_assistants {assistant_config}") + llm_agent = OpenAIAssistantAgent(**assistant_config) + elif agent_type == "knowledgebase_agent": + logger.info("#### Setting up knowledgebase_agent agent ####") + extra_config = self.task_config["tools_config"]["llm_agent"].get("extra_config", {}) + vector_store_config = extra_config.get("vector_store", {}) + llm_agent = LlamaIndexRag( + vector_id=vector_store_config.get("vector_id"), + temperature=extra_config.get("temperature", 0.1), + model=extra_config.get("model", "gpt-3.5-turbo-16k"), + buffer=40, + max_tokens=100, # You might want to make this configurable + provider_config=vector_store_config + ) + logger.info("Llama-index rag agent is created") + else: + raise(f"{agent_type} Agent type is not created yet") + return llm_agent + + def __setup_tasks(self, llm = None, agent_type = None, assistant_config= None): + if self.task_config["task_type"] == "conversation" and not self.__is_multiagent(): + self.tools["llm_agent"] = self.__get_agent_object(llm, agent_type, assistant_config) + elif self.__is_multiagent(): + return self.__get_agent_object(llm, agent_type, assistant_config) elif self.task_config["task_type"] == "extraction": logger.info("Setting up extraction agent") self.tools["llm_agent"] = ExtractionContextualAgent(llm, prompt=self.system_prompt) @@ -507,7 +613,8 @@ def __setup_tasks(self, llm): ######################## async def load_prompt(self, assistant_name, task_id, local, **kwargs): logger.info("prompt and config setup started") - if self.task_config["task_type"] == "webhook" or self.task_config["tools_config"]["llm_agent"]["agent_flow_type"] == "openai_assistant": + agent_type = self.task_config["tools_config"]["llm_agent"].get("agent_type", "simple_llm_agent") + if self.task_config["task_type"] == "webhook" or agent_type in ["openai_assistant", "knowledgebase_agent"]: return self.is_local = local today = datetime.now().strftime("%A, %B %d, %Y") @@ -891,7 +998,7 @@ def __store_into_history(self, meta_info, messages, llm_response, should_trigger logger.info("##### User spoke while LLM was generating response") else: self.llm_response_generated = True - convert_to_request_log(message=llm_response, meta_info= meta_info, component="llm", direction="response", model=self.task_config["tools_config"]["llm_agent"]["model"], run_id= self.run_id) + convert_to_request_log(message=llm_response, meta_info= meta_info, component="llm", direction="response", model=self.llm_config['model'], run_id= self.run_id) if should_trigger_function_call: #Now, we need to consider 2 things here #1. There was silence between function call and now @@ -996,8 +1103,8 @@ async def _process_conversation_task(self, message, sequence, meta_info): logger.info(f"Route {route} has a vector cache") relevant_utterance = self.vector_caches[route].get(message['data']) cache_response = self.route_responses_dict[route][relevant_utterance] - convert_to_request_log(message = message['data'], meta_info= meta_info, component="llm", direction="request", model=self.task_config["tools_config"]["llm_agent"]["model"], run_id= self.run_id) - convert_to_request_log(message = message['data'], meta_info= meta_info, component="llm", direction="response", model=self.task_config["tools_config"]["llm_agent"]["model"], is_cached= True, run_id= self.run_id) + convert_to_request_log(message = message['data'], meta_info= meta_info, component="llm", direction="request", model=self.llm_config["model"], run_id= self.run_id) + convert_to_request_log(message = message['data'], meta_info= meta_info, component="llm", direction="response", model=self.llm_config["model"], is_cached= True, run_id= self.run_id) messages = copy.deepcopy(self.history) messages += [{'role': 'user', 'content': message['data']},{'role': 'assistant', 'content': cache_response}] self.interim_history = copy.deepcopy(messages) @@ -1032,7 +1139,7 @@ async def _process_conversation_task(self, message, sequence, meta_info): {'role': 'system', 'content': self.check_for_completion_prompt}, {'role': 'user', 'content': format_messages(self.history, use_system_prompt= True)}] logger.info(f"##### Answer from the LLM {answer}") - convert_to_request_log(message=format_messages(prompt, use_system_prompt= True), meta_info= meta_info, component="llm", direction="request", model=self.task_config["tools_config"]["llm_agent"]["model"], run_id= self.run_id) + convert_to_request_log(message=format_messages(prompt, use_system_prompt= True), meta_info= meta_info, component="llm", direction="request", model=self.llm_agent_config["model"], run_id= self.run_id) convert_to_request_log(message=answer, meta_info= meta_info, component="llm", direction="response", model= self.check_for_completion_llm, run_id= self.run_id) if should_hangup: diff --git a/bolna/agent_types/data_ingestion_pipe.py b/bolna/agent_types/data_ingestion_pipe.py new file mode 100644 index 00000000..a654f8e8 --- /dev/null +++ b/bolna/agent_types/data_ingestion_pipe.py @@ -0,0 +1,126 @@ +from uuid import uuid4 +import os +from llama_index.embeddings.openai import OpenAIEmbedding +from llama_index.llms.openai import OpenAI +from llama_index.vector_stores.lancedb import LanceDBVectorStore +from llama_parse import LlamaParse +from llama_index.core.node_parser import ( + MarkdownElementNodeParser, + SentenceSplitter +) +from typing import Dict, Tuple +from llama_index.core import VectorStoreIndex +from llama_index.core import StorageContext +import dotenv +import asyncio +from .logger_config import configure_logger +import tempfile +import threading + +dotenv.load_dotenv() + +lance_db = "/tmp/RAG" +llama_cloud_key = os.getenv("LLAMA_CLOUD_API_KEY") +chatgpt = os.getenv("OPENAI_API_KEY") +logger = configure_logger(__name__) +ingestion_tasks = [] + +logger.info(f"{lance_db}") +async def ingestion_task(temp_file_name:str,table_name:str,chunk_size:int = 512,overlaping:int = 200): + parser = LlamaParse( + api_key=llama_cloud_key, + result_type="markdown", + ) + embed_model = OpenAIEmbedding(model="text-embedding-3-small",api_key=chatgpt) + llm = OpenAI(model="gpt-3.5-turbo", temperature=0.2,api_key=chatgpt) + logger.info(f"Emdeding model, LLM model and Llama Parser were loaded") + if LanceDBVectorStore(lance_db)._table_exists(table_name): + vector_store = LanceDBVectorStore(lance_db,table_name,mode="append") + logger.info(f"vector store is loaded") + docs = await parser.aload_data(temp_file_name) + node_parser = MarkdownElementNodeParser(num_workers=8,llm=llm) + nodes = await node_parser.aget_nodes_from_documents(docs) + nodes, objects = node_parser.get_nodes_and_objects(nodes) + nodes = await SentenceSplitter(chunk_size=chunk_size, chunk_overlap=overlaping).aget_nodes_from_documents( + nodes + ) + storage_context = StorageContext.from_defaults(vector_store=vector_store) + vector_index = VectorStoreIndex(nodes=nodes,storage_context=storage_context,embed_model=embed_model) + else: + vector_store = LanceDBVectorStore(lance_db,table_name) + logger.info(f"vector store is loaded") + docs = await parser.aload_data(temp_file_name) + node_parser = MarkdownElementNodeParser(num_workers=8,llm=llm) + nodes = await node_parser.aget_nodes_from_documents(docs) + nodes, objects = node_parser.get_nodes_and_objects(nodes) + nodes = await SentenceSplitter(chunk_size=chunk_size, chunk_overlap=overlaping).aget_nodes_from_documents( + nodes + ) + storage_context = StorageContext.from_defaults(vector_store=vector_store) + vector_index = VectorStoreIndex(nodes=nodes,storage_context=storage_context,embed_model=embed_model) + +def create_table(table_name,temp_file_name:str): + loop = asyncio.new_event_loop() + # table_name = str(uuid4()) + loop.run_until_complete(ingestion_task(temp_file_name=temp_file_name,table_name=table_name)) + +class TaskStatus: + SUCCESS = "SUCCESS" + WAIT = "WAIT" + PROCESSING = "PROCESSING" + ERROR = "ERROR" +class IngestionTask: + def __init__(self,file_name:str, chunk_size:int = 512, overlaping:int = 200) -> None: + self.chunk_size = chunk_size + self.overlaping = overlaping + self.file_name = file_name + + self._status:int = TaskStatus.WAIT + self._table_id:str = None + self._message:str = None +class IngestionPipeline: + def __init__(self,nuof_process:int=2) -> None: + self.task_queue:asyncio.queues.Queue = asyncio.queues.Queue() + # self.TaskIngestionThread = threading.Thread(target=self.__start) + self.task_keeping_status_dict:Dict[str:IngestionTask] = dict() + # self.TaskIngestionThread.start() + # ingestion_task = asyncio.get_event_loop().create_task(self.start()) + ingestion_tasks = [] + for _ in range(nuof_process): + ingestion_tasks.append(asyncio.create_task(self.start())) + # asyncio.gather(ingestion_tasks) + + async def add_task(self,task_id:str, task:IngestionTask): + self.task_keeping_status_dict[task_id] = task + await self.task_queue.put(task) + + async def check_task_status(self,task_id)->Tuple[int,str,str]: + task:IngestionTask = self.task_keeping_status_dict.get(task_id) + return task._status, task._table_id, task._message + + async def start(self): + logger.info("INGESTION PROCESS STARTED") + while True: + + task:IngestionTask = await self.task_queue.get() + + logger.info(f"got packet for processing") + task._status = TaskStatus.PROCESSING + table_id = str(uuid4()) + try: + await ingestion_task(task.file_name,table_id,task.chunk_size,task.overlaping) + task._table_id = table_id + task._message = "every thing run succesfully" + task._status = TaskStatus.SUCCESS + except Exception as e: + logger.info(f"ERROR: {e}") + task._message = "there is an error" + task._status = TaskStatus.ERROR + def _start(self): + # loop = asyncio.new_event_loop() + # try: + # loop.run_in_executor(self.start()) + # except Exception as e: + # logger.info(f"error: {e}") + asyncio.run(self.start()) + \ No newline at end of file diff --git a/bolna/agent_types/llama_index_rag_agent.py b/bolna/agent_types/llama_index_rag_agent.py new file mode 100644 index 00000000..fb4b32e1 --- /dev/null +++ b/bolna/agent_types/llama_index_rag_agent.py @@ -0,0 +1,192 @@ +import os +import time +import asyncio +import logging +from typing import List, Tuple, Generator, AsyncGenerator + +from dotenv import load_dotenv, find_dotenv + +from llama_index.core import VectorStoreIndex, StorageContext +from llama_index.core.llms import ChatMessage +from llama_index.core.tools import QueryEngineTool, ToolMetadata +from llama_index.llms.openai import OpenAI +from llama_index.vector_stores.lancedb import LanceDBVectorStore +from llama_index.agent.openai import OpenAIAgent + +from .base_agent import BaseAgent +from pymongo import MongoClient +from bolna.helpers.logger_config import configure_logger +from bolna.helpers.data_ingestion_pipe import lance_db +from bolna.rag.base import DatabaseConnector + +from bolna.models import * +from bolna.rag.mongodb_rag import MongoDBConnector, RAGEngine as MongoDBRAGEngine + + +load_dotenv(find_dotenv(), override=True) +logger = configure_logger(__name__) + +class LlamaIndexRag(BaseAgent): + """ + A class that implements a RAG (Retrieval-Augmented Generation) system using LlamaIndex. + + This class sets up and manages an OpenAI-based language model, a vector store, and an agent + for performing document search and question answering tasks. + + Attributes: + vector_id (str): Identifier for the vector store. + temperature (float): Temperature setting for the language model. + model (str): The name of the OpenAI model to use. + buffer (int): Size of the token buffer for streaming responses. + max_tokens (int): Maximum number of tokens for the language model output. + OPENAI_KEY (str): OpenAI API key retrieved from environment variables. + llm (OpenAI): Instance of the OpenAI language model. + vector_store (LanceDBVectorStore): Vector store for document storage and retrieval. + vector_index (VectorStoreIndex): Index built on top of the vector store. + query_engine: Query engine for searching the vector index. + agent (OpenAIAgent): Agent that uses the query engine to answer questions. + """ + + def __init__(self, vector_id: str, temperature: float, model: str, buffer: int = 40, max_tokens: int = 100, provider_config: dict = None): + """ + Initialize the LlamaIndexRag instance. + + Args: + vector_id (str): Identifier for the vector store. + temperature (float): Temperature setting for the language model. + model (str): The name of the OpenAI model to use. + buffer (int, optional): Size of the token buffer for streaming responses. Defaults to 40. + max_tokens (int, optional): Maximum number of tokens for the language model output. Defaults to 100. + """ + super().__init__() + self.vector_id = vector_id + self.temperature = temperature + self.model = model + self.buffer = buffer + self.max_tokens = max_tokens + self.provider_config = provider_config + self.OPENAI_KEY = os.getenv('OPENAI_API_KEY') + self.provider = None + self.query_engine = None + + self.LANCEDB_DIR = os.getenv('LANCEDB_DIR') + + self._setup_llm() + self._setup_provider() + self._setup_agent() + + def _setup_llm(self): + """Set up the OpenAI language model.""" + self.llm = OpenAI( + model=self.model, + temperature=self.temperature, + api_key=self.OPENAI_KEY, + max_tokens=self.max_tokens + ) + + def _setup_provider(self): + """Based on the relevant provider config, set up the provider.""" + + provider_name = self.provider_config.get('provider') + + if provider_name == 'mongodb': + logging.info(f"Setting up {provider_name} RAG") + config = MongoDBProviderConfig(**self.provider_config['provider_config']) + connector = MongoDBConnector(config) + connector.connect() + connector.verify_data() + self.provider = MongoDBRAGEngine(connector) + self.provider.setup() + logger.info(f"{provider_name.capitalize()} RAG engine initialized") + else: + logging.info(f"Setting up {provider_name} RAG") + self.vector_store = LanceDBVectorStore(uri=self.LANCEDB_DIR, table_name=self.provider_config['provider_config'].get('vector_id')) + logging.info(f"Table params : {self.provider_config['provider_config'].get('vector_id')}") + storage_context = StorageContext.from_defaults(vector_store=self.vector_store) + self.vector_index = VectorStoreIndex([], storage_context=storage_context) + self.query_engine = self.vector_index.as_query_engine() + logger.info("LanceDB vector store initialized") + + def _setup_agent(self): + """Set up the OpenAI agent with the query engine tool.""" + tools = [ + QueryEngineTool( + self.query_engine, + metadata=ToolMetadata( + name="search", + description="Search the document, pass the entire user message in the query", + ), + ) + ] + self.agent = OpenAIAgent.from_tools(tools=tools, verbose=True) + logger.info("LLAMA INDEX AGENT IS CREATED") + + @staticmethod + def convert_history(history: List[dict]) -> Tuple[ChatMessage, List[ChatMessage]]: + """ + Convert a list of message dictionaries to ChatMessage objects. + + Args: + history (List[dict]): A list of dictionaries containing message data. + + Returns: + Tuple[ChatMessage, List[ChatMessage]]: A tuple containing the latest message and the chat history. + """ + message = ChatMessage(role=history[-1]['role'], content=history[-1]['content']) + chat_history = [ChatMessage(role=msg['role'], content=msg['content']) for msg in history[:-1]] + return message, chat_history + + async def generate(self, message: List[dict], **kwargs) -> AsyncGenerator[Tuple[str, bool, float, bool], None]: + """ + Generate a response based on the input message and chat history. + + This method streams the generated response, yielding chunks of text along with metadata. + + Args: + message (List[dict]): A list of dictionaries containing the message data and chat history. + **kwargs: Additional keyword arguments (unused in this implementation). + + Yields: + Tuple[str, bool, float, bool]: A tuple containing: + - The generated text chunk. + - A boolean indicating if this is the final chunk. + - The latency of the first token. + - A boolean indicating if the response was truncated (always False in this implementation). + """ + logger.info(f"Generate function Input: {message}") + message, history = await asyncio.to_thread(self.convert_history, message) + start_time = time.time() + + # mongodb + if self.provider: + # Use provider-specific query method + response = self.provider.query(message.content) + yield response.response, True, time.time() - start_time, False + + else: + buffer = "" + latency = -1 + start_time = time.time() + + # llamaindex provides astream_chat, no need for to_thread as we are running this over cloud! + #token_generator = await asyncio.to_thread(self.agent.stream_chat, message.content, history) + token_generator = await self.agent.astream_chat(message.content, history) + + async for token in token_generator.async_response_gen(): + if latency < 0: + latency = time.time() - start_time + buffer += " " + token + if len(buffer.split()) >= self.buffer: + yield buffer.strip(), False, latency, False + logger.info(f"LLM BUFFER FULL BUFFER OUTPUT: {buffer}") + buffer = "" + + if buffer: + logger.info(f"LLM BUFFER FLUSH BUFFER OUTPUT: {buffer}") + yield buffer.strip(), True, latency, False + async def __del__(self): + if hasattr(self, 'provider') and hasattr(self.provider, 'disconnect'): + self.provider.disconnect() + + def get_model(self): + return "Model" \ No newline at end of file diff --git a/bolna/agent_types/logger_config.py b/bolna/agent_types/logger_config.py new file mode 100644 index 00000000..01be24c2 --- /dev/null +++ b/bolna/agent_types/logger_config.py @@ -0,0 +1,20 @@ +import logging + +VALID_LOGGING_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + + +def configure_logger(file_name, enabled=True, logging_level='INFO'): + if logging_level not in VALID_LOGGING_LEVELS: + logging_level = "INFO" + + logging.basicConfig( + level=logging_level, + format="%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + logger = logging.getLogger(file_name) + + if not enabled: + logger.disabled = True + return logger \ No newline at end of file diff --git a/bolna/helpers/utils.py b/bolna/helpers/utils.py index 2fcec132..6c4ca6a6 100644 --- a/bolna/helpers/utils.py +++ b/bolna/helpers/utils.py @@ -151,6 +151,7 @@ async def delete_s3_file_by_prefix(bucket_name,file_key): async def store_file(bucket_name=None, file_key=None, file_data=None, content_type="json", local=False, preprocess_dir=None): if not local: + logger.info(f"Saving at {bucket_name}") session = AioSession() async with AsyncExitStack() as exit_stack: diff --git a/bolna/models.py b/bolna/models.py index 4872a929..25b14266 100644 --- a/bolna/models.py +++ b/bolna/models.py @@ -1,18 +1,17 @@ import json from typing import Optional, List, Union, Dict -from pydantic import BaseModel, Field, validator, ValidationError, Json +from pydantic import BaseModel, Field, validator, field_validator, ValidationError, Json from pydantic_core import PydanticCustomError + from .providers import * AGENT_WELCOME_MESSAGE = "This call is being recorded for quality assurance and training. Please speak now." - def validate_attribute(value, allowed_values): if value not in allowed_values: raise ValidationError(f"Invalid provider {value}. Supported values: {allowed_values}") return value - class PollyConfig(BaseModel): voice: str engine: str @@ -108,7 +107,7 @@ def validate_model(cls, value): class IOModel(BaseModel): provider: str - format: str + format: Optional[str] = "wav" @validator("provider") def validate_provider(cls, value): @@ -127,16 +126,43 @@ class Route(BaseModel): # Routes can be used for FAQs caching, prompt routing, guard rails, agent assist function calling class Routes(BaseModel): embedding_model: Optional[str] = "Snowflake/snowflake-arctic-embed-l" - routes: List[Route] + routes: Optional[List[Route]] = [] class OpenaiAssistants(BaseModel): name: Optional[str] = None assistant_id: str = None + max_tokens: Optional[int] =100 + temperature: Optional[float] = 0.2 + buffer_size: Optional[int] = 100 + provider: Optional[str] = "openai" + model: Optional[str] = "gpt-3.5-turbo" + +class MongoDBProviderConfig(BaseModel): + connection_string: Optional[str] = None + db_name: Optional[str] = None + collection_name: Optional[str] = None + index_name: Optional[str] = None + llm_model: Optional[str] = "gpt-3.5-turbo" + embedding_model: Optional[str] = "text-embedding-3-small" + embedding_dimensions: Optional[int] = 256 + +class LanceDBProviderConfig(BaseModel): + vector_id: str + +class VectorStore(BaseModel): + provider: str + provider_config: Union[LanceDBProviderConfig, MongoDBProviderConfig] + + @field_validator('provider_config', mode='before') + def validate_provider_config(cls, value, info): + vector_provider = info.data.get('provider') + if vector_provider not in ['mongodb', 'lancedb']: + raise ValueError('Unsupported provider for vector_store') + return value class LLM(BaseModel): model: Optional[str] = "gpt-3.5-turbo" max_tokens: Optional[int] = 100 - agent_flow_type: Optional[str] = "streaming" family: Optional[str] = "openai" temperature: Optional[float] = 0.1 request_json: Optional[bool] = False @@ -148,11 +174,79 @@ class LLM(BaseModel): presence_penalty: Optional[float] = 0.0 provider: Optional[str] = "openai" base_url: Optional[str] = None - routes: Optional[Routes] = None + + +class SIMPLE_LLM_AGENT(LLM): + agent_flow_type: Optional[str] = "streaming" #It is used for backwards compatibility + routes: Optional[Routes] = None extraction_details: Optional[str] = None summarization_details: Optional[str] = None - backend: Optional[str] = "bolna" - extra_config: Optional[OpenaiAssistants] = None + + +class Node(BaseModel): + id: str + type: str #Can be router or conversation for now + llm: LLM + exit_criteria: str + exit_response: Optional[str] = None + exit_prompt: Optional[str] = None + is_root: Optional[bool] = False + +class Edge(BaseModel): + start_node: str # Node ID + end_node: str + condition: Optional[tuple] = None #extracted value from previous step and it's value + +class LLM_AGENT_GRAPH(BaseModel): + nodes: List[Node] + edges: List[Edge] + +class AGENT_ROUTE_CONFIG(BaseModel): + utterances: List[str] + threshold: Optional[float] = 0.85 + +class MultiAgent(BaseModel): + agent_map: Dict[str, Union[LLM, OpenaiAssistants]] + agent_routing_config: Dict[str, AGENT_ROUTE_CONFIG] + default_agent: str + embedding_model: Optional[str] = "Snowflake/snowflake-arctic-embed-l" + +class KnowledgebaseAgent(LLM): + vector_store: VectorStore + provider: Optional[str] = "openai" + model: Optional[str] = "gpt-3.5-turbo" + + +class LLM_AGENT(BaseModel): + agent_flow_type: str + agent_type: str + guardrails: Optional[Routes] = None + extra_config: Union[OpenaiAssistants, KnowledgebaseAgent, LLM_AGENT_GRAPH, MultiAgent, SIMPLE_LLM_AGENT] + + @field_validator('extra_config', mode='before') + def validate_extra_config(cls, value, info): + agent_type = info.data.get('agent_type') + + valid_config_types = { + 'openai_assistant': OpenaiAssistants, + 'knowledgebase_agent': KnowledgebaseAgent, + 'llm_agent_graph': LLM_AGENT_GRAPH, + 'multiagent': MultiAgent, + 'simple_llm_agent': SIMPLE_LLM_AGENT, + } + + if agent_type not in valid_config_types: + raise ValueError(f'Unsupported agent_type: {agent_type}') + + expected_type = valid_config_types[agent_type] + + if not isinstance(value, dict): + raise ValueError(f"extra_config must be a dict, got {type(value)}") + + try: + return expected_type(**value) + except Exception as e: + raise ValueError(f"Failed to create {expected_type.__name__} from extra_config: {str(e)}") class MessagingModel(BaseModel): @@ -184,7 +278,7 @@ class ToolModel(BaseModel): tools_params: Dict[str, APIParams] class ToolsConfig(BaseModel): - llm_agent: Optional[LLM] = None + llm_agent: Optional[Union[LLM_AGENT, SIMPLE_LLM_AGENT]] = None synthesizer: Optional[Synthesizer] = None transcriber: Optional[Transcriber] = None input: Optional[IOModel] = None @@ -232,4 +326,4 @@ class AgentModel(BaseModel): agent_name: str agent_type: str = "other" tasks: List[Task] - agent_welcome_message: Optional[str] = AGENT_WELCOME_MESSAGE + agent_welcome_message: Optional[str] = AGENT_WELCOME_MESSAGE \ No newline at end of file diff --git a/bolna/rag/base.py b/bolna/rag/base.py new file mode 100644 index 00000000..47d62fea --- /dev/null +++ b/bolna/rag/base.py @@ -0,0 +1,47 @@ +from abc import ABC, abstractmethod +from llama_index.core import VectorStoreIndex +from llama_index.core.settings import Settings +from llama_index.llms.openai import OpenAI +from llama_index.embeddings.openai import OpenAIEmbedding + +class DatabaseConnector(ABC): + def __init__(self, db_name: str): + self.db_name = db_name + + @abstractmethod + def connect(self): + pass + + @abstractmethod + def disconnect(self): + pass + + @abstractmethod + def verify_data(self): + pass + + @abstractmethod + def setup_vector_store(self): + pass + + +class VectorSearchEngine: + def __init__(self, db_connector): + self.db_connector = db_connector + self.index = None + + def setup_llama_index(self): + embed_model = OpenAIEmbedding(model="text-embedding-3-small", dimensions=256) + llm = OpenAI() + Settings.llm = llm + Settings.embed_model = embed_model + + def create_index(self): + vector_store = self.db_connector.setup_vector_store() + self.index = VectorStoreIndex.from_vector_store(vector_store) + + def query(self, query_text, similarity_top_k=5): + if not self.index: + raise ValueError("Index not created. Call create_index() first.") + query_engine = self.index.as_query_engine(similarity_top_k=similarity_top_k) + return query_engine.query(query_text) \ No newline at end of file diff --git a/bolna/rag/mongodb_rag.py b/bolna/rag/mongodb_rag.py new file mode 100644 index 00000000..4ab060fc --- /dev/null +++ b/bolna/rag/mongodb_rag.py @@ -0,0 +1,79 @@ +import os +import logging +from typing import List, Optional +from pydantic import BaseModel +from pymongo import MongoClient +from pymongo.errors import ConnectionFailure +from llama_index.core import VectorStoreIndex +from llama_index.core.settings import Settings +from llama_index.llms.openai import OpenAI +from llama_index.embeddings.openai import OpenAIEmbedding +from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch + +from bolna.models import * +from bolna.rag.base import DatabaseConnector + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class MongoDBConnector(DatabaseConnector): + def __init__(self, config: MongoDBProviderConfig): + super().__init__(config.db_name) + self.config = config + self.client = None + self.db = None + self.collection = None + self.collection_name = config.collection_name # Add this line + + def connect(self): + try: + self.client = MongoClient(self.config.connection_string) + self.db = self.client[self.config.db_name] + self.collection = self.db[self.config.collection_name] + logger.info("Connection to MongoDB successful") + except ConnectionFailure as e: + logger.error(f"Connection failed: {e}") + raise + + def disconnect(self): + if self.client: + self.client.close() + logger.info("Disconnected from MongoDB") + + def verify_data(self): + doc_count = self.collection.count_documents({}) + logger.info(f"Total documents in collection: {doc_count}") + if doc_count > 0: + logger.info(f"Documents found in the collection.") + else: + logger.warning("No documents found in the collection.") + + def setup_vector_store(self): + return MongoDBAtlasVectorSearch( + self.client, + db_name=self.config.db_name, + collection_name=self.config.collection_name, + vector_index_name=self.config.index_name + ) + +class RAGEngine: + def __init__(self, db_connector: MongoDBConnector): + self.db_connector = db_connector + self.index = None + + def setup(self): + embed_model = OpenAIEmbedding(model=self.db_connector.config.embedding_model, dimensions=self.db_connector.config.embedding_dimensions) + llm = OpenAI(model=self.db_connector.config.llm_model) + Settings.llm = llm + Settings.embed_model = embed_model + + vector_store = self.db_connector.setup_vector_store() + self.index = VectorStoreIndex.from_vector_store(vector_store) + + def query(self, query_text: str, similarity_top_k: int = 5): + if not self.index: + raise ValueError("Index not created. Call setup() first.") + query_engine = self.index.as_query_engine(similarity_top_k=similarity_top_k) + response = query_engine.query(query_text) + return response \ No newline at end of file diff --git a/examples/batch-calling/README.md b/examples/batch-calling/README.md deleted file mode 100644 index 86623e57..00000000 --- a/examples/batch-calling/README.md +++ /dev/null @@ -1 +0,0 @@ -https://docs.bolna.dev/batch-calling#example-request \ No newline at end of file diff --git a/examples/batch-calling/batch_calling.py b/examples/batch-calling/batch_calling.py deleted file mode 100644 index 84719c1d..00000000 --- a/examples/batch-calling/batch_calling.py +++ /dev/null @@ -1,110 +0,0 @@ -import asyncio -import os -from dotenv import load_dotenv -import aiohttp - -# Load environment variables from .env file -load_dotenv() - -# Load from .env -host = "https://api.bolna.dev" -api_key = os.getenv("api_key", None) -agent_id = 'ee153a6c-19f8-3a61-989a-9146a31c7834' #agent_id in which we want to create the batch -file_path = '/path/of/csv/file' -schedule_time = '2024-06-01T04:10:00+05:30' - - -async def schedule_batch(api_key, agent_id, batch_id=None, scheduled_at=None): - print("now scheduling batch for batch id : {}".format(batch_id)) - url = f"{host}/batches/schedule" - headers = {'Authorization': f'Bearer {api_key}'} - data = { - 'agent_id': agent_id, - 'batch_id': batch_id, - 'scheduled_at': scheduled_at - } - - try: - async with aiohttp.ClientSession() as session: - async with session.post(url, headers=headers, data=data) as response: - response_data = await response.json() - if response.status == 200: - return response_data - else: - raise Exception(f"Error scheduling batch: {response_data}") - except aiohttp.ClientError as e: - print(f"HTTP Client Error: {str(e)}") - except Exception as e: - print(f"Unexpected error: {str(e)}") - - -async def get_batch_status(api_key, agent_id, batch_id=None): - print("now getting batch status for batch id : {}".format(batch_id)) - url = f"{host}/batches/{agent_id}/{batch_id}" - headers = {'Authorization': f'Bearer {api_key}'} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, headers=headers) as response: - response_data = await response.json() - if response.status == 200: - return response_data - else: - raise Exception(f"Error getting batch status: {response_data}") - except aiohttp.ClientError as e: - print(f"HTTP Client Error: {str(e)}") - except Exception as e: - print(f"Unexpected error: {str(e)}") - - -async def get_batch_executions(api_key, agent_id, batch_id=None): - print("now getting batch executions for batch id : {}".format(batch_id)) - url = f"{host}/batches/{agent_id}/{batch_id}/executions" - headers = {'Authorization': f'Bearer {api_key}'} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, headers=headers) as response: - response_data = await response.json() - if response.status == 200: - return response_data - else: - raise Exception(f"Error getting batch executions: {response_data}") - except aiohttp.ClientError as e: - print(f"HTTP Client Error: {str(e)}") - except Exception as e: - print(f"Unexpected error: {str(e)}") - - -async def create_batch(): - url = f"{host}/batches" - headers = {'Authorization': f'Bearer {api_key}'} - - with open(file_path, 'rb') as f: - form_data = aiohttp.FormData() - form_data.add_field('agent_id', agent_id) - form_data.add_field('file', f, filename=os.path.basename(file_path), content_type='application/octet-stream') - - async with aiohttp.ClientSession() as session: - async with session.post(url, headers=headers, data=form_data) as response: - response_data = await response.json() - if response_data.get('state') == 'created': - batch_id = response_data.get('batch_id') - res = await schedule_batch(api_key, agent_id, batch_id, scheduled_at=schedule_time) - if res.get('state') == 'scheduled': - check = True - while check: - # Checking the current status every 1 minute - await asyncio.sleep(60) - res = await get_batch_status(api_key, agent_id, batch_id) - if res.get('status') == 'completed': - check = False - break - if not check: - res = await get_batch_executions(api_key, agent_id, batch_id) - print(res) - return res - - -if __name__ == "__main__": - asyncio.run(create_batch()) diff --git a/examples/function-calling/example_json.json b/examples/function-calling/example_json.json deleted file mode 100644 index b2298e1f..00000000 --- a/examples/function-calling/example_json.json +++ /dev/null @@ -1,216 +0,0 @@ -{ - "agent_config": { - "agent_name": "An agent with routes", - "agent_welcome_message": "This call is being recorded for quality assurance and training. Please speak now.", - "tasks": [ - { - "tools_config": { - "output": { - "format": "wav", - "provider": "twilio" - }, - "input": { - "format": "wav", - "provider": "twilio" - }, - "synthesizer": { - "audio_format": "wav", - "provider": "polly", - "stream": true, - "caching": true, - "provider_config": { - "voice": "Danielle", - "engine": "neural", - "language": "en-US" - }, - "buffer_size": 100.0 - }, - "llm_agent": { - "max_tokens": 100.0, - "presence_penalty": 0.0, - "summarization_details": null, - "base_url": null, - "extraction_details": null, - "top_p": 0.9, - "agent_flow_type": "streaming", - "request_json": false, - "routes": { - "embedding_model": "Snowflake/snowflake-arctic-embed-l", - "routes": [ - { - "route_name": "politics", - "utterances": [ - "Are you a Trump supporter", - "How many black people live in my neighborhood", - "Are you a democrat?", - "Are you a republican", - "Are you black?", - "What is the gender split of my society", - "Are you a democrat?", - "Tell me about your political ideologies", - "Who is winning the elections this year?", - "Are there hispanics in the area", - "I do not like democrats", - "I don't want faggots", - "Don't give me homosexuals", - "I need a white hair dresser only" - ], - "response": "Hey, thanks but I don't want to entertain hate speech", - "score_threshold": 0.90 - } - ] - }, - "min_p": 0.1, - "frequency_penalty": 0.0, - "stop": null, - "provider": "openai", - "top_k": 0.0, - "temperature": 0.2, - "model": "gpt-4o", - "family": "openai" - }, - "transcriber": { - "sampling_rate": 16000.0, - "endpointing": 400.0, - "task": "transcribe", - "keywords": "", - "stream": true, - "modeltype": "nova-2", - "model": "deepgram", - "language": "en", - "encoding": "linear16" - }, - "api_tools": { - "tools": "[{\"name\":\"check_availability_of_slots\",\"description\":\"Fetch the available free slots of appointment booking before booking the appointment\",\"parameters\":{\"type\":\"object\",\"properties\":{\"startTime\":{\"type\":\"string\",\"description\":\"It is an ISO FORMATTED DATE on which time user is available (convert it automatically to hr:min such as 3:30 PM is 15:30)\"},\"endTime\":{\"type\":\"string\",\"description\":\"It is an ISO FORMATTED DATE. endDate is always 15 minutes more than startDate always i.e. increment one day to the startDate. such if startDate is 2024-05-15T16:30:00.000Z then endDate is 2024-05-15T16:45:00.000Z\"},\"eventTypeId\":{\"type\":\"integer\",\"description\":\"it is the type of event. For women's haircut use event type id as 814889 and for men's haircut use 798483 \"}},\"required\":[\"startTime\",\"eventTypeId\", \"endTime\"]}},{\"name\":\"book_appointment\",\"description\":\"Use this tool to book an appointment with given details and save the appointment in the calendar.\",\"parameters\":{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\",\"description\":\"name of the person.\"},\"email\":{\"type\":\"string\",\"description\":\"email name of the person.\"},\"address\":{\"type\":\"string\",\"description\":\"address name of the person.\"},\"preferred_date\":{\"type\":\"string\",\"description\":\"Preferred date provided by the person, ask the date when user wants to book an appointment such as tomorrow or day after tomorrow. and convert the user's response into a python readable format i.e. yyyy-mm-dd\"},\"preferred_time\":{\"type\":\"string\",\"description\":\"Preferred time provided by the person, ask the time when users wants to book an appointment such as 9am or 10:30am and convert it to python readable format for example if users said 9am then it is 09:00 or 1:30PM then it 13:30 i.e. in hr:min \"},\"timezone\":{\"type\":\"string\",\"description\":\"fetch timezone by yourself, don't ask user, find timezone based on the address provided by the user\"},\"eventTypeId\":{\"type\":\"integer\",\"description\":\"it is the type of event. For women's haircut use event type id as 814889 and for men's haircut use 798483 \"}},\"required\":[\"name\",\"email\",\"address\",\"eventTypeId\",\"preferred_date\",\"preferred_time\",\"timezone\"]}}, {\"name\":\"transfer_call\",\"description\":\"Transfer calls to human agent if user asks to transfer the call or if it's not in your instructions on how to deal with a certain question user is asking\"}]", - "tools_params": { - "check_availability_of_slots": { - "url": "https://api.cal.com/v1/slots?apiKey=", - "method": "GET", - "api_token": null, - "param": "{\"eventTypeId\":%(eventTypeId)d,\"startTime\":\"%(startTime)s\",\"endTime\":\"%(endTime)s\"}" - }, - "book_appointment": { - "url": "https://api.cal.com/v1/bookings?apiKey=", - "method": "POST", - "api_token": null, - "param": "{\"responses\":{\"name\":\"%(name)s\",\"email\":\"%(email)s\",\"location\":{\"optionValue\":\"\",\"value\":\"inPerson\"}},\"start\":\"%(preferred_date)sT%(preferred_time)s:00.000Z\",\"eventTypeId\":%(eventTypeId)d,\"timeZone\":\"%(timezone)s\",\"language\":\"en\",\"metadata\":{}}" - } - } - } - }, - "task_config": { - "hangup_after_LLMCall": false, - "hangup_after_silence": 10.0, - "interruption_backoff_period": 0.0, - "backchanneling": true, - "backchanneling_start_delay": 5.0, - "optimize_latency": true, - "incremental_delay": 100.0, - "call_cancellation_prompt": null, - "number_of_words_for_interruption": 3.0, - "backchanneling_message_gap": 5.0 - }, - "task_type": "conversation", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "transcriber", - "llm", - "synthesizer" - ] - ] - } - }, - { - "tools_config": { - "output": null, - "input": null, - "synthesizer": null, - "llm_agent": { - "max_tokens": 100.0, - "presence_penalty": 0.0, - "summarization_details": null, - "base_url": null, - "extraction_details": null, - "top_p": 0.9, - "agent_flow_type": "streaming", - "request_json": true, - "routes": null, - "min_p": 0.1, - "frequency_penalty": 0.0, - "stop": null, - "provider": "openai", - "top_k": 0.0, - "temperature": 0.1, - "model": "gpt-3.5-turbo-1106", - "family": "openai" - }, - "transcriber": null, - "api_tools": null - }, - "task_config": {}, - "task_type": "summarization", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "llm" - ] - ] - } - }, - { - "tools_config": { - "output": null, - "input": null, - "synthesizer": null, - "llm_agent": { - "extraction_json": "{\n\"slot\": \"The slot booked by the user\"\n}", - "max_tokens": 100.0, - "presence_penalty": 0.0, - "summarization_details": null, - "base_url": null, - "extraction_details": "slot: Slot booked by user", - "top_p": 0.9, - "agent_flow_type": "streaming", - "request_json": true, - "routes": null, - "min_p": 0.1, - "frequency_penalty": 0.0, - "stop": null, - "provider": "openai", - "top_k": 0.0, - "temperature": 0.1, - "model": "gpt-3.5-turbo-1106", - "family": "openai" - }, - "transcriber": null, - "api_tools": null - }, - "task_config": {}, - "task_type": "extraction", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "llm" - ] - ] - } - } - ], - "agent_type": "Lead Qualification" - }, - "agent_prompts": { - "task_1": { - "system_prompt": "### Agent Description You're an, Mellisa, a helpful agent whose job is to book appointments for Boston Barber Co. at Beacon Hill. There are two type of appointments available - 1. Haircut for men. event id - 798483 2. Appointment for women - 814889 ### About store - Shop is opened Tuesday to Sunday from 9 am to 9pm. - For premium treatment one beer is on the house ### Flow Users can ask you to find available slots & booking for an appointment. You will ask the users about their availability i.e. when they are available the date and time and check if that slot is available or not then you will ask other details as mentioned in function calling and proceed with this information to do the function calling for finding available slots. If slots are available then you must tell only those slots or that slot to user which is or are very close to the user's availability. ### You have access to following functions/tools 1. **check_availability_of_slots** - To check availability of slots from the calendar before booking the appointment. 2. **book_appointment** - Use this tool to book an appointment with given details and save the appointment in the calendar. ### Important instructions 1. MAKE SURE YOU GET ALL THE REQUIRED DETAILS BEFORE DOING A FUNCTION CALL. 2. PLEASE MAKES SURE YOUR RESPONSES ARE GEARED TO BE SYNTHESIZED BY THE SYNTHESISER IN AN EXPRESSIVE WAY. 3. Just speak 1 sentence at a time" - }, - "task_2": { - "system_prompt": "You are given a conversation. Kindly summarize it in a professional tone" - }, - "task_3": { - "system_prompt": "Extract the information from the given conversation betweenn an bot and a human. \n### JSON structure {\n\"slot\": \"The slot booked by the user\"\n}" - } - } -} \ No newline at end of file diff --git a/examples/openai-assistants/README.md b/examples/openai-assistants/README.md deleted file mode 100644 index 15b860db..00000000 --- a/examples/openai-assistants/README.md +++ /dev/null @@ -1,233 +0,0 @@ -## Create a RAG based assistant - -### Step 1 :Create an assistant via the API (only openai supported yet) - -```sh -curl --request POST \ - --url https://api.bolna.dev/assistant \ - --header 'Authorization: Bearer bn-***' \ - --header 'Content-Type: multipart/form-data' \ - --header 'User-Agent: insomnia/8.5.1' \ - --form 'name=A simple insurance agent assistant with human fallback' \ - --form 'instructions=You are a Max Bupa Life Insurance virtual agent. Answer policy questions using policy documents, check available appointment slots, and book appointments. Greet users warmly, provide clear policy details, confirm appointments, and maintain professionalism and user confidentiality.' \ - --form model=gpt-3.5-turbo \ - --form 'tools=[{"name":"check_availability_of_slots","description":"Fetch the available free slots of appointment booking before booking the appointment","parameters":{"type":"object","properties":{"startTime":{"type":"string","description":"It is an ISO FORMATTED DATE on which time user is available (convert it automatically to hr:min such as 3:30 PM is 15:30)"},"endTime":{"type":"string","description":"It is an ISO FORMATTED DATE. endDate is always 15 minutes more than startDate always i.e. increment one day to the startDate. such if startDate is 2024-05-15T16:30:00.000Z then endDate is 2024-05-15T16:45:00.000Z"},"eventTypeId":{"type":"integer","description":"it is the type of event. use event type id as 814889 "}},"required":["startTime","eventTypeId", "endTime"]}},{"name":"book_appointment","description":"Use this tool to book an appointment with given details and save the appointment in the calendar.","parameters":{"type":"object","properties":{"name":{"type":"string","description":"name of the person."},"email":{"type":"string","description":"email name of the person."},"address":{"type":"string","description":"address name of the person."},"preferred_date":{"type":"string","description":"Preferred date provided by the person, ask the date when user wants to book an appointment such as tomorrow or day after tomorrow. and convert the user'\''s response into a python readable format i.e. yyyy-mm-dd"},"preferred_time":{"type":"string","description":"Preferred time provided by the person, ask the time when users wants to book an appointment such as 9am or 10:30am and convert it to python readable format for example if users said 9am then it is 09:00 or 1:30PM then it 13:30 i.e. in hr:min "},"timezone":{"type":"string","description":"fetch timezone by yourself, don'\''t ask user, find timezone based on the address provided by the user"},"eventTypeId":{"type":"integer","description":"it is the type of event. use event type id as 814889"}},"required":["name","email","address","eventTypeId","preferred_date","preferred_time","timezone"]}}, {"name":"transfer_call","description":"Transfer calls to human agent if user asks to transfer the call or if it'\''s not in your instructions on how to deal with a certain question user is asking"}]' \ - --form 'file=@/Users/marmikpandya/Desktop/indie-hacking/Health Companion-Health Insurance Plan_GEN617.pdf' -``` - -### Step 2 - Create an assistant - -```json -{ - "agent_config": { - "agent_name": "Openai assistant agent agent with call transfer", - "agent_status": "processed", - "agent_welcome_message": "This call is being recorded for quality assurance and training. Please speak now.", - "tasks": [ - { - "tools_config": { - "output": { - "format": "wav", - "provider": "twilio" - }, - "input": { - "format": "wav", - "provider": "twilio" - }, - "synthesizer": { - "audio_format": "wav", - "provider": "deepgram", - "stream": true, - "caching": true, - "provider_config": { - "voice": "Asteria", - "model": "aura-asteria-en" - }, - "buffer_size": 100.0 - }, - "llm_agent": { - "agent_flow_type": "openai_assistant", - "extra_config": { - "name": "a Insurance rag agent", - "assistant_id": "asst_Djy3W6i1n4fZpTMcylSiyVpE" - } - }, - "transcriber": { - "sampling_rate": 16000, - "endpointing": 123.0, - "task": "transcribe", - "keywords": null, - "stream": true, - "provider": "deepgram", - "model": "nova2", - "language": "en", - "encoding": "linear16" - }, - "api_tools": { - "tools": "[{\"name\":\"check_availability_of_slots\",\"description\":\"Fetch the available free slots of appointment booking before booking the appointment\",\"parameters\":{\"type\":\"object\",\"properties\":{\"startTime\":{\"type\":\"string\",\"description\":\"It is an ISO FORMATTED DATE on which time user is available (convert it automatically to hr:min such as 3:30 PM is 15:30)\"},\"endTime\":{\"type\":\"string\",\"description\":\"It is an ISO FORMATTED DATE. endDate is always 15 minutes more than startDate always i.e. increment one day to the startDate. such if startDate is 2024-05-15T16:30:00.000Z then endDate is 2024-05-15T16:45:00.000Z\"},\"eventTypeId\":{\"type\":\"integer\",\"description\":\"it is the type of event. For women's haircut use event type id as 814889 and for men's haircut use 798483 \"}},\"required\":[\"startTime\",\"eventTypeId\", \"endTime\"]}},{\"name\":\"book_appointment\",\"description\":\"Use this tool to book an appointment with given details and save the appointment in the calendar.\",\"parameters\":{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\",\"description\":\"name of the person.\"},\"email\":{\"type\":\"string\",\"description\":\"email name of the person.\"},\"address\":{\"type\":\"string\",\"description\":\"address name of the person.\"},\"preferred_date\":{\"type\":\"string\",\"description\":\"Preferred date provided by the person, ask the date when user wants to book an appointment such as tomorrow or day after tomorrow. and convert the user's response into a python readable format i.e. yyyy-mm-dd\"},\"preferred_time\":{\"type\":\"string\",\"description\":\"Preferred time provided by the person, ask the time when users wants to book an appointment such as 9am or 10:30am and convert it to python readable format for example if users said 9am then it is 09:00 or 1:30PM then it 13:30 i.e. in hr:min \"},\"timezone\":{\"type\":\"string\",\"description\":\"fetch timezone by yourself, don't ask user, find timezone based on the address provided by the user\"},\"eventTypeId\":{\"type\":\"integer\",\"description\":\"it is the type of event. For women's haircut use event type id as 814889 and for men's haircut use 798483 \"}},\"required\":[\"name\",\"email\",\"address\",\"eventTypeId\",\"preferred_date\",\"preferred_time\",\"timezone\"]}}, {\"name\":\"transfer_call\",\"description\":\"Transfer calls to human agent if user asks to transfer the call or if it's not in your instructions on how to deal with a certain question user is asking\"}]", - "tools_params": { - "book_appointment": { - "method": "POST", - "param": "{\"responses\":{\"name\":\"%(name)s\",\"email\":\"%(email)s\",\"location\":{\"optionValue\":\"\",\"value\":\"inPerson\"}},\"start\":\"%(preferred_date)sT%(preferred_time)s:00.000Z\",\"eventTypeId\":%(eventTypeId)d,\"timeZone\":\"%(timezone)s\",\"language\":\"en\",\"metadata\":{}}", - "url": "https://api.cal.com/v1/bookings?apiKey=", - "api_token": null - }, - "check_availability_of_slots": { - "method": "GET", - "param": "{\"eventTypeId\":%(eventTypeId)d,\"startTime\":\"%(startTime)s\",\"endTime\":\"%(endTime)s\"}", - "url": "https://api.cal.com/v1/slots?apiKey=", - "api_token": null - }, - "transfer_call": { - "method": "POST", - "param": null, - "url": "https://webhook.site/c5dfe764-8888-4be5-a28c-8cfe4d54d475", - "api_token": null - } - } - } - }, - "task_config": { - "ambient_noise_track": "office-ambience", - "hangup_after_LLMCall": false, - "hangup_after_silence": 10.0, - "ambient_noise": false, - "interruption_backoff_period": 0.0, - "backchanneling": false, - "backchanneling_start_delay": 5.0, - "optimize_latency": true, - "incremental_delay": 100.0, - "call_cancellation_prompt": null, - "number_of_words_for_interruption": 3.0, - "backchanneling_message_gap": 5.0, - "use_fillers": false - - }, - "task_type": "conversation", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "transcriber", - "llm", - "synthesizer" - ] - ] - } - }, - { - "tools_config": { - "output": null, - "input": null, - "synthesizer": null, - "llm_agent": { - "max_tokens": 100.0, - "presence_penalty": 0.0, - "summarization_details": null, - "base_url": null, - "extraction_details": null, - "top_p": 0.9, - "agent_flow_type": "streaming", - "request_json": true, - "routes": null, - "min_p": 0.1, - "frequency_penalty": 0.0, - "stop": null, - "provider": "openai", - "top_k": 0.0, - "temperature": 0.1, - "model": "gpt-3.5-turbo-1106", - "family": "openai" - }, - "transcriber": null, - "api_tools": null - }, - "task_config": { - "ambient_noise_track": "convention_hall", - "hangup_after_LLMCall": false, - "hangup_after_silence": 30.0, - "ambient_noise": true, - "interruption_backoff_period": 100.0, - "backchanneling": false, - "backchanneling_start_delay": 5.0, - "optimize_latency": true, - "incremental_delay": 100.0, - "call_cancellation_prompt": null, - "number_of_words_for_interruption": 1.0, - "backchanneling_message_gap": 5.0, - "call_terminate": 300 - }, - "task_type": "summarization", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "llm" - ] - ] - } - }, - { - "tools_config": { - "output": null, - "input": null, - "synthesizer": null, - "llm_agent": { - "max_tokens": 100.0, - "presence_penalty": 0.0, - "summarization_details": null, - "base_url": null, - "extraction_details": "slot: Slot booked by user", - "top_p": 0.9, - "agent_flow_type": "streaming", - "request_json": true, - "routes": null, - "min_p": 0.1, - "frequency_penalty": 0.0, - "stop": null, - "provider": "openai", - "top_k": 0.0, - "temperature": 0.1, - "model": "gpt-3.5-turbo-1106", - "family": "openai" - }, - "transcriber": null, - "api_tools": null - }, - "task_config": { - "ambient_noise_track": "office-ambience", - "hangup_after_LLMCall": false, - "hangup_after_silence": 10.0, - "ambient_noise": true, - "interruption_backoff_period": 100.0, - "backchanneling": false, - "backchanneling_start_delay": 5.0, - "optimize_latency": true, - "incremental_delay": 100.0, - "call_cancellation_prompt": null, - "number_of_words_for_interruption": 1.0, - "backchanneling_message_gap": 5.0 - }, - "task_type": "extraction", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "llm" - ] - ] - } - } - ], - "agent_type": "Lead Qualification" - }, - "agent_prompts": { - "task_2" : { - "system_prompt" : "summarize the conversation" - }, - "task_3" : { - "system_prompt" : "Extract the user sentiment in json with the key 'sentiment' " - } - } -} -``` \ No newline at end of file diff --git a/examples/webrtc/README.md b/examples/webrtc/README.md deleted file mode 100644 index 3644f3c3..00000000 --- a/examples/webrtc/README.md +++ /dev/null @@ -1,125 +0,0 @@ -# Bolna With Daily -Seamlessly merge [Bolna](https://github.com/bolna-ai/bolna) with [Daily](https://github.com/daily-co) for websocket connection. This is docker compose by which you can host bolna server Daily together in cloud just by clone this repo and follow these simple steps to deploy, but before that you have to make sure that you have [docker](https://docs.docker.com/engine/install/) and [docker compose](https://docs.docker.com/compose/install/) and make a .env file refer to .env-sample. - -in .env file kindly save -DAILY_API_KEY = YOUR_DAILY_API_KEY - -### Start Serices -```shell -docker compose up -d -``` - -note: make sure that your all service were runing - -`let assume your server IP is 192.168.1.10` - -### Creating Agent -for creating agent you have to execute following command mention below -```shell -curl --location 'http://192.168.1.10:5001/agent' \ ---header 'Content-Type: application/json' \ ---data '{ - "agent_config": { - "agent_name": "Alfred", - "agent_type": "other", - "agent_welcome_message": "Welcome", - "tasks": [ - { - "task_type": "conversation", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "transcriber", - "llm", - "synthesizer" - ] - ] - }, - "tools_config": { - "input": { - "format": "wav", - "provider": "daily" - }, - "output": { - "format": "wav", - "provider": "daily" - }, - "llm_agent": { - "max_tokens": 100, - "presence_penalty": 0, - "top_p": 0.9, - "agent_flow_type": "streaming", - "request_json": false, - "min_p": 0.1, - "frequency_penalty": 0, - "provider": "openai", - "top_k": 0, - "temperature": 0.2, - "model": "gpt-3.5-turbo", - "family": "openai" - }, - "synthesizer": { - "audio_format": "wav", - "buffer_size": 150, - "caching": true, - "provider": "polly", - "provider_config": { - "engine": "neural", - "language": "en-US", - "voice": "Danielle" - }, - "stream": true - }, - "transcriber": { - "encoding": "linear16", - "endpointing": 100, - "keywords": "", - "language": "en", - "model": "nova-2", - "provider": "deepgram", - "sampling_rate": 16000, - "stream": true, - "task": "transcribe" - } - }, - "task_config": { - "hangup_after_silence": 30 - } - } - ] - }, - "agent_prompts": { - "task_1": { - "system_prompt": "You are assistant at Dr. Sharma clinic you have to book an appointment" - } - } -} -' - -``` -below given is the response -``` -{"agent_id":"dcfe02de-bOdf-4589-b15b-64c76f0077d0", "state" : "created" } -``` -save / copy the agent_id we have to use in next step - -### Connect frontend to the websocket call - -Example frontent - https://codefile.io/f/rWIsJ0GztJ - -1. Connect your frontend with the (’/chat/v1/{agent_id}’) websocket and use the agent_id you saved in above step. It's that easy! -2. Our daily web-socket call will be up and running, so you can start chatting with the agent through your frontend! - -### Stop Services -```shell -docker compose down -``` - -### Conservation DEMO -This is demo using below prompt to the LLM -```json -"task_1": { - "system_prompt": "You are assistant at Dr. Sharma clinic you have to book an appointment" -} -``` diff --git a/examples/webrtc/docker-compose.yml b/examples/webrtc/docker-compose.yml deleted file mode 100644 index 3ed46100..00000000 --- a/examples/webrtc/docker-compose.yml +++ /dev/null @@ -1,28 +0,0 @@ -services: - - # main bolna service - bolna-app: - image: bolna-app:latest - build: - context: . - dockerfile: dockerfiles/bolna_server.Dockerfile - ports: - - "5001:5001" - depends_on: - - redis - env_file: - - .env - volumes: - - ../agent_data:/app/agent_data - - $HOME/.aws/credentials:/root/.aws/credentials:ro - - $HOME/.aws/config:/root/.aws/config:ro - environment: - - REDIS_URL=redis://redis:6379 - - # redis service used as a persistent storage - redis: - image: redis:latest - ports: - - "6379:6379" - - \ No newline at end of file diff --git a/examples/webrtc/dockerfiles/bolna_server.Dockerfile b/examples/webrtc/dockerfiles/bolna_server.Dockerfile deleted file mode 100644 index 414aca2c..00000000 --- a/examples/webrtc/dockerfiles/bolna_server.Dockerfile +++ /dev/null @@ -1,19 +0,0 @@ -FROM python:3.10.13-slim - -WORKDIR /app -COPY ./requirements.txt /app -COPY ./quickstart_server.py /app - -RUN apt-get update && apt-get install libgomp1 git -y -RUN apt-get -y update && apt-get -y upgrade && apt-get install -y --no-install-recommends ffmpeg -RUN pip install -r requirements.txt -RUN pip install --force-reinstall git+https://github.com/bolna-ai/bolna@master -RUN pip install scipy==1.11.0 -RUN pip install torch==2.0.1 -RUN pip install torchaudio==2.0.1 -RUN pip install pydub==0.25.1 -RUN pip install ffprobe -RUN pip install aiofiles - -EXPOSE 5001 -CMD ["uvicorn", "quickstart_server:app", "--host", "0.0.0.0", "--port", "5001"] diff --git a/examples/webrtc/quickstart_server.py b/examples/webrtc/quickstart_server.py deleted file mode 100644 index 07e2882b..00000000 --- a/examples/webrtc/quickstart_server.py +++ /dev/null @@ -1,122 +0,0 @@ -import os -import asyncio -import time -import uuid -import traceback -import aiohttp -from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Query -from fastapi.middleware.cors import CORSMiddleware -import redis.asyncio as redis -from dotenv import load_dotenv -from bolna.helpers.utils import store_file -from bolna.prompts import * -from bolna.helpers.logger_config import configure_logger -from bolna.models import * -from bolna.llms import LiteLLM -from bolna.agent_manager.assistant_manager import AssistantManager - -load_dotenv() -logger = configure_logger(__name__) - -redis_pool = redis.ConnectionPool.from_url(os.getenv('REDIS_URL'), decode_responses=True) -redis_client = redis.Redis.from_pool(redis_pool) -active_websockets: List[WebSocket] = [] - -app = FastAPI() - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"] -) - - -class CreateAgentPayload(BaseModel): - agent_config: AgentModel - agent_prompts: Optional[Dict[str, Dict[str, str]]] - - -@app.post("/agent") -async def create_agent(agent_data: CreateAgentPayload): - agent_uuid = str(uuid.uuid4()) - data_for_db = agent_data.agent_config.model_dump() - data_for_db["assistant_status"] = "seeding" - agent_prompts = agent_data.agent_prompts - logger.info(f'Data for DB {data_for_db}') - - if len(data_for_db['tasks']) > 0: - logger.info("Setting up follow up tasks") - for index, task in enumerate(data_for_db['tasks']): - if task['task_type'] == "extraction": - extraction_prompt_llm = os.getenv("EXTRACTION_PROMPT_GENERATION_MODEL") - extraction_prompt_generation_llm = LiteLLM(model=extraction_prompt_llm, max_tokens=2000) - extraction_prompt = await extraction_prompt_generation_llm.generate( - messages=[ - {'role': 'system', 'content': EXTRACTION_PROMPT_GENERATION_PROMPT}, - {'role': 'user', 'content': data_for_db["tasks"][index]['tools_config']["llm_agent"]['extraction_details']} - ]) - data_for_db["tasks"][index]["tools_config"]["llm_agent"]['extraction_json'] = extraction_prompt - - stored_prompt_file_path = f"{agent_uuid}/conversation_details.json" - await asyncio.gather( - redis_client.set(agent_uuid, json.dumps(data_for_db)), - store_file(file_key=stored_prompt_file_path, file_data=agent_prompts, local=True) - ) - - return {"agent_id": agent_uuid, "state": "created"} - - -############################################################################################# -# Websocket -############################################################################################# -@app.websocket("/chat/v1/{agent_id}") -async def websocket_endpoint(agent_id: str, websocket: WebSocket, user_agent: str = Query(None)): - logger.info("Connected to ws") - await websocket.accept() - active_websockets.append(websocket) - agent_config, context_data = None, None - try: - retrieved_agent_config = await redis_client.get(agent_id) - logger.info( - f"Retrieved agent config: {retrieved_agent_config}") - async with aiohttp.ClientSession() as session: - token = os.getenv('DAILY_API_KEY') - headers = { - 'Content-Type': 'application/json', - 'Authorization': f'Bearer {token}' - } - payload = { - 'properties': { - 'max_participants': 2, - 'start_video_off': True, - 'enable_screenshare': False, - 'enable_recording': 'cloud', - 'exp': int(time.time()) + 3600 # room will expire after 1 hour automatically, - } - } - - async with session.post("https://api.daily.co/v1/rooms/", headers=headers, json=payload) as response: - response_json = await response.json() - if response.status != 200: - res = {"connection": False, "type": "closing"} - else: - room_url = response_json['url'] - res = {"connection": True, "type": "setup", "room_url": room_url} - active_websockets.remove(websocket) - await websocket.send_json(res) - agent_config = json.loads(retrieved_agent_config) - except Exception as e: - traceback.print_exc() - raise HTTPException(status_code=404, detail="Agent not found") - assistant_manager = AssistantManager(agent_config, websocket, agent_id, room_url=room_url) - - try: - async for index, task_output in assistant_manager.run(local=True): - logger.info(task_output) - except WebSocketDisconnect: - active_websockets.remove(websocket) - except Exception as e: - traceback.print_exc() - logger.error(f"error in executing {e}") \ No newline at end of file diff --git a/examples/webrtc/requirements.txt b/examples/webrtc/requirements.txt deleted file mode 100644 index c0522987..00000000 --- a/examples/webrtc/requirements.txt +++ /dev/null @@ -1,7 +0,0 @@ -python-dotenv==1.0.0 -fastapi==0.108.0 -plivo==4.47.0 -redis==5.0.1 -twilio==8.9.0 -uvicorn==0.22.0 -daily-python==0.9.1 \ No newline at end of file diff --git a/examples/whisper-melo-llama3/.env-sample b/examples/whisper-melo-llama3/.env-sample deleted file mode 100644 index 481490ce..00000000 --- a/examples/whisper-melo-llama3/.env-sample +++ /dev/null @@ -1,15 +0,0 @@ -TWILIO_ACCOUNT_SID= -TWILIO_AUTH_TOKEN= -TWILIO_PHONE_NUMBER= - -OPENAI_API_KEY= - -ENVIRONMENT=local -WEBSOCKET_URL= -APP_CALLBACK_URL= - -REDIS_URL=redis://redis:6379 - -WHISPER_URL=ws://whisper-app:9000 - -MELO_TTS=http://melo-app:8000/connection \ No newline at end of file diff --git a/examples/whisper-melo-llama3/Readme.md b/examples/whisper-melo-llama3/Readme.md deleted file mode 100644 index ae93f6bd..00000000 --- a/examples/whisper-melo-llama3/Readme.md +++ /dev/null @@ -1,198 +0,0 @@ -# Bolna With MeloTTS and WhisperASR -Introducing our Dockerized solution! Seamlessly merge [Bolna](https://github.com/bolna-ai/bolna) with [Whisper ASR](https://github.com/bolna-ai/streaming-whisper-server) and [Melo TTS](https://github.com/anshjoseph/MiloTTS-Server) for telephone provider we use Twillo and for tunning we use ngrok. This is docker compose by which you can host bolna server Whisper ASR, Melo TTS together in cloud just by clone this repo and follow these simple steps to deploy ,but before that you have to make sure that you have [docker](https://docs.docker.com/engine/install/) and [docker compose](https://docs.docker.com/compose/install/) and make a .env file refer to .env-sample and also put ngrok auth token in ngrok-config.yml file - - -### Dockerfiles - -- - Whisper ASR Server - -- - Melo TTS Server - - - - -### Start Services -```shell -# build latest `twilio-app` and `bolna-app` -docker compose build twilio-app bolna-app - -# run containers -docker compose up -d -``` -the output something like this -![alt text](./img/docker_up.png "docker compose up -d") - -note: make sure that your all service were runing - -### Creating Agent -for creating agent you have to execute following command mention below - -
-Agent create API
- -```shell -curl --location 'http://0.0.0.0:5001/agent' \ ---header 'Content-Type: application/json' \ ---data '{ - "agent_config": { - "agent_name": "Bolna Agent", - "agent_welcome_message": "Hey how are you!", - "tasks": [ - { - "tools_config": { - "output": { - "format": "wav", - "provider": "twilio" - }, - "input": { - "format": "wav", - "provider": "twilio" - }, - "synthesizer": { - "provider": "melotts", - "provider_config": { - "voice": "Alex", - "sample_rate": 8000, - "sdp_ratio" : 0.2, - "noise_scale" : 0.6, - "noise_scale_w" : 0.8, - "speed" : 1.0 - }, - "stream": true, - "buffer_size": 123, - "audio_format": "pcm" - }, - "llm_agent": { - "model": "deepinfra/meta-llama/Meta-Llama-3-70B-Instruct", - "max_tokens": 123, - "agent_flow_type": "streaming", - "use_fallback": true, - "family": "llama", - "temperature": 0.1, - "request_json": true, - "provider": "deepinfra" - }, - "transcriber": { - "encoding": "linear16", - "language": "en", - "model": "whisper", - "stream": true, - "modeltype":"distil-large-v3", - "keywords":"ansh,joseph,hola", - "task": "transcribe", - "provider":"whisper" - }, - "api_tools": null - }, - "task_config": { - "ambient_noise_track": "office-ambience", - "hangup_after_LLMCall": false, - "hangup_after_silence": 10.0, - "ambient_noise": false, - "interruption_backoff_period": 0.0, - "backchanneling": false, - "backchanneling_start_delay": 5.0, - "optimize_latency": true, - "incremental_delay": 100.0, - "call_cancellation_prompt": null, - "number_of_words_for_interruption": 3.0, - "backchanneling_message_gap": 5.0, - "use_fillers": false - }, - "task_type": "conversation", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "transcriber", - "llm", - "synthesizer" - ] - ] - } - } - ], - "agent_type": "Lead Qualification" - }, - "agent_prompts": { - "task_1": { - "system_prompt": "Ask if they are coming for party tonight" - } - } -}' - -``` -
- -below given is the response -![alt text](./img/agent_res.png "agent response") - -Copy this `agent_id` we have to use in next step while invoking the call - -
-Invoke call API
- -```shell -curl --location 'http://0.0.0.0:8001/call' \ ---header 'Content-Type: application/json' \ ---data '{ - "agent_id": "bf2a9e9c-6038-4104-85c4-b71a0d1478c9", - "recipient_phone_number": "+1XXXXXXXXXX" -}' -``` -
- -You should hear your phone ringing now. - -### Stop Services -```shell -docker compose down -``` -![alt text](./img/docker_dw.png "docker compose up -d") - - -### Changing the voice MeloTTS - -by default we resrtict Melo EN but there were 5 option for voice as mention below -- ['EN-US'](./audio/audio_sample/EN_US.wav) -- ['EN-BR'](./audio/audio_sample/EN-BR.wav) -- ['EN-AU'](./audio/audio_sample/EN-AU.wav) -- ['EN-Default'](./audio/audio_sample/EN-Default.wav) -- ['EN_INDIA'](./audio/audio_sample/EN_INDIA.wav) - -you have to just change the following section mention below -```JSON -{ - "synthesizer": { - "provider": "melo", - "provider_config": { - "voice": "", - "sample_rate": 8000, - "sdp_ratio": 0.2, - "noise_scale": 0.6, - "noise_scale_w": 0.8, - "speed": 1.0 - }, - "stream": true, - "buffer_size": 123, - "audio_format": "pcm" - } -} -``` - - -### Conservation DENO -This is demo using below prompt to the LLM -```json -"task_1": { - "system_prompt": "You are assistant at Dr. Sharma clinic you have to book an appointment" -} -``` - - - -[chat GPT 3.5 turbo 16k demo](./audio/demo_audio.mp3) - -you can give prompt as per your use case \ No newline at end of file diff --git a/examples/whisper-melo-llama3/audio/audio_sample/EN-AU.wav b/examples/whisper-melo-llama3/audio/audio_sample/EN-AU.wav deleted file mode 100644 index d1e3a144..00000000 Binary files a/examples/whisper-melo-llama3/audio/audio_sample/EN-AU.wav and /dev/null differ diff --git a/examples/whisper-melo-llama3/audio/audio_sample/EN-BR.wav b/examples/whisper-melo-llama3/audio/audio_sample/EN-BR.wav deleted file mode 100644 index a437d853..00000000 Binary files a/examples/whisper-melo-llama3/audio/audio_sample/EN-BR.wav and /dev/null differ diff --git a/examples/whisper-melo-llama3/audio/audio_sample/EN-Default.wav b/examples/whisper-melo-llama3/audio/audio_sample/EN-Default.wav deleted file mode 100644 index c8a64426..00000000 Binary files a/examples/whisper-melo-llama3/audio/audio_sample/EN-Default.wav and /dev/null differ diff --git a/examples/whisper-melo-llama3/audio/audio_sample/EN_INDIA.wav b/examples/whisper-melo-llama3/audio/audio_sample/EN_INDIA.wav deleted file mode 100644 index 7b075920..00000000 Binary files a/examples/whisper-melo-llama3/audio/audio_sample/EN_INDIA.wav and /dev/null differ diff --git a/examples/whisper-melo-llama3/audio/audio_sample/EN_US.wav b/examples/whisper-melo-llama3/audio/audio_sample/EN_US.wav deleted file mode 100644 index 9117a469..00000000 Binary files a/examples/whisper-melo-llama3/audio/audio_sample/EN_US.wav and /dev/null differ diff --git a/examples/whisper-melo-llama3/audio/demo_audio.mp3 b/examples/whisper-melo-llama3/audio/demo_audio.mp3 deleted file mode 100644 index 57c55387..00000000 Binary files a/examples/whisper-melo-llama3/audio/demo_audio.mp3 and /dev/null differ diff --git a/examples/whisper-melo-llama3/docker-compose.yml b/examples/whisper-melo-llama3/docker-compose.yml deleted file mode 100644 index 0752847a..00000000 --- a/examples/whisper-melo-llama3/docker-compose.yml +++ /dev/null @@ -1,83 +0,0 @@ -services: - - # main bolna service - bolna-app: - image: bolna-app:latest - build: - context: . - dockerfile: dockerfiles/bolna_server.Dockerfile - ports: - - "5001:5001" - depends_on: - - redis - env_file: - - .env - volumes: - - ../agent_data:/app/agent_data - - $HOME/.aws/credentials:/root/.aws/credentials:ro - - $HOME/.aws/config:/root/.aws/config:ro - - # redis service used as a persistent storage - redis: - image: redis:latest - ports: - - "6379:6379" - - # ngrok for local tunneling - ngrok: - image: ngrok/ngrok:latest - restart: unless-stopped - command: - - "start" - - "--all" - - "--config" - - "/etc/ngrok.yml" - volumes: - - ./ngrok-config.yml:/etc/ngrok.yml - ports: - - 4040:4040 - - ### Telephony servers ### - twilio-app: - image: twilio-app:latest - build: - context: . - dockerfile: dockerfiles/twilio_server.Dockerfile - ports: - - "8001:8001" - depends_on: - - redis - env_file: - - .env - - ### whisper servers ### - whisper-app: - image: bolnadev/whisper_server:latest - build: - context: . - dockerfile: dockerfiles/whisper_server.Dockerfile - ports: - - "9002:9000" - deploy: - resources: - reservations: - devices: - - driver: nvidia - count: 1 - capabilities: [gpu] - ### Melo TTS ### - melo-app: - image: bolnadev/melo_server:latest - build: - context: . - dockerfile: dockerfiles/melo_server.Dockerfile - ports: - - "8002:8000" - deploy: - resources: - reservations: - devices: - - driver: nvidia - count: 1 - capabilities: [gpu] - diff --git a/examples/whisper-melo-llama3/dockerfiles/bolna_server.Dockerfile b/examples/whisper-melo-llama3/dockerfiles/bolna_server.Dockerfile deleted file mode 100644 index 414aca2c..00000000 --- a/examples/whisper-melo-llama3/dockerfiles/bolna_server.Dockerfile +++ /dev/null @@ -1,19 +0,0 @@ -FROM python:3.10.13-slim - -WORKDIR /app -COPY ./requirements.txt /app -COPY ./quickstart_server.py /app - -RUN apt-get update && apt-get install libgomp1 git -y -RUN apt-get -y update && apt-get -y upgrade && apt-get install -y --no-install-recommends ffmpeg -RUN pip install -r requirements.txt -RUN pip install --force-reinstall git+https://github.com/bolna-ai/bolna@master -RUN pip install scipy==1.11.0 -RUN pip install torch==2.0.1 -RUN pip install torchaudio==2.0.1 -RUN pip install pydub==0.25.1 -RUN pip install ffprobe -RUN pip install aiofiles - -EXPOSE 5001 -CMD ["uvicorn", "quickstart_server:app", "--host", "0.0.0.0", "--port", "5001"] diff --git a/examples/whisper-melo-llama3/dockerfiles/melo_server.Dockerfile b/examples/whisper-melo-llama3/dockerfiles/melo_server.Dockerfile deleted file mode 100644 index f016c63e..00000000 --- a/examples/whisper-melo-llama3/dockerfiles/melo_server.Dockerfile +++ /dev/null @@ -1,13 +0,0 @@ -FROM python:3.10.13-slim -WORKDIR /app - -RUN apt-get update && apt-get install libgomp1 git -y -RUN apt-get -y update && apt-get -y upgrade && apt-get install -y --no-install-recommends ffmpeg -RUN git clone https://github.com/bolna-ai/MeloTTS -RUN pip install fastapi uvicorn torchaudio -RUN cp -a MeloTTS/. . -RUN python -m pip cache purge -RUN pip install --no-cache-dir txtsplit torch torchaudio cached_path transformers==4.27.4 mecab-python3==1.0.5 num2words==0.5.12 unidic_lite unidic mecab-python3==1.0.5 pykakasi==2.2.1 fugashi==1.3.0 g2p_en==2.1.0 anyascii==0.3.2 jamo==0.4.1 gruut[de,es,fr]==2.2.3 g2pkk>=0.1.1 librosa==0.9.1 pydub==0.25.1 eng_to_ipa==0.0.2 inflect==7.0.0 unidecode==1.3.7 pypinyin==0.50.0 cn2an==0.5.22 jieba==0.42.1 langid==1.1.6 tqdm tensorboard==2.16.2 loguru==0.7.2 -RUN python -m unidic download -EXPOSE 8000 -CMD ["python3", "Server.py"] diff --git a/examples/whisper-melo-llama3/dockerfiles/twilio_server.Dockerfile b/examples/whisper-melo-llama3/dockerfiles/twilio_server.Dockerfile deleted file mode 100644 index d368fbb9..00000000 --- a/examples/whisper-melo-llama3/dockerfiles/twilio_server.Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -FROM python:3.10.13-slim - -WORKDIR /app -COPY ./requirements.txt /app -COPY ./telephony_server/twilio_api_server.py /app - -RUN pip install --no-cache-dir -r requirements.txt - -EXPOSE 8001 - -CMD ["uvicorn", "twilio_api_server:app", "--host", "0.0.0.0", "--port", "8001"] diff --git a/examples/whisper-melo-llama3/dockerfiles/whisper_server.Dockerfile b/examples/whisper-melo-llama3/dockerfiles/whisper_server.Dockerfile deleted file mode 100644 index 7b33c322..00000000 --- a/examples/whisper-melo-llama3/dockerfiles/whisper_server.Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM python:3.10.13-slim - -RUN apt-get update && apt-get install libgomp1 git -y -RUN apt-get -y update && apt-get -y upgrade && apt-get install -y --no-install-recommends ffmpeg -RUN apt-get -y install build-essential -RUN apt-get -y install portaudio19-dev -RUN git clone https://github.com/bolna-ai/streaming-whisper-server.git -WORKDIR streaming-whisper-server -RUN pip install -e . -RUN pip install transformers - -RUN ct2-transformers-converter --model openai/whisper-small --copy_files preprocessor_config.json --output_dir ./Server/ASR/whisper_small --quantization float16 -WORKDIR Server -EXPOSE 9000 -CMD ["python3", "Server.py", "-p", "9000"] diff --git a/examples/whisper-melo-llama3/img/agent_res.png b/examples/whisper-melo-llama3/img/agent_res.png deleted file mode 100644 index 320b4222..00000000 Binary files a/examples/whisper-melo-llama3/img/agent_res.png and /dev/null differ diff --git a/examples/whisper-melo-llama3/img/docker_dw.png b/examples/whisper-melo-llama3/img/docker_dw.png deleted file mode 100644 index aaabc378..00000000 Binary files a/examples/whisper-melo-llama3/img/docker_dw.png and /dev/null differ diff --git a/examples/whisper-melo-llama3/img/docker_up.png b/examples/whisper-melo-llama3/img/docker_up.png deleted file mode 100644 index 78fe8726..00000000 Binary files a/examples/whisper-melo-llama3/img/docker_up.png and /dev/null differ diff --git a/examples/whisper-melo-llama3/ngrok-config.yml b/examples/whisper-melo-llama3/ngrok-config.yml deleted file mode 100644 index 5742c498..00000000 --- a/examples/whisper-melo-llama3/ngrok-config.yml +++ /dev/null @@ -1,10 +0,0 @@ -region: us -version: '2' -authtoken: -tunnels: - twilio-app: - addr: twilio-app:8001 - proto: http - bolna-app: - addr: bolna-app:5001 - proto: http diff --git a/examples/whisper-melo-llama3/quickstart_server.py b/examples/whisper-melo-llama3/quickstart_server.py deleted file mode 100644 index e0399703..00000000 --- a/examples/whisper-melo-llama3/quickstart_server.py +++ /dev/null @@ -1,96 +0,0 @@ -import os -import asyncio -import uuid -import traceback -from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Query -from fastapi.middleware.cors import CORSMiddleware -import redis.asyncio as redis -from dotenv import load_dotenv -from bolna.helpers.utils import store_file -from bolna.prompts import * -from bolna.helpers.logger_config import configure_logger -from bolna.models import * -from bolna.llms import LiteLLM -from bolna.agent_manager.assistant_manager import AssistantManager - -load_dotenv() -logger = configure_logger(__name__) - -redis_pool = redis.ConnectionPool.from_url(os.getenv('REDIS_URL'), decode_responses=True) -redis_client = redis.Redis.from_pool(redis_pool) -active_websockets: List[WebSocket] = [] - -app = FastAPI() - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"] -) - - -class CreateAgentPayload(BaseModel): - agent_config: AgentModel - agent_prompts: Optional[Dict[str, Dict[str, str]]] - - -@app.post("/agent") -async def create_agent(agent_data: CreateAgentPayload): - agent_uuid = str(uuid.uuid4()) - data_for_db = agent_data.agent_config.model_dump() - data_for_db["assistant_status"] = "seeding" - agent_prompts = agent_data.agent_prompts - logger.info(f'Data for DB {data_for_db}') - - if len(data_for_db['tasks']) > 0: - logger.info("Setting up follow up tasks") - for index, task in enumerate(data_for_db['tasks']): - if task['task_type'] == "extraction": - extraction_prompt_llm = os.getenv("EXTRACTION_PROMPT_GENERATION_MODEL") - extraction_prompt_generation_llm = LiteLLM(model=extraction_prompt_llm, max_tokens=2000) - extraction_prompt = await extraction_prompt_generation_llm.generate( - messages=[ - {'role': 'system', 'content': EXTRACTION_PROMPT_GENERATION_PROMPT}, - {'role': 'user', 'content': data_for_db["tasks"][index]['tools_config']["llm_agent"]['extraction_details']} - ]) - data_for_db["tasks"][index]["tools_config"]["llm_agent"]['extraction_json'] = extraction_prompt - - stored_prompt_file_path = f"{agent_uuid}/conversation_details.json" - await asyncio.gather( - redis_client.set(agent_uuid, json.dumps(data_for_db)), - store_file(file_key=stored_prompt_file_path, file_data=agent_prompts, local=True) - ) - - return {"agent_id": agent_uuid, "state": "created"} - - -############################################################################################# -# Websocket -############################################################################################# -@app.websocket("/chat/v1/{agent_id}") -async def websocket_endpoint(agent_id: str, websocket: WebSocket, user_agent: str = Query(None)): - logger.info("Connected to ws") - await websocket.accept() - active_websockets.append(websocket) - agent_config, context_data = None, None - try: - retrieved_agent_config = await redis_client.get(agent_id) - logger.info( - f"Retrieved agent config: {retrieved_agent_config}") - agent_config = json.loads(retrieved_agent_config) - except Exception as e: - traceback.print_exc() - raise HTTPException(status_code=404, detail="Agent not found") - - assistant_manager = AssistantManager(agent_config, websocket, agent_id) - - try: - async for index, task_output in assistant_manager.run(local=True): - logger.info(task_output) - except WebSocketDisconnect: - active_websockets.remove(websocket) - except Exception as e: - traceback.print_exc() - logger.error(f"error in executing {e}") diff --git a/examples/whisper-melo-llama3/requirements.txt b/examples/whisper-melo-llama3/requirements.txt deleted file mode 100644 index 7e9bdc5f..00000000 --- a/examples/whisper-melo-llama3/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -python-dotenv==1.0.0 -fastapi==0.108.0 -redis==5.0.1 -twilio==8.9.0 -uvicorn==0.22.0 diff --git a/examples/whisper-melo-llama3/telephony_server/twilio_api_server.py b/examples/whisper-melo-llama3/telephony_server/twilio_api_server.py deleted file mode 100644 index bcdd3912..00000000 --- a/examples/whisper-melo-llama3/telephony_server/twilio_api_server.py +++ /dev/null @@ -1,94 +0,0 @@ -import os -import json -import requests -import uuid -from twilio.twiml.voice_response import VoiceResponse, Connect -from twilio.rest import Client -from dotenv import load_dotenv -import redis.asyncio as redis -from fastapi import FastAPI, HTTPException, Query, Request -from fastapi.responses import PlainTextResponse -from logging import Logger -logger = Logger(__name__) -app = FastAPI() -load_dotenv() -port = 8001 - -twilio_account_sid = os.getenv('TWILIO_ACCOUNT_SID') -twilio_auth_token = os.getenv('TWILIO_AUTH_TOKEN') -twilio_phone_number = os.getenv('TWILIO_PHONE_NUMBER') - -# Initialize Twilio client -twilio_client = Client(twilio_account_sid, twilio_auth_token) - - -def populate_ngrok_tunnels(): - response = requests.get("http://ngrok:4040/api/tunnels") # ngrok interface - app_callback_url, websocket_url = None, None - - if response.status_code == 200: - data = response.json() - - for tunnel in data['tunnels']: - if tunnel['name'] == 'twilio-app': - app_callback_url = tunnel['public_url'] - elif tunnel['name'] == 'bolna-app': - websocket_url = tunnel['public_url'].replace('https:', 'wss:') - - return app_callback_url, websocket_url - else: - print(f"Error: Unable to fetch data. Status code: {response.status_code}") - - -@app.post('/call') -async def make_call(request: Request): - try: - call_details = await request.json() - agent_id = call_details.get('agent_id', None) - - if not agent_id: - raise HTTPException(status_code=404, detail="Agent not provided") - - if not call_details or "recipient_phone_number" not in call_details: - raise HTTPException(status_code=404, detail="Recipient phone number not provided") - - app_callback_url, websocket_url = populate_ngrok_tunnels() - - print(f'app_callback_url: {app_callback_url}') - print(f'websocket_url: {websocket_url}') - print("tokens") - print(f"{twilio_account_sid} {twilio_auth_token} {twilio_phone_number}") - try: - call = twilio_client.calls.create( - to=call_details.get('recipient_phone_number'), - from_=twilio_phone_number, - url=f"{app_callback_url}/twilio_callback?ws_url={websocket_url}&agent_id={agent_id}", - method="POST", - record=False - ) - except Exception as e: - print(e) - return PlainTextResponse(f"error {e}", status_code=200) - - return PlainTextResponse("done", status_code=200) - - except Exception as e: - print(f"Exception occurred in make_call: {e}") - raise HTTPException(status_code=500, detail="Internal Server Error") - - -@app.post('/twilio_callback') -async def twilio_callback(ws_url: str = Query(...), agent_id: str = Query(...)): - try: - response = VoiceResponse() - - connect = Connect() - websocket_twilio_route = f'{ws_url}/chat/v1/{agent_id}' - connect.stream(url=websocket_twilio_route) - print(f"websocket connection done to {websocket_twilio_route}") - response.append(connect) - - return PlainTextResponse(str(response), status_code=200, media_type='text/xml') - - except Exception as e: - print(f"Exception occurred in twilio_callback: {e}") diff --git a/examples/with-fillers/README.md b/examples/with-fillers/README.md deleted file mode 100644 index ba2aae2a..00000000 --- a/examples/with-fillers/README.md +++ /dev/null @@ -1,21 +0,0 @@ -### With Fillers - -Fillers can be used to give an agent time to think or to make sure with expressive TTS like elevenlabs, you're able to ensir elow latency. This is the example task config fot the agent to enable the same. - -```json - "task_config": { - "ambient_noise_track": "office-ambience", - "hangup_after_LLMCall": false, - "hangup_after_silence": 10.0, - "ambient_noise": true, - "interruption_backoff_period": 0.0, - "backchanneling": true, - "backchanneling_start_delay": 5.0, - "optimize_latency": true, - "incremental_delay": 300.0, - "call_cancellation_prompt": null, - "number_of_words_for_interruption": 3.0, - "backchanneling_message_gap": 5.0, - "use_fillers": true - }, -``` \ No newline at end of file diff --git a/examples/with-fillers/example_agent.json b/examples/with-fillers/example_agent.json deleted file mode 100644 index e69de29b..00000000 diff --git a/examples/with-guardrails/README.md b/examples/with-guardrails/README.md deleted file mode 100644 index 074530b9..00000000 --- a/examples/with-guardrails/README.md +++ /dev/null @@ -1,31 +0,0 @@ -### With Guardrails - -You can add guardrails to make sure agent doesn't respond to unwanted queries -```json - "routes": { - "embedding_model": "Snowflake/snowflake-arctic-embed-l", - "routes": [ - { - "route_name": "politics", - "utterances": [ - "Are you a Trump supporter", - "How many black people live in my neighborhood", - "Are you a democrat?", - "Are you a republican", - "Are you black?", - "What is the gender split of my society", - "Are you a democrat?", - "Tell me about your political ideologies", - "Who is winning the elections this year?", - "Are there hispanics in the area", - "I do not like democrats", - "I don't want faggots", - "Don't give me homosexuals", - "I need a white hair dresser only" - ], - "response": "Hey, thanks but I don't want to entertain hate speech", - "score_threshold": 0.90 - } - ] - }, -``` \ No newline at end of file diff --git a/examples/with-guardrails/example_agent.json b/examples/with-guardrails/example_agent.json deleted file mode 100644 index 88c73ce0..00000000 --- a/examples/with-guardrails/example_agent.json +++ /dev/null @@ -1,199 +0,0 @@ -{ - "agent_config": { - "agent_name": "An agent with routes", - "agent_welcome_message": "This call is being recorded for quality assurance and training. Please speak now.", - "tasks": [ - { - "tools_config": { - "output": { - "format": "wav", - "provider": "twilio" - }, - "input": { - "format": "wav", - "provider": "twilio" - }, - "synthesizer": { - "audio_format": "wav", - "provider": "polly", - "stream": true, - "caching": true, - "provider_config": { - "voice": "Danielle", - "engine": "neural", - "language": "en-US" - }, - "buffer_size": 100.0 - }, - "llm_agent": { - "max_tokens": 100.0, - "presence_penalty": 0.0, - "summarization_details": null, - "base_url": null, - "extraction_details": null, - "top_p": 0.9, - "agent_flow_type": "streaming", - "request_json": false, - "routes": { - "embedding_model": "Snowflake/snowflake-arctic-embed-l", - "routes": [ - { - "route_name": "politics", - "utterances": [ - "Are you a Trump supporter", - "How many black people live in my neighborhood", - "Are you a democrat?", - "Are you a republican", - "Are you black?", - "What is the gender split of my society", - "Are you a democrat?", - "Tell me about your political ideologies", - "Who is winning the elections this year?", - "Are there hispanics in the area", - "I do not like democrats", - "I don't want faggots", - "Don't give me homosexuals", - "I need a white hair dresser only" - ], - "response": "Hey, thanks but I don't want to entertain hate speech", - "score_threshold": 0.90 - } - ] - }, - "min_p": 0.1, - "frequency_penalty": 0.0, - "stop": null, - "provider": "openai", - "top_k": 0.0, - "temperature": 0.2, - "model": "gpt-4o", - "family": "openai" - }, - "transcriber": { - "sampling_rate": 16000.0, - "endpointing": 400.0, - "task": "transcribe", - "keywords": "", - "stream": true, - "modeltype": "nova-2", - "model": "deepgram", - "language": "en", - "encoding": "linear16" - } - }, - "task_config": { - "hangup_after_LLMCall": false, - "hangup_after_silence": 10.0, - "interruption_backoff_period": 0.0, - "backchanneling": true, - "backchanneling_start_delay": 5.0, - "optimize_latency": true, - "incremental_delay": 100.0, - "call_cancellation_prompt": null, - "number_of_words_for_interruption": 3.0, - "backchanneling_message_gap": 5.0 - }, - "task_type": "conversation", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "transcriber", - "llm", - "synthesizer" - ] - ] - } - }, - { - "tools_config": { - "output": null, - "input": null, - "synthesizer": null, - "llm_agent": { - "max_tokens": 100.0, - "presence_penalty": 0.0, - "summarization_details": null, - "base_url": null, - "extraction_details": null, - "top_p": 0.9, - "agent_flow_type": "streaming", - "request_json": true, - "routes": null, - "min_p": 0.1, - "frequency_penalty": 0.0, - "stop": null, - "provider": "openai", - "top_k": 0.0, - "temperature": 0.1, - "model": "gpt-3.5-turbo-1106", - "family": "openai" - }, - "transcriber": null, - "api_tools": null - }, - "task_config": {}, - "task_type": "summarization", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "llm" - ] - ] - } - }, - { - "tools_config": { - "output": null, - "input": null, - "synthesizer": null, - "llm_agent": { - "extraction_json": "{\n\"slot\": \"The slot booked by the user\"\n}", - "max_tokens": 100.0, - "presence_penalty": 0.0, - "summarization_details": null, - "base_url": null, - "extraction_details": "slot: Slot booked by user", - "top_p": 0.9, - "agent_flow_type": "streaming", - "request_json": true, - "routes": null, - "min_p": 0.1, - "frequency_penalty": 0.0, - "stop": null, - "provider": "openai", - "top_k": 0.0, - "temperature": 0.1, - "model": "gpt-3.5-turbo-1106", - "family": "openai" - }, - "transcriber": null, - "api_tools": null - }, - "task_config": {}, - "task_type": "extraction", - "toolchain": { - "execution": "parallel", - "pipelines": [ - [ - "llm" - ] - ] - } - } - ], - "agent_type": "Lead Qualification" - }, - "agent_prompts": { - "task_1": { - "system_prompt": "### Agent Description You're an, Mellisa, a helpful agent whose job is to book appointments for Boston Barber Co. at Beacon Hill. There are two type of appointments available - 1. Haircut for men. event id - 798483 2. Appointment for women - 814889 ### About store - Shop is opened Tuesday to Sunday from 9 am to 9pm. - For premium treatment one beer is on the house ### Flow Users can ask you to find available slots & booking for an appointment. You will ask the users about their availability i.e. when they are available the date and time and check if that slot is available or not then you will ask other details as mentioned in function calling and proceed with this information to do the function calling for finding available slots. If slots are available then you must tell only those slots or that slot to user which is or are very close to the user's availability. ### You have access to following functions/tools 1. **check_availability_of_slots** - To check availability of slots from the calendar before booking the appointment. 2. **book_appointment** - Use this tool to book an appointment with given details and save the appointment in the calendar. ### Important instructions 1. MAKE SURE YOU GET ALL THE REQUIRED DETAILS BEFORE DOING A FUNCTION CALL. 2. PLEASE MAKES SURE YOUR RESPONSES ARE GEARED TO BE SYNTHESIZED BY THE SYNTHESISER IN AN EXPRESSIVE WAY. 3. Just speak 1 sentence at a time" - }, - "task_2": { - "system_prompt": "You are given a conversation. Kindly summarize it in a professional tone" - }, - "task_3": { - "system_prompt": "Extract the information from the given conversation betweenn an bot and a human. \n### JSON structure {\n\"slot\": \"The slot booked by the user\"\n}" - } - } -} \ No newline at end of file diff --git a/local_setup/.env.sample b/local_setup/.env.sample new file mode 100644 index 00000000..1216f976 --- /dev/null +++ b/local_setup/.env.sample @@ -0,0 +1,28 @@ +DEEPGRAM_AUTH_TOKEN= +ELEVENLABS_API_KEY= +OPENAI_API_KEY= +OPENAI_MODEL=gpt-3.5-turbo +REDIS_URL=redis://redis:6379 +WHISPER_URL= +STYLE_TTS= +BUCKET_NAME=bolna-user-prompts-sandbox + +# RAG credentials +LANCEDB_DIR= +LLAMA_CLOUD_API_KEY= + +# AWS credentials +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= + +# You can make a call with either of Twilio or Plivo providers. + +# Twilio credentials +TWILIO_ACCOUNT_SID= +TWILIO_AUTH_TOKEN= +TWILIO_PHONE_NUMBER= + +# Plivo credentials +PLIVO_AUTH_ID= +PLIVO_AUTH_TOKEN= +PLIVO_PHONE_NUMBER= \ No newline at end of file diff --git a/local_setup/docker-compose.yml b/local_setup/docker-compose.yml index 24424cc7..6b128948 100644 --- a/local_setup/docker-compose.yml +++ b/local_setup/docker-compose.yml @@ -6,6 +6,8 @@ services: build: context: . dockerfile: dockerfiles/bolna_server.Dockerfile + args: + BRANCH: ${BRANCH:-develop} ports: - "5001:5001" depends_on: @@ -15,7 +17,7 @@ services: volumes: - ../agent_data:/app/agent_data - $HOME/.aws/credentials:/root/.aws/credentials:ro - - $HOME/.aws/config:/root/.aws/config:ro + - $HOME/.aws/config:/root/.aws/credentials:ro # redis service used as a persistent storage redis: @@ -52,16 +54,16 @@ services: env_file: - .env - plivo-app: - image: plivo-app:latest - build: - context: . - dockerfile: dockerfiles/plivo_server.Dockerfile - ports: - - "8002:8002" - depends_on: - - redis - - ngrok - - bolna-app - env_file: - - .env + # plivo-app: + # image: plivo-app:latest + # build: + # context: . + # dockerfile: dockerfiles/plivo_server.Dockerfile + # ports: + # - "8002:8002" + # depends_on: + # - redis + # - ngrok + # - bolna-app + # env_file: + # - .env \ No newline at end of file diff --git a/local_setup/dockerfiles/bolna_server.Dockerfile b/local_setup/dockerfiles/bolna_server.Dockerfile index 7b8fb2ea..74c96417 100644 --- a/local_setup/dockerfiles/bolna_server.Dockerfile +++ b/local_setup/dockerfiles/bolna_server.Dockerfile @@ -2,13 +2,18 @@ FROM python:3.10.13-slim WORKDIR /app -RUN apt-get update && apt-get install -y --no-install-recommends \ - libgomp1 \ - git \ - ffmpeg -RUN --mount=type=cache,target=/root/.cache/pip \ - pip install git+https://github.com/bolna-ai/bolna@master -COPY quickstart_server.py /app/ +RUN apt-get update && apt-get install libgomp1 git -y +RUN apt-get -y update && apt-get -y upgrade && apt-get install -y --no-install-recommends ffmpeg +RUN pip install -r requirements.txt + +ARG BRANCH=develop +RUN pip install --force-reinstall git+https://github.com/bolna-ai/bolna@${BRANCH} + +RUN pip install torch==2.0.1 +RUN pip install torchaudio==2.0.1 +RUN pip install pydub==0.25.1 +RUN pip install ffprobe +RUN pip install aiofiles EXPOSE 5001 diff --git a/local_setup/dockerfiles/ingestion_server.Dockerfile b/local_setup/dockerfiles/ingestion_server.Dockerfile new file mode 100644 index 00000000..28ecb5b4 --- /dev/null +++ b/local_setup/dockerfiles/ingestion_server.Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.10.13-slim + +WORKDIR /app +COPY ./requirements.txt /app +COPY ./ingestion_server /app/ingestion_server + +RUN apt-get update && apt-get install -y --no-install-recommends gcc +RUN pip install --no-cache-dir -r requirements.txt + +ENV PYTHONPATH=/app + +WORKDIR /app/ingestion_server + +EXPOSE 8000 + +CMD ["uvicorn", "quickstart_ingestion_server:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/local_setup/ingestion_server/__init__.py b/local_setup/ingestion_server/__init__.py new file mode 100644 index 00000000..5a6641ab --- /dev/null +++ b/local_setup/ingestion_server/__init__.py @@ -0,0 +1,177 @@ +from embeddings import EmbedProviders +from rags import RAGProviders, BaseRAG +from datachecks import RAGConfig, RAGTask, RAGTaskStatus +import asyncio +from threading import Thread +from utils import configure_logger +from typing import Dict +from uuid import uuid4 +import tempfile +import os + +logger = configure_logger(__name__) + +class RAG: + """ + Retrieval-Augmented Generation (RAG) implementation. + + This class handles the ingestion and storage of documents for RAG. + """ + + def __init__(self, VectorDB: BaseRAG, Workers: int = 2) -> None: + """ + Initialize the RAG instance. + + Args: + VectorDB (BaseRAG): The vector database to use. + Workers (int, optional): Number of worker threads. Defaults to 2. + """ + self.file_process_task_queue: asyncio.Queue = asyncio.Queue() + self.file_store_task_queue: asyncio.Queue = asyncio.Queue() + + self.VectorDB: BaseRAG = VectorDB + self.Workers: int = Workers + + self.RAG_THREAD = Thread(target=self.start) + self.shutdown = False + + async def __shutdown_loop(self): + """Monitor the shutdown flag.""" + while not self.shutdown: + await asyncio.sleep(0.5) + + async def __ingestion_task(self): + """Process ingestion tasks from the queue.""" + while not self.shutdown: + task: RAGTask = await self.file_process_task_queue.get() + task._status = RAGTaskStatus.PROCESSING + try: + nodes = await self.VectorDB.generate_nodes_sentence_splitter(task.file_loc) + except Exception as e: + logger.error(f"ERROR in {e}") + task._status = RAGTaskStatus.ERROR + continue + task._nodes = nodes + await self.file_store_task_queue.put(task) + + async def __nodes_storage(self): + """Store processed nodes in the vector database.""" + while not self.shutdown: + task: RAGTask = await self.file_store_task_queue.get() + try: + index = await self.VectorDB.add_index(task._nodes) + except Exception as e: + logger.error(f"ERROR in {e}") + task._status = RAGTaskStatus.ERROR + continue + task._index = index + task._status = RAGTaskStatus.SUCESSFUL + + def start(self): + """Start the RAG processing loop.""" + loop = asyncio.new_event_loop() + ingestion_task_pool = [loop.create_task(self.__ingestion_task()) for _ in range(self.Workers)] + file_storage = loop.create_task(self.__nodes_storage()) + loop.run_until_complete(self.__shutdown_loop()) + file_storage.cancel() + for t in ingestion_task_pool: + t.cancel() + loop.close() + + +class RAGFactory: + """ + Factory class for creating and managing RAG instances. + """ + + def __init__(self) -> None: + """Initialize the RAGFactory.""" + self.RAGS: Dict[str, RAG] = dict() + + def make_rag(self, config: RAGConfig): + """ + Create a new RAG instance. + + Args: + config (RAGConfig): Configuration for the RAG instance. + + Returns: + str: Unique identifier for the created RAG instance. + """ + rag_name = f"RAG-{uuid4()}" + embedding_name = EmbedProviders[config.provider_config.embedding_name.provider](config.provider_config.embedding_name.embedding_model_name) + vector_db = RAGProviders[config.provider](embedding_name, config.provider_config) + rag = RAG(vector_db, config.provider_config.worker) + rag.RAG_THREAD.start() + self.RAGS[rag_name] = rag + return rag_name + + def stop_all(self): + """Stop all RAG instances.""" + for rag in self.RAGS.values(): + rag.shutdown = True + + def stop(self, rag_name: str): + """ + Stop a specific RAG instance. + + Args: + rag_name (str): Identifier of the RAG instance to stop. + + Raises: + ValueError: If the specified RAG instance doesn't exist. + """ + if rag_name in self.RAGS.keys(): + self.RAGS[rag_name].shutdown = True + self.RAGS.pop(rag_name) + else: + raise ValueError("No RAG with that ID exists") + + async def file_ingest(self, rag_name, file) -> RAGTask: + """ + Ingest a file into a RAG instance. + + Args: + rag_name (str): Identifier of the RAG instance. + file: File object to ingest. + + Returns: + RAGTask: Task object representing the ingestion process. + + Raises: + ValueError: If the specified RAG instance doesn't exist or if the file type is unsupported. + """ + if rag_name not in self.RAGS.keys(): + raise ValueError(f"RAG: {rag_name} does not exist") + if file.content_type not in ["application/pdf", "application/x-pdf"]: + raise ValueError("Only PDF files are supported for now") + + task_id = str(uuid4()) + temp_file = tempfile.NamedTemporaryFile() + temp_file.write(await file.read()) + prev = temp_file.name + file_name = f"/tmp/{task_id}.pdf" + os.rename(prev, file_name) + task = RAGTask(file_loc=file_name) + await self.RAGS[rag_name].file_process_task_queue.put(task) + + while task._status in [RAGTaskStatus.WAIT, RAGTaskStatus.PROCESSING]: + await asyncio.sleep(0.4) + + os.rename(file_name, prev) + return task + + async def retrieve_query(self, rag_name: str, index: str, query: str): + """ + Retrieve documents based on a query. + + Args: + rag_name (str): Identifier of the RAG instance. + index (str): Index to search in. + query (str): Query string. + + Returns: + List of relevant documents. + """ + rag = self.RAGS[rag_name] + return await rag.VectorDB.get_docs_index(query=query, index=index) \ No newline at end of file diff --git a/local_setup/ingestion_server/datachecks/__init__.py b/local_setup/ingestion_server/datachecks/__init__.py new file mode 100644 index 00000000..88ee7511 --- /dev/null +++ b/local_setup/ingestion_server/datachecks/__init__.py @@ -0,0 +1,60 @@ +from pydantic import BaseModel, Field, root_validator +from typing import Union, Optional + +# Embedding config +class embeddings(BaseModel): + provider:str + embedding_model_name:Optional[str] = "" + +# DB Configs +class LanceDBConfig(BaseModel): + loc: Optional[str] = "" + +class MongoDBConfig(BaseModel): + index: Optional[str] = "" + uri: Optional[str] = "" + db: Optional[str] = "" + collection_name: Optional[str] = "" + +# Provider Configs +class ProviderConfig(BaseModel): + embedding_name:embeddings + chunk_size:int + overlapping:int + worker:int + similarity_top_k:int + rag: Union[MongoDBConfig, LanceDBConfig] = Field(union_mode="left_to_right") + + @root_validator(pre=True) + def check_config_type(cls, values): + config = values.get('rag') + if isinstance(config, dict): + if 'uri' in config and 'collection_name' in config: + values['rag'] = MongoDBConfig(**config) + else: + values['rag'] = LanceDBConfig(**config) + return values + +# Rag Config +class RAGConfig(BaseModel): + provider:str + provider_config:ProviderConfig + +class Query(BaseModel): + provider:str + index:str + query:str + +# Utility checks for RAG +class RAGTaskStatus: + WAIT = "WAITING" + PROCESSING = "PROCESSING" + ERROR = "ERROR" + SUCESSFUL = "SUCESSFUL" + +class RAGTask(BaseModel): + file_loc:str + _status:str = RAGTaskStatus.WAIT + _message:str = "" + _index:str = "" + _nodes:list = [] \ No newline at end of file diff --git a/local_setup/ingestion_server/embeddings/__init__.py b/local_setup/ingestion_server/embeddings/__init__.py new file mode 100644 index 00000000..2a0487f5 --- /dev/null +++ b/local_setup/ingestion_server/embeddings/__init__.py @@ -0,0 +1,9 @@ +from typing import Dict + +from embeddings.openai_emded import OpenAI +from embeddings.base import BaseEmbed + +# EmbedProviders dictionary with the key being the name of the provider and the value being the class of the provider +EmbedProviders : Dict[str, BaseEmbed] = { + "OpenAI": OpenAI +} \ No newline at end of file diff --git a/local_setup/ingestion_server/embeddings/base.py b/local_setup/ingestion_server/embeddings/base.py new file mode 100644 index 00000000..37b4dd94 --- /dev/null +++ b/local_setup/ingestion_server/embeddings/base.py @@ -0,0 +1,10 @@ +from abc import ABC, abstractmethod + +class BaseEmbed(ABC): + def __init__(self, name: str) -> None: + self.name = name + + @abstractmethod + def get_embedding(self): + """Method to retrieve the embedding representation.""" + raise NotImplementedError("Subclasses must implement this method.") \ No newline at end of file diff --git a/local_setup/ingestion_server/embeddings/openai_emded.py b/local_setup/ingestion_server/embeddings/openai_emded.py new file mode 100644 index 00000000..0a562e8e --- /dev/null +++ b/local_setup/ingestion_server/embeddings/openai_emded.py @@ -0,0 +1,31 @@ +from embeddings.base import BaseEmbed +from llama_index.embeddings.openai import OpenAIEmbedding +import dotenv +import os + +dotenv.load_dotenv() + +class OpenAI(BaseEmbed): + def __init__(self, model: str) -> None: + """Initialize the OpenAI embedding provider. + + Args: + model (str): The model name to be used for embeddings. + + Raises: + ValueError: If the OpenAI API key is not found in the environment variables. + """ + super().__init__("OpenAI") + self.model = model + api_key = os.getenv("OPENAI_API_KEY") + if api_key is None: + raise ValueError("OPENAI KEY IS NOT FOUND") + self.embedding_instance = OpenAIEmbedding(model=model, api_key=api_key) + + def get_embedding(self): + """Retrieve the embedding instance. + + Returns: + OpenAIEmbedding: The instance of the OpenAI embedding. + """ + return self.embedding_instance \ No newline at end of file diff --git a/local_setup/ingestion_server/payload.markdown b/local_setup/ingestion_server/payload.markdown new file mode 100644 index 00000000..1f456649 --- /dev/null +++ b/local_setup/ingestion_server/payload.markdown @@ -0,0 +1,19 @@ + +## LanceDB + +{ + "provider": "LanceDB", + "provider_config": { + "embedding_name": { + "provider": "OpenAI", + "embedding_model_name": "text-embedding-3-small" + }, + "chunk_size": 512, + "overlapping": 200, + "worker": 2, + "similarity_top_k": 2, + "rag": { + "loc": "dev" + } + } +} \ No newline at end of file diff --git a/local_setup/ingestion_server/quickstart_ingestion_server.py b/local_setup/ingestion_server/quickstart_ingestion_server.py new file mode 100644 index 00000000..1c71b503 --- /dev/null +++ b/local_setup/ingestion_server/quickstart_ingestion_server.py @@ -0,0 +1,261 @@ +import json +import logging +import time +import dotenv + +from fastapi import FastAPI, File, UploadFile, Form +from typing import Dict, Any, Optional, Union + +import uvicorn + +from embeddings import EmbedProviders +from rags import RAGProviders, BaseRAG +from datachecks import RAGConfig, RAGTask, RAGTaskStatus +import asyncio +from threading import Thread +from utils import configure_logger +from uuid import uuid4 +import tempfile +import os + +logger = configure_logger(__name__) + + +dotenv.load_dotenv() +DB: Dict[str, RAGConfig] = {} + + + +class RAG: + """ + Retrieval-Augmented Generation (RAG) implementation. + + This class handles the ingestion and storage of documents for RAG. + """ + + def __init__(self, VectorDB: BaseRAG, Workers: int = 2) -> None: + """ + Initialize the RAG instance. + + Args: + VectorDB (BaseRAG): The vector database to use. + Workers (int, optional): Number of worker threads. Defaults to 2. + """ + self.file_process_task_queue: asyncio.Queue = asyncio.Queue() + self.file_store_task_queue: asyncio.Queue = asyncio.Queue() + + self.VectorDB: BaseRAG = VectorDB + self.Workers: int = Workers + + self.RAG_THREAD = Thread(target=self.start) + self.shutdown = False + + async def __shutdown_loop(self): + """Monitor the shutdown flag.""" + while not self.shutdown: + await asyncio.sleep(0.5) + + async def __ingestion_task(self): + """Process ingestion tasks from the queue.""" + while not self.shutdown: + task: RAGTask = await self.file_process_task_queue.get() + task._status = RAGTaskStatus.PROCESSING + try: + nodes = await self.VectorDB.generate_nodes_sentence_splitter(task.file_loc) + except Exception as e: + logger.error(f"ERROR in {e}") + task._status = RAGTaskStatus.ERROR + continue + task._nodes = nodes + await self.file_store_task_queue.put(task) + + async def __nodes_storage(self): + """Store processed nodes in the vector database.""" + while not self.shutdown: + task: RAGTask = await self.file_store_task_queue.get() + try: + index = await self.VectorDB.add_index(task._nodes) + except Exception as e: + logger.error(f"ERROR in {e}") + task._status = RAGTaskStatus.ERROR + continue + task._index = index + task._status = RAGTaskStatus.SUCESSFUL + + def start(self): + """Start the RAG processing loop.""" + loop = asyncio.new_event_loop() + ingestion_task_pool = [loop.create_task(self.__ingestion_task()) for _ in range(self.Workers)] + file_storage = loop.create_task(self.__nodes_storage()) + loop.run_until_complete(self.__shutdown_loop()) + file_storage.cancel() + for t in ingestion_task_pool: + t.cancel() + loop.close() + + +class RAGFactory: + """ + Factory class for creating and managing RAG instances. + """ + + def __init__(self) -> None: + """Initialize the RAGFactory.""" + self.RAGS: Dict[str, RAG] = dict() + + def make_rag(self, config: RAGConfig): + """ + Create a new RAG instance. + + Args: + config (RAGConfig): Configuration for the RAG instance. + + Returns: + str: Unique identifier for the created RAG instance. + """ + rag_name = f"RAG-{uuid4()}" + embedding_name = EmbedProviders[config.provider_config.embedding_name.provider](config.provider_config.embedding_name.embedding_model_name) + vector_db = RAGProviders[config.provider](embedding_name, config.provider_config) + rag = RAG(vector_db, config.provider_config.worker) + rag.RAG_THREAD.start() + self.RAGS[rag_name] = rag + return rag_name + + def stop_all(self): + """Stop all RAG instances.""" + for rag in self.RAGS.values(): + rag.shutdown = True + + def stop(self, rag_name: str): + """ + Stop a specific RAG instance. + + Args: + rag_name (str): Identifier of the RAG instance to stop. + + Raises: + ValueError: If the specified RAG instance doesn't exist. + """ + if rag_name in self.RAGS.keys(): + self.RAGS[rag_name].shutdown = True + self.RAGS.pop(rag_name) + else: + raise ValueError("No RAG with that ID exists") + + async def file_ingest(self, rag_name, file) -> RAGTask: + """ + Ingest a file into a RAG instance. + + Args: + rag_name (str): Identifier of the RAG instance. + file: File object to ingest. + + Returns: + RAGTask: Task object representing the ingestion process. + + Raises: + ValueError: If the specified RAG instance doesn't exist or if the file type is unsupported. + """ + if rag_name not in self.RAGS.keys(): + raise ValueError(f"RAG: {rag_name} does not exist") + if file.content_type not in ["application/pdf", "application/x-pdf"]: + raise ValueError("Only PDF files are supported for now") + + task_id = str(uuid4()) + temp_file = tempfile.NamedTemporaryFile() + temp_file.write(await file.read()) + prev = temp_file.name + file_name = f"/tmp/{task_id}.pdf" + os.rename(prev, file_name) + task = RAGTask(file_loc=file_name) + await self.RAGS[rag_name].file_process_task_queue.put(task) + + while task._status in [RAGTaskStatus.WAIT, RAGTaskStatus.PROCESSING]: + await asyncio.sleep(0.4) + + os.rename(file_name, prev) + return task + + async def retrieve_query(self, rag_name: str, index: str, query: str): + """ + Retrieve documents based on a query. + + Args: + rag_name (str): Identifier of the RAG instance. + index (str): Index to search in. + query (str): Query string. + + Returns: + List of relevant documents. + """ + rag = self.RAGS[rag_name] + return await rag.VectorDB.get_docs_index(query=query, index=index) + +rag_factory = RAGFactory() +app = FastAPI() + +@app.get("/") +def heartbeat() -> float: + """Health check endpoint that returns the current server time.""" + return time.time() + +@app.post("/make-rag") +async def make_rag( + file: UploadFile = File(...), + config: str = Form(...) +) -> Dict[str, Any]: + """ + Create a RAG configuration, return its ID, and ingest the uploaded file. + + Args: + file (UploadFile): The file to upload and ingest. + config (str): The RAG configuration as a JSON string. + + Returns: + Dict[str, Any]: A dictionary containing the created RAG ID, upload status, and index. + """ + try: + # Parse the JSON string into a RAGConfig object + config = json.loads(config) + logger.info(f"Ingestion Config : {config}") + rag_config = RAGConfig(**config) + logger.info(f"RAG Config : {rag_config}") + + # Create RAG configuration + rag_id = rag_factory.make_rag(rag_config) + logging.info(f"Rag id {rag_id}") + # Ingest the file + task = await rag_factory.file_ingest(rag_name=rag_id, file=file) + + return { + "rag_id": rag_id, + "index": task._index, + "status": task._status, + "message": "RAG created and file ingested successfully" + } + except Exception as e: + logging.error(f"Something went wrong {e}") + return { + "rag_id": None, + "index": None, + "status": "ERROR", + "message": "Invalid JSON in config parameter" + } + +@app.get("/rag-retrive/{rag_id}/{index}") +async def rag_retrive(query: str, rag_id: str, index: str) -> list: + """Retrieve documents based on a query for a specific RAG ID and index. + Args: + query (str): The query string to search for. + rag_id (str): The ID of the RAG to search in. + index (str): The index to search in. + Returns: + list: A list of documents matching the query. + """ + docs = await rag_factory.retrieve_query(rag_name=rag_id, index=index, query=query) + send_filter = [{"text": node.text, "score": node.score} for node in docs] + return send_filter + + +if __name__ == "__main__": + uvicorn.run("quickstart_ingestion_server:app", port=8000, host="0.0.0.0", reload=True) \ No newline at end of file diff --git a/local_setup/ingestion_server/rags/__init__.py b/local_setup/ingestion_server/rags/__init__.py new file mode 100644 index 00000000..8588c135 --- /dev/null +++ b/local_setup/ingestion_server/rags/__init__.py @@ -0,0 +1,10 @@ +from rags.lancedb_rag import LanceDB +from rags.mongoDB_rag import MongoDB +from rags.base import BaseRAG + +from typing import Dict + +RAGProviders : Dict[str, BaseRAG] = { + "LanceDB": LanceDB, + "MongoDB": MongoDB +} \ No newline at end of file diff --git a/local_setup/ingestion_server/rags/base.py b/local_setup/ingestion_server/rags/base.py new file mode 100644 index 00000000..1da99427 --- /dev/null +++ b/local_setup/ingestion_server/rags/base.py @@ -0,0 +1,156 @@ +import os +import dotenv +from uuid import uuid4 + +from llama_parse import LlamaParse +from llama_index.core.node_parser import ( + MarkdownElementNodeParser, + SentenceSplitter, + TextSplitter +) +from llama_index.llms.openai import OpenAI +from llama_index.core import Settings + +from embeddings import BaseEmbed +from utils import configure_logger + + +dotenv.load_dotenv() + +logger = configure_logger(__name__) + +class BaseRAG: + """ + Base class for Retrieval-Augmented Generation (RAG) systems. + + Attributes: + provider (str): The provider for the RAG system. + base_embed (BaseEmbed): The embedding model used. + embeding_model: The actual embedding model instance. + chunk_size (int): Size of the chunks for splitting documents. + overlapping (int): Overlap size for chunking. + LLAMA_CLOUD (str): API key for Llama Cloud. + parse (LlamaParse): Instance of LlamaParse for data parsing. + OPENAI_API_KEY (str): API key for OpenAI. + llm (OpenAI): Instance of OpenAI model. + """ + + def __init__(self, provider: str, embedding_name: BaseEmbed, chunk_size: int, overlapping: int) -> None: + """ + Initializes the BaseRAG instance with the specified parameters. + + Args: + provider (str): The provider for the RAG system. + embedding_name (BaseEmbed): The embedding model used. + chunk_size (int): Size of the chunks for splitting documents. + overlapping (int): Overlap size for chunking. + + Raises: + ValueError: If required environment variables are not set. + """ + self.provider = provider + self.base_embed: BaseEmbed = embedding_name + self.embeding_model = self.base_embed.get_embedding() + self.chunk_size = chunk_size + self.overlapping = overlapping + self.LLAMA_CLOUD = os.getenv("LLAMA_CLOUD_API_KEY") + + if self.LLAMA_CLOUD is None: + raise ValueError("LLAMA_CLOUD_API_KEY is not set in .env") + + self.parse = LlamaParse(api_key=self.LLAMA_CLOUD, result_type="markdown") + + self.OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") + + if self.OPENAI_API_KEY is None: + raise ValueError("OPENAI_API_KEY is not set in .env") + + self.llm = OpenAI(model="gpt-3.5-turbo", temperature=0.2, api_key=self.OPENAI_API_KEY) + + Settings.embed_model = self.embeding_model + Settings.llm = self.llm + + def generate_index_name(self) -> str: + """Generates a unique index name using UUID. + + Returns: + str: A unique index name. + """ + return str(uuid4()) + + async def generate_nodes_sentence_splitter(self, file_loc: str): + """Generates nodes using a sentence splitter. + + Args: + file_loc (str): The file location to load data from. + + Returns: + nodes: The generated nodes after processing. + """ + docs = await self.parse.aload_data(file_path=file_loc) + node_parser = MarkdownElementNodeParser(num_workers=8, llm=self.llm) + nodes = await node_parser.aget_nodes_from_documents(docs) + nodes, _ = node_parser.get_nodes_and_objects(nodes) + nodes = await SentenceSplitter(chunk_size=self.chunk_size, chunk_overlap=self.overlapping).aget_nodes_from_documents(nodes) + return nodes + + async def generate_nodes_text_splitter(self, file_loc: str): + """Generates nodes using a text splitter. + + Args: + file_loc (str): The file location to load data from. + + Returns: + nodes: The generated nodes after processing. + """ + docs = await self.parse.aload_data(file_path=file_loc) + node_parser = MarkdownElementNodeParser(num_workers=8, llm=self.llm) + nodes = await node_parser.aget_nodes_from_documents(docs) + nodes, _ = node_parser.get_nodes_and_objects(nodes) + nodes = await TextSplitter(chunk_size=self.chunk_size, chunk_overlap=self.overlapping).aget_nodes_from_documents(nodes) + return nodes + + async def append_index(self, nodes) -> str: + """Appends nodes to the existing index. + + Args: + nodes: The nodes to append. + + Raises: + NotImplementedError: This method should be implemented in subclasses. + """ + raise NotImplementedError + + async def add_index(self, nodes) -> str: + """Adds nodes to the index. + + Args: + nodes: The nodes to add. + + Raises: + NotImplementedError: This method should be implemented in subclasses. + """ + raise NotImplementedError + + async def delete_index(self, index: str) -> bool: + """Deletes an index. + + Args: + index (str): The index to delete. + + Raises: + NotImplementedError: This method should be implemented in subclasses. + """ + raise NotImplementedError + + async def get_docs_index(self, query: str, index: str): + """Retrieves documents from the index based on a query. + + Args: + query (str): The query to search for. + index (str): The index to search in. + + Raises: + NotImplementedError: This method should be implemented in subclasses. + """ + raise NotImplementedError \ No newline at end of file diff --git a/local_setup/ingestion_server/rags/lancedb_rag.py b/local_setup/ingestion_server/rags/lancedb_rag.py new file mode 100644 index 00000000..bac1cfed --- /dev/null +++ b/local_setup/ingestion_server/rags/lancedb_rag.py @@ -0,0 +1,105 @@ +import os +from typing import Any, Coroutine +from embeddings.base import BaseEmbed +from rags.base import BaseRAG +from llama_index.vector_stores.lancedb import LanceDBVectorStore +from llama_index.core import VectorStoreIndex, StorageContext +from datachecks import ProviderConfig, LanceDBConfig +from llama_index.core.retrievers import VectorIndexRetriever + +class LanceDB(BaseRAG): + """ + LanceDB class for managing vector storage and retrieval using LanceDB. + + Attributes: + similarity_top_k (int): Number of top similar items to retrieve. + config (LanceDBConfig): Configuration for LanceDB. + loc (str): Location for the vector database. + path (str): Path to the vector database data. + """ + + def __init__(self, embedding_name: BaseEmbed, config: ProviderConfig) -> None: + """ + Initializes the LanceDB instance. + + Args: + embedding_name (BaseEmbed): The embedding model used. + config (ProviderConfig): Configuration for the provider. + """ + super().__init__("LanceDB", embedding_name, config.chunk_size, config.overlapping) + self.similarity_top_k = config.similarity_top_k + self.config: LanceDBConfig = config.rag + self.loc = self.config.loc + + # self.path = f"s3://lancedb-example-data/{self.loc}" : use this to config your lancedb dir path + + self.path = os.getenv('LANCEDB_DIR') + print(f"Initialized LanceDB with path: {self.path}") + self.verify_shared_directory() + + def verify_shared_directory(self): + if os.path.exists(self.path): + print(f"Directory {self.path} exists.") + print("Contents of the directory:") + for item in os.listdir(self.path): + print(item) + else: + print(f"Directory {self.path} does not exist.") + + async def append_index(self, nodes) -> Coroutine[Any, Any, str]: + """Appends nodes to the existing index. + + Args: + nodes: The nodes to append. + + Returns: + Coroutine: A coroutine that returns None. + """ + return None + + async def add_index(self, nodes) -> str: + """Adds nodes to the index and creates a new table. + + Args: + nodes: The nodes to add. + + Returns: + str: The name of the created table. + """ + table_name = self.generate_index_name() + # TODO: add reranking in the DB + print(f"LanceDB Dir Path is : {self.path}") + vector_store = LanceDBVectorStore(self.path, table_name=table_name) + storage_context = StorageContext.from_defaults(vector_store=vector_store) + vector_index = VectorStoreIndex(nodes=nodes, storage_context=storage_context, embed_model=self.embeding_model) + return table_name + + async def delete_index(self, index: str) -> bool: + """Deletes an index. + + Args: + index (str): The index to delete. + + Returns: + bool: Result of the deletion operation. + """ + return await super().delete_index(index) + + async def get_docs_index(self, query: str, index: str): + """Retrieves documents from the index based on a query. + + Args: + query (str): The query to search for. + index (str): The index to search in. + + Returns: + Retrieved documents based on the query. + """ + vector_store = LanceDBVectorStore(uri=self.path, table_name=index) + storage_context = StorageContext.from_defaults(vector_store=vector_store) + vector_index = VectorStoreIndex(nodes=[], storage_context=storage_context) + query_engine = VectorIndexRetriever(vector_index, similarity_top_k=self.similarity_top_k) + return query_engine.retrieve(query) + + # query_engine = vector_index.as_query_engine(llm=self.llm) + diff --git a/local_setup/ingestion_server/rags/mongoDB_rag.py b/local_setup/ingestion_server/rags/mongoDB_rag.py new file mode 100644 index 00000000..be7abead --- /dev/null +++ b/local_setup/ingestion_server/rags/mongoDB_rag.py @@ -0,0 +1,96 @@ +from typing import Any, Coroutine +from pymongo import MongoClient +from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch + +from embeddings.base import BaseEmbed +from rags.base import BaseRAG +from datachecks import ProviderConfig, MongoDBConfig + +from llama_index.core import VectorStoreIndex, StorageContext +from llama_index.core.retrievers import VectorIndexRetriever + +class MongoDB(BaseRAG): + """ + MongoDB class for managing vector storage and retrieval using MongoDB. + + Attributes: + similarity_top_k (int): Number of top similar items to retrieve. + config (MongoDBConfig): Configuration for MongoDB. + client (MongoClient): MongoDB client instance. + """ + + def __init__(self, embedding_name: BaseEmbed, config: ProviderConfig) -> None: + """ + Initializes the MongoDB instance. + + Args: + embedding_name (BaseEmbed): The embedding model used. + config (ProviderConfig): Configuration for the provider. + """ + super().__init__("MongoDB", embedding_name, config.chunk_size, config.overlapping) + self.similarity_top_k = config.similarity_top_k + self.config: MongoDBConfig = config.rag + self.client = MongoClient(self.config.uri) + + async def append_index(self, nodes) -> Coroutine[Any, Any, str]: + """Appends nodes to the existing index. + + Args: + nodes: The nodes to append. + + Returns: + Coroutine: A coroutine that calls the base class method. + """ + return await super().append_index(nodes) + + async def get_docs_index(self, query: str, index: str): + """Retrieves documents from the index based on a query. + + Args: + query (str): The query to search for. + index (str): The index to search in. + + Returns: + Retrieved documents based on the query. + """ + vector_store = MongoDBAtlasVectorSearch( + self.client, + db_name=self.config.db, + collection_name=self.config.collection_name, + vector_index_name=self.config.index + ) + vector_store_context = StorageContext.from_defaults(vector_store=vector_store) + vector_store_index = VectorStoreIndex(nodes=[], storage_context=vector_store_context) + vector_store_retriever = VectorIndexRetriever(index=vector_store_index, similarity_top_k=self.similarity_top_k) + return vector_store_retriever.retrieve(query) + + async def delete_index(self, index: str) -> Coroutine[Any, Any, bool]: + """Deletes an index. + + Args: + index (str): The index to delete. + + Returns: + Coroutine: A coroutine that calls the base class method. + """ + return await super().delete_index(index) + + async def add_index(self, nodes) -> str: + """Adds nodes to the index and creates a new index. + + Args: + nodes: The nodes to add. + + Returns: + str: The name of the created index. + """ + # index = self.generate_index_name() + vector_store = MongoDBAtlasVectorSearch( + self.client, + db_name=self.config.db, + collection_name=self.config.collection_name, + index_name=self.config.index + ) + vector_store_context = StorageContext.from_defaults(vector_store=vector_store) + vector_store_index = VectorStoreIndex(nodes=nodes, storage_context=vector_store_context, embed_model=self.embeding_model) + return self.config.index \ No newline at end of file diff --git a/local_setup/ingestion_server/utils/__init__.py b/local_setup/ingestion_server/utils/__init__.py new file mode 100644 index 00000000..80c88a37 --- /dev/null +++ b/local_setup/ingestion_server/utils/__init__.py @@ -0,0 +1 @@ +from .log import configure_logger \ No newline at end of file diff --git a/local_setup/ingestion_server/utils/log.py b/local_setup/ingestion_server/utils/log.py new file mode 100644 index 00000000..acc08150 --- /dev/null +++ b/local_setup/ingestion_server/utils/log.py @@ -0,0 +1,31 @@ +import logging + +VALID_LOGGING_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + +def configure_logger(file_name, enabled=True, logging_level='INFO'): + """ + Configures a logger for the specified file. + + Parameters: + - file_name (str): The name of the file for which the logger is being configured. + - enabled (bool): Flag to enable or disable the logger. Default is True. + - logging_level (str): The logging level to set. Must be one of the valid levels. Default is 'INFO'. + + Returns: + - logger (logging.Logger): Configured logger instance. + """ + if logging_level not in VALID_LOGGING_LEVELS: + logging_level = "INFO" + + logging.basicConfig( + level=logging_level, + format="%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + logger = logging.getLogger(file_name) + + if not enabled: + logger.disabled = True + + return logger \ No newline at end of file diff --git a/local_setup/quick_scripts/buildup_docker.sh b/local_setup/quick_scripts/buildup_docker.sh new file mode 100755 index 00000000..c188a35c --- /dev/null +++ b/local_setup/quick_scripts/buildup_docker.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Prompt the user for the branch name +read -p "Enter the branch name: " BRANCH + +# Build and run containers for bolna_server +BRANCH=$BRANCH docker-compose build --no-cache bolna-app +docker-compose up -d --build bolna-app + +# Build and run containers for other services +for dockerfile in local_setup/dockerfiles/*.Dockerfile; do + service_name=$(basename "$dockerfile" .Dockerfile) + docker-compose build --no-cache "$service_name" + docker-compose up -d --build "$service_name" +done + +# Append logs to log.txt +docker-compose logs -f >> log.txt & + +echo "All Docker images are built, and containers are up and running. Logs are being appended to log.txt." \ No newline at end of file diff --git a/local_setup/quickstart_server.py b/local_setup/quickstart_server.py index e0399703..4e7cab59 100644 --- a/local_setup/quickstart_server.py +++ b/local_setup/quickstart_server.py @@ -12,8 +12,12 @@ from bolna.models import * from bolna.llms import LiteLLM from bolna.agent_manager.assistant_manager import AssistantManager - -load_dotenv() +from bolna.helpers.data_ingestion_pipe import create_table, ingestion_task, ingestion_tasks, IngestionPipeline, IngestionTask, TaskStatus +import tempfile +import threading +from typing import Dict +import os +load_dotenv(override=True) logger = configure_logger(__name__) redis_pool = redis.ConnectionPool.from_url(os.getenv('REDIS_URL'), decode_responses=True) diff --git a/local_setup/telephony_server/requirements.txt b/local_setup/telephony_server/requirements.txt index 0f3540fa..9628139f 100644 --- a/local_setup/telephony_server/requirements.txt +++ b/local_setup/telephony_server/requirements.txt @@ -5,3 +5,10 @@ redis==5.0.1 requests==2.31.0 twilio==8.9.0 uvicorn==0.22.0 +pylance==0.16.0 +pymongo==4.8.0 +python-multipart==0.0.6 +llama-index==0.10.65 +llama-index-vector-stores-lancedb==0.1.7 +llama-index-vector-stores-mongodb==0.1.8 +llama-index-llms-openai==0.1.29 diff --git a/local_setup/telephony_server/twilio_api_server.py b/local_setup/telephony_server/twilio_api_server.py index fc8874b1..4274e20a 100644 --- a/local_setup/telephony_server/twilio_api_server.py +++ b/local_setup/telephony_server/twilio_api_server.py @@ -6,11 +6,11 @@ from twilio.rest import Client from dotenv import load_dotenv import redis.asyncio as redis -from fastapi import FastAPI, HTTPException, Query, Request +from fastapi import FastAPI, HTTPException, Query, Request, Header from fastapi.responses import PlainTextResponse app = FastAPI() -load_dotenv() +load_dotenv(override=True) port = 8001 twilio_account_sid = os.getenv('TWILIO_ACCOUNT_SID') @@ -23,6 +23,7 @@ def populate_ngrok_tunnels(): response = requests.get("http://ngrok:4040/api/tunnels") # ngrok interface + app_callback_url, websocket_url = None, None telephony_url, bolna_url = None, None if response.status_code == 200: @@ -74,6 +75,47 @@ async def make_call(request: Request): raise HTTPException(status_code=500, detail="Internal Server Error") +# # Use this Endpoint to call the playground endpoint directly! +# @app.post('/call') +# async def make_call_v2(request: Request, authorization: str = Header(...)): +# try: +# call_details = await request.json() +# agent_id = call_details.get('agent_id') +# print(f"Agent ID : {agent_id}") +# recipient_phone_number = call_details.get('recipient_phone_number') +# user_data = call_details.get('user_data', {}) + +# if not agent_id: +# raise HTTPException(status_code=400, detail="Agent ID not provided") + +# if not recipient_phone_number: +# raise HTTPException(status_code=400, detail="Recipient phone number not provided") + +# # Prepare the payload for the new endpoint +# payload = { +# "agent_id": agent_id, +# "recipient_phone_number": recipient_phone_number, +# } + +# headers = { +# 'Authorization': authorization, +# 'Content-Type': 'application/json' +# } + +# # Send the request to the new endpoint +# response = requests.post("https://api.bolna.dev/call", headers=headers, json=payload) +# print(f"Response status code: {response.status_code}") +# print(f"Response content: {response.content}") + +# if response.status_code != 200: +# raise HTTPException(status_code=response.status_code, detail=f"Failed to make call: {response.text}") + +# return PlainTextResponse("Call initiated successfully", status_code=200) + +# except Exception as e: +# print(f"Exception occurred in make_call_v2: {e}") +# raise HTTPException(status_code=500, detail=str(e)) + @app.post('/twilio_connect') async def twilio_connect(bolna_host: str = Query(...), agent_id: str = Query(...)): try: diff --git a/requirements.txt b/requirements.txt index 2c18dae0..d18ecf0d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,8 +22,21 @@ websockets==10.4 onnxruntime>=1.16.3 scipy==1.11.4 uvloop==0.19.0 -tokenizers==0.15.2 -huggingface-hub==0.20.1 -semantic-router==0.0.46 -sentence-transformers==3.0.1 +tokenizers +huggingface-hub +semantic-router +sentence_transformers optimum[onnxruntime] +lancedb==0.12.0 +llama-index==0.10.65 +llama-index-vector-stores-lancedb==0.1.7 +llama-index-vector-stores-mongodb==0.1.8 +python-multipart==0.0.9 +sentry_sdk==2.13.0 + +# tokenizers==0.15.2 +# huggingface-hub==0.20.1 +# semantic-router==0.0.46 +# sentence-transformers==3.0.1 +# optimum[onnxruntime] +