From 299fb8e1a14e19e84cc28e88c69d32ea1670f550 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Mon, 26 Aug 2024 16:21:55 -0400 Subject: [PATCH 01/11] Use perplexity logic to check cerabras streaming end indication --- log10/_httpx_utils.py | 41 ++++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 5fa05620..a8f0dd3f 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -462,6 +462,8 @@ def patch_streaming_log(self, duration: int, full_response: str): "\r\n\r\n" if self.llm_client == LLM_CLIENTS.OPENAI and "perplexity" in self.host_header else "\n\n" ) responses = full_response.split(separator) + filter_responses = [r for r in responses if r] + response_json = self.parse_response_data(filter_responses) response_json = self.parse_response_data(responses) self.log_row["response"] = json.dumps(response_json) @@ -507,10 +509,10 @@ def is_response_end_reached(self, text: str) -> bool: if self.llm_client == LLM_CLIENTS.ANTHROPIC: return self.is_anthropic_response_end_reached(text) elif self.llm_client == LLM_CLIENTS.OPENAI: - if "perplexity" in self.host_header: - return self.is_perplexity_response_end_reached(text) - else: + if "openai" in self.host_header: return self.is_openai_response_end_reached(text) + else: + return self.is_openai_compatible_response_end_reached(text) else: logger.debug("Currently logging is only available for async openai and anthropic.") return False @@ -518,19 +520,28 @@ def is_response_end_reached(self, text: str) -> bool: def is_anthropic_response_end_reached(self, text: str): return "event: message_stop" in text - def is_perplexity_response_end_reached(self, text: str): + def is_openai_compatible_response_end_reached(self, text: str, check_content: bool = False): json_strings = text.split("data: ")[1:] # Parse the last JSON string last_json_str = json_strings[-1].strip() - last_object = json.loads(last_json_str) - return last_object.get("choices", [{}])[0].get("finish_reason", "") == "stop" + try: + last_object = json.loads(last_json_str) + except json.JSONDecodeError: + logger.debug(f"Full response: {repr(text)}") + logger.debug(f"Failed to parse the last JSON string: {last_json_str}") + return False + + choice = last_object.get("choices", [{}])[0] + finish_reason = choice.get("finish_reason", "") + content = choice.get("delta", {}).get("content", "") + + if finish_reason == "stop": + return not content if check_content else True + return False def is_openai_response_end_reached(self, text: str): - """ - In Perplexity, the last item in the responses is empty. - In OpenAI and Mistral, the last item in the responses is "data: [DONE]". - """ - return not text or "data: [DONE]" in text + logger.debug(f"Full response: {repr(text)}") + return "data: [DONE]" in text def parse_anthropic_responses(self, responses: list[str]): message_id = "" @@ -628,8 +639,12 @@ def parse_openai_responses(self, responses: list[str]): finish_reason = "" for r in responses: - if self.is_openai_response_end_reached(r): - break + if "openai" in self.host_header: + if self.is_openai_response_end_reached(r): + break + else: + if self.is_openai_compatible_response_end_reached(r, check_content=True): + break # loading the substring of response text after 'data: '. # example: 'data: {"choices":[{"text":"Hello, how can I help you today?"}]}' From f4f86cbc51bbd12b2223949ea530ec3b02bb7760 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Mon, 26 Aug 2024 16:52:33 -0400 Subject: [PATCH 02/11] Check openai data contains [DONE] first --- log10/_httpx_utils.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index a8f0dd3f..6063f0e4 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -509,10 +509,7 @@ def is_response_end_reached(self, text: str) -> bool: if self.llm_client == LLM_CLIENTS.ANTHROPIC: return self.is_anthropic_response_end_reached(text) elif self.llm_client == LLM_CLIENTS.OPENAI: - if "openai" in self.host_header: - return self.is_openai_response_end_reached(text) - else: - return self.is_openai_compatible_response_end_reached(text) + return self.is_openai_response_end_reached(text) else: logger.debug("Currently logging is only available for async openai and anthropic.") return False @@ -539,9 +536,15 @@ def is_openai_compatible_response_end_reached(self, text: str, check_content: bo return not content if check_content else True return False - def is_openai_response_end_reached(self, text: str): - logger.debug(f"Full response: {repr(text)}") - return "data: [DONE]" in text + def is_openai_response_end_reached(self, text: str, check_content: bool = False): + """ + OpenAI, Mistral response end is reached when the data contains "data: [DONE]". + Perplexity, Cerebras response end is reached when the last JSON object contains finish_reason == stop. + """ + if "data: [DONE]" in text: + return True + else: + return self.is_openai_compatible_response_end_reached(text, check_content) def parse_anthropic_responses(self, responses: list[str]): message_id = "" @@ -639,12 +642,8 @@ def parse_openai_responses(self, responses: list[str]): finish_reason = "" for r in responses: - if "openai" in self.host_header: - if self.is_openai_response_end_reached(r): - break - else: - if self.is_openai_compatible_response_end_reached(r, check_content=True): - break + if self.is_openai_response_end_reached(r, check_content=True): + break # loading the substring of response text after 'data: '. # example: 'data: {"choices":[{"text":"Hello, how can I help you today?"}]}' From 37a5873d6592d1da45bee378ed2523cda1b7c6a6 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Mon, 26 Aug 2024 17:06:57 -0400 Subject: [PATCH 03/11] Check response choices before acessing choices objects --- log10/_httpx_utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 6063f0e4..52a45280 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -528,7 +528,11 @@ def is_openai_compatible_response_end_reached(self, text: str, check_content: bo logger.debug(f"Failed to parse the last JSON string: {last_json_str}") return False - choice = last_object.get("choices", [{}])[0] + if choices := last_object.get("choices", []): + choice = choices[0] + else: + return False + finish_reason = choice.get("finish_reason", "") content = choice.get("delta", {}).get("content", "") From 8f1c1a274d1c96943ac3a4fa40470b83a0d8d4ba Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Mon, 26 Aug 2024 17:40:04 -0400 Subject: [PATCH 04/11] Ensure reaching end stream in the last object --- log10/_httpx_utils.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 52a45280..78728899 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -464,7 +464,6 @@ def patch_streaming_log(self, duration: int, full_response: str): responses = full_response.split(separator) filter_responses = [r for r in responses if r] response_json = self.parse_response_data(filter_responses) - response_json = self.parse_response_data(responses) self.log_row["response"] = json.dumps(response_json) self.log_row["status"] = "finished" @@ -645,8 +644,8 @@ def parse_openai_responses(self, responses: list[str]): full_argument = "" finish_reason = "" - for r in responses: - if self.is_openai_response_end_reached(r, check_content=True): + for index, r in enumerate(responses): + if (index == len(responses) - 1) and self.is_openai_response_end_reached(r, check_content=True): break # loading the substring of response text after 'data: '. From ccf0dec9946301ab4a815be79974081a6e4d83e5 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 27 Aug 2024 00:12:37 -0400 Subject: [PATCH 05/11] Specific conditions for openai and mistral --- log10/_httpx_utils.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 78728899..5e3a07f9 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -516,7 +516,7 @@ def is_response_end_reached(self, text: str) -> bool: def is_anthropic_response_end_reached(self, text: str): return "event: message_stop" in text - def is_openai_compatible_response_end_reached(self, text: str, check_content: bool = False): + def has_response_finished_with_stop_reason(self, text: str, parse_single_data_entry: bool = False): json_strings = text.split("data: ")[1:] # Parse the last JSON string last_json_str = json_strings[-1].strip() @@ -536,18 +536,22 @@ def is_openai_compatible_response_end_reached(self, text: str, check_content: bo content = choice.get("delta", {}).get("content", "") if finish_reason == "stop": - return not content if check_content else True + return not content if parse_single_data_entry else True return False - def is_openai_response_end_reached(self, text: str, check_content: bool = False): + def is_openai_response_end_reached(self, text: str, parse_single_data_entry: bool = False): """ - OpenAI, Mistral response end is reached when the data contains "data: [DONE]". + OpenAI, Mistral response end is reached when the data contains "data: [DONE]\n\n". Perplexity, Cerebras response end is reached when the last JSON object contains finish_reason == stop. """ - if "data: [DONE]" in text: - return True - else: - return self.is_openai_compatible_response_end_reached(text, check_content) + hosts = ["openai", "mistral"] + + if any(p in self.host_header for p in hosts): + suffix = f"data: [DONE]{'' if parse_single_data_entry else '\n\n'}" + if text.endswith(suffix): + return True + + return self.has_response_finished_with_stop_reason(text, parse_single_data_entry) def parse_anthropic_responses(self, responses: list[str]): message_id = "" @@ -644,8 +648,8 @@ def parse_openai_responses(self, responses: list[str]): full_argument = "" finish_reason = "" - for index, r in enumerate(responses): - if (index == len(responses) - 1) and self.is_openai_response_end_reached(r, check_content=True): + for r in responses: + if self.is_openai_response_end_reached(r, parse_single_data_entry=True): break # loading the substring of response text after 'data: '. From 899eeed9c6bf9aaec601e78620004f8c6795948b Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 27 Aug 2024 00:13:26 -0400 Subject: [PATCH 06/11] Add parameterized for openai compatible tests --- tests/test_openai_compatibility.py | 84 +++++++++++++++++++----------- 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/tests/test_openai_compatibility.py b/tests/test_openai_compatibility.py index 447db81d..46f447ea 100644 --- a/tests/test_openai_compatibility.py +++ b/tests/test_openai_compatibility.py @@ -11,19 +11,38 @@ log10(openai) -model_name = "llama-3.1-sonar-small-128k-chat" -if "PERPLEXITYAI_API_KEY" not in os.environ: - raise ValueError("Please set the PERPLEXITYAI_API_KEY environment variable.") - -compatibility_config = { - "base_url": "https://api.perplexity.ai", - "api_key": os.environ.get("PERPLEXITYAI_API_KEY"), -} +# Define a fixture that provides parameterized api_key and base_url +@pytest.fixture( + params=[ + { + "model_name": "llama-3.1-sonar-small-128k-chat", + "api_key": "PERPLEXITYAI_API_KEY", + "base_url": "https://api.perplexity.ai", + }, + {"model_name": "open-mistral-nemo", "api_key": "MISTRAL_API_KEY", "base_url": "https://api.mistral.ai/v1"}, + ] +) +def config(request): + api_environment_variable = request.param["api_key"] + if api_environment_variable not in os.environ: + raise ValueError(f"Please set the {api_environment_variable} environment variable.") + + return { + "base_url": request.param["base_url"], + "api_key": request.param["api_key"], + "model_name": request.param["model_name"], + } @pytest.mark.chat -def test_chat(session): +def test_chat(session, config): + compatibility_config = { + "base_url": config["base_url"], + "api_key": os.environ.get(config["api_key"]), + } + model_name = config["model_name"] + client = openai.OpenAI(**compatibility_config) completion = client.chat.completions.create( model=model_name, @@ -46,7 +65,13 @@ def test_chat(session): @pytest.mark.chat -def test_chat_not_given(session): +def test_chat_not_given(session, config): + compatibility_config = { + "base_url": config["base_url"], + "api_key": os.environ.get(config["api_key"]), + } + model_name = config["model_name"] + client = openai.OpenAI(**compatibility_config) completion = client.chat.completions.create( model=model_name, @@ -69,23 +94,13 @@ def test_chat_not_given(session): @pytest.mark.chat @pytest.mark.async_client @pytest.mark.asyncio(scope="module") -async def test_chat_async(session): - client = AsyncOpenAI(**compatibility_config) - completion = await client.chat.completions.create( - model=model_name, - messages=[{"role": "user", "content": "Say this is a test"}], - ) - - content = completion.choices[0].message.content - assert isinstance(content, str) - await finalize() - _LogAssertion(completion_id=session.last_completion_id(), message_content=content).assert_chat_response() +async def test_chat_async(session, config): + compatibility_config = { + "base_url": config["base_url"], + "api_key": os.environ.get(config["api_key"]), + } + model_name = config["model_name"] - -@pytest.mark.chat -@pytest.mark.async_client -@pytest.mark.asyncio(scope="module") -async def test_perplexity_chat_async(session): client = AsyncOpenAI(**compatibility_config) completion = await client.chat.completions.create( model=model_name, @@ -100,7 +115,13 @@ async def test_perplexity_chat_async(session): @pytest.mark.chat @pytest.mark.stream -def test_chat_stream(session): +def test_chat_stream(session, config): + compatibility_config = { + "base_url": config["base_url"], + "api_key": os.environ.get(config["api_key"]), + } + model_name = config["model_name"] + client = openai.OpenAI(**compatibility_config) response = client.chat.completions.create( model=model_name, @@ -119,9 +140,14 @@ def test_chat_stream(session): @pytest.mark.async_client @pytest.mark.stream @pytest.mark.asyncio(scope="module") -async def test_chat_async_stream(session): - client = AsyncOpenAI(**compatibility_config) +async def test_chat_async_stream(session, config): + compatibility_config = { + "base_url": config["base_url"], + "api_key": os.environ.get(config["api_key"]), + } + model_name = config["model_name"] + client = AsyncOpenAI(**compatibility_config) output = "" stream = await client.chat.completions.create( model=model_name, From 1445fbb91706d5acda989f4597bef4c74e4ba987 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 27 Aug 2024 00:17:17 -0400 Subject: [PATCH 07/11] Include the newline characters without using a backslash in the f-string --- log10/_httpx_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 5e3a07f9..b6752601 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -547,7 +547,7 @@ def is_openai_response_end_reached(self, text: str, parse_single_data_entry: boo hosts = ["openai", "mistral"] if any(p in self.host_header for p in hosts): - suffix = f"data: [DONE]{'' if parse_single_data_entry else '\n\n'}" + suffix = "data: [DONE]" + ("" if parse_single_data_entry else "\n\n") if text.endswith(suffix): return True From 02096abd8c01e1357411d54bacf78db397be0f61 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 27 Aug 2024 20:16:11 -0400 Subject: [PATCH 08/11] Check for usage to determine reaching the end of cerebras stream --- log10/_httpx_utils.py | 5 ++++- tests/test_openai_compatibility.py | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index b6752601..8f6eb146 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -534,15 +534,18 @@ def has_response_finished_with_stop_reason(self, text: str, parse_single_data_en finish_reason = choice.get("finish_reason", "") content = choice.get("delta", {}).get("content", "") + usage = last_object.get("usage", {}) if finish_reason == "stop": - return not content if parse_single_data_entry else True + return not content and not usage if parse_single_data_entry else True return False def is_openai_response_end_reached(self, text: str, parse_single_data_entry: bool = False): """ OpenAI, Mistral response end is reached when the data contains "data: [DONE]\n\n". Perplexity, Cerebras response end is reached when the last JSON object contains finish_reason == stop. + The parse_single_data_entry argument is used to distinguish between a single data entry and multiple data entries. + The function is called in two contexts: first, to assess whether the entire accumulated response has completed when processing streaming data, and second, to verify if a single response object has finished processing during individual response handling. """ hosts = ["openai", "mistral"] diff --git a/tests/test_openai_compatibility.py b/tests/test_openai_compatibility.py index 46f447ea..ac08c099 100644 --- a/tests/test_openai_compatibility.py +++ b/tests/test_openai_compatibility.py @@ -21,6 +21,7 @@ "base_url": "https://api.perplexity.ai", }, {"model_name": "open-mistral-nemo", "api_key": "MISTRAL_API_KEY", "base_url": "https://api.mistral.ai/v1"}, + {"model_name": "llama3.1-8b", "api_key": "CEREBRAS_API_KEY", "base_url": "https://api.cerebras.ai/v1"}, ] ) def config(request): @@ -132,7 +133,7 @@ def test_chat_stream(session, config): output = "" for chunk in response: - output += chunk.choices[0].delta.content + output += chunk.choices[0].delta.content or "" _LogAssertion(completion_id=session.last_completion_id(), message_content=output).assert_chat_response() From 62b0c188962e37cfc928b862b591d60b78644262 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 27 Aug 2024 20:16:42 -0400 Subject: [PATCH 09/11] Update using gpt4-o-mini for openai tests --- tests/pytest.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pytest.ini b/tests/pytest.ini index d614c2de..06e3a7a2 100644 --- a/tests/pytest.ini +++ b/tests/pytest.ini @@ -1,7 +1,7 @@ [pytest] addopts = - --openai_model=gpt-3.5-turbo - --openai_vision_model=gpt-4o + --openai_model=gpt-4o-mini + --openai_vision_model=gpt-4o-mini --anthropic_model=claude-3-haiku-20240307 --anthropic_legacy_model=claude-2.1 --google_model=gemini-1.5-pro-latest From 98da7ae73dd0c017fe02de4f099b508893ac0904 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 27 Aug 2024 20:43:44 -0400 Subject: [PATCH 10/11] Add cerebras secret to ci test --- .github/workflows/test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 978ce338..edc5e16a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -51,6 +51,7 @@ jobs: LAMINI_API_KEY: ${{ secrets.LAMINI_API_KEY }} GOOGLE_API_KEY : ${{ secrets.GOOGLE_API_KEY }} PERPLEXITYAI_API_KEY: ${{ secrets.PERPLEXITYAI_API_KEY }} + CEREBRAS_API_KEY: ${{ secrets.CEREBRAS_API_KEY }} steps: - uses: actions/checkout@v4 - name: Install poetry From eda5b26d964a2887c9ac4201213ee86a35ada28d Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Wed, 28 Aug 2024 17:32:05 -0400 Subject: [PATCH 11/11] Remove usage condition --- log10/_httpx_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 8f6eb146..2068540c 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -534,10 +534,9 @@ def has_response_finished_with_stop_reason(self, text: str, parse_single_data_en finish_reason = choice.get("finish_reason", "") content = choice.get("delta", {}).get("content", "") - usage = last_object.get("usage", {}) if finish_reason == "stop": - return not content and not usage if parse_single_data_entry else True + return not content if parse_single_data_entry else True return False def is_openai_response_end_reached(self, text: str, parse_single_data_entry: bool = False):