Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ready] support API retry #479

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions data_juicer/ops/mapper/calibrate_qa_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(self,
api_model: str = 'gpt-4o',
*,
api_url: Optional[str] = None,
api_key: Optional[str] = None,
response_path: Optional[str] = None,
system_prompt: Optional[str] = None,
input_template: Optional[str] = None,
Expand All @@ -45,16 +44,15 @@ def __init__(self,
Initialization method.

:param api_model: API model name.
:param api_url: API URL. Defaults to DJ_API_URL environment variable.
:param api_key: API key. Defaults to DJ_API_KEY environment variable.
:param api_url: URL endpoint for the API.
:param response_path: Path to extract content from the API response.
Defaults to 'choices.0.message.content'.
:param system_prompt: System prompt for the calibration task.
:param input_template: Template for building the model input.
:param reference_template: Template for formatting the reference text.
:param qa_pair_template: Template for formatting question-answer pairs.
:param output_pattern: Regular expression for parsing model output.
:param model_params: Parameters for initializing the model.
:param model_params: Parameters for initializing the API model.
:param sampling_params: Extra parameters passed to the API call.
:param kwargs: Extra keyword arguments.
"""
Expand All @@ -67,13 +65,12 @@ def __init__(self,
self.qa_pair_template = qa_pair_template or \
self.DEFAULT_QA_PAIR_TEMPLATE
self.output_pattern = output_pattern or self.DEFAULT_OUTPUT_PATTERN

self.model_params = model_params or {}
self.sampling_params = sampling_params or {}

model_params = model_params or {}
self.model_key = prepare_model(model_type='api',
api_model=api_model,
api_url=api_url,
api_key=api_key,
model=api_model,
url=api_url,
response_path=response_path,
**model_params)

Expand Down
190 changes: 104 additions & 86 deletions data_juicer/utils/model_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import fnmatch
import inspect
import os
from functools import partial
from pickle import UnpicklingError
from typing import Optional, Union

import httpx
import multiprocess as mp
import requests
import wget
from loguru import logger

Expand All @@ -26,6 +27,7 @@
diffusers = LazyLoader('diffusers', 'diffusers')
ram = LazyLoader('ram', 'ram.models')
cv2 = LazyLoader('cv2', 'cv2')
openai = LazyLoader('openai', 'openai')

MODEL_ZOO = {}

Expand Down Expand Up @@ -107,67 +109,69 @@ def check_model(model_name, force=False):

class APIModel:

def __init__(self,
api_model,
*,
api_url=None,
api_key=None,
response_path=None):
self.api_model = api_model

if api_url is None:
api_url = os.getenv('DJ_API_URL')
if api_url is None:
base_url = os.getenv('OPENAI_BASE_URL',
'https://api.openai.com/v1')
api_url = base_url.rstrip('/') + '/chat/completions'
self.api_url = api_url

if api_key is None:
api_key = os.getenv('DJ_API_KEY') or os.getenv('OPENAI_API_KEY')
self.api_key = api_key

if response_path is None:
response_path = 'choices.0.message.content'
self.response_path = response_path

self.headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}'
}

def __call__(self, messages, **kwargs):
"""Sends messages to the configured API model and returns the parsed response.
def __init__(self, model, url=None, response_path=None, **kwargs):
"""
Initializes an instance of the APIModel class.

:param model: The model name to use for the API.
:param url: URL endpoint for the API. If relative, it will be joined
with base_url or the OPENAI_BASE_URL environment variable. Defaults
to '/chat/completions' for OpenAI compatibility.
:param response_path: Dot-separated path to extract the desired
response content. Defaults to 'choices.0.message.content' for
OpenAI compatibility.
:param kwargs: Additional arguments to configure the OpenAI client.
"""
self.model = model
self.url = url or '/chat/completions'
self.response_path = response_path or 'choices.0.message.content'

:param messages: The messages to send to the API.
:param model: The model to be used for generating responses.
:param kwargs: Additional parameters for the API request.
client_args = self._filter_arguments(openai.OpenAI, kwargs)
self.client = openai.OpenAI(**client_args)

:return: The parsed response from the API, or None if an error occurs.
def __call__(self, messages, **kwargs):
"""
Sends messages to the configured API model and returns the parsed
response content.

:param messages: A list of message dictionaries to send to the API.
Each message should have a 'role' (e.g., 'user',
'assistant') and 'content' (the message text).
:param kwargs: Additional parameters for the API call.
:return: The parsed response content from the API call, or an empty
string if an error occurs.
"""
payload = {
'model': self.api_model,
body = {
'messages': messages,
**kwargs,
'model': self.model,
}
body.update(kwargs)
stream = kwargs.get('stream', False)
stream_cls = openai.Stream[openai.types.chat.ChatCompletionChunk]

try:
response = requests.post(self.api_url,
json=payload,
headers=self.headers)
response.raise_for_status()
response = self.client.post(self.url,
body=body,
cast_to=httpx.Response,
stream=stream,
stream_cls=stream_cls)
result = response.json()
return self.nested_access(result, self.response_path)
return self._nested_access(result, self.response_path)
except Exception as e:
logger.exception(e)
return ''

@staticmethod
def nested_access(data, path):
"""Access nested data using a dot-separated path.
def _nested_access(data, path):
"""
Access nested data using a dot-separated path.

:param data: The data structure to access.
:param data: A dictionary or a list to access the nested data from.
:param path: A dot-separated string representing the path to access.
:return: The value at the specified path, if it exists.
This can include numeric indices when accessing list
elements.
:return: The value located at the specified path, or raises a KeyError
or IndexError if the path does not exist.
"""
keys = path.split('.')
for key in keys:
Expand All @@ -176,72 +180,86 @@ def nested_access(data, path):
data = data[key]
return data

@staticmethod
def _filter_arguments(func, args_dict):
"""
Filters and returns only the valid arguments for a given function
signature.

def prepare_api_model(api_model,
:param func: The function or callable to inspect.
:param args_dict: A dictionary of argument names and values to filter.
:return: A dictionary containing only the arguments that match the
function's signature, preserving any **kwargs if applicable.
"""
params = inspect.signature(func).parameters
filtered_args = {}
for name, param in params.items():
# If **kwargs is found, return without change
if param.kind == inspect.Parameter.VAR_KEYWORD:
return args_dict
# Collect valid parameters
if name not in {'self', 'cls'} and name in args_dict:
filtered_args[name] = args_dict[name]
return filtered_args


def prepare_api_model(model,
*,
api_url=None,
api_key=None,
url=None,
response_path=None,
return_processor=False,
processor_name=None,
processor_config=None,
**model_params):
"""Creates a callable API model for interacting with OpenAI-compatible API.
The callable supports custom response parsing and works with proxy servers
that may be incompatible.

This callable object supports custom result parsing and is suitable for use
with incompatible proxy servers.

:param api_url: The URL of the API. If not provided, it will fallback to
the environment variables DJ_API_URL or OPENAI_BASE_URL.
:param api_key: The API key for authorization. If not provided, it will
fallback to the environment variables DJ_API_KEY or OPENAI_API_KEY.
:param response_path: The path to extract content from the API response.
Defaults to 'choices.0.message.content'. This can be customized
based on the API's response structure.
:param model: The name of the model to interact with.
:param url: URL endpoint for the API.
:param response_path: The dot-separated path to extract desired content
from the API response. Defaults to 'choices.0.message.content'.
:param return_processor: A boolean flag indicating whether to return a
processor along with the model. The processor is used for tasks like
tokenization or encoding. Defaults to False.
:param processor_name: The name of a specific processor from Hugging Face
to be used. This is only necessary if a custom processor is required.
:param model_params: Extra parameters to be passed to the processor.
processor along with the model. The processor can be used for tasks
like tokenization or encoding. Defaults to False.
:param processor_config: A dictionary containing configuration settings
for a specific processor from Hugging Face. It should include all
necessary parameters for initializing the processor. This parameter is
used only if `return_processor` is True.
:param model_params: Additional parameters to configure the API model.
:return: A tuple containing the callable API model object and optionally a
processor if `return_processor` is True.
"""
model_params = model_params or {}
model = APIModel(model=model,
url=url,
response_path=response_path,
**model_params)

model = APIModel(api_model=api_model,
api_url=api_url,
api_key=api_key,
response_path=response_path)
if not return_processor:
return model

def get_processor():
try:
import tiktoken
return tiktoken.encoding_for_model(api_model)
return tiktoken.encoding_for_model(model)
except Exception:
pass

try:
import dashscope
return dashscope.get_tokenizer(api_model)
return dashscope.get_tokenizer(model)
except Exception:
pass

try:
return transformers.AutoProcessor.from_pretrained(
api_model, **model_params)
except Exception:
raise ValueError(
'Failed to initialize the processor. Please check the following:\n' # noqa: E501
"- For OpenAI models: Install 'tiktoken' via `pip install tiktoken`.\n" # noqa: E501
"- For DashScope models: Install both 'dashscope' and 'tiktoken' via `pip install dashscope tiktoken`.\n" # noqa: E501
"- For custom models: Provide a valid Hugging Face name via the 'processor_name' parameter.\n" # noqa: E501
'If the issue persists, check the provided `api_model`.')
raise ValueError(
'Failed to initialize the processor. Please check the following:\n' # noqa: E501
"- For OpenAI models: Install 'tiktoken' via `pip install tiktoken`.\n" # noqa: E501
"- For DashScope models: Install both 'dashscope' and 'tiktoken' via `pip install dashscope tiktoken`.\n" # noqa: E501
"- For custom models: Use the 'processor_config' parameter to configure a Hugging Face processor." # noqa: E501
)

if processor_name is not None:
if processor_config is not None:
processor = transformers.AutoProcessor.from_pretrained(
processor_name, **model_params)
**processor_config)
else:
processor = get_processor()
return (model, processor)
Expand Down
43 changes: 34 additions & 9 deletions tests/ops/mapper/test_calibrate_qa_mapper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os
import unittest
from unittest.mock import Mock, patch

import httpx
from loguru import logger

from data_juicer.ops.mapper.calibrate_qa_mapper import CalibrateQAMapper
Expand All @@ -12,11 +15,7 @@
@SKIPPED_TESTS.register_module()
class CalibrateQAMapperTest(DataJuicerTestCaseBase):

def _run_op(self, api_model, response_path=None):

op = CalibrateQAMapper(api_model=api_model,
response_path=response_path)

def _run_op(self, op):
reference = """# 角色语言风格
1. 下面是李莲花的问答样例,你必须贴合他的语言风格:

Expand Down Expand Up @@ -69,10 +68,36 @@ def _run_op(self, api_model, response_path=None):

def test(self):
# before runing this test, set below environment variables:
# export DJ_API_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions
# export DJ_API_KEY=your_key
self._run_op('qwen2.5-72b-instruct')

# export OPENAI_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/
# export OPENAI_API_KEY=your_dashscope_key
op = CalibrateQAMapper(api_model='qwen2.5-72b-instruct')
self._run_op(op)

def test_args(self):
op = CalibrateQAMapper(
api_model='qwen2.5-72b-instruct',
api_url=
'https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions',
response_path='choices.0.message.content')
self._run_op(op)

@patch('httpx.Client.send')
def test_retry(self, mock_send):
mock_response = Mock()
mock_response.status_code = 408
mock_response.headers = {}
mock_request = Mock()
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
'408 Client Error: Request Timeout',
request=mock_request,
response=mock_response)
mock_send.return_value = mock_response

with self.assertLogs(level='DEBUG') as cm:
op = CalibrateQAMapper(api_model='test',
model_params={'max_retries': 3})
op.process({'text': '', 'query': '', 'response': ''})
self.assertIn('3 retries left', '\n'.join(cm.output))

if __name__ == '__main__':
unittest.main()
Loading