Skip to content

Commit

Permalink
Introduce structure keys (#902)
Browse files Browse the repository at this point in the history
* Introduce structure keys

* Add agent key to tasks

* Rebasing is hard

* Rename task output telemetry

* Feedback
  • Loading branch information
gvieira committed Jul 15, 2024
1 parent 161c4a6 commit dd8a199
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 14 deletions.
6 changes: 6 additions & 0 deletions src/crewai/agents/agent_builder/base_agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import uuid
from abc import ABC, abstractmethod
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Dict, List, Optional, TypeVar

from pydantic import (
Expand Down Expand Up @@ -162,6 +163,11 @@ def set_private_attrs(self):
self._token_process = TokenProcess()
return self

@property
def key(self):
source = [self.role, self.goal, self.backstory]
return md5("|".join(source).encode()).hexdigest()

@abstractmethod
def execute_task(
self,
Expand Down
8 changes: 8 additions & 0 deletions src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import uuid
from concurrent.futures import Future
from hashlib import md5
from typing import Any, Dict, List, Optional, Tuple, Union

from langchain_core.callbacks import BaseCallbackHandler
Expand Down Expand Up @@ -330,6 +331,13 @@ def validate_context_no_future_tasks(self):
)
return self

@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
task.key for task in self.tasks
]
return md5("|".join(source).encode()).hexdigest()

def _setup_from_config(self):
assert self.config is not None, "Config should not be None."

Expand Down
12 changes: 11 additions & 1 deletion src/crewai/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import uuid
from concurrent.futures import Future
from copy import copy
from hashlib import md5
from typing import Any, Dict, List, Optional, Tuple, Type, Union


from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
Expand Down Expand Up @@ -173,6 +175,14 @@ def execute_sync(
"""Execute the task synchronously."""
return self._execute_core(agent, context, tools)

@property
def key(self) -> str:
description = self._original_description or self.description
expected_output = self._original_expected_output or self.expected_output
source = [description, expected_output]

return md5("|".join(source).encode()).hexdigest()

def execute_async(
self,
agent: BaseAgent | None = None,
Expand Down Expand Up @@ -238,7 +248,7 @@ def _execute_core(
self.callback(self.output)

if self._execution_span:
self._telemetry.task_ended(self._execution_span, self)
self._telemetry.task_ended(self._execution_span, self, agent.crew)
self._execution_span = None

if self.output_file:
Expand Down
34 changes: 24 additions & 10 deletions src/crewai/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,8 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
pkg_resources.get_distribution("crewai").version,
)
self._add_attribute(span, "python_version", platform.python_version())
self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id))

if crew.share_crew:
self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None
)

