Skip to content

Commit

Permalink
Merge branch 'main' into homepage
Browse files Browse the repository at this point in the history
  • Loading branch information
BRama10 authored Sep 24, 2024
2 parents 30cab80 + ee95077 commit 2b5931f
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 80 deletions.
78 changes: 75 additions & 3 deletions aios/hooks/llm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from concurrent.futures import ThreadPoolExecutor, Future
from contextlib import contextmanager

from typing import Any
from random import randint
Expand All @@ -7,7 +8,8 @@

from aios.scheduler.fifo_scheduler import FIFOScheduler

from aios.hooks.types.llm import AgentSubmitDeclaration, FactoryParams, LLMParams, SchedulerParams, LLMRequestQueue, QueueGetMessage, QueueAddMessage, QueueCheckEmpty
from aios.hooks.types.llm import AgentSubmitDeclaration, FactoryParams, LLMParams, SchedulerParams, LLMRequestQueue, \
QueueGetMessage, QueueAddMessage, QueueCheckEmpty
from aios.hooks.validate import validate

from aios.hooks.stores import queue as QueueStore, processes as ProcessStore
Expand All @@ -19,10 +21,12 @@

ids = []


@validate(LLMParams)
def useKernel(params: LLMParams) -> LLM:
return LLM(**params.model_dump())


def useLLMRequestQueue() -> tuple[LLMRequestQueue, QueueGetMessage, QueueAddMessage, QueueCheckEmpty]:
r_str = generate_random_string()
_ = LLMRequestQueue()
Expand All @@ -38,13 +42,12 @@ def addMessage(message: str):
def isEmpty():
return QueueStore.isEmpty(_)


return _, getMessage, addMessage, isEmpty


@validate(SchedulerParams)
def useFIFOScheduler(params: SchedulerParams):
if params.get_queue_message is None:

from aios.hooks.stores._global import global_llm_req_queue_get_message

params.get_queue_message = global_llm_req_queue_get_message
Expand Down Expand Up @@ -109,3 +112,72 @@ def awaitAgentExecution(process_id: str) -> dict[str, Any]:
raise ValueError(f"Process with ID '{process_id}' not found.")

return submitAgent, awaitAgentExecution


@contextmanager
@validate(SchedulerParams)
def fifo_scheduler(params: SchedulerParams):
"""
A context manager that starts and stops a FIFO scheduler.
Args:
params (SchedulerParams): The parameters for the scheduler.
"""
if params.get_queue_message is None:
from aios.hooks.stores._global import global_llm_req_queue_get_message
params.get_queue_message = global_llm_req_queue_get_message

scheduler = FIFOScheduler(**params.model_dump())

scheduler.start()
yield
scheduler.stop()


@contextmanager
def aios_starter(
llm_name,
max_gpu_memory,
eval_device,
max_new_tokens,
scheduler_log_mode,
agent_log_mode,
llm_kernel_log_mode,
use_backend
):
"""
Starts a LLM kernel and a scheduler for running agents,
returning a submitAgent and awaitAgentExecution function.
Args:
llm_name (str): The name of the LLM kernel to use.
max_gpu_memory (str): The maximum amount of GPU memory to use.
eval_device (str): The device to evaluate the LLM on.
max_new_tokens (int): The maximum number of new tokens to generate.
scheduler_log_mode (str): The log mode for the scheduler.
agent_log_mode (str): The log mode for the agents.
llm_kernel_log_mode (str): The log mode for the LLM kernel.
use_backend (str): The backend to use for running the LLM kernel.
Yields:
submitAgent (Callable[[str, str], str]): A function that submits an agent for execution.
awaitAgentExecution (Callable[[str], dict[str, Any]]): A function that waits for an agent
to complete and returns its result.
"""
llm = useKernel(
llm_name=llm_name,
max_gpu_memory=max_gpu_memory,
eval_device=eval_device,
max_new_tokens=max_new_tokens,
log_mode=llm_kernel_log_mode,
use_backend=use_backend
)

# run agents concurrently for maximum efficiency using a scheduler
submit_agent, await_agent_execution = useFactory(
log_mode=agent_log_mode,
max_workers=64
)

with fifo_scheduler(llm=llm, log_mode=scheduler_log_mode, get_queue_message=None):
yield submit_agent, await_agent_execution
121 changes: 44 additions & 77 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import os
import warnings

