Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(platform): Introduced Agent Execution Block #8533

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
1f8a0fe
feat(backend): RedisEventQueue into Pub/Sub
majdyz Oct 21, 2024
64b97e5
agent block
majdyz Oct 22, 2024
c080094
Latest dev
majdyz Oct 22, 2024
aca3761
Make agent block work
majdyz Oct 22, 2024
ba83587
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into za…
majdyz Oct 23, 2024
81329d5
merge
majdyz Oct 23, 2024
8b84180
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into za…
majdyz Oct 28, 2024
5c72e4f
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into za…
majdyz Oct 30, 2024
6f9e303
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into za…
majdyz Nov 1, 2024
85370c8
refactor(backend): Merge GraphMeta & Graph, remove subgraph
majdyz Nov 3, 2024
08fc257
Cleanup
majdyz Nov 4, 2024
fa8de9a
Merge branch 'master' of github.com:Significant-Gravitas/AutoGPT into…
majdyz Nov 4, 2024
35b003c
Merge graph api improvement
majdyz Nov 4, 2024
e6d3368
Add input & output schema on agent executor block
majdyz Nov 4, 2024
f0aac84
UI for agent executor block
majdyz Nov 4, 2024
17c5352
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into za…
majdyz Nov 4, 2024
9ce41d5
Merge branch 'zamilmajdy/unifygraphmeta-and-graph' of github.com:Sign…
majdyz Nov 5, 2024
747608f
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into za…
majdyz Nov 5, 2024
629d200
Make it work 🙌
majdyz Nov 5, 2024
c450520
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into za…
majdyz Nov 5, 2024
76b5817
Merge conflict
majdyz Nov 5, 2024
57b2373
Cleanup
majdyz Nov 5, 2024
2a96c23
Cleanup
majdyz Nov 5, 2024
9d68e30
Resolve conflict
majdyz Nov 5, 2024
3d86165
Resolve conflict
majdyz Nov 5, 2024
1569e23
Merge input & output schema
majdyz Nov 5, 2024
4167977
Merge branch 'zamilmajdy/unifygraphmeta-and-graph' of github.com:Sign…
majdyz Nov 5, 2024
cb48637
Remove duplicated code & text
majdyz Nov 6, 2024
2027d3e
fix(frontend): Broken UI caused by lack of error propagation & input …
majdyz Nov 6, 2024
a52fed0
Merge branch 'dev' into zamilmajdy/unifygraphmeta-and-graph
majdyz Nov 6, 2024
8047de8
Merge branch 'zamilmajdy/unifygraphmeta-and-graph' of github.com:Sign…
majdyz Nov 6, 2024
6acc061
Merge branch 'zamilmajdy/fix-broken-ui' of github.com:Significant-Gra…
majdyz Nov 6, 2024
a7fa589
Add migration to convert stats from list into object
majdyz Nov 6, 2024
fb8dc64
Merge branch 'zamilmajdy/unifygraphmeta-and-graph' of github.com:Sign…
majdyz Nov 6, 2024
dd1945f
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into za…
majdyz Nov 7, 2024
802da95
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into za…
majdyz Nov 7, 2024
a8fcb12
Merge branch 'dev' into zamilmajdy/secrt-956-integrate-connection-obj…
majdyz Nov 7, 2024
7edf118
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into za…
majdyz Nov 7, 2024
0158638
Cosmetic changes
majdyz Nov 7, 2024
8bc5c50
Merge remote-tracking branch 'origin/zamilmajdy/secrt-956-integrate-c…
majdyz Nov 7, 2024
9fe65d0
Merge branch 'dev' into zamilmajdy/secrt-956-integrate-connection-obj…
majdyz Nov 8, 2024
b95d3cc
Merge branch 'dev' into zamilmajdy/secrt-956-integrate-connection-obj…
majdyz Nov 11, 2024
bacad00
Merge branch 'dev' into zamilmajdy/secrt-956-integrate-connection-obj…
majdyz Nov 12, 2024
cb9abff
Merge branch 'dev' into zamilmajdy/secrt-956-integrate-connection-obj…
majdyz Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions autogpt_platform/backend/backend/blocks/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging

from autogpt_libs.utils.cache import thread_cached

from backend.data.block import (
Block,
BlockCategory,
BlockInput,
BlockOutput,
BlockSchema,
BlockType,
get_block,
)
from backend.data.execution import ExecutionStatus
from backend.data.model import SchemaField

logger = logging.getLogger(__name__)


@thread_cached
def get_executor_manager_client():
from backend.executor import ExecutionManager
from backend.util.service import get_service_client

return get_service_client(ExecutionManager)


@thread_cached
def get_event_bus():
from backend.data.queue import RedisExecutionEventBus

return RedisExecutionEventBus()


class AgentExecutorBlock(Block):
class Input(BlockSchema):
user_id: str = SchemaField(description="User ID")
graph_id: str = SchemaField(description="Graph ID")
graph_version: int = SchemaField(description="Graph Version")

