Skip to content

Commit

Permalink
AgentOps Runtime Logging Implementation (#2682)
Browse files Browse the repository at this point in the history
* add agentops req

* track conversable agents with agentops

* track tool usage

* track message sending

* remove record from parent

* remove record

* simple example

* notebook example

* remove spacing change

* optional dependency

* documentation

* remove extra import

* optional import

* record if agentops

* if agentops

* wrap function auto name

* install agentops before notebook test

* documentation fixes

* notebook metadata

* notebook metadata

* pre-commit hook changes

* doc link fixes

* git lfs

* autogen tag

* bump agentops version

* log tool events

* notebook fixes

* docs

* formatting

* Updated ecosystem manual

* Update notebook for clarity

* cleaned up notebook

* updated precommit recommendations

* Fixed links to screenshots and examples

* removed unused files

* changed notebook hyperlink

* update docusaurus link path

* reverted setup.py

* change setup again

* undo changes

* revert conversable agent

* removed file not in branch

* Updated notebook to look nicer

* change letter

* revert setup

* revert setup again

* change ref link

* change reflink

* remove optional dependency

* removed duplicated section

* Addressed clarity commetns from howard

* minor updates to wording

* formatting and pr fixes

* added info markdown cell

* better docs

* notebook

* observability docs

* pre-commit fixes

* example images in notebook

* example images in docs

* example images in docs

* delete agentops ong

* doc updates

* docs updates

* docs updates

* use agent as extra_kwarg

* add logging tests

* pass function properly

* create table

* dummy function name

* log chat completion source name

* safe serialize

* test fixes

* formatting

* type checks

---------

Co-authored-by: reibs <areibman@gmail.com>
Co-authored-by: Chi Wang <wang.chi@microsoft.com>
Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
Co-authored-by: Howard Gil <howardbgil@gmail.com>
Co-authored-by: Alex Reibman <meta.alex.r@gmail.com>
  • Loading branch information
6 people authored and victordibia committed Jul 30, 2024
1 parent f11a884 commit 83a7162
Show file tree
Hide file tree
Showing 16 changed files with 1,145 additions and 191 deletions.
9 changes: 4 additions & 5 deletions autogen/agentchat/conversable_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ..function_utils import get_function_schema, load_basemodels_if_needed, serialize_to_str
from ..io.base import IOStream
from ..oai.client import ModelClient, OpenAIWrapper
from ..runtime_logging import log_event, log_new_agent, logging_enabled
from ..runtime_logging import log_event, log_function_use, log_new_agent, logging_enabled
from .agent import Agent, LLMAgent
from .chat import ChatResult, a_initiate_chats, initiate_chats
from .utils import consolidate_chat_info, gather_usage_summary
Expand Down Expand Up @@ -1357,9 +1357,7 @@ def _generate_oai_reply_from_client(self, llm_client, messages, cache) -> Union[

# TODO: #1143 handle token limit exceeded error
response = llm_client.create(
context=messages[-1].pop("context", None),
messages=all_messages,
cache=cache,
context=messages[-1].pop("context", None), messages=all_messages, cache=cache, agent=self
)
extracted_response = llm_client.extract_text_or_completion_object(response)[0]

Expand Down Expand Up @@ -2528,13 +2526,14 @@ def _wrap_function(self, func: F) -> F:
@functools.wraps(func)
def _wrapped_func(*args, **kwargs):
retval = func(*args, **kwargs)

log_function_use(self, func, kwargs, retval)
return serialize_to_str(retval)

@load_basemodels_if_needed
@functools.wraps(func)
async def _a_wrapped_func(*args, **kwargs):
retval = await func(*args, **kwargs)
log_function_use(self, func, kwargs, retval)
return serialize_to_str(retval)

wrapped_func = _a_wrapped_func if inspect.iscoroutinefunction(func) else _wrapped_func
Expand Down
21 changes: 18 additions & 3 deletions autogen/logger/base_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import sqlite3
import uuid
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Dict, List, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypeVar, Union

from openai import AzureOpenAI, OpenAI
from openai.types.chat import ChatCompletion

if TYPE_CHECKING:
from autogen import Agent, ConversableAgent, OpenAIWrapper

F = TypeVar("F", bound=Callable[..., Any])
ConfigItem = Dict[str, Union[str, List[str]]]
LLMConfig = Dict[str, Union[None, float, int, ConfigItem, List[ConfigItem]]]

Expand All @@ -32,6 +33,7 @@ def log_chat_completion(
invocation_id: uuid.UUID,
client_id: int,
wrapper_id: int,
source: Union[str, Agent],
request: Dict[str, Union[float, str, List[Dict[str, str]]]],
response: Union[str, ChatCompletion],
is_cached: int,
Expand All @@ -49,9 +51,10 @@ def log_chat_completion(
invocation_id (uuid): A unique identifier for the invocation to the OpenAIWrapper.create method call
client_id (int): A unique identifier for the underlying OpenAI client instance
wrapper_id (int): A unique identifier for the OpenAIWrapper instance
request (dict): A dictionary representing the the request or call to the OpenAI client endpoint
source (str or Agent): The source/creator of the event as a string name or an Agent instance
request (dict): A dictionary representing the request or call to the OpenAI client endpoint
response (str or ChatCompletion): The response from OpenAI
is_chached (int): 1 if the response was a cache hit, 0 otherwise
is_cached (int): 1 if the response was a cache hit, 0 otherwise
cost(float): The cost for OpenAI response
start_time (str): A string representing the moment the request was initiated
"""
Expand Down Expand Up @@ -104,6 +107,18 @@ def log_new_client(
"""
...

@abstractmethod
def log_function_use(self, source: Union[str, Agent], function: F, args: Dict[str, Any], returns: Any) -> None:
"""
Log the use of a registered function (could be a tool)
Args:
source (str or Agent): The source/creator of the event as a string name or an Agent instance
function (F): The function information
args (dict): The function args to log
returns (any): The return
"""

@abstractmethod
def stop(self) -> None:
"""
Expand Down
44 changes: 43 additions & 1 deletion autogen/logger/file_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
import threading
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union

from openai import AzureOpenAI, OpenAI
from openai.types.chat import ChatCompletion
Expand All @@ -21,9 +21,21 @@

logger = logging.getLogger(__name__)

F = TypeVar("F", bound=Callable[..., Any])

__all__ = ("FileLogger",)


def safe_serialize(obj: Any) -> str:
def default(o: Any) -> str:
if hasattr(o, "to_json"):
return str(o.to_json())
else:
return f"<<non-serializable: {type(o).__qualname__}>>"

return json.dumps(obj, default=default)


class FileLogger(BaseLogger):
def __init__(self, config: Dict[str, Any]):
self.config = config
Expand Down Expand Up @@ -59,6 +71,7 @@ def log_chat_completion(
invocation_id: uuid.UUID,
client_id: int,
wrapper_id: int,
source: Union[str, Agent],
request: Dict[str, Union[float, str, List[Dict[str, str]]]],
response: Union[str, ChatCompletion],
is_cached: int,
Expand All @@ -69,6 +82,11 @@ def log_chat_completion(
Log a chat completion.
"""
thread_id = threading.get_ident()
source_name = None
if isinstance(source, str):
source_name = source
else:
source_name = source.name
try:
log_data = json.dumps(
{
Expand All @@ -82,6 +100,7 @@ def log_chat_completion(
"start_time": start_time,
"end_time": get_current_ts(),
"thread_id": thread_id,
"source_name": source_name,
}
)

Expand Down Expand Up @@ -204,6 +223,29 @@ def log_new_client(
except Exception as e:
self.logger.error(f"[file_logger] Failed to log event {e}")

def log_function_use(self, source: Union[str, Agent], function: F, args: Dict[str, Any], returns: Any) -> None:
"""
Log a registered function(can be a tool) use from an agent or a string source.
"""
thread_id = threading.get_ident()

try:
log_data = json.dumps(
{
"source_id": id(source),
"source_name": str(source.name) if hasattr(source, "name") else source,
"agent_module": source.__module__,
"agent_class": source.__class__.__name__,
"timestamp": get_current_ts(),
"thread_id": thread_id,
"input_args": safe_serialize(args),
"returns": safe_serialize(returns),
}
)
self.logger.info(log_data)
except Exception as e:
self.logger.error(f"[file_logger] Failed to log event {e}")

def get_connection(self) -> None:
"""Method is intentionally left blank because there is no specific connection needed for the FileLogger."""
pass
Expand Down
57 changes: 54 additions & 3 deletions autogen/logger/sqlite_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sqlite3
import threading
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple, TypeVar, Union

from openai import AzureOpenAI, OpenAI
from openai.types.chat import ChatCompletion
Expand All @@ -25,6 +25,18 @@

__all__ = ("SqliteLogger",)

F = TypeVar("F", bound=Callable[..., Any])


def safe_serialize(obj: Any) -> str:
def default(o: Any) -> str:
if hasattr(o, "to_json"):
return str(o.to_json())
else:
return f"<<non-serializable: {type(o).__qualname__}>>"

return json.dumps(obj, default=default)


class SqliteLogger(BaseLogger):
schema_version = 1
Expand All @@ -49,6 +61,7 @@ def start(self) -> str:
client_id INTEGER,
wrapper_id INTEGER,
session_id TEXT,
source_name TEXT,
request TEXT,
response TEXT,
is_cached INEGER,
Expand Down Expand Up @@ -118,6 +131,18 @@ class TEXT, -- type or class name of cli
"""
self._run_query(query=query)

query = """
CREATE TABLE IF NOT EXISTS function_calls (
source_id INTEGER,
source_name TEXT,
function_name TEXT,
args TEXT DEFAULT NULL,
returns TEXT DEFAULT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
);
"""
self._run_query(query=query)

current_verion = self._get_current_db_version()
if current_verion is None:
self._run_query(
Expand Down Expand Up @@ -192,6 +217,7 @@ def log_chat_completion(
invocation_id: uuid.UUID,
client_id: int,
wrapper_id: int,
source: Union[str, Agent],
request: Dict[str, Union[float, str, List[Dict[str, str]]]],
response: Union[str, ChatCompletion],
is_cached: int,
Expand All @@ -208,10 +234,16 @@ def log_chat_completion(
else:
response_messages = json.dumps(to_dict(response), indent=4)

source_name = None
if isinstance(source, str):
source_name = source
else:
source_name = source.name

query = """
INSERT INTO chat_completions (
invocation_id, client_id, wrapper_id, session_id, request, response, is_cached, cost, start_time, end_time
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
invocation_id, client_id, wrapper_id, session_id, request, response, is_cached, cost, start_time, end_time, source_name
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
args = (
invocation_id,
Expand All @@ -224,6 +256,7 @@ def log_chat_completion(
cost,
start_time,
end_time,
source_name,
)

self._run_query(query=query, args=args)
Expand Down Expand Up @@ -335,6 +368,24 @@ def log_new_wrapper(self, wrapper: OpenAIWrapper, init_args: Dict[str, Union[LLM
)
self._run_query(query=query, args=args)

def log_function_use(self, source: Union[str, Agent], function: F, args: Dict[str, Any], returns: Any) -> None:

if self.con is None:
return

query = """
INSERT INTO function_calls (source_id, source_name, function_name, args, returns, timestamp) VALUES (?, ?, ?, ?, ?, ?)
"""
query_args: Tuple[Any, ...] = (
id(source),
source.name if hasattr(source, "name") else source,
function.__name__,
safe_serialize(args),
safe_serialize(returns),
get_current_ts(),
)
self._run_query(query=query, args=query_args)

def log_new_client(
self, client: Union[AzureOpenAI, OpenAI, GeminiClient], wrapper: OpenAIWrapper, init_args: Dict[str, Any]
) -> None:
Expand Down
6 changes: 6 additions & 0 deletions autogen/oai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class OpenAIWrapper:
"""A wrapper class for openai client."""

extra_kwargs = {
"agent",
"cache",
"cache_seed",
"filter_func",
Expand Down Expand Up @@ -542,6 +543,7 @@ def create(self, **config: Any) -> ModelClient.ModelClientResponseProtocol:
Note that the cache argument overrides the legacy cache_seed argument: if this argument is provided,
then the cache_seed argument is ignored. If this argument is not provided or None,
then the cache_seed argument is used.
- agent (AbstractAgent | None): The object responsible for creating a completion if an agent.
- (Legacy) cache_seed (int | None) for using the DiskCache. Default to 41.
An integer cache_seed is useful when implementing "controlled randomness" for the completion.
None for no caching.
Expand Down Expand Up @@ -589,6 +591,7 @@ def yes_or_no_filter(context, response):
cache = extra_kwargs.get("cache")
filter_func = extra_kwargs.get("filter_func")
context = extra_kwargs.get("context")
agent = extra_kwargs.get("agent")

total_usage = None
actual_usage = None
Expand Down Expand Up @@ -626,6 +629,7 @@ def yes_or_no_filter(context, response):
invocation_id=invocation_id,
client_id=id(client),
wrapper_id=id(self),
agent=agent,
request=params,
response=response,
is_cached=1,
Expand Down Expand Up @@ -658,6 +662,7 @@ def yes_or_no_filter(context, response):
invocation_id=invocation_id,
client_id=id(client),
wrapper_id=id(self),
agent=agent,
request=params,
response=f"error_code:{error_code}, config {i} failed",
is_cached=0,
Expand Down Expand Up @@ -688,6 +693,7 @@ def yes_or_no_filter(context, response):
invocation_id=invocation_id,
client_id=id(client),
wrapper_id=id(self),
agent=agent,
request=params,
response=response,
is_cached=0,
Expand Down
15 changes: 13 additions & 2 deletions autogen/runtime_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import sqlite3
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, TypeVar, Union

from openai import AzureOpenAI, OpenAI
from openai.types.chat import ChatCompletion
Expand All @@ -20,6 +20,8 @@
autogen_logger = None
is_logging = False

F = TypeVar("F", bound=Callable[..., Any])


def start(
logger: Optional[BaseLogger] = None,
Expand Down Expand Up @@ -56,6 +58,7 @@ def log_chat_completion(
invocation_id: uuid.UUID,
client_id: int,
wrapper_id: int,
agent: Union[str, Agent],
request: Dict[str, Union[float, str, List[Dict[str, str]]]],
response: Union[str, ChatCompletion],
is_cached: int,
Expand All @@ -67,7 +70,7 @@ def log_chat_completion(
return

autogen_logger.log_chat_completion(
invocation_id, client_id, wrapper_id, request, response, is_cached, cost, start_time
invocation_id, client_id, wrapper_id, agent, request, response, is_cached, cost, start_time
)


Expand All @@ -87,6 +90,14 @@ def log_event(source: Union[str, Agent], name: str, **kwargs: Dict[str, Any]) ->
autogen_logger.log_event(source, name, **kwargs)


def log_function_use(agent: Union[str, Agent], function: F, args: Dict[str, Any], returns: any):
if autogen_logger is None:
logger.error("[runtime logging] log_function_use: autogen logger is None")
return

autogen_logger.log_function_use(agent, function, args, returns)


def log_new_wrapper(wrapper: OpenAIWrapper, init_args: Dict[str, Union[LLMConfig, List[LLMConfig]]]) -> None:
if autogen_logger is None:
logger.error("[runtime logging] log_new_wrapper: autogen logger is None")
Expand Down
Loading

0 comments on commit 83a7162

Please sign in to comment.