Skip to content

Commit

Permalink
log tool events
Browse files Browse the repository at this point in the history
  • Loading branch information
bboynton97 committed May 14, 2024
1 parent ecc4113 commit 3db70ce
Show file tree
Hide file tree
Showing 8 changed files with 519 additions and 8 deletions.
9 changes: 6 additions & 3 deletions autogen/agentchat/conversable_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .agent import Agent, LLMAgent
from .chat import ChatResult, a_initiate_chats, initiate_chats
from .utils import consolidate_chat_info, gather_usage_summary
from autogen.runtime_logging import log_function_use

__all__ = ("ConversableAgent",)

Expand Down Expand Up @@ -1345,6 +1346,7 @@ def _generate_oai_reply_from_client(self, llm_client, messages, cache) -> Union[
context=messages[-1].pop("context", None),
messages=all_messages,
cache=cache,
source=self
)
extracted_response = llm_client.extract_text_or_completion_object(response)[0]

Expand Down Expand Up @@ -2394,7 +2396,7 @@ def register_function(self, function_map: Dict[str, Union[Callable, None]]):
self._function_map.update(function_map)
self._function_map = {k: v for k, v in self._function_map.items() if v is not None}

def update_function_signature(self, func_sig: Union[str, Dict], is_remove: None):
def update_function_signature(self, func_sig: Union[str, Dict], is_remove: bool = False):
"""update a function_signature in the LLM configuration for function_call.
Args:
Expand Down Expand Up @@ -2438,7 +2440,7 @@ def update_function_signature(self, func_sig: Union[str, Dict], is_remove: None)

self.client = OpenAIWrapper(**self.llm_config)

def update_tool_signature(self, tool_sig: Union[str, Dict], is_remove: None):
def update_tool_signature(self, tool_sig: Union[str, Dict], is_remove: bool = False):
"""update a tool_signature in the LLM configuration for tool_call.
Args:
Expand Down Expand Up @@ -2506,13 +2508,14 @@ def _wrap_function(self, func: F) -> F:
@functools.wraps(func)
def _wrapped_func(*args, **kwargs):
retval = func(*args, **kwargs)

log_function_use(self, func, kwargs, retval)
return serialize_to_str(retval)

@load_basemodels_if_needed
@functools.wraps(func)
async def _a_wrapped_func(*args, **kwargs):
retval = await func(*args, **kwargs)
log_function_use(self, func, kwargs, retval)
return serialize_to_str(retval)

wrapped_func = _a_wrapped_func if inspect.iscoroutinefunction(func) else _wrapped_func
Expand Down
124 changes: 124 additions & 0 deletions autogen/logger/agentops_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from __future__ import annotations

import json
import logging
import sqlite3
import threading
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Union, TypeVar, Callable

import agentops
from openai import AzureOpenAI, OpenAI
from openai.types.chat import ChatCompletion

from autogen.logger.base_logger import BaseLogger
from autogen.logger.logger_utils import get_current_ts, to_dict

from .base_logger import LLMConfig

from agentops import LLMEvent, ToolEvent, ActionEvent
from uuid import uuid4

if TYPE_CHECKING:
from autogen import Agent, ConversableAgent, OpenAIWrapper

logger = logging.getLogger(__name__)
lock = threading.Lock()

__all__ = ("AgentOpsLogger",)

F = TypeVar("F", bound=Callable[..., Any])


class AgentOpsLogger(BaseLogger):
agent_store: [{"agentops_id": str, "autogen_id": str}] = []

def __init__(self, config: Dict[str, Any]):
self.config = config
print('Starting agentops logger')

def start(self) -> str:
pass

def _get_agentops_id_from_agent(self, autogen_id: str) -> str:
for agent in self.agent_store:
if agent["autogen_id"] == autogen_id:
return agent["agentops_id"]

def log_chat_completion(
self,
invocation_id: uuid.UUID,
client_id: int,
wrapper_id: int,
source: Union[str, Agent],
request: Dict[str, Union[float, str, List[Dict[str, str]]]],
response: Union[str, ChatCompletion],
is_cached: int,
cost: float,
start_time: str,
) -> None:
end_time = get_current_ts()

# if response is None or isinstance(response, str):
# response_messages = json.dumps({"response": response})
# else:
# response_messages = json.dumps(to_dict(response), indent=4)

# print('request')
# print(request)
# print('response')
# print(response)
# print('response_messages')
# print(response_messages)

