diff --git a/README.md b/README.md index 4b9a35c..65a9b44 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,53 @@ print(out) --- +## `TogetherLLM` Documentation + +The `TogetherLLM` class is designed to simplify the interaction with Together's LLM models. It provides a straightforward way to run tasks on these models, including support for concurrent and batch processing. + +### Initialization + +To use `TogetherLLM`, you need to initialize it with your API key, the name of the model you want to use, and optionally, a system prompt. The system prompt is used to provide context to the model for the tasks you will run. + +Here's an example of how to initialize `TogetherLLM`: +```python +import os +from swarm_models import TogetherLLM + +model_runner = TogetherLLM( + api_key=os.environ.get("TOGETHER_API_KEY"), + model_name="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo", + system_prompt="You're Larry fink", +) +``` +### Running Tasks + +Once initialized, you can run tasks on the model using the `run` method. This method takes a task string as an argument and returns the response from the model. + +Here's an example of running a single task: +```python +task = "How do we allocate capital efficiently in your opinion Larry?" +response = model_runner.run(task) +print(response) +``` +### Running Multiple Tasks Concurrently + +`TogetherLLM` also supports running multiple tasks concurrently using the `run_concurrently` method. This method takes a list of task strings and returns a list of responses from the model. + +Here's an example of running multiple tasks concurrently: +```python +tasks = [ + "What are the top-performing mutual funds in the last quarter?", + "How do I evaluate the risk of a mutual fund?", + "What are the fees associated with investing in a mutual fund?", + "Can you recommend a mutual fund for a beginner investor?", + "How do I diversify my portfolio with mutual funds?", +] +responses = model_runner.run_concurrently(tasks) +for response in responses: + print(response) +``` + ## **Enterprise-Grade Features** diff --git a/examples/ollama_example.py b/examples/ollama_example.py index 29e62d7..5e4cf37 100644 --- a/examples/ollama_example.py +++ b/examples/ollama_example.py @@ -3,4 +3,4 @@ model = OllamaModel(model_name="", host="") -model.run("What is the theory of the universe") \ No newline at end of file +model.run("What is the theory of the universe") diff --git a/pyproject.toml b/pyproject.toml index c07c93c..a7ad1e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarm-models" -version = "0.1.0" +version = "0.1.1" description = "Swarm Models - Pytorch" license = "MIT" authors = ["Kye Gomez "] @@ -30,6 +30,7 @@ diffusers = "*" loguru = "*" pydantic = "*" langchain-community = "0.0.29" +together = "*" diff --git a/requirements.txt b/requirements.txt index a5772cc..909eda5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ transformers diffusers loguru pydantic -langchain-community=="0.0.29" \ No newline at end of file +langchain-community=="0.0.29" +together \ No newline at end of file diff --git a/swarm_models/__init__.py b/swarm_models/__init__.py index 7686854..5bac302 100644 --- a/swarm_models/__init__.py +++ b/swarm_models/__init__.py @@ -28,7 +28,6 @@ ) from swarm_models.popular_llms import ReplicateChat as Replicate from swarm_models.qwen import QwenVLMultiModal # noqa: E402 -from swarm_models.together import TogetherLLM # noqa: E402 from swarm_models.model_types import ( # noqa: E402 AudioModality, ImageModality, @@ -42,6 +41,7 @@ from swarm_models.ollama_model import OllamaModel from swarm_models.sam_two import GroundedSAMTwo from swarm_models.utils import * # NOQA +from swarm_models.together_llm import TogetherLLM __all__ = [ "BaseLLM", diff --git a/swarm_models/model_router.py b/swarm_models/model_router.py index 612352b..8943c89 100644 --- a/swarm_models/model_router.py +++ b/swarm_models/model_router.py @@ -1,8 +1,4 @@ -from typing import List, Union - -from swarm_models.base_embedding_model import BaseEmbeddingModel -from swarm_models.base_llm import BaseLLM -from swarm_models.base_multimodal_model import BaseMultiModalModel +from typing import List, Callable from swarm_models.fuyu import Fuyu # noqa: E402 from swarm_models.gpt4_vision_api import GPT4VisionAPI # noqa: E402 from swarm_models.huggingface import HuggingfaceLLM # noqa: E402 @@ -13,7 +9,6 @@ from swarm_models.llava import LavaMultiModal # noqa: E402 from swarm_models.nougat import Nougat # noqa: E402 from swarm_models.openai_embeddings import OpenAIEmbeddings -from swarm_models.openai_function_caller import OpenAIFunctionCaller from swarm_models.openai_tts import OpenAITTS # noqa: E402 from swarm_models.palm import GooglePalm as Palm # noqa: E402 from swarm_models.popular_llms import Anthropic as Anthropic @@ -35,20 +30,16 @@ from swarm_models.sampling_params import SamplingParams from swarm_models.together import TogetherLLM # noqa: E402 from swarm_models.vilt import Vilt # noqa: E402 -from swarm_models.structs.base_structure import BaseStructure -from swarm_models.utils.loguru_logger import logger +from loguru import logger -# New type BaseLLM and BaseEmbeddingModel and BaseMultimodalModel -omni_model_type = Union[ - BaseLLM, BaseEmbeddingModel, BaseMultiModalModel, callable -] -list_of_omni_model_type = List[omni_model_type] +# # New type BaseLLM and BaseEmbeddingModel and BaseMultimodalModel +# omni_model_type = Union[ +# BaseLLM, BaseEmbeddingModel, BaseMultiModalModel, callable +# ] +# list_of_Callable = List[Callable] models = [ - BaseLLM, - BaseEmbeddingModel, - BaseMultiModalModel, Fuyu, GPT4VisionAPI, HuggingfaceLLM, @@ -73,28 +64,28 @@ TogetherLLM, Vilt, FireWorksAI, - OpenAIFunctionCaller, + # OpenAIFunctionCaller, ] -class ModelRouter(BaseStructure): +class ModelRouter: """ A router for managing multiple models. Attributes: model_router_id (str): The ID of the model router. model_router_description (str): The description of the model router. - model_pool (List[omni_model_type]): The list of models in the model pool. + model_pool (List[Callable]): The list of models in the model pool. Methods: check_for_models(): Checks if there are any models in the model pool. - add_model(model: omni_model_type): Adds a model to the model pool. - add_models(models: List[omni_model_type]): Adds multiple models to the model pool. - get_model_by_name(model_name: str) -> omni_model_type: Retrieves a model from the model pool by its name. - get_multiple_models_by_name(model_names: List[str]) -> List[omni_model_type]: Retrieves multiple models from the model pool by their names. - get_model_pool() -> List[omni_model_type]: Retrieves the entire model pool. - get_model_by_index(index: int) -> omni_model_type: Retrieves a model from the model pool by its index. - get_model_by_id(model_id: str) -> omni_model_type: Retrieves a model from the model pool by its ID. + add_model(model: Callable): Adds a model to the model pool. + add_models(models: List[Callable]): Adds multiple models to the model pool. + get_model_by_name(model_name: str) -> Callable: Retrieves a model from the model pool by its name. + get_multiple_models_by_name(model_names: List[str]) -> List[Callable]: Retrieves multiple models from the model pool by their names. + get_model_pool() -> List[Callable]: Retrieves the entire model pool. + get_model_by_index(index: int) -> Callable: Retrieves a model from the model pool by its index. + get_model_by_id(model_id: str) -> Callable: Retrieves a model from the model pool by its ID. dict() -> dict: Returns a dictionary representation of the model router. """ @@ -103,7 +94,7 @@ def __init__( self, model_router_id: str = "model_router", model_router_description: str = "A router for managing multiple models.", - model_pool: List[omni_model_type] = models, + model_pool: List[Callable] = models, verbose: bool = False, *args, **kwargs, @@ -130,12 +121,12 @@ def check_for_models(self): if len(self.model_pool) == 0: raise ValueError("No models found in model pool.") - def add_model(self, model: omni_model_type): + def add_model(self, model: Callable): """ Adds a model to the model pool. Args: - model (omni_model_type): The model to be added. + model (Callable): The model to be added. Returns: str: A success message indicating that the model has been added to the model pool. @@ -144,12 +135,12 @@ def add_model(self, model: omni_model_type): self.model_pool.append(model) return "Model successfully added to model pool." - def add_models(self, models: List[omni_model_type]): + def add_models(self, models: List[Callable]): """ Adds multiple models to the model pool. Args: - models (List[omni_model_type]): The models to be added. + models (List[Callable]): The models to be added. Returns: str: A success message indicating that the models have been added to the model pool. @@ -158,59 +149,9 @@ def add_models(self, models: List[omni_model_type]): self.model_pool.extend(models) return "Models successfully added to model pool." - # def query_model_from_langchain(self, model_name: str, *args, **kwargs): - # """ - # Query a model from langchain community. - - # Args: - # model_name (str): The name of the model. - # *args: Additional positional arguments to be passed to the model. - # **kwargs: Additional keyword arguments to be passed to the model. - - # Returns: - # omni_model_type: The model object. - - # Raises: - # ValueError: If the model with the given name is not found in the model pool. - # """ - # from langchain_community.llms import __getattr__ - - # logger.info( - # f"Querying model {model_name} from langchain community." - # ) - # model = __getattr__(model_name)(*args, **kwargs) - # model = self.refactor_model_class_if_invoke_class(model) - - # return model - - def get_model_by_name(self, model_name: str) -> omni_model_type: - """ - Retrieves a model from the model pool by its name. - - Args: - model_name (str): The name of the model. - - Returns: - omni_model_type: The model object. - - Raises: - ValueError: If the model with the given name is not found in the model pool. - """ - logger.info(f"Retrieving model {model_name} from model pool.") - for model in self.model_pool: - if model_name in [ - model.name, - model.model_id, - model.model_name, - ]: - return model - raise ValueError( - f"Model {model_name} not found in model pool." - ) - def get_multiple_models_by_name( self, model_names: List[str] - ) -> List[omni_model_type]: + ) -> List[Callable]: """ Retrieves multiple models from the model pool by their names. @@ -218,7 +159,7 @@ def get_multiple_models_by_name( model_names (List[str]): The names of the models. Returns: - List[omni_model_type]: The list of model objects. + List[Callable]: The list of model objects. Raises: ValueError: If any of the models with the given names are not found in the model pool. @@ -231,16 +172,16 @@ def get_multiple_models_by_name( models.append(self.get_model_by_name(model_name)) return models - def get_model_pool(self) -> List[omni_model_type]: + def get_model_pool(self) -> List[Callable]: """ Retrieves the entire model pool. Returns: - List[omni_model_type]: The list of model objects in the model pool. + List[Callable]: The list of model objects in the model pool. """ return self.model_pool - def get_model_by_index(self, index: int) -> omni_model_type: + def get_model_by_index(self, index: int) -> Callable: """ Retrieves a model from the model pool by its index. @@ -248,42 +189,48 @@ def get_model_by_index(self, index: int) -> omni_model_type: index (int): The index of the model in the model pool. Returns: - omni_model_type: The model object. + Callable: The model object. Raises: IndexError: If the index is out of range. """ return self.model_pool[index] - def get_model_by_id(self, model_id: str) -> omni_model_type: + def get_model_by_name(self, model_name: str) -> Callable: """ - Retrieves a model from the model pool by its ID. + Retrieves a model from the model pool by its name. Args: - model_id (str): The ID of the model. + model_name (str): The name of the model. Returns: - omni_model_type: The model object. + Callable: The model object. Raises: - ValueError: If the model with the given ID is not found in the model pool. + ValueError: If the model with the given name is not found in the model pool. """ - name = model_id + logger.info(f"Retrieving model {model_name} from model pool.") for model in self.model_pool: - if ( - hasattr(model, "model_id") - and name == model.model_id - or hasattr(model, "model_name") - and name == model.model_name - or hasattr(model, "name") - and name == model.name - or hasattr(model, "model") - and name == model.model - ): + # Create a list of possible names to check + model_names = [] + + # Check for the existence of attributes before accessing them + if hasattr(model, "name"): + model_names.append(model.name) + if hasattr(model, "model_id"): + model_names.append(model.model_id) + if hasattr(model, "model_name"): + model_names.append(model.model_name) + + # Check if the model_name is in the list of model names + if model_name in model_names: return model - raise ValueError(f"Model {model_id} not found in model pool.") - def refactor_model_class_if_invoke(self): + return model + + # raise ValueError(f"Model {model_name} not found in model pool.") + + def refactor_model_class_if_invoke(self, *args, **kwargs): """ Refactors the model class if it has an 'invoke' method. @@ -294,8 +241,18 @@ def refactor_model_class_if_invoke(self): """ for model in self.model_pool: if hasattr(model, "invoke"): - model.run = model.invoke - model.__call__ = model.invoke + model.run = model.invoke(*args, **kwargs) + model.__call__ = model.invoke(*args, **kwargs) + logger.info( + f"Refactored model {model.name} to have run and __call__ methods." + ) + + # Update the model in the model pool + self.model_pool[self.model_pool.index(model)] = model + + if hasattr(model, "generate"): + model.run = model.invoke(*args, **kwargs) + model.__call__ = model.invoke(*args, **kwargs) logger.info( f"Refactored model {model.name} to have run and __call__ methods." ) @@ -326,11 +283,7 @@ def refactor_model_class_if_invoke_class( return model def find_model_by_name_and_run( - self, - model_name: str = None, - task: str = None, - *args, - **kwargs, + self, model_name: str, task: str, *args, **kwargs ) -> str: """ Finds a model by its name and runs a task on it. @@ -338,8 +291,6 @@ def find_model_by_name_and_run( Args: model_name (str): The name of the model. task (str): The task to be run on the model. - *args: Additional positional arguments to be passed to the task. - **kwargs: Additional keyword arguments to be passed to the task. Returns: str: The result of running the task on the model. @@ -348,7 +299,24 @@ def find_model_by_name_and_run( ValueError: If the model with the given name is not found in the model pool. """ model = self.get_model_by_name(model_name) - return model.run(task, *args, **kwargs) + + if model is None: + raise ValueError( + f"Model '{model_name}' not found in the model pool." + ) + + return model.run(task=task, *args, **kwargs) + + +def run_model_router( + model_name: str, task: str, *args, **kwargs +) -> str: + logger.info( + f"Running model router with {model_name} on task: {task}" + ) + return ModelRouter().find_model_by_name_and_run( + model_name, task, *args, **kwargs + ) # model = ModelRouter() @@ -356,4 +324,5 @@ def find_model_by_name_and_run( # print(model.get_model_pool()) # print(model.get_model_by_index(0)) # print(model.get_model_by_id("stability-ai/stable-diffusion:")) -# # print(model.get_multiple_models_by_name(["gpt-4o", "gpt-4"])) +# print(model.get_multiple_models_by_name(["gpt-4o", "gpt-4"])) +print(run_model_router("gpt-4o-mini", "what's your name?")) diff --git a/swarm_models/together.py b/swarm_models/together.py deleted file mode 100644 index 8bccaf7..0000000 --- a/swarm_models/together.py +++ /dev/null @@ -1,137 +0,0 @@ -import logging -import os -from typing import Optional - -import requests -from dotenv import load_dotenv - -from swarm_models.base_llm import BaseLLM - -# Load environment variables -load_dotenv() - - -def together_api_key_env(): - """Get the API key from the environment.""" - return os.getenv("TOGETHER_API_KEY") - - -class TogetherLLM(BaseLLM): - """ - GPT-4 Vision API - - This class is a wrapper for the OpenAI API. It is used to run the GPT-4 Vision model. - - Parameters - ---------- - together_api_key : str - The OpenAI API key. Defaults to the together_api_key environment variable. - max_tokens : int - The maximum number of tokens to generate. Defaults to 300. - - - Methods - ------- - encode_image(img: str) - Encode image to base64. - run(task: str, img: str) - Run the model. - __call__(task: str, img: str) - Run the model. - - Examples: - --------- - >>> from swarm_models import GPT4VisionAPI - >>> llm = GPT4VisionAPI() - >>> task = "What is the color of the object?" - >>> img = "https://i.imgur.com/2M2ZGwC.jpeg" - >>> llm.run(task, img) - - - """ - - def __init__( - self, - together_api_key: str = together_api_key_env, - model_name: str = "mistralai/Mixtral-8x7B-Instruct-v0.1", - logging_enabled: bool = False, - max_workers: int = 10, - max_tokens: str = 300, - api_endpoint: str = "https://api.together.xyz", - beautify: bool = False, - streaming_enabled: Optional[bool] = False, - meta_prompt: Optional[bool] = False, - system_prompt: Optional[str] = None, - *args, - **kwargs, - ): - super(TogetherLLM).__init__(*args, **kwargs) - self.together_api_key = together_api_key - self.logging_enabled = logging_enabled - self.model_name = model_name - self.max_workers = max_workers - self.max_tokens = max_tokens - self.api_endpoint = api_endpoint - self.beautify = beautify - self.streaming_enabled = streaming_enabled - self.meta_prompt = meta_prompt - self.system_prompt = system_prompt - - if self.logging_enabled: - logging.basicConfig(level=logging.DEBUG) - else: - # Disable debug logs for requests and urllib3 - logging.getLogger("requests").setLevel(logging.WARNING) - logging.getLogger("urllib3").setLevel(logging.WARNING) - - if self.meta_prompt: - self.system_prompt = self.meta_prompt_init() - - # Function to handle vision tasks - def run(self, task: str = None, *args, **kwargs): - """Run the model.""" - try: - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {self.together_api_key}", - } - payload = { - "model": self.model_name, - "messages": [ - { - "role": "system", - "content": [self.system_prompt], - }, - { - "role": "user", - "content": task, - }, - ], - "max_tokens": self.max_tokens, - **kwargs, - } - response = requests.post( - self.api_endpoint, - headers=headers, - json=payload, - *args, - **kwargs, - ) - - out = response.json() - content = ( - out["choices"][0] - .get("message", {}) - .get("content", None) - ) - if self.streaming_enabled: - content = self.stream_response(content) - - return content - - except Exception as error: - print( - f"Error with the request: {error}, make sure you" - " double check input types and positions" - ) - return None diff --git a/swarm_models/together_llm.py b/swarm_models/together_llm.py new file mode 100644 index 0000000..9d8c9d4 --- /dev/null +++ b/swarm_models/together_llm.py @@ -0,0 +1,140 @@ +from concurrent.futures import ThreadPoolExecutor + +import loguru +from dotenv import load_dotenv +from together import Together + +load_dotenv() + + +class TogetherLLM: + """ + A class to run models with various arguments, including support for concurrent and batch processing. + + Attributes: + api_key (str): The API key for the model service. + model_name (str): The name of the model to run. + """ + + def __init__( + self, + api_key: str, + model_name: str, + system_prompt: str = None, + *args, + **kwargs, + ): + """ + Initializes the ModelRunner with an API key and model name. + + Args: + api_key (str): The API key for the model service. + model_name (str): The name of the model to run. + """ + self.api_key = api_key + self.model_name = model_name + self.system_prompt = system_prompt + self.client = Together(api_key=self.api_key, *args, **kwargs) + loguru.logger.add("model_runner.log", rotation="10 MB") + + def run(self, task: str, *args, **kwargs) -> str: + """ + Runs the model with the given task and returns the response. + + Args: + task (str): The task to pass to the model. + **kwargs: Additional keyword arguments to pass to the model. + + Returns: + str: The content of the first response choice. + """ + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=[ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": task}, + ], + *args, + **kwargs, + ) + loguru.logger.info( + f"Model {self.model_name} run successfully with task: {task}" + ) + return response.choices[0].message.content + except Exception as e: + loguru.logger.error( + f"Error running model {self.model_name} with task: {task}: {e}" + ) + return "Error running model." + + def run_concurrently(self, tasks: list, **kwargs) -> list: + """ + Runs the model concurrently with multiple tasks and returns a list of responses. + + Args: + tasks (list): A list of tasks to pass to the model. + **kwargs: Additional keyword arguments to pass to the model. + + Returns: + list: A list of responses, each being the content of the first response choice. + """ + responses = [] + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.run, task, **kwargs) + for task in tasks + ] + for future in futures: + try: + response = future.result() + responses.append(response) + except Exception as e: + loguru.logger.error( + f"Error running model concurrently: {e}" + ) + responses.append("Error running model.") + return responses + + def run_batch( + self, tasks: list, batch_size: int = 10, **kwargs + ) -> list: + """ + Runs the model in batches with multiple tasks and returns a list of responses. + + Args: + tasks (list): A list of tasks to pass to the model. + batch_size (int): The size of each batch to process concurrently. + **kwargs: Additional keyword arguments to pass to the model. + + Returns: + list: A list of responses, each being the content of the first response choice. + """ + responses = [] + for i in range(0, len(tasks), batch_size): + batch_tasks = tasks[i : i + batch_size] + batch_responses = self.run_concurrently( + batch_tasks, **kwargs + ) + responses.extend(batch_responses) + return responses + + +# # Example usage +# if __name__ == "__main__": +# model_runner = TogetherLLM( +# api_key=os.environ.get("TOGETHER_API_KEY"), +# model_name="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo", +# system_prompt="You're Larry fink", +# ) +# tasks = [ +# "What are the top-performing mutual funds in the last quarter?", +# "How do I evaluate the risk of a mutual fund?", +# "What are the fees associated with investing in a mutual fund?", +# "Can you recommend a mutual fund for a beginner investor?", +# "How do I diversify my portfolio with mutual funds?", +# ] +# # response_contents = model_runner.run_concurrently(tasks) +# # for response_content in response_contents: +# # print(response_content) +# print(model_runner.run("How do we allocate capital efficiently in your opion Larry?")) diff --git a/together_example.py b/together_example.py new file mode 100644 index 0000000..f596d85 --- /dev/null +++ b/together_example.py @@ -0,0 +1,22 @@ +import os +from swarm_models import TogetherLLM + + +# Example usage +if __name__ == "__main__": + model_runner = TogetherLLM( + api_key=os.environ.get("TOGETHER_API_KEY"), + model_name="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo", + system_prompt="You're Larry fink", + ) + tasks = [ + "What are the top-performing mutual funds in the last quarter?", + "How do I evaluate the risk of a mutual fund?", + "What are the fees associated with investing in a mutual fund?", + "Can you recommend a mutual fund for a beginner investor?", + "How do I diversify my portfolio with mutual funds?", + ] + # response_contents = model_runner.run_concurrently(tasks) + # for response_content in response_contents: + # print(response_content) + print(model_runner.run("How do we allocate capital efficiently in your opion Larry?"))