Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix asyncify and relax (warning) on settings read from unrecognized thread #1813

Merged
merged 1 commit into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion dsp/utils/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from contextlib import contextmanager
from dsp.utils.utils import dotdict
from functools import lru_cache

DEFAULT_CONFIG = dotdict(
lm=None,
Expand All @@ -27,6 +28,12 @@
async_max_workers=8,
)

@lru_cache(maxsize=None)
def warn_once(msg: str):
import logging
logger = logging.getLogger(__name__)
logger.warning(msg)


class Settings:
"""DSP configuration settings."""
Expand Down Expand Up @@ -59,7 +66,11 @@ def config(self):
thread_id = threading.get_ident()
# if thread_id not in self.stack_by_thread:
# self.stack_by_thread[thread_id] = [self.main_stack[-1].copy()]
return self.stack_by_thread[thread_id][-1]
try:
return self.stack_by_thread[thread_id][-1]
except Exception:
warn_once("Warning: You seem to be creating DSPy threads in an unsupported way.")
return self.main_stack[-1]

def __getattr__(self, name):
if hasattr(self.config, name):
Expand All @@ -74,6 +85,8 @@ def __append(self, config):
thread_id = threading.get_ident()
# if thread_id not in self.stack_by_thread:
# self.stack_by_thread[thread_id] = [self.main_stack[-1].copy()]

assert thread_id in self.stack_by_thread, "Error: You seem to be creating DSPy threads in an unsupported way."
self.stack_by_thread[thread_id].append(config)

def __pop(self):
Expand Down
20 changes: 19 additions & 1 deletion dspy/utils/asyncify.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,22 @@ def get_limiter():


def asyncify(program):
return asyncer.asyncify(program, abandon_on_cancel=True, limiter=get_limiter())
import dspy
import threading

assert threading.get_ident() == dspy.settings.main_tid, "asyncify can only be called from the main thread"

def wrapped(*args, **kwargs):
thread_stacks = dspy.settings.stack_by_thread
current_thread_id = threading.get_ident()
creating_new_thread = current_thread_id not in thread_stacks

assert creating_new_thread
thread_stacks[current_thread_id] = list(dspy.settings.main_stack)

try:
return program(*args, **kwargs)
finally:
del thread_stacks[threading.get_ident()]

return asyncer.asyncify(wrapped, abandon_on_cancel=True, limiter=get_limiter())
Loading