completion = response.choices[len(response.choices)-1]

# completion_str = completion.message.content
# if completion_str is None and completion.message.tool_calls is not None:
# completion_str = f'Using tool "{completion.message.tool_calls[0].function.name}"'

# request['messages'][0]['content']
llm_event = LLMEvent(prompt=request['messages'], completion=completion.message, model=response.model)
llm_event.init_timestamp = start_time
llm_event.end_timestamp = end_time
llm_event.agent_id = self._get_agentops_id_from_agent(str(id(source)))
agentops.record(llm_event)

def log_new_agent(self, agent: ConversableAgent, init_args: Dict[str, Any]) -> None:
ao_agent_id = agentops.create_agent(agent.name, str(uuid4()))
self.agent_store.append({'agentops_id': ao_agent_id, 'autogen_id': str(id(agent))})

def log_event(self, source: Union[str, Agent], name: str, **kwargs: Dict[str, Any]) -> None:
event = ActionEvent(action_type=name)
agentops_id = self._get_agentops_id_from_agent(str(id(source)))
event.agent_id = agentops_id
agentops.record(event)

def log_function_use(
self, source: Union[str, Agent], function: F, args: Dict[str, Any], returns: any
):
event = ToolEvent()
agentops_id = self._get_agentops_id_from_agent(str(id(source)))
event.agent_id = agentops_id
event.function = function
event.params = args
event.returns = returns
event.name = getattr(function, '_name')
agentops.record(event)

def log_new_wrapper(self, wrapper: OpenAIWrapper, init_args: Dict[str, Union[LLMConfig, List[LLMConfig]]]) -> None:
pass

def log_new_client(
self, client: Union[AzureOpenAI, OpenAI], wrapper: OpenAIWrapper, init_args: Dict[str, Any]
) -> None:
pass

def stop(self) -> None:
if self.con:
self.con.close()

def get_connection(self) -> Union[None, sqlite3.Connection]:
if self.con:
return self.con
return None
23 changes: 20 additions & 3 deletions autogen/logger/base_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import sqlite3
import uuid
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Dict, List, Union
from typing import TYPE_CHECKING, Any, Dict, List, Union, TypeVar, Callable

from openai import AzureOpenAI, OpenAI
from openai.types.chat import ChatCompletion

if TYPE_CHECKING:
from autogen import Agent, ConversableAgent, OpenAIWrapper

F = TypeVar("F", bound=Callable[..., Any])
ConfigItem = Dict[str, Union[str, List[str]]]
LLMConfig = Dict[str, Union[None, float, int, ConfigItem, List[ConfigItem]]]

