diff --git a/aios/llm_cores/adapter.py b/aios/llm_cores/adapter.py new file mode 100644 index 00000000..3c49ae79 --- /dev/null +++ b/aios/llm_cores/adapter.py @@ -0,0 +1,83 @@ + +from aios.llm_cores.base import BaseLLM +from aios.llm_cores.registry import API_MODEL_REGISTRY +# from .llm_classes.hf_native_llm import HfNativeLLM + +# standard implementation of LLM methods +# from .llm_classes.ollama_llm import OllamaLLM +# from .llm_classes.vllm import vLLM + +class LLMAdapter: + """Parameters for LLMs + + Args: + llm_name (str): Name of the LLMs + max_gpu_memory (dict, optional): Maximum GPU resources that can be allocated to the LLM. Defaults to None. + eval_device (str, optional): Evaluation device of binding LLM to designated devices for inference. Defaults to None. + max_new_tokens (int, optional): Maximum token length generated by the LLM. Defaults to 256. + log_mode (str, optional): Mode of logging the LLM processing status. Defaults to "console". + use_backend (str, optional): Backend to use for speeding up open-source LLMs. Defaults to None. Choices are ["vllm", "ollama"] + """ + + def __init__(self, + llm_name: str, + max_gpu_memory: dict = None, + eval_device: str = None, + max_new_tokens: int = 256, + use_backend: str = None + ): + + self.model: BaseLLM | None = None + + # For API-based LLM + if llm_name in API_MODEL_REGISTRY.keys(): + self.model = API_MODEL_REGISTRY[llm_name]( + llm_name = llm_name, + ) + # For locally-deployed LLM + else: + if use_backend == "ollama" or llm_name.startswith("ollama"): + # self.model = OllamaLLM( + # llm_name=llm_name, + # max_gpu_memory=max_gpu_memory, + # eval_device=eval_device, + # max_new_tokens=max_new_tokens, + # log_mode=log_mode + # ) + pass + + elif use_backend == "vllm": + # self.model = vLLM( + # llm_name=llm_name, + # max_gpu_memory=max_gpu_memory, + # eval_device=eval_device, + # max_new_tokens=max_new_tokens, + # log_mode=log_mode + # ) + pass + else: # use huggingface LLM without backend + # self.model = HfNativeLLM( + # llm_name=llm_name, + # max_gpu_memory=max_gpu_memory, + # eval_device=eval_device, + # max_new_tokens=max_new_tokens, + # log_mode=log_mode + # ) + pass + + # def execute(self, + # agent_process, + # temperature=0.0) -> None: + # """Address request sent from the agent + + # Args: + # agent_process: AgentProcess object that contains request sent from the agent + # temperature (float, optional): Parameter to control the randomness of LLM output. Defaults to 0.0. + # """ + # self.model.execute(agent_process,temperature) + + def get_model(self) -> BaseLLM | None: + return self.model + + + \ No newline at end of file diff --git a/aios/llm_cores/base.py b/aios/llm_cores/base.py new file mode 100644 index 00000000..f5a0b346 --- /dev/null +++ b/aios/llm_cores/base.py @@ -0,0 +1,119 @@ +import json +import re + +# abc allows to make abstract classes +from abc import ABC, abstractmethod + +from aios.utils.id_generator import generator_tool_call_id +from pyopenagi.utils.chat_template import LLMQuery as Query + + +class BaseLLM(ABC): + def __init__(self, + llm_name: str, + max_gpu_memory: dict = None, + eval_device: str = None, + max_new_tokens: int = 256, + ): + self.max_gpu_memory = max_gpu_memory + self.eval_device = eval_device + self.max_new_tokens = max_new_tokens + + self.model_name = llm_name + + self.load_llm_and_tokenizer() + + + + def convert_map(self, map: dict) -> dict: + """ helper utility to convert the keys of a map to int """ + new_map = {} + for k, v in map.items(): + new_map[int(k)] = v + return new_map + + def check_model_type(self, model_name): + # TODO add more model types + return "causal_lm" + + + @abstractmethod + def load_llm_and_tokenizer(self) -> None: # load model from config + # raise NotImplementedError + """Load model and tokenizers for each type of LLMs + """ + return + + # only use for open-sourced LLM + def tool_calling_input_format(self, messages: list, tools: list) -> list: + """Integrate tool information into the messages for open-sourced LLMs + + Args: + messages (list): messages with different roles + tools (list): tool information + """ + prefix_prompt = "In and only in current step, you need to call tools. Available tools are: " + tool_prompt = json.dumps(tools) + suffix_prompt = "".join( + [ + 'Must call functions that are available. To call a function, respond ' + 'immediately and only with a list of JSON object of the following format:' + '{[{"name":"function_name_value","parameters":{"parameter_name1":"parameter_value1",' + '"parameter_name2":"parameter_value2"}}]}' + ] + ) + + # translate tool call message for models don't support tool call + for message in messages: + if "tool_calls" in message: + message["content"] = json.dumps(message.pop("tool_calls")) + elif message["role"] == "tool": + message["role"] = "user" + tool_call_id = message.pop("tool_call_id") + content = message.pop("content") + message["content"] = f"The result of the execution of function(id :{tool_call_id}) is: {content}. " + + messages[-1]["content"] += (prefix_prompt + tool_prompt + suffix_prompt) + return messages + + def parse_json_format(self, message: str) -> str: + json_array_pattern = r'\[\s*\{.*?\}\s*\]' + json_object_pattern = r'\{\s*.*?\s*\}' + + match_array = re.search(json_array_pattern, message) + + if match_array: + json_array_substring = match_array.group(0) + + try: + json_array_data = json.loads(json_array_substring) + return json.dumps(json_array_data) + except json.JSONDecodeError: + pass + + match_object = re.search(json_object_pattern, message) + + if match_object: + json_object_substring = match_object.group(0) + + try: + json_object_data = json.loads(json_object_substring) + return json.dumps(json_object_data) + except json.JSONDecodeError: + pass + return '[]' + + def parse_tool_calls(self, message): + # add tool call id and type for models don't support tool call + tool_calls = json.loads(self.parse_json_format(message)) + for tool_call in tool_calls: + tool_call["id"] = generator_tool_call_id() + tool_call["type"] = "function" + return tool_calls + + def execute(self, query: Query): + return self.process(query) + + @abstractmethod + def process(self, query: Query): + raise NotImplementedError \ No newline at end of file diff --git a/aios/llm_cores/providers/api/anthropic.py b/aios/llm_cores/providers/api/anthropic.py new file mode 100644 index 00000000..990af057 --- /dev/null +++ b/aios/llm_cores/providers/api/anthropic.py @@ -0,0 +1,141 @@ +import re +import json +import anthropic +from typing import List, Dict, Any + +from cerebrum.llm.base import BaseLLM +from cerebrum.utils.chat import Query, Response + +class ClaudeLLM(BaseLLM): + """ + ClaudeLLM class for interacting with Anthropic's Claude models. + + This class provides methods for processing queries using Claude models, + including handling of tool calls and message formatting. + + Attributes: + model (anthropic.Anthropic): The Anthropic client for API calls. + tokenizer (None): Placeholder for tokenizer, not used in this implementation. + """ + + def __init__(self, llm_name: str, + max_gpu_memory: Dict[int, str] = None, + eval_device: str = None, + max_new_tokens: int = 256): + """ + Initialize the ClaudeLLM instance. + + Args: + llm_name (str): Name of the Claude model to use. + max_gpu_memory (Dict[int, str], optional): GPU memory configuration. + eval_device (str, optional): Device for evaluation. + max_new_tokens (int, optional): Maximum number of new tokens to generate. + log_mode (str, optional): Logging mode, defaults to "console". + """ + super().__init__(llm_name, + max_gpu_memory=max_gpu_memory, + eval_device=eval_device, + max_new_tokens=max_new_tokens,) + + def load_llm_and_tokenizer(self) -> None: + """ + Load the Anthropic client for API calls. + """ + self.model = anthropic.Anthropic() + self.tokenizer = None + + def process(self, query: Query): + """ + Process a query using the Claude model. + + Args: + agent_process (Any): The agent process containing the query and tools. + temperature (float, optional): Sampling temperature for generation. + + Raises: + AssertionError: If the model name doesn't contain 'claude'. + anthropic.APIError: If there's an error with the Anthropic API call. + Exception: For any other unexpected errors. + """ + assert re.search(r'claude', self.model_name, re.IGNORECASE), "Model name must contain 'claude'" + messages = query.messages + tools = query.tools + + print(f"{messages}", level="info") + + if tools: + messages = self.tool_calling_input_format(messages, tools) + + anthropic_messages = self._convert_to_anthropic_messages(messages) + + try: + response = self.model.messages.create( + model=self.model_name, + messages=anthropic_messages, + max_tokens=self.max_new_tokens, + temperature=0.0 + ) + + response_message = response.content[0].text + # self.logger.log(f"API Response: {response_message}", level="info") + tool_calls = self.parse_tool_calls(response_message) if tools else None + + return Response( + response_message=response_message, + tool_calls=tool_calls + ) + except anthropic.APIError as e: + error_message = f"Anthropic API error: {str(e)}" + self.logger.log(error_message, level="warning") + return Response( + response_message=f"Error: {str(e)}", + tool_calls=None + ) + except Exception as e: + error_message = f"Unexpected error: {str(e)}" + self.logger.log(error_message, level="warning") + return Response( + response_message=f"Unexpected error: {str(e)}", + tool_calls=None + ) + + + def _convert_to_anthropic_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]: + """ + Convert messages to the format expected by the Anthropic API. + + Args: + messages (List[Dict[str, str]]): Original messages. + + Returns: + List[Dict[str, str]]: Converted messages for Anthropic API. + """ + anthropic_messages = [] + for message in messages: + if message['role'] == 'system': + anthropic_messages.append({"role": "user", "content": f"System: {message['content']}"}) + anthropic_messages.append({"role": "assistant", "content": "Understood. I will follow these instructions."}) + else: + anthropic_messages.append({ + "role": "user" if message['role'] == 'user' else "assistant", + "content": message['content'] + }) + return anthropic_messages + + def tool_calling_output_format(self, tool_calling_messages: str) -> List[Dict[str, Any]]: + """ + Parse the tool calling output from the model's response. + + Args: + tool_calling_messages (str): The model's response containing tool calls. + + Returns: + List[Dict[str, Any]]: Parsed tool calls, or None if parsing fails. + """ + try: + json_content = json.loads(tool_calling_messages) + if isinstance(json_content, list) and len(json_content) > 0 and 'name' in json_content[0]: + return json_content + except json.JSONDecodeError: + pass + return super().tool_calling_output_format(tool_calling_messages) \ No newline at end of file diff --git a/aios/llm_cores/providers/api/bedrock.py b/aios/llm_cores/providers/api/bedrock.py new file mode 100644 index 00000000..e69de29b diff --git a/aios/llm_cores/providers/api/google.py b/aios/llm_cores/providers/api/google.py new file mode 100644 index 00000000..930bd049 --- /dev/null +++ b/aios/llm_cores/providers/api/google.py @@ -0,0 +1,101 @@ +# wrapper around gemini from google for LLMs + +import re +import time +import json + +from cerebrum.llm.base import BaseLLM +from cerebrum.utils.chat import Query, Response +from cerebrum.utils.llm import get_from_env + + +class GeminiLLM(BaseLLM): + def __init__(self, llm_name: str, + max_gpu_memory: dict = None, + eval_device: str = None, + max_new_tokens: int = 256,): + super().__init__(llm_name, + max_gpu_memory, + eval_device, + max_new_tokens,) + + def load_llm_and_tokenizer(self) -> None: + """ dynamic loading because the module is only needed for this case """ + assert re.search(r'gemini', self.model_name, re.IGNORECASE) + try: + import google.generativeai as genai + gemini_api_key = get_from_env("GEMINI_API_KEY") + genai.configure(api_key=gemini_api_key) + self.model = genai.GenerativeModel(self.model_name) + self.tokenizer = None + except ImportError: + raise ImportError( + "Could not import google.generativeai python package. " + "Please install it with `pip install google-generativeai`." + ) + + def convert_messages(self, messages): + if messages: + gemini_messages = [] + for m in messages: + gemini_messages.append( + { + "role": "user" if m["role"] in ["user", "system"] else "model", + "parts": {"text": m["content"]} + } + ) + else: + gemini_messages = None + return gemini_messages + + def process(self, query: Query): + # ensures the model is the current one + """ wrapper around functions""" + + # agent_process.set_status("executing") + # agent_process.set_start_time(time.time()) + messages = query.messages + tools = query.tools + message_return_type = query.message_return_type + + if tools: + messages = self.tool_calling_input_format(messages, tools) + + # convert role to fit the gemini role types + messages = self.convert_messages( + messages=messages, + ) + + # self.logger.log( + # f"{agent_process.agent_name} is switched to executing.\n", + # level = "executing" + # ) + + outputs = self.model.generate_content( + json.dumps({"contents": messages}) + ) + + try: + result = outputs.candidates[0].content.parts[0].text + if tools: + tool_calls = self.parse_tool_calls(result) + if tool_calls: + return Response( + response_message=None, + tool_calls=tool_calls + ) + + else: + return Response( + response_message=result, + ) + + else: + if message_return_type == "json": + result = self.parse_json_format(result) + return Response( + response_message=result, + ) + except IndexError: + raise IndexError( + f"{self.model_name} can not generate a valid result, please try again") diff --git a/aios/llm_cores/providers/api/groq.py b/aios/llm_cores/providers/api/groq.py new file mode 100644 index 00000000..e69de29b diff --git a/aios/llm_cores/providers/api/openai.py b/aios/llm_cores/providers/api/openai.py new file mode 100644 index 00000000..a6e002f1 --- /dev/null +++ b/aios/llm_cores/providers/api/openai.py @@ -0,0 +1,100 @@ +import re +import time + +# could be dynamically imported similar to other models +from openai import OpenAI + +from cerebrum.llm.base import BaseLLM +from cerebrum.utils.chat import Query, Response + +import openai +import json + + +class GPTLLM(BaseLLM): + def __init__(self, llm_name: str, + max_gpu_memory: dict = None, + eval_device: str = None, + max_new_tokens: int = 1024,): + super().__init__(llm_name, + max_gpu_memory, + eval_device, + max_new_tokens,) + + def load_llm_and_tokenizer(self) -> None: + self.model = OpenAI() + self.tokenizer = None + + def parse_tool_calls(self, tool_calls): + if tool_calls: + parsed_tool_calls = [] + for tool_call in tool_calls: + function_name = tool_call.function.name + function_args = json.loads(tool_call.function.arguments) + parsed_tool_calls.append( + { + "name": function_name, + "parameters": function_args, + "type": tool_call.type, + "id": tool_call.id + } + ) + return parsed_tool_calls + return None + + def process(self, query: Query): + # ensures the model is the current one + assert re.search(r'gpt', self.model_name, re.IGNORECASE) + + """ wrapper around openai api """ + # agent_process.set_status("executing") + # agent_process.set_start_time(time.time()) + messages = query.messages + # print(messages) + # self.logger.log( + # f"{agent_process.agent_name} is switched to executing.\n", + # level = "executing" + # ) + # time.sleep(10) + try: + response = self.model.chat.completions.create( + model=self.model_name, + messages=messages, + tools=query.tools, + # tool_choice = "required" if agent_process.query.tools else None, + max_tokens=self.max_new_tokens, + # response_format = {"type": "json_object"} + ) + # print(response_message) + response_message = response.choices[0].message.content + # print(response_message) + tool_calls = self.parse_tool_calls( + response.choices[0].message.tool_calls + ) + # print(tool_calls) + # print(response.choices[0].message) + return Response( + response_message=response_message, + tool_calls=tool_calls + ) + except openai.APIConnectionError as e: + return Response( + response_message=f"Server connection error: {e.__cause__}" + ) + + except openai.RateLimitError as e: + return Response( + response_message=f"OpenAI RATE LIMIT error {e.status_code}: (e.response)" + ) + except openai.APIStatusError as e: + return Response( + response_message=f"OpenAI STATUS error {e.status_code}: (e.response)" + ) + except openai.BadRequestError as e: + return Response( + response_message=f"OpenAI BAD REQUEST error {e.status_code}: (e.response)" + ) + except Exception as e: + return Response( + response_message=f"An unexpected error occurred: {e}" + ) diff --git a/aios/llm_cores/providers/local/huggingface.py b/aios/llm_cores/providers/local/huggingface.py new file mode 100644 index 00000000..e69de29b diff --git a/aios/llm_cores/providers/local/ollama.py b/aios/llm_cores/providers/local/ollama.py new file mode 100644 index 00000000..e69de29b diff --git a/aios/llm_cores/providers/local/vllm.py b/aios/llm_cores/providers/local/vllm.py new file mode 100644 index 00000000..e69de29b diff --git a/aios/llm_cores/registry.py b/aios/llm_cores/registry.py new file mode 100644 index 00000000..7100a489 --- /dev/null +++ b/aios/llm_cores/registry.py @@ -0,0 +1,33 @@ +# registering all proprietary llm models in a constant + + + +#used for closed LLM model registry +from cerebrum.llm.providers.api.anthropic import ClaudeLLM +from cerebrum.llm.providers.api.google import GeminiLLM +from cerebrum.llm.providers.api.openai import GPTLLM + + +API_MODEL_REGISTRY = { + # Gemini + "gemini-1.5-flash": GeminiLLM, + "gemini-1.5-pro": GeminiLLM, + + # GPT + 'gpt-3.5-turbo': GPTLLM, + 'gpt-4-turbo': GPTLLM, + 'gpt-4o': GPTLLM, + 'gpt-4o-2024-05-13': GPTLLM, + 'gpt-4o-mini': GPTLLM, + + # claude + 'claude-3-5-sonnet-20240620': ClaudeLLM, + + # amazon bedrock + # 'bedrock/anthropic.claude-3-haiku-20240307-v1:0': BedrockLLM, + + #Groq + # 'llama3-groq-8b-8192-tool-use-preview': GroqLLM, + # 'llama3-70b-8192': GroqLLM, + # 'mixtral-8x7b-32768' : GroqLLM +} \ No newline at end of file diff --git a/server.py b/server.py index 0a8b331d..6481a37f 100644 --- a/server.py +++ b/server.py @@ -1,6 +1,7 @@ from collections import OrderedDict from fastapi import Depends, FastAPI, Query from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel from aios.hooks.modules.scheduler import useFIFOScheduler from aios.hooks.modules.agent import useFactory @@ -15,6 +16,8 @@ from aios.core.schema import CoreSchema from aios.hooks.types.parser import ParserQuery +from aios.llm_cores.adapter import LLMAdapter + from aios.utils.utils import ( parse_global_args, ) @@ -29,6 +32,8 @@ import json +from pyopenagi.utils.chat_template import LLMQuery + load_dotenv() app = FastAPI() @@ -126,6 +131,21 @@ async def set_kernel(req: LLMParams): setLLMState(useCore(**req)) +class LLMCoreCallParams(BaseModel): + llm_name: str + query: LLMQuery + +@app.post("/call_llm_core") +async def call_llm_core(req: LLMCoreCallParams): + llm = LLMAdapter(req.llm_name) + core = llm.get_model() + + res = core.execute(req.query) + + return { + 'response': res + } + @app.post("/add_agent") async def add_agent(