diff --git a/src/crewai/agents/agent_builder/base_agent.py b/src/crewai/agents/agent_builder/base_agent.py index 394e10cda9..90f886fd52 100644 --- a/src/crewai/agents/agent_builder/base_agent.py +++ b/src/crewai/agents/agent_builder/base_agent.py @@ -1,6 +1,7 @@ import uuid from abc import ABC, abstractmethod from copy import copy as shallow_copy +from hashlib import md5 from typing import Any, Dict, List, Optional, TypeVar from pydantic import ( @@ -162,6 +163,11 @@ def set_private_attrs(self): self._token_process = TokenProcess() return self + @property + def key(self): + source = [self.role, self.goal, self.backstory] + return md5("|".join(source).encode()).hexdigest() + @abstractmethod def execute_task( self, diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 0132c6941c..e540a2ad9a 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -2,6 +2,7 @@ import json import uuid from concurrent.futures import Future +from hashlib import md5 from typing import Any, Dict, List, Optional, Tuple, Union from langchain_core.callbacks import BaseCallbackHandler @@ -330,6 +331,13 @@ def validate_context_no_future_tasks(self): ) return self + @property + def key(self) -> str: + source = [agent.key for agent in self.agents] + [ + task.key for task in self.tasks + ] + return md5("|".join(source).encode()).hexdigest() + def _setup_from_config(self): assert self.config is not None, "Config should not be None." diff --git a/src/crewai/task.py b/src/crewai/task.py index 439b1df135..23c6f71518 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -5,8 +5,10 @@ import uuid from concurrent.futures import Future from copy import copy +from hashlib import md5 from typing import Any, Dict, List, Optional, Tuple, Type, Union + from langchain_openai import ChatOpenAI from opentelemetry.trace import Span from pydantic import UUID4, BaseModel, Field, field_validator, model_validator @@ -173,6 +175,14 @@ def execute_sync( """Execute the task synchronously.""" return self._execute_core(agent, context, tools) + @property + def key(self) -> str: + description = self._original_description or self.description + expected_output = self._original_expected_output or self.expected_output + source = [description, expected_output] + + return md5("|".join(source).encode()).hexdigest() + def execute_async( self, agent: BaseAgent | None = None, @@ -238,7 +248,7 @@ def _execute_core( self.callback(self.output) if self._execution_span: - self._telemetry.task_ended(self._execution_span, self) + self._telemetry.task_ended(self._execution_span, self, agent.crew) self._execution_span = None if self.output_file: diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index 534b663bf7..3983de0ecd 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -92,13 +92,8 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None): pkg_resources.get_distribution("crewai").version, ) self._add_attribute(span, "python_version", platform.python_version()) + self._add_attribute(span, "crew_key", crew.key) self._add_attribute(span, "crew_id", str(crew.id)) - - if crew.share_crew: - self._add_attribute( - span, "crew_inputs", json.dumps(inputs) if inputs else None - ) - self._add_attribute(span, "crew_process", crew.process) self._add_attribute(span, "crew_memory", crew.memory) self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks)) @@ -109,6 +104,7 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None): json.dumps( [ { + "key": agent.key, "id": str(agent.id), "role": agent.role, "goal": agent.goal, @@ -133,12 +129,14 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None): json.dumps( [ { + "key": task.key, "id": str(task.id), "description": task.description, "expected_output": task.expected_output, "async_execution?": task.async_execution, "human_input?": task.human_input, "agent_role": task.agent.role if task.agent else "None", + "agent_key": task.agent.key if task.agent else None, "context": ( [task.description for task in task.context] if task.context @@ -157,6 +155,12 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None): self._add_attribute(span, "platform_system", platform.system()) self._add_attribute(span, "platform_version", platform.version()) self._add_attribute(span, "cpus", os.cpu_count()) + + if crew.share_crew: + self._add_attribute( + span, "crew_inputs", json.dumps(inputs) if inputs else None + ) + span.set_status(Status(StatusCode.OK)) span.end() except Exception: @@ -170,7 +174,9 @@ def task_started(self, crew: Crew, task: Task) -> Span | None: created_span = tracer.start_span("Task Created") + self._add_attribute(created_span, "crew_key", crew.key) self._add_attribute(created_span, "crew_id", str(crew.id)) + self._add_attribute(created_span, "task_key", task.key) self._add_attribute(created_span, "task_id", str(task.id)) if crew.share_crew: @@ -186,7 +192,9 @@ def task_started(self, crew: Crew, task: Task) -> Span | None: span = tracer.start_span("Task Execution") + self._add_attribute(span, "crew_key", crew.key) self._add_attribute(span, "crew_id", str(crew.id)) + self._add_attribute(span, "task_key", task.key) self._add_attribute(span, "task_id", str(task.id)) if crew.share_crew: @@ -201,13 +209,16 @@ def task_started(self, crew: Crew, task: Task) -> Span | None: return None - def task_ended(self, span: Span, task: Task): + def task_ended(self, span: Span, task: Task, crew: Crew): """Records task execution in a crew.""" if self.ready: try: - self._add_attribute( - span, "output", task.output.raw_output if task.output else "" - ) + if crew.share_crew: + self._add_attribute( + span, + "task_output", + task.output.raw if task.output else "", + ) span.set_status(Status(StatusCode.OK)) span.end() @@ -293,6 +304,7 @@ def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): "crewai_version", pkg_resources.get_distribution("crewai").version, ) + self._add_attribute(span, "crew_key", crew.key) self._add_attribute(span, "crew_id", str(crew.id)) self._add_attribute( span, "crew_inputs", json.dumps(inputs) if inputs else None @@ -303,6 +315,7 @@ def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): json.dumps( [ { + "key": agent.key, "id": str(agent.id), "role": agent.role, "goal": agent.goal, @@ -333,6 +346,7 @@ def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): "async_execution?": task.async_execution, "human_input?": task.human_input, "agent_role": task.agent.role if task.agent else "None", + "agent_key": task.agent.key if task.agent else None, "context": ( [task.description for task in task.context] if task.context diff --git a/tests/agents/agent_builder/base_agent_test.py b/tests/agents/agent_builder/base_agent_test.py new file mode 100644 index 0000000000..4e47f2271e --- /dev/null +++ b/tests/agents/agent_builder/base_agent_test.py @@ -0,0 +1,36 @@ +import hashlib +from typing import Any, List, Optional + +from crewai.agents.agent_builder.base_agent import BaseAgent +from pydantic import BaseModel + + +class TestAgent(BaseAgent): + def execute_task( + self, + task: Any, + context: Optional[str] = None, + tools: Optional[List[Any]] = None, + ) -> str: + return "" + + def create_agent_executor(self, tools=None) -> None: ... + + def _parse_tools(self, tools: List[Any]) -> List[Any]: + return [] + + def get_delegation_tools(self, agents: List["BaseAgent"]): ... + + def get_output_converter( + self, llm: Any, text: str, model: type[BaseModel] | None, instructions: str + ): ... + + +def test_key(): + agent = TestAgent( + role="test role", + goal="test goal", + backstory="test backstory", + ) + hash = hashlib.md5("test role|test goal|test backstory".encode()).hexdigest() + assert agent.key == hash diff --git a/tests/crew_test.py b/tests/crew_test.py index 543a165096..b9f38789e2 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -1,5 +1,6 @@ """Test Agent creation and execution basic functionality.""" +import hashlib import json from concurrent.futures import Future from unittest import mock @@ -2234,3 +2235,28 @@ def test_replay_from_task_setup_context(): assert crew.tasks[0].output.output_format == OutputFormat.RAW assert crew.tasks[1].prompt_context == "context raw output" + + +def test_key(): + tasks = [ + Task( + description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.", + expected_output="Bullet point list of 5 important events.", + agent=researcher, + ), + Task( + description="Write a 1 amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.", + expected_output="A 4 paragraph article about AI.", + agent=writer, + ), + ] + crew = Crew( + agents=[researcher, writer], + process=Process.sequential, + tasks=tasks, + ) + hash = hashlib.md5( + f"{researcher.key}|{writer.key}|{tasks[0].key}|{tasks[1].key}".encode() + ).hexdigest() + + assert crew.key == hash diff --git a/tests/task_test.py b/tests/task_test.py index 95f201c7c9..9e98ecbad7 100644 --- a/tests/task_test.py +++ b/tests/task_test.py @@ -1,15 +1,15 @@ """Test Agent creation and execution basic functionality.""" +import hashlib import json from unittest.mock import MagicMock, patch import pytest -from pydantic import BaseModel -from pydantic_core import ValidationError - from crewai import Agent, Crew, Process, Task from crewai.tasks.task_output import TaskOutput from crewai.utilities.converter import Converter +from pydantic import BaseModel +from pydantic_core import ValidationError def test_task_tool_reflect_agent_tools(): @@ -791,3 +791,22 @@ def test_task_output_str_with_none(): ) assert str(task_output) == "" + + +def test_key(): + original_description = "Give me a list of 5 interesting ideas about {topic} to explore for an article, what makes them unique and interesting." + original_expected_output = "Bullet point list of 5 interesting ideas about {topic}." + task = Task( + description=original_description, + expected_output=original_expected_output, + ) + hash = hashlib.md5( + f"{original_description}|{original_expected_output}".encode() + ).hexdigest() + + assert task.key == hash, "The key should be the hash of the description." + + task.interpolate_inputs(inputs={"topic": "AI"}) + assert ( + task.key == hash + ), "The key should be the hash of the non-interpolated description."