Skip to content

Commit

Permalink
Replay feat using db (#930)
Browse files Browse the repository at this point in the history
* Cleaned up task execution to now have separate paths for async and sync execution. Updating all kickoff functions to return CrewOutput. WIP. Waiting for Joao feedback on async task execution with task_output

* Consistently storing async and sync output for context

* outline tests I need to create going forward

* Major rehaul of TaskOutput and CrewOutput. Updated all tests to work with new change. Need to add in a few final tricky async tests and add a few more to verify output types on TaskOutput and CrewOutput.

* Encountering issues with callback. Need to test on main. WIP

* working on tests. WIP

* WIP. Figuring out disconnect issue.

* Cleaned up logs now that I've isolated the issue to the LLM

* more wip.

* WIP. It looks like usage metrics has always been broken for async

* Update parent crew who is managing for_each loop

* Merge in main to bugfix/kickoff-for-each-usage-metrics

* Clean up code for review

* Add new tests

* Final cleanup. Ready for review.

* Moving copy functionality from Agent to BaseAgent

* Fix renaming issue

* Fix linting errors

* use BaseAgent instead of Agent where applicable

* Fixing missing function. Working on tests.

* WIP. Needing team to review change

* Fixing issues brought about by merge

* WIP: need to fix json encoder

* WIP need to fix encoder

* WIP

* WIP: replay working with async. need to add tests

* Implement major fixes from yesterdays group conversation. Now working on tests.

* The majority of tasks are working now. Need to fix converter class

* Fix final failing test

* Fix linting and type-checker issues

* Add more tests to fully test CrewOutput and TaskOutput changes

* Add in validation for async cannot depend on other async tasks.

* WIP: working replay feat fixing inputs, need tests

* WIP: core logic of seq and heir for executing tasks added into one

* Update validators and tests

* better logic for seq and hier

* replay working for both seq and hier just need tests

* fixed context

* added cli command + code cleanup TODO: need better refactoring

* refactoring for cleaner code

* added better tests

* removed todo comments and fixed some tests

* fix logging now all tests should pass

* cleaner code

* ensure replay is delcared when replaying specific tasks

* ensure hierarchical works

* better typing for stored_outputs and separated task_output_handler

* added better tests

* added replay feature to crew docs

* easier cli command name

* fixing changes

* using sqllite instead of .json file for logging previous task_outputs

* tools fix

* added to docs and fixed tests

* fixed .db

* fixed docs and removed unneeded comments

* separating ltm and replay db

* fixed printing colors

* added how to doc

---------

Co-authored-by: Brandon Hancock <brandon@brandonhancock.io>
  • Loading branch information
lorenzejay and bhancockio committed Jul 15, 2024
1 parent 7696b45 commit 67b04b3
Show file tree
Hide file tree
Showing 25 changed files with 6,380 additions and 197 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ test.py
rc-tests/*
*.pkl
temp/*
.vscode/*
.vscode/*
crew_tasks_output.json
29 changes: 29 additions & 0 deletions docs/core-concepts/Crews.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,32 @@ for async_result in async_results:
```

These methods provide flexibility in how you manage and execute tasks within your crew, allowing for both synchronous and asynchronous workflows tailored to your needs


### Replaying from specific task:
You can now replay from a specific task using our cli command replay.

The replay_from_tasks feature in CrewAI allows you to replay from a specific task using the command-line interface (CLI). By running the command `crewai replay -t <task_id>`, you can specify the `task_id` for the replay process.

Kickoffs will now save the latest kickoffs returned task outputs locally for you to be able to replay from.


### Replaying from specific task Using the CLI
To use the replay feature, follow these steps:

1. Open your terminal or command prompt.
2. Navigate to the directory where your CrewAI project is located.
3. Run the following command:

To view latest kickoff task_ids use:

```shell
crewai log-tasks-outputs
```


```shell
crewai replay -t <task_id>
```

These commands let you replay from your latest kickoff tasks, still retaining context from previously executed tasks.
49 changes: 49 additions & 0 deletions docs/how-to/Replay-tasks-from-latest-Crew-Kickoff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
title: Replay Tasks from Latest Crew Kickoff
description: Replay tasks from the latest crew.kickoff(...)
---

## Introduction
CrewAI provides the ability to replay from a task specified from the latest crew kickoff. This feature is particularly useful when you've finished a kickoff and may want to retry certain tasks or don't need to refetch data over and your agents already have the context saved from the kickoff execution so you just need to replay the tasks you want to.

## Note:
You must run `crew.kickoff()` before you can replay a task. Currently, only the latest kickoff is supported, so if you use `kickoff_for_each`, it will only allow you to replay from the most recent crew run.

Here's an example of how to replay from a task:

### Replaying from specific task Using the CLI
To use the replay feature, follow these steps:

1. Open your terminal or command prompt.
2. Navigate to the directory where your CrewAI project is located.
3. Run the following command:

To view latest kickoff task_ids use:
```shell
crewai log-tasks-outputs
```

Once you have your task_id to replay from use:
```shell
crewai replay -t <task_id>
```


### Replaying from a task Programmatically
To replay from a task programmatically, use the following steps:

1. Specify the task_id and input parameters for the replay process.
2. Execute the replay command within a try-except block to handle potential errors.

```python
def replay_from_task():
"""
Replay the crew execution from a specific task.
"""
task_id = '<task_id>'
inputs = {"topic": "CrewAI Training"} # this is optional, you can pass in the inputs you want to replay otherwise uses the previous kickoffs inputs
try:
YourCrewName_Crew().crew().replay_from_task(task_id=task_id, inputs=inputs)

except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")
5 changes: 5 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
Kickoff a Crew for a List
</a>
</li>
<li>
<a href="./how-to/Replay-tasks-from-latest-Crew-Kickoff">
Replay from a Task
</a>
</li>
<li>
<a href="./how-to/AgentOps-Observability">
Agent Monitoring with AgentOps
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ nav:
- Human Input on Execution: 'how-to/Human-Input-on-Execution.md'
- Kickoff a Crew Asynchronously: 'how-to/Kickoff-async.md'
- Kickoff a Crew for a List: 'how-to/Kickoff-for-each.md'
- Replay from a specific task from a kickoff: 'how-to/Replay-tasks-from-latest-Crew-Kickoff.md'
- Agent Monitoring with AgentOps: 'how-to/AgentOps-Observability.md'
- Agent Monitoring with LangTrace: 'how-to/Langtrace-Observability.md'
- Tools Docs:
Expand Down
2 changes: 1 addition & 1 deletion src/crewai/agents/agent_builder/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def _parse_tools(self, tools: List[Any]) -> List[Any]:
pass

@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]):
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[Any]:
"""Set the task tools that init BaseAgenTools class."""
pass

Expand Down
51 changes: 51 additions & 0 deletions src/crewai/cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import click
import pkg_resources

from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
)


from .create_crew import create_crew
from .train_crew import train_crew
from .replay_from_task import replay_task_command


@click.group()
Expand Down Expand Up @@ -48,5 +54,50 @@ def train(n_iterations: int):
train_crew(n_iterations)


@crewai.command()
@click.option(
"-t",
"--task_id",
type=str,
help="Replay the crew from this task ID, including all subsequent tasks.",
)
def replay(task_id: str) -> None:
"""
Replay the crew execution from a specific task.
Args:
task_id (str): The ID of the task to replay from.
"""
try:
click.echo(f"Replaying the crew from task {task_id}")
replay_task_command(task_id)
except Exception as e:
click.echo(f"An error occurred while replaying: {e}", err=True)


@crewai.command()
def log_tasks_outputs() -> None:
"""
Retrieve your latest crew.kickoff() task outputs.
"""
try:
storage = KickoffTaskOutputsSQLiteStorage()
tasks = storage.load()

if not tasks:
click.echo(
"No task outputs found. Only crew kickoff task outputs are logged."
)
return

for index, task in enumerate(tasks, 1):
click.echo(f"Task {index}: {task['task_id']}")
click.echo(f"Description: {task['expected_output']}")
click.echo("------")

except Exception as e:
click.echo(f"An error occurred while logging task outputs: {e}", err=True)


if __name__ == "__main__":
crewai()
24 changes: 24 additions & 0 deletions src/crewai/cli/replay_from_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import subprocess
import click


def replay_task_command(task_id: str) -> None:
"""
Replay the crew execution from a specific task.
Args:
task_id (str): The ID of the task to replay from.
"""
command = ["poetry", "run", "replay", task_id]

try:
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)

except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while replaying the task: {e}", err=True)
click.echo(e.output, err=True)

except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
10 changes: 10 additions & 0 deletions src/crewai/cli/templates/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,13 @@ def train():

except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")

def replay_from_task():
"""
Replay the crew execution from a specific task.
"""
try:
{{crew_name}}Crew().crew().replay_from_task(task_id=sys.argv[1])

except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")
1 change: 1 addition & 0 deletions src/crewai/cli/templates/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ crewai = { extras = ["tools"], version = "^0.35.8" }
[tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:run"
train = "{{folder_name}}.main:train"
replay = "{{folder_name}}.main:replay_from_task"

[build-system]
requires = ["poetry-core"]
Expand Down
Loading

0 comments on commit 67b04b3

Please sign in to comment.