Expand All @@ -32,6 +33,7 @@ def log_chat_completion(
invocation_id: uuid.UUID,
client_id: int,
wrapper_id: int,
source: Union[str, Agent],
request: Dict[str, Union[float, str, List[Dict[str, str]]]],
response: Union[str, ChatCompletion],
is_cached: int,
Expand All @@ -49,9 +51,10 @@ def log_chat_completion(
invocation_id (uuid): A unique identifier for the invocation to the OpenAIWrapper.create method call
client_id (int): A unique identifier for the underlying OpenAI client instance
wrapper_id (int): A unique identifier for the OpenAIWrapper instance
request (dict): A dictionary representing the the request or call to the OpenAI client endpoint
source (str or Agent): The source/creator of the event as a string name or an Agent instance
request (dict): A dictionary representing the request or call to the OpenAI client endpoint
response (str or ChatCompletion): The response from OpenAI
is_chached (int): 1 if the response was a cache hit, 0 otherwise
is_cached (int): 1 if the response was a cache hit, 0 otherwise
cost(float): The cost for OpenAI response
start_time (str): A string representing the moment the request was initiated
"""
Expand Down Expand Up @@ -104,6 +107,20 @@ def log_new_client(
"""
...

@abstractmethod
def log_function_use(
self, source: Union[str, Agent], function: F, args: Dict[str, Any], returns: any
):
"""
Log the use of a registered function (could be a tool)
Args:
source (str or Agent): The source/creator of the event as a string name or an Agent instance
function (F): The function information
args (dict): The function args to log
returns (any): The return
"""

@abstractmethod
def stop(self) -> None:
"""
Expand Down
3 changes: 3 additions & 0 deletions autogen/logger/logger_factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Dict, Optional

from autogen.logger.agentops_logger import AgentOpsLogger
from autogen.logger.base_logger import BaseLogger
from autogen.logger.sqlite_logger import SqliteLogger

Expand All @@ -14,5 +15,7 @@ def get_logger(logger_type: str = "sqlite", config: Optional[Dict[str, Any]] = N

if logger_type == "sqlite":
return SqliteLogger(config)
if logger_type == "agentops":
return AgentOpsLogger(config)
else:
raise ValueError(f"[logger_factory] Unknown logger type: {logger_type}")
1 change: 1 addition & 0 deletions autogen/logger/sqlite_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def log_chat_completion(
invocation_id: uuid.UUID,
client_id: int,
wrapper_id: int,
source: Union[str, Agent],
request: Dict[str, Union[float, str, List[Dict[str, str]]]],
response: Union[str, ChatCompletion],
is_cached: int,
Expand Down
5 changes: 5 additions & 0 deletions autogen/oai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ def create(self, params: Dict[str, Any]) -> ChatCompletion:
# If streaming is not enabled, send a regular chat completion request
params = params.copy()
params["stream"] = False
del params['source']
response = completions.create(**params)

return response
Expand Down Expand Up @@ -533,6 +534,7 @@ def create(self, **config: Any) -> ModelClient.ModelClientResponseProtocol:
Note that the cache argument overrides the legacy cache_seed argument: if this argument is provided,
then the cache_seed argument is ignored. If this argument is not provided or None,
then the cache_seed argument is used.
- agent (AbstractAgent | None): The object responsible for creating a completion if an agent.
- (Legacy) cache_seed (int | None) for using the DiskCache. Default to 41.
An integer cache_seed is useful when implementing "controlled randomness" for the completion.
None for no caching.
Expand Down Expand Up @@ -617,6 +619,7 @@ def yes_or_no_filter(context, response):
invocation_id=invocation_id,
client_id=id(client),
wrapper_id=id(self),
source=full_config.get("source"),
request=params,
response=response,
is_cached=1,
Expand Down Expand Up @@ -649,6 +652,7 @@ def yes_or_no_filter(context, response):
invocation_id=invocation_id,
client_id=id(client),
wrapper_id=id(self),
source=full_config.get("source"),
request=params,
response=f"error_code:{error_code}, config {i} failed",
is_cached=0,
Expand Down Expand Up @@ -679,6 +683,7 @@ def yes_or_no_filter(context, response):
invocation_id=invocation_id,
client_id=id(client),
wrapper_id=id(self),
source=full_config.get("source"),
request=params,
response=response,
is_cached=0,
Expand Down
17 changes: 15 additions & 2 deletions autogen/runtime_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import sqlite3
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, TypeVar, Callable

from openai import AzureOpenAI, OpenAI
from openai.types.chat import ChatCompletion
Expand All @@ -19,6 +19,8 @@
autogen_logger = None
is_logging = False

F = TypeVar("F", bound=Callable[..., Any])


def start(logger_type: str = "sqlite", config: Optional[Dict[str, Any]] = None) -> str:
global autogen_logger
Expand All @@ -39,6 +41,7 @@ def log_chat_completion(
invocation_id: uuid.UUID,
client_id: int,
wrapper_id: int,
source: Union[str, Agent],
request: Dict[str, Union[float, str, List[Dict[str, str]]]],
response: Union[str, ChatCompletion],
is_cached: int,
Expand All @@ -50,7 +53,7 @@ def log_chat_completion(
return

autogen_logger.log_chat_completion(
invocation_id, client_id, wrapper_id, request, response, is_cached, cost, start_time
invocation_id, client_id, wrapper_id, source, request, response, is_cached, cost, start_time
)


Expand All @@ -70,6 +73,16 @@ def log_event(source: Union[str, Agent], name: str, **kwargs: Dict[str, Any]) ->
autogen_logger.log_event(source, name, **kwargs)


def log_function_use(
source: Union[str, Agent], function: F, args: Dict[str, Any], returns: any
):
if autogen_logger is None:
logger.error("[runtime logging] log_function_use: autogen logger is None")
return

autogen_logger.log_function_use(source, function, args, returns)


def log_new_wrapper(wrapper: OpenAIWrapper, init_args: Dict[str, Union[LLMConfig, List[LLMConfig]]]) -> None:
if autogen_logger is None:
logger.error("[runtime logging] log_new_wrapper: autogen logger is None")
Expand Down
Loading

0 comments on commit 3db70ce

Please sign in to comment.