data: BlockInput = SchemaField(description="Input data for the graph")
input_schema: dict = SchemaField(description="Input schema for the graph")
output_schema: dict = SchemaField(description="Output schema for the graph")

class Output(BlockSchema):
pass

def __init__(self):
super().__init__(
id="e189baac-8c20-45a1-94a7-55177ea42565",
description="Executes an existing agent inside your agent",
input_schema=AgentExecutorBlock.Input,
output_schema=AgentExecutorBlock.Output,
block_type=BlockType.AGENT,
categories={BlockCategory.AGENT},
)

def run(self, input_data: Input, **kwargs) -> BlockOutput:
executor_manager = get_executor_manager_client()
event_bus = get_event_bus()

graph_exec = executor_manager.add_execution(
graph_id=input_data.graph_id,
graph_version=input_data.graph_version,
user_id=input_data.user_id,
data=input_data.data,
)
log_id = f"Graph #{input_data.graph_id}-V{input_data.graph_version}, exec-id: {graph_exec.graph_exec_id}"
logger.info(f"Starting execution of {log_id}")

for event in event_bus.listen(
graph_id=graph_exec.graph_id, graph_exec_id=graph_exec.graph_exec_id
):
logger.info(
f"Execution {log_id} produced input {event.input_data} output {event.output_data}"
)

if not event.node_id:
if event.status in [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED]:
logger.info(f"Execution {log_id} ended with status {event.status}")
break
else:
continue

if not event.block_id:
logger.warning(f"{log_id} received event without block_id {event}")
continue

block = get_block(event.block_id)
if not block or block.block_type != BlockType.OUTPUT:
continue

output_name = event.input_data.get("name")
if not output_name:
logger.warning(f"{log_id} produced an output with no name {event}")
continue

