Skip to content

Commit

Permalink
Fix threading
Browse files Browse the repository at this point in the history
  • Loading branch information
gvieira committed Nov 21, 2024
1 parent f8ca49d commit 8f5f67d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
35 changes: 22 additions & 13 deletions src/crewai/llm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import io
import logging
import sys
import threading
import warnings
from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Union
Expand All @@ -13,16 +13,25 @@
)


class FilteredStream(io.StringIO):
def write(self, s):
if (
"Give Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new"
in s
or "LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True`"
in s
):
return
super().write(s)
class FilteredStream:
def __init__(self, original_stream):
self._original_stream = original_stream
self._lock = threading.Lock()

def write(self, s) -> int:
with self._lock:
if (
"Give Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new"
in s
or "LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True`"
in s
):
return 0
return self._original_stream.write(s)

def flush(self):
with self._lock:
return self._original_stream.flush()


LLM_CONTEXT_WINDOW_SIZES = {
Expand Down Expand Up @@ -60,8 +69,8 @@ def suppress_warnings():
# Redirect stdout and stderr
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = FilteredStream()
sys.stderr = FilteredStream()
sys.stdout = FilteredStream(old_stdout)
sys.stderr = FilteredStream(old_stderr)

try:
yield
Expand Down
10 changes: 7 additions & 3 deletions src/crewai/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
from pydantic_core import PydanticCustomError

from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.i18n import I18N
Expand Down Expand Up @@ -208,7 +208,9 @@ def execute_async(
"""Execute the task asynchronously."""
future: Future[TaskOutput] = Future()
threading.Thread(
target=self._execute_task_async, args=(agent, context, tools, future)
daemon=True,
target=self._execute_task_async,
args=(agent, context, tools, future),
).start()
return future

Expand Down Expand Up @@ -277,7 +279,9 @@ def _execute_core(
content = (
json_output
if json_output
else pydantic_output.model_dump_json() if pydantic_output else result
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
self._save_file(content)

Expand Down

0 comments on commit 8f5f67d

Please sign in to comment.