From 82a9b1930b30f1d89cb60192d983943d35b17fcd Mon Sep 17 00:00:00 2001 From: Braelyn Boynton Date: Wed, 21 Feb 2024 00:57:44 -0800 Subject: [PATCH 1/9] optionally bypass create new session on client init --- agentops/client.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/agentops/client.py b/agentops/client.py index 98b90657..32c4ea12 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -47,7 +47,9 @@ def __init__(self, api_key: Optional[str] = None, endpoint: Optional[str] = 'https://agentops-server-v2.fly.dev', max_wait_time: Optional[int] = 1000, max_queue_size: Optional[int] = 100, - override=True): + override=True, + bypass_new_session=False + ): # Get API key from env if api_key is None: @@ -74,7 +76,8 @@ def __init__(self, api_key: Optional[str] = None, # Override sys.excepthook sys.excepthook = self.handle_exception - self._start_session(tags) + if not bypass_new_session: + self._start_session(tags) if override: if 'openai' in sys.modules: From 6e73b52da5e88d5b1ae56ca5602f34051d7f9f22 Mon Sep 17 00:00:00 2001 From: Braelyn Boynton Date: Wed, 21 Feb 2024 13:56:54 -0800 Subject: [PATCH 2/9] set and add tags then update the session --- agentops/client.py | 15 +++++++++++++++ agentops/worker.py | 12 ++++++++++++ 2 files changed, 27 insertions(+) diff --git a/agentops/client.py b/agentops/client.py index 32c4ea12..2b30c72d 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -158,6 +158,21 @@ def sync_wrapper(*args, **kwargs): return decorator + def add_tags(self, tags: List[str]): + if self.session.tags is not None: + self.session.tags.extend(tags) + else: + self.session.tags = tags + + self.worker.update_session(self.session) + + def set_tags(self, tags: List[str]): + if self.session is None: + raise Exception("You must create a session before assigning tags") + + self.session.tags = tags + self.worker.update_session(self.session) + def _record_event_sync(self, func, event_name, tags, *args, **kwargs): init_time = get_ISO_time() func_args = inspect.signature(func).parameters diff --git a/agentops/worker.py b/agentops/worker.py index 430807af..e4a1bd58 100644 --- a/agentops/worker.py +++ b/agentops/worker.py @@ -99,6 +99,18 @@ def end_session(self, session: Session) -> None: self.config.api_key, self.config.org_key) + def update_session(self, session: Session) -> None: + with self.lock: + payload = { + "session": session.__dict__ + } + + HttpClient.post(f'{self.config.endpoint}/sessions', + json.dumps(filter_unjsonable( + payload)).encode("utf-8"), + self.config.api_key, + self.config.org_key) + def run(self) -> None: while not self.stop_flag.is_set(): time.sleep(self.config.max_wait_time / 1000) From ae22e072c9245ddc48aa16e0df5b1f5bc7a1706d Mon Sep 17 00:00:00 2001 From: Braelyn Boynton Date: Wed, 21 Feb 2024 14:38:42 -0800 Subject: [PATCH 3/9] set and add tags then update the session --- agentops/client.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/agentops/client.py b/agentops/client.py index 2b30c72d..fff9ab9d 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -77,7 +77,10 @@ def __init__(self, api_key: Optional[str] = None, sys.excepthook = self.handle_exception if not bypass_new_session: - self._start_session(tags) + self.start_session(tags) + else: + self.session = None + self.worker = None if override: if 'openai' in sys.modules: @@ -159,6 +162,9 @@ def sync_wrapper(*args, **kwargs): return decorator def add_tags(self, tags: List[str]): + if self.session is None: + return print("You must create a session before assigning tags") + if self.session.tags is not None: self.session.tags.extend(tags) else: @@ -168,7 +174,7 @@ def add_tags(self, tags: List[str]): def set_tags(self, tags: List[str]): if self.session is None: - raise Exception("You must create a session before assigning tags") + return print("You must create a session before assigning tags") self.session.tags = tags self.worker.update_session(self.session) @@ -260,7 +266,7 @@ async def _record_event_async(self, func, event_name, tags, *args, **kwargs): return returns - def _start_session(self, tags: Optional[List[str]] = None): + def start_session(self, tags: Optional[List[str]] = None): """ Start a new session for recording events. From 9c9f75f6ee2b0c43df1f86cee385125c87ee703c Mon Sep 17 00:00:00 2001 From: Braelyn Boynton Date: Wed, 21 Feb 2024 14:54:52 -0800 Subject: [PATCH 4/9] allow starting new sessions --- agentops/client.py | 6 +++++- agentops/session.py | 10 +++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/agentops/client.py b/agentops/client.py index fff9ab9d..01e41620 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -274,6 +274,9 @@ def start_session(self, tags: Optional[List[str]] = None): tags (List[str], optional): Tags that can be used for grouping or sorting later. e.g. ["test_run"]. """ + if self.session is not None: + return print("Session already started. End this session before starting a new one.") + self.session = Session(str(uuid4()), tags) self.worker = Worker(self.config) self.worker.start_session(self.session) @@ -294,9 +297,10 @@ def end_session(self, end_state: str = Field("Indeterminate", video (str, optional): The video screen recording of the session """ if not self.session.has_ended: + self.session.video = video self.session.end_session(end_state, rating, end_state_reason) self.worker.end_session(self.session) - self.session.video = video + self.session = None else: logging.info("Warning: The session has already been ended.") diff --git a/agentops/session.py b/agentops/session.py index d437ac88..0c79b681 100644 --- a/agentops/session.py +++ b/agentops/session.py @@ -1,13 +1,7 @@ from .helpers import get_ISO_time -from pydantic import BaseModel, Field from typing import Optional, List -class SessionState(BaseModel): - end_state: str = Field(..., pattern="^(Success|Fail|Indeterminate)$") - end_state_reason: Optional[str] = None - - class Session: """ Represents a session of events, with a start and end state. @@ -26,6 +20,9 @@ class Session: """ def __init__(self, session_id: str, tags: Optional[List[str]] = None): + self.end_timestamp = None + self.rating = None + self.end_state = None self.session_id = session_id self.init_timestamp = get_ISO_time() self.tags = tags @@ -50,7 +47,6 @@ def end_session(self, end_state: str = "Indeterminate", rating: Optional[str] = rating (str, optional): The rating for the session. end_state_reason (str, optional): The reason for ending the session. Provides context for why the session ended. """ - SessionState(end_state=end_state, end_state_reason=end_state_reason) self.end_state = end_state self.rating = rating self.end_state_reason = end_state_reason From 97fc25e8c90b90bbfbb595cb26fb5bc4e065fd13 Mon Sep 17 00:00:00 2001 From: Braelyn Boynton Date: Wed, 21 Feb 2024 15:55:50 -0800 Subject: [PATCH 5/9] session and client are private, tags are in sync --- agentops/client.py | 53 ++++++++++++++------------ agentops/langchain_callback_handler.py | 4 +- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/agentops/client.py b/agentops/client.py index 01e41620..9f189cc3 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -38,7 +38,7 @@ class Client: max_queue_size (int, optional): The maximum size of the event queue. Defaults to 100. override (bool): Whether to override and LLM calls to emit as events. Attributes: - session (Session, optional): A Session is a grouping of events (e.g. a run of your agent). + _session (Session, optional): A Session is a grouping of events (e.g. a run of your agent). """ def __init__(self, api_key: Optional[str] = None, @@ -79,8 +79,9 @@ def __init__(self, api_key: Optional[str] = None, if not bypass_new_session: self.start_session(tags) else: - self.session = None - self.worker = None + self._session = None + self._worker = None + self._tags = tags if override: if 'openai' in sys.modules: @@ -128,9 +129,9 @@ def record(self, event: Event): event (Event): The event to record. """ - if not self.session.has_ended: - self.worker.add_event( - {'session_id': self.session.session_id, **event.__dict__}) + if not self._session is None and not self._session.has_ended: + self._worker.add_event( + {'session_id': self._session.session_id, **event.__dict__}) else: logging.info("This event was not recorded because the previous session has been ended" + " Start a new session to record again.") @@ -162,22 +163,24 @@ def sync_wrapper(*args, **kwargs): return decorator def add_tags(self, tags: List[str]): - if self.session is None: + if self._session is None: return print("You must create a session before assigning tags") - if self.session.tags is not None: - self.session.tags.extend(tags) + if self._tags is not None: + self._tags.extend(tags) else: - self.session.tags = tags + self._tags = tags - self.worker.update_session(self.session) + self._session.tags = self._tags + self._worker.update_session(self._session) def set_tags(self, tags: List[str]): - if self.session is None: + if self._session is None: return print("You must create a session before assigning tags") - self.session.tags = tags - self.worker.update_session(self.session) + self._tags = tags + self._session.tags = tags + self._worker.update_session(self._session) def _record_event_sync(self, func, event_name, tags, *args, **kwargs): init_time = get_ISO_time() @@ -274,12 +277,14 @@ def start_session(self, tags: Optional[List[str]] = None): tags (List[str], optional): Tags that can be used for grouping or sorting later. e.g. ["test_run"]. """ - if self.session is not None: + if self._session is not None: return print("Session already started. End this session before starting a new one.") - self.session = Session(str(uuid4()), tags) - self.worker = Worker(self.config) - self.worker.start_session(self.session) + self._session = Session(str(uuid4()), tags) + if self._tags: + self.set_tags(self._tags) + self._worker = Worker(self.config) + self._worker.start_session(self._session) def end_session(self, end_state: str = Field("Indeterminate", description="End state of the session", @@ -296,16 +301,16 @@ def end_session(self, end_state: str = Field("Indeterminate", end_state_reason (str, optional): The reason for ending the session. video (str, optional): The video screen recording of the session """ - if not self.session.has_ended: - self.session.video = video - self.session.end_session(end_state, rating, end_state_reason) - self.worker.end_session(self.session) - self.session = None + if not self._session.has_ended: + self._session.video = video + self._session.end_session(end_state, rating, end_state_reason) + self._worker.end_session(self._session) + self._session = None else: logging.info("Warning: The session has already been ended.") def cleanup(self, end_state_reason: Optional[str] = None): # Only run cleanup function if session is created - if hasattr(self, "session") and not self.session.has_ended: + if hasattr(self, "session") and not self._session.has_ended: self.end_session(end_state='Fail', end_state_reason=end_state_reason) diff --git a/agentops/langchain_callback_handler.py b/agentops/langchain_callback_handler.py index b1c53f47..d9cda192 100644 --- a/agentops/langchain_callback_handler.py +++ b/agentops/langchain_callback_handler.py @@ -305,7 +305,7 @@ def on_retry( @property def session_id(self): - return self.ao_client.session.session_id + return self.ao_client._session.session_id class AsyncLangchainCallbackHandler(AsyncCallbackHandler): @@ -596,4 +596,4 @@ async def on_retry( @property def session_id(self): - return self.ao_client.session.session_id + return self.ao_client._session.session_id From cb599ea4bc853d1e89b9ec01a76a44502d6a19e2 Mon Sep 17 00:00:00 2001 From: Braelyn Boynton Date: Wed, 21 Feb 2024 16:11:06 -0800 Subject: [PATCH 6/9] tag fix --- agentops/client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/agentops/client.py b/agentops/client.py index 9f189cc3..aec8075d 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -280,9 +280,7 @@ def start_session(self, tags: Optional[List[str]] = None): if self._session is not None: return print("Session already started. End this session before starting a new one.") - self._session = Session(str(uuid4()), tags) - if self._tags: - self.set_tags(self._tags) + self._session = Session(str(uuid4()), tags or self._tags) self._worker = Worker(self.config) self._worker.start_session(self._session) @@ -301,11 +299,13 @@ def end_session(self, end_state: str = Field("Indeterminate", end_state_reason (str, optional): The reason for ending the session. video (str, optional): The video screen recording of the session """ + if self._session is None: + return print("Session has already been ended.") if not self._session.has_ended: self._session.video = video self._session.end_session(end_state, rating, end_state_reason) self._worker.end_session(self._session) - self._session = None + # self._session = None else: logging.info("Warning: The session has already been ended.") From 18df31fd6c04f0c70e08d3ae984df367430da16f Mon Sep 17 00:00:00 2001 From: Braelyn Boynton Date: Wed, 21 Feb 2024 16:23:46 -0800 Subject: [PATCH 7/9] fix no bypass --- agentops/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agentops/client.py b/agentops/client.py index aec8075d..dad7e156 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -76,10 +76,10 @@ def __init__(self, api_key: Optional[str] = None, # Override sys.excepthook sys.excepthook = self.handle_exception + self._session = None if not bypass_new_session: self.start_session(tags) else: - self._session = None self._worker = None self._tags = tags From 2d8b9a32eb7d5812d4db77e97abb74a7c21c119e Mon Sep 17 00:00:00 2001 From: Braelyn Boynton Date: Wed, 21 Feb 2024 16:41:05 -0800 Subject: [PATCH 8/9] no has_ended --- agentops/client.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/agentops/client.py b/agentops/client.py index dad7e156..1cdfd591 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -129,7 +129,7 @@ def record(self, event: Event): event (Event): The event to record. """ - if not self._session is None and not self._session.has_ended: + if not self._session is None: self._worker.add_event( {'session_id': self._session.session_id, **event.__dict__}) else: @@ -301,16 +301,14 @@ def end_session(self, end_state: str = Field("Indeterminate", """ if self._session is None: return print("Session has already been ended.") - if not self._session.has_ended: - self._session.video = video - self._session.end_session(end_state, rating, end_state_reason) - self._worker.end_session(self._session) - # self._session = None - else: - logging.info("Warning: The session has already been ended.") + + self._session.video = video + self._session.end_session(end_state, rating, end_state_reason) + self._worker.end_session(self._session) + # self._session = None def cleanup(self, end_state_reason: Optional[str] = None): # Only run cleanup function if session is created - if hasattr(self, "session") and not self._session.has_ended: + if hasattr(self, "session"): self.end_session(end_state='Fail', end_state_reason=end_state_reason) From 774cdf6cfd15380799cbc024ef8ed896ebff13f9 Mon Sep 17 00:00:00 2001 From: Braelyn Boynton Date: Wed, 21 Feb 2024 17:23:33 -0800 Subject: [PATCH 9/9] cannot create new sessions after end --- agentops/client.py | 44 ++++++++++++++++++++++---------------------- agentops/session.py | 2 +- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/agentops/client.py b/agentops/client.py index 1cdfd591..5fd5fd54 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -121,6 +121,26 @@ def signal_handler(self, signum, frame): end_state_reason=f'Signal {signal_name} detected') sys.exit(0) + def add_tags(self, tags: List[str]): + if self._session is None: + return print("You must create a session before assigning tags") + + if self._tags is not None: + self._tags.extend(tags) + else: + self._tags = tags + + self._session.tags = self._tags + self._worker.update_session(self._session) + + def set_tags(self, tags: List[str]): + if self._session is None: + return print("You must create a session before assigning tags") + + self._tags = tags + self._session.tags = tags + self._worker.update_session(self._session) + def record(self, event: Event): """ Record an event with the AgentOps service. @@ -129,7 +149,7 @@ def record(self, event: Event): event (Event): The event to record. """ - if not self._session is None: + if not self._session is None and not self._session.has_ended: self._worker.add_event( {'session_id': self._session.session_id, **event.__dict__}) else: @@ -162,26 +182,6 @@ def sync_wrapper(*args, **kwargs): return decorator - def add_tags(self, tags: List[str]): - if self._session is None: - return print("You must create a session before assigning tags") - - if self._tags is not None: - self._tags.extend(tags) - else: - self._tags = tags - - self._session.tags = self._tags - self._worker.update_session(self._session) - - def set_tags(self, tags: List[str]): - if self._session is None: - return print("You must create a session before assigning tags") - - self._tags = tags - self._session.tags = tags - self._worker.update_session(self._session) - def _record_event_sync(self, func, event_name, tags, *args, **kwargs): init_time = get_ISO_time() func_args = inspect.signature(func).parameters @@ -299,7 +299,7 @@ def end_session(self, end_state: str = Field("Indeterminate", end_state_reason (str, optional): The reason for ending the session. video (str, optional): The video screen recording of the session """ - if self._session is None: + if self._session is None or self._session.has_ended: return print("Session has already been ended.") self._session.video = video diff --git a/agentops/session.py b/agentops/session.py index 0c79b681..c23958ca 100644 --- a/agentops/session.py +++ b/agentops/session.py @@ -60,4 +60,4 @@ def has_ended(self) -> bool: Returns: bool: Whether the session has been ended """ - return hasattr(self, "end_state") + return self.end_state is not None