from aios.hooks.llm import useFactory, useKernel, useFIFOScheduler
from aios.hooks.llm import aios_starter

from aios.utils.utils import delete_directories
from dotenv import load_dotenv
Expand All @@ -30,86 +30,53 @@ def main():
warnings.filterwarnings("ignore")
parser = parse_global_args()
args = parser.parse_args()

llm_name = args.llm_name
max_gpu_memory = args.max_gpu_memory
eval_device = args.eval_device
max_new_tokens = args.max_new_tokens
scheduler_log_mode = args.scheduler_log_mode
agent_log_mode = args.agent_log_mode
llm_kernel_log_mode = args.llm_kernel_log_mode
use_backend = args.use_backend
load_dotenv()

llm = useKernel(
llm_name=llm_name,
max_gpu_memory=max_gpu_memory,
eval_device=eval_device,
max_new_tokens=max_new_tokens,
log_mode=llm_kernel_log_mode,
use_backend=use_backend
)

# run agents concurrently for maximum efficiency using a scheduler

startScheduler, stopScheduler = useFIFOScheduler(
llm=llm,
log_mode=scheduler_log_mode,
get_queue_message=None
)

submitAgent, awaitAgentExecution = useFactory(
log_mode=agent_log_mode,
max_workers=64
)

startScheduler()

# register your agents and submit agent tasks
""" submitAgent(
agent_name="example/academic_agent",
task_input="Find recent papers on the impact of social media on mental health in adolescents."
)
"""

"""
submitAgent(
agent_name="om-raheja/transcribe_agent",
task_input="listen to my yap for 5 seconds and write a response to it"
)
"""

"""
submitAgent(
agent_name="example/cocktail_mixlogist",
task_input="Create a cocktail for a summer garden party. Guests enjoy refreshing, citrusy flavors. Available ingredients include vodka, gin, lime, lemon, mint, and various fruit juices."
)
"""

"""
submitAgent(
agent_name="example/cook_therapist",
task_input="Develop a low-carb, keto-friendly dinner that is flavorful and satisfying."
)
"""

agent_tasks = [
["example/academic_agent", "Tell me what is the prollm paper mainly about"]
# ["example/cocktail_mixlogist", "Create a cocktail for a summer garden party. Guests enjoy refreshing, citrusy flavors. Available ingredients include vodka, gin, lime, lemon, mint, and various fruit juices."]
]

agent_ids = []
for agent_name, task_input in agent_tasks:
agent_id = submitAgent(
agent_name=agent_name,
task_input=task_input
with aios_starter(**vars(args)) as (submit_agent, await_agent_execution):

# register your agents and submit agent tasks
""" submitAgent(
agent_name="example/academic_agent",
task_input="Find recent papers on the impact of social media on mental health in adolescents."
)
"""

"""
submitAgent(
agent_name="om-raheja/transcribe_agent",
task_input="listen to my yap for 5 seconds and write a response to it"
)
agent_ids.append(agent_id)

for agent_id in agent_ids:
awaitAgentExecution(agent_id)
"""

stopScheduler()
"""
submitAgent(
agent_name="example/cocktail_mixlogist",
task_input="Create a cocktail for a summer garden party. Guests enjoy refreshing, citrusy flavors. Available ingredients include vodka, gin, lime, lemon, mint, and various fruit juices."
)
"""

"""
submitAgent(
agent_name="example/cook_therapist",
task_input="Develop a low-carb, keto-friendly dinner that is flavorful and satisfying."
)
"""

agent_tasks = [
["example/academic_agent", "Tell me what is the prollm paper mainly about"]
# ["example/cocktail_mixlogist", "Create a cocktail for a summer garden party. Guests enjoy refreshing, citrusy flavors. Available ingredients include vodka, gin, lime, lemon, mint, and various fruit juices."]
]

agent_ids = []
for agent_name, task_input in agent_tasks:
agent_id = submit_agent(
agent_name=agent_name,
task_input=task_input
)
agent_ids.append(agent_id)

for agent_id in agent_ids:
await_agent_execution(agent_id)

clean_cache(root_directory="./")

Expand Down

0 comments on commit 2b5931f

Please sign in to comment.