self._add_attribute(span, "crew_process", crew.process)
self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
Expand All @@ -109,6 +104,7 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
json.dumps(
[
{
"key": agent.key,
"id": str(agent.id),
"role": agent.role,
"goal": agent.goal,
Expand All @@ -133,12 +129,14 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
json.dumps(
[
{
"key": task.key,
"id": str(task.id),
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None",
"agent_key": task.agent.key if task.agent else None,
"context": (
[task.description for task in task.context]
if task.context
Expand All @@ -157,6 +155,12 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
self._add_attribute(span, "platform_system", platform.system())
self._add_attribute(span, "platform_version", platform.version())
self._add_attribute(span, "cpus", os.cpu_count())

if crew.share_crew:
self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None
)

span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
Expand All @@ -170,7 +174,9 @@ def task_started(self, crew: Crew, task: Task) -> Span | None:

created_span = tracer.start_span("Task Created")

self._add_attribute(created_span, "crew_key", crew.key)
self._add_attribute(created_span, "crew_id", str(crew.id))
self._add_attribute(created_span, "task_key", task.key)
self._add_attribute(created_span, "task_id", str(task.id))

if crew.share_crew:
Expand All @@ -186,7 +192,9 @@ def task_started(self, crew: Crew, task: Task) -> Span | None:

span = tracer.start_span("Task Execution")

self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "task_key", task.key)
self._add_attribute(span, "task_id", str(task.id))

if crew.share_crew:
Expand All @@ -201,13 +209,16 @@ def task_started(self, crew: Crew, task: Task) -> Span | None:

return None

def task_ended(self, span: Span, task: Task):
def task_ended(self, span: Span, task: Task, crew: Crew):
"""Records task execution in a crew."""
if self.ready:
try:
self._add_attribute(
span, "output", task.output.raw_output if task.output else ""
)
if crew.share_crew:
self._add_attribute(
span,
"task_output",
task.output.raw if task.output else "",
)

span.set_status(Status(StatusCode.OK))
span.end()
Expand Down Expand Up @@ -293,6 +304,7 @@ def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
"crewai_version",
pkg_resources.get_distribution("crewai").version,
)
self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None
Expand All @@ -303,6 +315,7 @@ def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
json.dumps(
[
{
"key": agent.key,
"id": str(agent.id),
"role": agent.role,
"goal": agent.goal,
Expand Down Expand Up @@ -333,6 +346,7 @@ def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None",
"agent_key": task.agent.key if task.agent else None,
"context": (
[task.description for task in task.context]
if task.context
Expand Down
36 changes: 36 additions & 0 deletions tests/agents/agent_builder/base_agent_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import hashlib
from typing import Any, List, Optional

from crewai.agents.agent_builder.base_agent import BaseAgent
from pydantic import BaseModel


class TestAgent(BaseAgent):
def execute_task(
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
return ""

def create_agent_executor(self, tools=None) -> None: ...

def _parse_tools(self, tools: List[Any]) -> List[Any]:
return []

def get_delegation_tools(self, agents: List["BaseAgent"]): ...

def get_output_converter(
self, llm: Any, text: str, model: type[BaseModel] | None, instructions: str
): ...


def test_key():
agent = TestAgent(
role="test role",
goal="test goal",
backstory="test backstory",
)
hash = hashlib.md5("test role|test goal|test backstory".encode()).hexdigest()
assert agent.key == hash
26 changes: 26 additions & 0 deletions tests/crew_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Test Agent creation and execution basic functionality."""

import hashlib
import json
from concurrent.futures import Future
from unittest import mock
Expand Down Expand Up @@ -2234,3 +2235,28 @@ def test_replay_from_task_setup_context():
assert crew.tasks[0].output.output_format == OutputFormat.RAW

assert crew.tasks[1].prompt_context == "context raw output"


def test_key():
tasks = [
Task(
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
expected_output="Bullet point list of 5 important events.",
agent=researcher,
),
Task(
description="Write a 1 amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="A 4 paragraph article about AI.",
agent=writer,
),
]
crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
tasks=tasks,
)
hash = hashlib.md5(
f"{researcher.key}|{writer.key}|{tasks[0].key}|{tasks[1].key}".encode()
).hexdigest()

assert crew.key == hash
25 changes: 22 additions & 3 deletions tests/task_test.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
"""Test Agent creation and execution basic functionality."""

import hashlib
import json
from unittest.mock import MagicMock, patch

import pytest
from pydantic import BaseModel
from pydantic_core import ValidationError

from crewai import Agent, Crew, Process, Task
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.converter import Converter
from pydantic import BaseModel
from pydantic_core import ValidationError


def test_task_tool_reflect_agent_tools():
Expand Down Expand Up @@ -791,3 +791,22 @@ def test_task_output_str_with_none():
)

assert str(task_output) == ""


def test_key():
original_description = "Give me a list of 5 interesting ideas about {topic} to explore for an article, what makes them unique and interesting."
original_expected_output = "Bullet point list of 5 interesting ideas about {topic}."
task = Task(
description=original_description,
expected_output=original_expected_output,
)
hash = hashlib.md5(
f"{original_description}|{original_expected_output}".encode()
).hexdigest()

assert task.key == hash, "The key should be the hash of the description."

task.interpolate_inputs(inputs={"topic": "AI"})
assert (
task.key == hash
), "The key should be the hash of the non-interpolated description."

0 comments on commit dd8a199

Please sign in to comment.