for output_data in event.output_data.get("output", []):
logger.info(f"Execution {log_id} produced {output_name}: {output_data}")
yield output_name, output_data
6 changes: 4 additions & 2 deletions autogpt_platform/backend/backend/blocks/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ class Input(BlockSchema):
)
name: str = SchemaField(description="The name of the output.")
title: str | None = SchemaField(
description="The title of the input.", default=None, advanced=True
description="The title of the output.",
default=None,
advanced=True,
)
description: str | None = SchemaField(
description="The description of the output.",
Expand Down Expand Up @@ -262,7 +264,7 @@ class Output(BlockSchema):
def __init__(self):
super().__init__(
id="363ae599-353e-4804-937e-b2ee3cef3da4",
description=("Stores the output of the graph for users to see."),
description="Stores the output of the graph for users to see.",
input_schema=AgentOutputBlock.Input,
output_schema=AgentOutputBlock.Output,
test_input=[
Expand Down
6 changes: 5 additions & 1 deletion autogpt_platform/backend/backend/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class BlockType(Enum):
INPUT = "Input"
OUTPUT = "Output"
NOTE = "Note"
AGENT = "Agent"


class BlockCategory(Enum):
Expand All @@ -48,6 +49,7 @@ class BlockCategory(Enum):
COMMUNICATION = "Block that interacts with communication platforms."
DEVELOPER_TOOLS = "Developer tools such as GitHub blocks."
DATA = "Block that interacts with structured data."
AGENT = "Block that interacts with other agents."

def dict(self) -> dict[str, str]:
return {"category": self.name, "description": self.value}
Expand Down Expand Up @@ -299,7 +301,9 @@ def execute(self, input_data: BlockInput, **kwargs) -> BlockOutput:
):
if output_name == "error":
raise RuntimeError(output_data)
if error := self.output_schema.validate_field(output_name, output_data):
if self.block_type == BlockType.STANDARD and (
error := self.output_schema.validate_field(output_name, output_data)
):
raise ValueError(f"Block produced an invalid output data: {error}")
yield output_name, output_data

Expand Down
33 changes: 30 additions & 3 deletions autogpt_platform/backend/backend/data/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ExecutionResult(BaseModel):
graph_exec_id: str
node_exec_id: str
node_id: str
block_id: str
status: ExecutionStatus
input_data: BlockInput
output_data: CompletedBlockOutput
Expand All @@ -72,6 +73,26 @@ class ExecutionResult(BaseModel):
start_time: datetime | None
end_time: datetime | None

@staticmethod
def from_graph(graph: AgentGraphExecution):
return ExecutionResult(
graph_id=graph.agentGraphId,
graph_version=graph.agentGraphVersion,
graph_exec_id=graph.id,
node_exec_id="",
node_id="",
block_id="",
status=graph.executionStatus,
# TODO: Populate input_data & output_data from AgentNodeExecutions
# Input & Output comes AgentInputBlock & AgentOutputBlock.
input_data={},
output_data={},
add_time=graph.createdAt,
queue_time=graph.createdAt,
start_time=graph.startedAt,
end_time=graph.updatedAt,
)

@staticmethod
def from_db(execution: AgentNodeExecution):
if execution.executionData:
Expand All @@ -93,9 +114,10 @@ def from_db(execution: AgentNodeExecution):
graph_id=graph_execution.agentGraphId if graph_execution else "",
graph_version=graph_execution.agentGraphVersion if graph_execution else 0,
graph_exec_id=execution.agentGraphExecutionId,
block_id=execution.AgentNode.agentBlockId if execution.AgentNode else "",
node_exec_id=execution.id,
node_id=execution.agentNodeId,
status=ExecutionStatus(execution.executionStatus),
status=execution.executionStatus,
input_data=input_data,
output_data=output_data,
add_time=execution.addedTime,
Expand Down Expand Up @@ -248,15 +270,20 @@ async def update_graph_execution_start_time(graph_exec_id: str):
async def update_graph_execution_stats(
graph_exec_id: str,
stats: dict[str, Any],
):
) -> ExecutionResult:

status = ExecutionStatus.FAILED if stats.get("error") else ExecutionStatus.COMPLETED
await AgentGraphExecution.prisma().update(
res = await AgentGraphExecution.prisma().update(
where={"id": graph_exec_id},
data={
"executionStatus": status,
"stats": json.dumps(stats),
},
)
if not res:
raise ValueError(f"Execution {graph_exec_id} not found.")

return ExecutionResult.from_graph(res)


async def update_node_execution_stats(node_exec_id: str, stats: dict[str, Any]):
Expand Down
35 changes: 28 additions & 7 deletions autogpt_platform/backend/backend/data/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prisma.types import AgentGraphWhereInput
from pydantic.fields import computed_field

from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.basic import AgentInputBlock, AgentOutputBlock
from backend.data.block import BlockInput, BlockType, get_block, get_blocks
from backend.data.db import BaseDbModel, transaction
Expand Down Expand Up @@ -174,24 +175,35 @@ def starting_nodes(self) -> list[Node]:
if node.id not in outbound_nodes or node.id in input_nodes
]

def reassign_ids(self, reassign_graph_id: bool = False):
def reassign_ids(self, user_id: str, reassign_graph_id: bool = False):
"""
Reassigns all IDs in the graph to new UUIDs.
This method can be used before storing a new graph to the database.
"""
self.validate_graph()

# Reassign Graph ID
id_map = {node.id: str(uuid.uuid4()) for node in self.nodes}
if reassign_graph_id:
self.id = str(uuid.uuid4())

# Reassign Node IDs
for node in self.nodes:
node.id = id_map[node.id]

# Reassign Link IDs
for link in self.links:
link.source_id = id_map[link.source_id]
link.sink_id = id_map[link.sink_id]

# Reassign User IDs for agent blocks
for node in self.nodes:
if node.block_id != AgentExecutorBlock().id:
continue
node.input_default["user_id"] = user_id
node.input_default.setdefault("data", {})

self.validate_graph()

def validate_graph(self, for_run: bool = False):
def sanitize(name):
return name.split("_#_")[0].split("_@_")[0].split("_$_")[0]
Expand All @@ -215,6 +227,7 @@ def sanitize(name):
for_run # Skip input completion validation, unless when executing.
or block.block_type == BlockType.INPUT
or block.block_type == BlockType.OUTPUT
or block.block_type == BlockType.AGENT
):
raise ValueError(
f"Node {block.name} #{node.id} required input missing: `{name}`"
Expand Down Expand Up @@ -248,18 +261,26 @@ def is_static_output_block(nid: str) -> bool:
)

sanitized_name = sanitize(name)
vals = node.input_default
if i == 0:
fields = f"Valid output fields: {block.output_schema.get_fields()}"
fields = (
block.output_schema.get_fields()
if block.block_type != BlockType.AGENT
else vals.get("output_schema", {}).get("properties", {}).keys()
)
else:
fields = f"Valid input fields: {block.input_schema.get_fields()}"
fields = (
block.input_schema.get_fields()
if block.block_type != BlockType.AGENT
else vals.get("input_schema", {}).get("properties", {}).keys()
)
if sanitized_name not in fields:
raise ValueError(f"{suffix}, `{name}` invalid, {fields}")
fields_msg = f"Allowed fields: {fields}"
raise ValueError(f"{suffix}, `{name}` invalid, {fields_msg}")

if is_static_output_block(link.source_id):
link.is_static = True # Each value block output should be static.

# TODO: Add type compatibility check here.

@staticmethod
def from_db(graph: AgentGraph, hide_credentials: bool = False):
executions = [
Expand Down
4 changes: 2 additions & 2 deletions autogpt_platform/backend/backend/executor/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def get_port(cls) -> int:
return Config().database_api_port

@expose
def send_execution_update(self, execution_result_dict: dict[Any, Any]):
self.event_queue.publish(ExecutionResult(**execution_result_dict))
def send_execution_update(self, execution_result: ExecutionResult):
self.event_queue.publish(execution_result)

@staticmethod
def exposed_run_and_wait(
Expand Down
Loading
Loading