Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optionally bypass create new session on client init #86

Merged
merged 9 commits into from
Feb 22, 2024
Merged
65 changes: 48 additions & 17 deletions agentops/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Client:
max_queue_size (int, optional): The maximum size of the event queue. Defaults to 100.
override (bool): Whether to override and LLM calls to emit as events.
Attributes:
session (Session, optional): A Session is a grouping of events (e.g. a run of your agent).
_session (Session, optional): A Session is a grouping of events (e.g. a run of your agent).
"""

def __init__(self, api_key: Optional[str] = None,
Expand All @@ -47,7 +47,9 @@ def __init__(self, api_key: Optional[str] = None,
endpoint: Optional[str] = 'https://agentops-server-v2.fly.dev',
max_wait_time: Optional[int] = 1000,
max_queue_size: Optional[int] = 100,
override=True):
override=True,
bypass_new_session=False
):

# Get API key from env
if api_key is None:
Expand All @@ -74,7 +76,12 @@ def __init__(self, api_key: Optional[str] = None,
# Override sys.excepthook
sys.excepthook = self.handle_exception

self._start_session(tags)
self._session = None
if not bypass_new_session:
self.start_session(tags)
else:
self._worker = None
self._tags = tags

if override:
if 'openai' in sys.modules:
Expand Down Expand Up @@ -114,6 +121,26 @@ def signal_handler(self, signum, frame):
end_state_reason=f'Signal {signal_name} detected')
sys.exit(0)

def add_tags(self, tags: List[str]):
if self._session is None:
return print("You must create a session before assigning tags")

if self._tags is not None:
self._tags.extend(tags)
else:
self._tags = tags

self._session.tags = self._tags
self._worker.update_session(self._session)

def set_tags(self, tags: List[str]):
if self._session is None:
return print("You must create a session before assigning tags")

self._tags = tags
self._session.tags = tags
self._worker.update_session(self._session)

def record(self, event: Event):
"""
Record an event with the AgentOps service.
Expand All @@ -122,9 +149,9 @@ def record(self, event: Event):
event (Event): The event to record.
"""

if not self.session.has_ended:
self.worker.add_event(
{'session_id': self.session.session_id, **event.__dict__})
if not self._session is None and not self._session.has_ended:
self._worker.add_event(
{'session_id': self._session.session_id, **event.__dict__})
else:
logging.info("This event was not recorded because the previous session has been ended" +
" Start a new session to record again.")
Expand Down Expand Up @@ -242,17 +269,20 @@ async def _record_event_async(self, func, event_name, tags, *args, **kwargs):

return returns

def _start_session(self, tags: Optional[List[str]] = None):
def start_session(self, tags: Optional[List[str]] = None):
"""
Start a new session for recording events.

Args:
tags (List[str], optional): Tags that can be used for grouping or sorting later.
e.g. ["test_run"].
"""
self.session = Session(str(uuid4()), tags)
self.worker = Worker(self.config)
self.worker.start_session(self.session)
if self._session is not None:
return print("Session already started. End this session before starting a new one.")

self._session = Session(str(uuid4()), tags or self._tags)
self._worker = Worker(self.config)
self._worker.start_session(self._session)

def end_session(self, end_state: str = Field("Indeterminate",
description="End state of the session",
Expand All @@ -269,15 +299,16 @@ def end_session(self, end_state: str = Field("Indeterminate",
end_state_reason (str, optional): The reason for ending the session.
video (str, optional): The video screen recording of the session
"""
if not self.session.has_ended:
self.session.end_session(end_state, rating, end_state_reason)
self.worker.end_session(self.session)
self.session.video = video
else:
logging.info("Warning: The session has already been ended.")
if self._session is None or self._session.has_ended:
return print("Session has already been ended.")

self._session.video = video
self._session.end_session(end_state, rating, end_state_reason)
self._worker.end_session(self._session)
# self._session = None

def cleanup(self, end_state_reason: Optional[str] = None):
# Only run cleanup function if session is created
if hasattr(self, "session") and not self.session.has_ended:
if hasattr(self, "session"):
self.end_session(end_state='Fail',
end_state_reason=end_state_reason)
4 changes: 2 additions & 2 deletions agentops/langchain_callback_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def on_retry(

@property
def session_id(self):
return self.ao_client.session.session_id
return self.ao_client._session.session_id


class AsyncLangchainCallbackHandler(AsyncCallbackHandler):
Expand Down Expand Up @@ -596,4 +596,4 @@ async def on_retry(

@property
def session_id(self):
return self.ao_client.session.session_id
return self.ao_client._session.session_id
12 changes: 4 additions & 8 deletions agentops/session.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
from .helpers import get_ISO_time
from pydantic import BaseModel, Field
from typing import Optional, List


class SessionState(BaseModel):
end_state: str = Field(..., pattern="^(Success|Fail|Indeterminate)$")
end_state_reason: Optional[str] = None


class Session:
"""
Represents a session of events, with a start and end state.
Expand All @@ -26,6 +20,9 @@ class Session:
"""

def __init__(self, session_id: str, tags: Optional[List[str]] = None):
self.end_timestamp = None
self.rating = None
self.end_state = None
self.session_id = session_id
self.init_timestamp = get_ISO_time()
self.tags = tags
Expand All @@ -50,7 +47,6 @@ def end_session(self, end_state: str = "Indeterminate", rating: Optional[str] =
rating (str, optional): The rating for the session.
end_state_reason (str, optional): The reason for ending the session. Provides context for why the session ended.
"""
SessionState(end_state=end_state, end_state_reason=end_state_reason)
self.end_state = end_state
self.rating = rating
self.end_state_reason = end_state_reason
Expand All @@ -64,4 +60,4 @@ def has_ended(self) -> bool:
Returns:
bool: Whether the session has been ended
"""
return hasattr(self, "end_state")
return self.end_state is not None
12 changes: 12 additions & 0 deletions agentops/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ def end_session(self, session: Session) -> None:
self.config.api_key,
self.config.org_key)

def update_session(self, session: Session) -> None:
with self.lock:
payload = {
"session": session.__dict__
}

HttpClient.post(f'{self.config.endpoint}/sessions',
json.dumps(filter_unjsonable(
payload)).encode("utf-8"),
self.config.api_key,
self.config.org_key)

def run(self) -> None:
while not self.stop_flag.is_set():
time.sleep(self.config.max_wait_time / 1000)
Expand Down