Skip to content

Commit

Permalink
Json Task Output Truncation with Escape Characters (#1009)
Browse files Browse the repository at this point in the history
* Fixed special character issue when converting json to models. Added numerous tests to ensure thigns work properly.

* Fix linting error and cleaned up tests

* Fix customer_converter_cls test failure

* Fixed tests. Thank you lorenze for pointing that out. added a few more to ensure converter creation works properly

* Address lorenze feedback

* Fix linting issues
  • Loading branch information
bhancockio authored Jul 26, 2024
1 parent 88bffaa commit da7d825
Show file tree
Hide file tree
Showing 4 changed files with 437 additions and 104 deletions.
115 changes: 14 additions & 101 deletions src/crewai/task.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import json
import os
import re
import threading
import uuid
from concurrent.futures import Future
from copy import copy
from hashlib import md5
from typing import Any, Dict, List, Optional, Tuple, Type, Union

from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
Expand All @@ -17,10 +15,8 @@
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser


class Task(BaseModel):
Expand Down Expand Up @@ -254,9 +250,7 @@ def _execute_core(
content = (
json_output
if json_output
else pydantic_output.model_dump_json()
if pydantic_output
else result
else pydantic_output.model_dump_json() if pydantic_output else result
)
self._save_file(content)

Expand Down Expand Up @@ -326,121 +320,40 @@ def get_agent_by_role(role: str) -> Union["BaseAgent", None]:

return copied_task

def _create_converter(self, *args, **kwargs) -> Converter:
"""Create a converter instance."""
if self.agent and not self.converter_cls:
converter = self.agent.get_output_converter(*args, **kwargs)
elif self.converter_cls:
converter = self.converter_cls(*args, **kwargs)

if not converter:
raise Exception("No output converter found or set.")

return converter

def _export_output(
self, result: str
) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]:
pydantic_output: Optional[BaseModel] = None
json_output: Optional[Dict[str, Any]] = None

if self.output_pydantic or self.output_json:
model_output = self._convert_to_model(result)
pydantic_output = (
model_output if isinstance(model_output, BaseModel) else None
model_output = convert_to_model(
result,
self.output_pydantic,
self.output_json,
self.agent,
self.converter_cls,
)
if isinstance(model_output, str):

if isinstance(model_output, BaseModel):
pydantic_output = model_output
elif isinstance(model_output, dict):
json_output = model_output
elif isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
else:
json_output = model_output if isinstance(model_output, dict) else None

return pydantic_output, json_output

def _convert_to_model(self, result: str) -> Union[dict, BaseModel, str]:
model = self.output_pydantic or self.output_json
if model is None:
return result

try:
return self._validate_model(result, model)
except Exception:
return self._handle_partial_json(result, model)

def _validate_model(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel]:
exported_result = model.model_validate_json(result)
if self.output_json:
return exported_result.model_dump()
return exported_result

def _handle_partial_json(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel, str]:
match = re.search(r"({.*})", result, re.DOTALL)
if match:
try:
exported_result = model.model_validate_json(match.group(0))
if self.output_json:
return exported_result.model_dump()
return exported_result
except Exception:
pass

return self._convert_with_instructions(result, model)

def _convert_with_instructions(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel, str]:
llm = self.agent.function_calling_llm or self.agent.llm # type: ignore # Item "None" of "BaseAgent | None" has no attribute "function_calling_llm"
instructions = self._get_conversion_instructions(model, llm)

converter = self._create_converter(
llm=llm, text=result, model=model, instructions=instructions
)
exported_result = (
converter.to_pydantic() if self.output_pydantic else converter.to_json()
)

if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
return result

return exported_result

def _get_output_format(self) -> OutputFormat:
if self.output_json:
return OutputFormat.JSON
if self.output_pydantic:
return OutputFormat.PYDANTIC
return OutputFormat.RAW

def _get_conversion_instructions(self, model: Type[BaseModel], llm: Any) -> str:
instructions = "I'm gonna convert this raw text into valid JSON."
if not self._is_gpt(llm):
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
return instructions

def _save_output(self, content: str) -> None:
if not self.output_file:
raise Exception("Output file path is not set.")

directory = os.path.dirname(self.output_file)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(self.output_file, "w", encoding="utf-8") as file:
file.write(content)

def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None

def _save_file(self, result: Any) -> None:
directory = os.path.dirname(self.output_file) # type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None"

Expand Down
155 changes: 155 additions & 0 deletions src/crewai/utilities/converter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import json
import re
from typing import Any, Optional, Type, Union

from langchain.schema import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, ValidationError

from crewai.agents.agent_builder.utilities.base_output_converter import OutputConverter
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser


class ConverterError(Exception):
Expand Down Expand Up @@ -72,3 +77,153 @@ def _create_chain(self):
def is_gpt(self) -> bool:
"""Return if llm provided is of gpt from openai."""
return isinstance(self.llm, ChatOpenAI) and self.llm.openai_api_base is None


def convert_to_model(
result: str,
output_pydantic: Optional[Type[BaseModel]],
output_json: Optional[Type[BaseModel]],
agent: Any,
converter_cls: Optional[Type[Converter]] = None,
) -> Union[dict, BaseModel, str]:
model = output_pydantic or output_json
if model is None:
return result

try:
escaped_result = json.dumps(json.loads(result, strict=False))
return validate_model(escaped_result, model, bool(output_json))
except json.JSONDecodeError as e:
Printer().print(
content=f"Error parsing JSON: {e}. Attempting to handle partial JSON.",
color="yellow",
)
return handle_partial_json(
result, model, bool(output_json), agent, converter_cls
)
except ValidationError as e:
Printer().print(
content=f"Pydantic validation error: {e}. Attempting to handle partial JSON.",
color="yellow",
)
return handle_partial_json(
result, model, bool(output_json), agent, converter_cls
)
except Exception as e:
Printer().print(
content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.",
color="red",
)
return result


def validate_model(
result: str, model: Type[BaseModel], is_json_output: bool
) -> Union[dict, BaseModel]:
exported_result = model.model_validate_json(result)
if is_json_output:
return exported_result.model_dump()
return exported_result


def handle_partial_json(
result: str,
model: Type[BaseModel],
is_json_output: bool,
agent: Any,
converter_cls: Optional[Type[Converter]] = None,
) -> Union[dict, BaseModel, str]:
match = re.search(r"({.*})", result, re.DOTALL)
if match:
try:
exported_result = model.model_validate_json(match.group(0))
if is_json_output:
return exported_result.model_dump()
return exported_result
except json.JSONDecodeError as e:
Printer().print(
content=f"Error parsing JSON: {e}. The extracted JSON-like string is not valid JSON. Attempting alternative conversion method.",
color="yellow",
)
except ValidationError as e:
Printer().print(
content=f"Pydantic validation error: {e}. The JSON structure doesn't match the expected model. Attempting alternative conversion method.",
color="yellow",
)
except Exception as e:
Printer().print(
content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.",
color="red",
)

return convert_with_instructions(
result, model, is_json_output, agent, converter_cls
)


def convert_with_instructions(
result: str,
model: Type[BaseModel],
is_json_output: bool,
agent: Any,
converter_cls: Optional[Type[Converter]] = None,
) -> Union[dict, BaseModel, str]:
llm = agent.function_calling_llm or agent.llm
instructions = get_conversion_instructions(model, llm)

converter = create_converter(
agent=agent,
converter_cls=converter_cls,
llm=llm,
text=result,
model=model,
instructions=instructions,
)
exported_result = (
converter.to_pydantic() if not is_json_output else converter.to_json()
)

if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
return result

return exported_result


def get_conversion_instructions(model: Type[BaseModel], llm: Any) -> str:
instructions = "I'm gonna convert this raw text into valid JSON."
if not is_gpt(llm):
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
return instructions


def is_gpt(llm: Any) -> bool:
from langchain_openai import ChatOpenAI

return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None


def create_converter(
agent: Optional[Any] = None,
converter_cls: Optional[Type[Converter]] = None,
*args,
**kwargs,
) -> Converter:
if agent and not converter_cls:
if hasattr(agent, "get_output_converter"):
converter = agent.get_output_converter(*args, **kwargs)
else:
raise AttributeError("Agent does not have a 'get_output_converter' method")
elif converter_cls:
converter = converter_cls(*args, **kwargs)
else:
raise ValueError("Either agent or converter_cls must be provided")

if not converter:
raise Exception("No output converter found or set.")

return converter
5 changes: 2 additions & 3 deletions tests/task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
from unittest.mock import MagicMock, patch

import pytest
from pydantic import BaseModel
from pydantic_core import ValidationError

from crewai import Agent, Crew, Process, Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.converter import Converter
from pydantic import BaseModel
from pydantic_core import ValidationError


def test_task_tool_reflect_agent_tools():
Expand Down
Loading

0 comments on commit da7d825

Please sign in to comment.