diff --git a/aios/llm_core/adapter.py b/aios/llm_core/adapter.py index a884aba6..111491ce 100644 --- a/aios/llm_core/adapter.py +++ b/aios/llm_core/adapter.py @@ -1,7 +1,7 @@ from aios.llm_core.cores.base import BaseLLM from aios.llm_core.registry import MODEL_REGISTRY - +from aios.llm_core.cores.local.ollama import OllamaLLM class LLMAdapter: """Parameters for LLMs @@ -37,7 +37,11 @@ def __init__(self, # For locally-deployed LLM else: if use_backend == "ollama" or llm_name.startswith("ollama"): - pass + self.model = OllamaLLM( + llm_name = llm_name, + log_mode = log_mode, + use_context_manager = use_context_manager + ) #ollama here elif use_backend == "vllm": # VLLM here diff --git a/aios/llm_core/cores/api/anthropic.py b/aios/llm_core/cores/api/anthropic.py new file mode 100644 index 00000000..d8b00873 --- /dev/null +++ b/aios/llm_core/cores/api/anthropic.py @@ -0,0 +1,213 @@ +import re +import json +import time +import anthropic +from typing import List, Dict, Any + +from aios.llm_core.cores.base import BaseLLM + +from cerebrum.llm.communication import Response + + +class ClaudeLLM(BaseLLM): + """ + ClaudeLLM class for interacting with Anthropic's Claude models. + + This class provides methods for processing queries using Claude models, + including handling of tool calls and message formatting. + + Attributes: + model (anthropic.Anthropic): The Anthropic client for API calls. + tokenizer (None): Placeholder for tokenizer, not used in this implementation. + """ + + def __init__( + self, + llm_name: str, + max_gpu_memory: Dict[int, str] = None, + eval_device: str = None, + max_new_tokens: int = 256, + log_mode: str = "console", + use_context_manager: bool = False, + ): + """ + Initialize the ClaudeLLM instance. + + Args: + llm_name (str): Name of the Claude model to use. + max_gpu_memory (Dict[int, str], optional): GPU memory configuration. + eval_device (str, optional): Device for evaluation. + max_new_tokens (int, optional): Maximum number of new tokens to generate. + log_mode (str, optional): Logging mode, defaults to "console". + """ + super().__init__( + llm_name, + max_gpu_memory=max_gpu_memory, + eval_device=eval_device, + max_new_tokens=max_new_tokens, + log_mode=log_mode, + use_context_manager=use_context_manager, + ) + + def load_llm_and_tokenizer(self) -> None: + """ + Load the Anthropic client for API calls. + """ + self.model = anthropic.Anthropic() + self.tokenizer = None + + def convert_tools(self, tools): + anthropic_tools = [] + # print(tools) + for tool in tools: + anthropic_tool = tool["function"] + anthropic_tool["input_schema"] = anthropic_tool["parameters"] + anthropic_tool.pop("parameters") + anthropic_tools.append(anthropic_tool) + # print(anthropic_tools) + return anthropic_tools + + def address_syscall(self, llm_syscall, temperature: float = 0.0) -> None: + """ + Process a request_data using the Claude model. + + Args: + agent_request (Any): The agent process containing the request_data and tools. + temperature (float, optional): Sampling temperature for generation. + + Raises: + AssertionError: If the model name doesn't contain 'claude'. + anthropic.APIError: If there's an error with the Anthropic API call. + Exception: For any other unexpected errors. + """ + assert re.search( + r"claude", self.model_name, re.IGNORECASE + ), "Model name must contain 'claude'" + llm_syscall.set_status("executing") + llm_syscall.set_start_time(time.time()) + messages = llm_syscall.query.messages + tools = llm_syscall.query.tools + + self.logger.log(f"{messages}", level="info") + self.logger.log( + f"{llm_syscall.agent_name} is switched to executing.", level="executing" + ) + + if tools: + # messages = self.tool_calling_input_format(messages, tools) + tools = self.convert_tools(tools) + + anthropic_messages = self._convert_to_anthropic_messages(messages) + self.logger.log(f"{anthropic_messages}", level="info") + + try: + response = self.model.messages.create( + model=self.model_name, + messages=anthropic_messages, + max_tokens=self.max_new_tokens, + temperature=temperature, + # tools=tools, + ) + + print(response) + + response_message = response.content[0].text + self.logger.log(f"API Response: {response_message}", level="info") + tool_calls = self.parse_tool_calls(response_message) if tools else None + + response = Response( + response_message=response_message, tool_calls=tool_calls + ) + + # agent_request.set_response( + # Response( + # response_message=response_message, + # tool_calls=tool_calls + # ) + # ) + except anthropic.APIError as e: + error_message = f"Anthropic API error: {str(e)}" + self.logger.log(error_message, level="warning") + + response = Response(response_message=f"Error: {str(e)}", tool_calls=None) + + # agent_request.set_response( + # Response( + # response_message=f"Error: {str(e)}", + # tool_calls=None + # ) + # ) + + except Exception as e: + error_message = f"Unexpected error: {str(e)}" + self.logger.log(error_message, level="warning") + # agent_request.set_response( + # Response( + # response_message=f"Unexpected error: {str(e)}", + # tool_calls=None + # ) + # ) + response = Response( + response_message=f"Unexpected error: {str(e)}", tool_calls=None + ) + + return response + # agent_request.set_status("done") + # agent_request.set_end_time(time.time()) + + def _convert_to_anthropic_messages( + self, messages: List[Dict[str, str]] + ) -> List[Dict[str, str]]: + """ + Convert messages to the format expected by the Anthropic API. + + Args: + messages (List[Dict[str, str]]): Original messages. + + Returns: + List[Dict[str, str]]: Converted messages for Anthropic API. + """ + anthropic_messages = [] + for message in messages: + if message["role"] == "system": + anthropic_messages.append( + {"role": "user", "content": f"System: {message['content']}"} + ) + anthropic_messages.append( + { + "role": "assistant", + "content": "Understood. I will follow these instructions.", + } + ) + else: + anthropic_messages.append( + { + "role": "user" if message["role"] == "user" else "assistant", + "content": message["content"], + } + ) + return anthropic_messages + + def tool_calling_output_format( + self, tool_calling_messages: str + ) -> List[Dict[str, Any]]: + """ + Parse the tool calling output from the model's response. + + Args: + tool_calling_messages (str): The model's response containing tool calls. + + Returns: + List[Dict[str, Any]]: Parsed tool calls, or None if parsing fails. + """ + try: + json_content = json.loads(tool_calling_messages) + if ( + isinstance(json_content, list) + and len(json_content) > 0 + and "name" in json_content[0] + ): + return json_content + except json.JSONDecodeError: + pass + return super().tool_calling_output_format(tool_calling_messages) diff --git a/aios/llm_core/cores/local/hf.py b/aios/llm_core/cores/local/hf.py new file mode 100644 index 00000000..3802f54d --- /dev/null +++ b/aios/llm_core/cores/local/hf.py @@ -0,0 +1,277 @@ +# Run models from huggingface using the transformers library + +import torch +import time +from transformers import AutoTokenizer + +from aios.llm_core.cores.base import BaseLLM + +from cerebrum.llm.communication import Response + +from aios.utils import get_from_env + +import re + +class HfNativeLLM(BaseLLM): + + def load_llm_and_tokenizer(self) -> None: + """ fetch the model from huggingface and run it """ + self.max_gpu_memory = self.convert_map(self.max_gpu_memory) + + self.auth_token = get_from_env("HF_AUTH_TOKENS") + + """ only casual lms for now """ + self.model = MODEL_CLASS[self.model_type].from_pretrained( + self.model_name, + device_map="auto", + max_memory=self.max_gpu_memory, + use_auth_token = self.auth_token + ) + self.tokenizer = AutoTokenizer.from_pretrained( + self.model_name, + use_auth_token = self.auth_token + ) + self.tokenizer.pad_token_id = self.tokenizer.eos_token_id + + def parse_tool_callings(self, result): + pattern = r'\[\{.*?\}\]' + matches = re.findall(pattern, result) + return matches[-1] + + def address_syscall(self, + llm_syscall, + temperature=0.0) -> None: + llm_syscall.set_status("executing") + llm_syscall.set_start_time(time.time()) + self.logger.log( + f"{llm_syscall.agent_name} is switched to executing.\n", + level = "executing" + ) + + messages = llm_syscall.request_data.messages + tools = llm_syscall.request_data.tools + message_return_type = llm_syscall.request_data.message_return_type + + """ context_manager works only with open llms """ + if self.context_manager.check_restoration(llm_syscall.get_pid()): + restored_context = self.context_manager.gen_recover( + llm_syscall.get_pid() + ) + start_idx = restored_context["start_idx"] + beams = restored_context["beams"] + beam_scores = restored_context["beam_scores"] + beam_attention_mask = restored_context["beam_attention_mask"] + + outputs = self.llm_generate( + search_mode = "beam_search", + beam_size = 1, + beams = beams, + beam_scores = beam_scores, + beam_attention_mask = beam_attention_mask, + max_new_tokens = self.max_new_tokens, + start_idx = start_idx, + timestamp = llm_syscall.get_time_limit() + ) + else: + """ use the system prompt otherwise """ + + if tools: + messages = self.tool_calling_input_format(messages, tools) + + prompt = self.tokenizer.apply_chat_template( + messages, + tokenize = False + ) + + input_ids = self.tokenizer.encode(prompt, return_tensors="pt") + + attention_mask = input_ids != self.tokenizer.pad_token_id + input_ids = input_ids.to(self.eval_device) + attention_mask = attention_mask.to(self.eval_device) + + outputs = self.llm_generate( + input_ids = input_ids, + attention_mask = attention_mask, + search_mode = "beam_search", + beam_size = 1, + max_new_tokens=self.max_new_tokens, + start_idx = 0, + timestamp = llm_syscall.get_time_limit() + ) + # TODO temporarily + outputs["result"] = outputs["result"][input_ids.shape[-1]:] + # output_ids = outputs + # print(output_ids) + output_ids = outputs["result"] + + """ devectorize the output """ + result = self.tokenizer.decode(output_ids, skip_special_tokens=False) + + if outputs["finished_flag"]: # finished flag is set as True + + if self.context_manager.check_restoration( + llm_syscall.get_pid()): + self.context_manager.clear_restoration( + llm_syscall.get_pid() + ) + + if tools: + tool_calls = self.parse_tool_calls( + result + ) + llm_syscall.set_response( + Response( + response_message = None, + tool_calls = tool_calls + ) + ) + else: + llm_syscall.set_response( + Response( + response_message = result + ) + ) + llm_syscall.set_status("done") + + else: + """ the module will automatically suspend if reach the time limit """ + self.logger.log( + f"{llm_syscall.agent_name} is switched to suspending due to the reach of time limit ({llm_syscall.get_time_limit()}s).\n", + level = "suspending" + ) + self.context_manager.gen_snapshot( + llm_syscall.get_pid(), + context = { + "start_idx": outputs["start_idx"], + "beams": outputs["beams"], + "beam_scores": outputs["beam_scores"], + "beam_attention_mask": outputs["beam_attention_mask"] + } + ) + if message_return_type == "json": + result = self.parse_json_format(result) + llm_syscall.set_response( + Response( + response_message = result + ) + ) + llm_syscall.set_status("suspending") + + llm_syscall.set_end_time(time.time()) + + def llm_generate(self, + input_ids: torch.Tensor = None, + attention_mask: torch.Tensor = None, + beams: torch.Tensor = None, + beam_scores: torch.Tensor = None, + beam_attention_mask: torch.Tensor = None, + beam_size: int = None, + max_new_tokens: int = None, + search_mode: str = None, + start_idx: int = 0, + timestamp: int = None + ): + """ only supports beam search generation """ + if search_mode == "beam_search": + output_ids = self.beam_search( + input_ids = input_ids, + attention_mask = attention_mask, + beam_size = beam_size, + beams = beams, + beam_scores = beam_scores, + beam_attention_mask = beam_attention_mask, + max_new_tokens = max_new_tokens, + start_idx = start_idx, + timestamp = timestamp + ) + return output_ids + else: + # TODO: greedy support + return NotImplementedError + + def beam_search(self, + input_ids: torch.Tensor = None, + attention_mask: torch.Tensor = None, + beams=None, + beam_scores=None, + beam_attention_mask=None, + beam_size: int = None, + max_new_tokens: int = None, + start_idx: int = 0, + timestamp: int = None + ): + + """ + beam search gets multiple token sequences concurrently and calculates + which token sequence is the most likely opposed to calculating the + best token greedily + """ + + if beams is None or beam_scores is None or beam_attention_mask is None: + beams = input_ids.repeat(beam_size, 1) + beam_attention_mask = attention_mask.repeat(beam_size, 1) + beam_scores = torch.zeros(beam_size, device=self.eval_device) + + start_time = time.time() + + finished_flag = False + + idx = start_idx + + for step in range(start_idx, max_new_tokens): + with torch.no_grad(): + # Obtain logits for the last tokens across all beams + outputs = self.model(beams, attention_mask=beam_attention_mask) + next_token_logits = outputs.logits[:, -1, :] + + # Apply softmax to convert logits to probabilities + next_token_probs = torch.softmax(next_token_logits, dim=-1) + + # Calculate scores for all possible next tokens for each beam + next_token_scores = beam_scores.unsqueeze(-1) + torch.log(next_token_probs) + + # Flatten to treat the beam and token dimensions as one + next_token_scores_flat = next_token_scores.view(-1) + + # Select top overall scores to find the next beams + top_scores, top_indices = torch.topk(next_token_scores_flat, beam_size, sorted=True) + + # Determine the next beams and their corresponding tokens + beam_indices = top_indices // next_token_probs.size(1) # Find which beam the top tokens came from + token_indices = top_indices % next_token_probs.size(1) # Find which token within the beam was selected + + # Update beams, scores, and attention masks + beams = torch.cat([beams[beam_indices], token_indices.unsqueeze(-1)], dim=-1) + beam_attention_mask = torch.cat([beam_attention_mask[beam_indices], torch.ones_like(token_indices).unsqueeze(-1)], dim=-1) + beam_scores = top_scores + + # Check for stopping criteria + if timestamp is not None and time.time() - start_time >= timestamp: + idx = step + break + + # Check for completion + if torch.all(beams[:, -1] == self.tokenizer.eos_token_id): + idx = step + finished_flag = True + break + + if step + 1 == max_new_tokens: + idx = step + finished_flag = True + break + + best_beam_idx = beam_scores.argmax() + + best_beam = beams[best_beam_idx] + + outputs = { + "finished_flag": finished_flag, + "start_idx": idx, + "beams": beams, + "beam_scores": beam_scores, + "beam_attention_mask": beam_attention_mask, + "result": best_beam + } + + return outputs \ No newline at end of file diff --git a/aios/llm_core/cores/local/ollama.py b/aios/llm_core/cores/local/ollama.py new file mode 100644 index 00000000..1b53b241 --- /dev/null +++ b/aios/llm_core/cores/local/ollama.py @@ -0,0 +1,109 @@ +# wrapper around ollama for LLMs + +import re +import time +import ollama + +from aios.llm_core.cores.base import BaseLLM + +from aios.utils import get_from_env + +from cerebrum.llm.communication import Response + + +class OllamaLLM(BaseLLM): + + def __init__( + self, + llm_name: str, + max_gpu_memory: dict = None, + eval_device: str = None, + max_new_tokens: int = 256, + log_mode: str = "console", + use_context_manager=False, + ): + super().__init__( + llm_name, + max_gpu_memory, + eval_device, + max_new_tokens, + log_mode, + use_context_manager + ) + + def load_llm_and_tokenizer(self) -> None: + self.model = None + self.tokenizer = None + + def address_syscall(self, llm_syscall, temperature=0.0): + # ensures the models are from ollama + assert re.search(r"ollama", self.model_name, re.IGNORECASE) + + """ simple wrapper around ollama functions """ + llm_syscall.set_status("executing") + llm_syscall.set_start_time(time.time()) + messages = llm_syscall.query.messages + tools = llm_syscall.query.tools + message_return_type = llm_syscall.query.message_return_type + self.logger.log( + f"{llm_syscall.agent_name} is switched to executing.\n", level="executing" + ) + + # with and without overhead for tool handling + if tools: + messages = self.tool_calling_input_format(messages, tools) + try: + response = ollama.chat( + model=self.model_name.split("/")[-1], messages=messages + ) + + tool_calls = self.parse_tool_calls(response["message"]["content"]) + + if tool_calls: + llm_syscall.set_response( + Response( + response_message=None, tool_calls=tool_calls, finished=True + ) + ) + else: + llm_syscall.set_response( + Response( + response_message=response["message"]["content"], + finished=True, + ) + ) + except Exception as e: + llm_syscall.set_response( + Response(response_message=f"An unexpected error occurred: {e}"), + finished=True, + ) + + else: + try: + response = ollama.chat( + model=self.model_name.split("/")[-1], + messages=messages, + options=ollama.Options(num_predict=self.max_new_tokens), + ) + result = response["message"]["content"] + + # print(f"***** original result: {result} *****") + + if message_return_type == "json": + result = self.parse_json_format(result) + + llm_syscall.set_response( + Response(response_message=result, finished=True) + ) + + except Exception as e: + llm_syscall.set_response( + Response( + response_message=f"An unexpected error occurred: {e}", + finished=True, + ) + ) + + llm_syscall.set_status("done") + llm_syscall.set_end_time(time.time()) + return diff --git a/aios/llm_core/cores/local/vllm.py b/aios/llm_core/cores/local/vllm.py new file mode 100644 index 00000000..3e0a3a6d --- /dev/null +++ b/aios/llm_core/cores/local/vllm.py @@ -0,0 +1,117 @@ +import time + +# could be dynamically imported similar to other models + +from aios.llm_core.cores.base import BaseLLM + +from aios.utils import get_from_env + +from cerebrum.llm.communication import Response + + +from transformers import AutoTokenizer + + +class vLLM(BaseLLM): + + def __init__( + self, + llm_name: str, + max_gpu_memory: dict = None, + eval_device: str = None, + max_new_tokens: int = 256, + log_mode: str = "console", + use_context_manager=False, + ): + + super().__init__( + llm_name, + max_gpu_memory, + eval_device, + max_new_tokens, + log_mode, + use_context_manager + ) + + def load_llm_and_tokenizer(self) -> None: + """fetch the model from huggingface and run it""" + self.available_gpus = list(self.max_gpu_memory.keys()) + self.gpu_nums = len(self.available_gpus) + try: + import vllm + except ImportError: + raise ImportError( + "Could not import vllm python package. " + "Please install it with `pip install vllm`." + ) + + """ only casual lms for now """ + self.model = vllm.LLM( + model=self.model_name, + download_dir=get_from_env("HF_HOME"), + tensor_parallel_size=self.gpu_nums, + ) + self.tokenizer = AutoTokenizer.from_pretrained( + self.model_name, + ) + self.tokenizer.pad_token_id = self.tokenizer.eos_token_id + + self.sampling_params = vllm.SamplingParams( + temperature=0.8, + top_p=0.95, + max_tokens=self.max_new_tokens, + ) + + def address_syscall(self, llm_syscall, temperature=0.0) -> None: + llm_syscall.set_status("executing") + llm_syscall.set_start_time(time.time()) + self.logger.log( + f"{llm_syscall.agent_name} is switched to executing.\n", level="executing" + ) + + messages = llm_syscall.query.messages + tools = llm_syscall.query.tools + message_return_type = llm_syscall.query.message_return_type + + if tools: + messages = self.tool_calling_input_format(messages, tools) + # print(messages) + prompt = self.tokenizer.apply_chat_template( + messages, + # tools = tools, + tokenize=False, + ) + # prompt = self.parse_messages(messages) + response = self.model.generate(prompt, self.sampling_params) + # print(response) + result = response[0].outputs[0].text + + # print(f"***** Result: {result} *****") + + tool_calls = self.parse_tool_calls(result) + if tool_calls: + llm_syscall.set_response( + Response( + response_message=None, tool_calls=tool_calls, finished=True + ) + ) + else: + llm_syscall.set_response( + Response(response_message=result, finished=True) + ) + + else: + prompt = self.tokenizer.apply_chat_template(messages, tokenize=False) + + # prompt = self.parse_messages(messages) + response = self.model.generate(prompt, self.sampling_params) + + result = response[0].outputs[0].text + if message_return_type == "json": + result = self.parse_json_format(result) + + llm_syscall.set_response(Response(response_message=result, finished=True)) + + llm_syscall.set_status("done") + + llm_syscall.set_end_time(time.time()) diff --git a/aios/llm_core/registry.py b/aios/llm_core/registry.py index 34d88614..5dc7c08f 100644 --- a/aios/llm_core/registry.py +++ b/aios/llm_core/registry.py @@ -1,6 +1,6 @@ from aios.llm_core.cores.api.google import GeminiLLM from aios.llm_core.cores.api.openai import GPTLLM - +from aios.llm_core.cores.api.anthropic import ClaudeLLM MODEL_REGISTRY = { # Gemini @@ -13,4 +13,9 @@ 'gpt-4o': GPTLLM, 'gpt-4o-2024-05-13': GPTLLM, 'gpt-4o-mini': GPTLLM, + + # Claude + 'claude-3-5-sonnet-latest': ClaudeLLM, + 'claude-3-5-haiku-latest': ClaudeLLM, + 'claude-3-opus-latest': ClaudeLLM } \ No newline at end of file