diff --git a/camel/toolkits/search_toolkit.py b/camel/toolkits/search_toolkit.py index 772eafdff0..63de71ae62 100644 --- a/camel/toolkits/search_toolkit.py +++ b/camel/toolkits/search_toolkit.py @@ -12,10 +12,14 @@ # limitations under the License. # =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. =========== import os -from typing import Any, Dict, List +import xml.etree.ElementTree as ET +from typing import Any, Dict, List, Union + +import requests from camel.toolkits.base import BaseToolkit from camel.toolkits.function_tool import FunctionTool +from camel.utils import api_keys_required, dependencies_required class SearchToolkit(BaseToolkit): @@ -25,6 +29,7 @@ class SearchToolkit(BaseToolkit): search engines like Google, DuckDuckGo, Wikipedia and Wolfram Alpha. """ + @dependencies_required("wikipedia") def search_wiki(self, entity: str) -> str: r"""Search the entity in WikiPedia and return the summary of the required page, containing factual information about @@ -37,13 +42,7 @@ def search_wiki(self, entity: str) -> str: str: The search result. If the page corresponding to the entity exists, return the summary of this entity in a string. """ - try: - import wikipedia - except ImportError: - raise ImportError( - "Please install `wikipedia` first. You can install it " - "by running `pip install wikipedia`." - ) + import wikipedia result: str @@ -64,6 +63,7 @@ def search_wiki(self, entity: str) -> str: return result + @dependencies_required("duckduckgo_search") def search_duckduckgo( self, query: str, source: str = "text", max_results: int = 5 ) -> List[Dict[str, Any]]: @@ -151,6 +151,7 @@ def search_duckduckgo( # If no answer found, return an empty list return responses + @api_keys_required("GOOGLE_API_KEY", "SEARCH_ENGINE_ID") def search_google( self, query: str, num_result_pages: int = 5 ) -> List[Dict[str, Any]]: @@ -251,7 +252,10 @@ def search_google( # If no answer found, return an empty list return responses - def query_wolfram_alpha(self, query: str, is_detailed: bool) -> str: + @dependencies_required("wolframalpha") + def query_wolfram_alpha( + self, query: str, is_detailed: bool = False + ) -> Union[str, Dict[str, Any]]: r"""Queries Wolfram|Alpha and returns the result. Wolfram|Alpha is an answer engine developed by Wolfram Research. It is offered as an online service that answers factual queries by computing answers from @@ -259,19 +263,16 @@ def query_wolfram_alpha(self, query: str, is_detailed: bool) -> str: Args: query (str): The query to send to Wolfram Alpha. - is_detailed (bool): Whether to include additional details in the - result. + is_detailed (bool): Whether to include additional details + including step by step information in the result. + (default::obj:`False`) Returns: - str: The result from Wolfram Alpha, formatted as a string. + Union[str, Dict[str, Any]]: The result from Wolfram Alpha. + Returns a string if `is_detailed` is False, otherwise returns + a dictionary with detailed information. """ - try: - import wolframalpha - except ImportError: - raise ImportError( - "Please install `wolframalpha` first. You can install it by" - " running `pip install wolframalpha`." - ) + import wolframalpha WOLFRAMALPHA_APP_ID = os.environ.get('WOLFRAMALPHA_APP_ID') if not WOLFRAMALPHA_APP_ID: @@ -284,28 +285,105 @@ def query_wolfram_alpha(self, query: str, is_detailed: bool) -> str: try: client = wolframalpha.Client(WOLFRAMALPHA_APP_ID) res = client.query(query) - assumption = next(res.pods).text or "No assumption made." - answer = next(res.results).text or "No answer found." + except Exception as e: - if isinstance(e, StopIteration): - return "Wolfram Alpha wasn't able to answer it" - else: - error_message = ( - f"Wolfram Alpha wasn't able to answer it" f"{e!s}." - ) - return error_message + return f"Wolfram Alpha wasn't able to answer it. Error: {e}" - result = f"Assumption:\n{assumption}\n\nAnswer:\n{answer}" + pased_result = self._parse_wolfram_result(res) - # Add additional details in the result if is_detailed: - result += '\n' - for pod in res.pods: - result += '\n' + pod['@title'] + ':\n' - for sub in pod.subpods: - result += (sub.plaintext or "None") + '\n' + step_info = self._get_wolframalpha_step_by_step_solution( + WOLFRAMALPHA_APP_ID, query + ) + pased_result["steps"] = step_info + return pased_result + + return pased_result["final_answer"] + + def _parse_wolfram_result(self, result) -> Dict[str, Any]: + r"""Parses a Wolfram Alpha API result into a structured dictionary + format. + + Args: + result: The API result returned from a Wolfram Alpha + query, structured with multiple pods, each containing specific + information related to the query. + + Returns: + dict: A structured dictionary with the original query and the + final answer. + """ + + # Extract the original query + query = result.get('@inputstring', '') + + # Initialize a dictionary to hold structured output + output = {"query": query, "pod_info": [], "final_answer": None} + + # Loop through each pod to extract the details + for pod in result.get('pod', []): + pod_info = { + "title": pod.get('@title', ''), + "description": pod.get('subpod', {}).get('plaintext', ''), + "image_url": pod.get('subpod', {}) + .get('img', {}) + .get('@src', ''), + } + + # Add to steps list + output["pod_info"].append(pod_info) + + # Get final answer + if pod.get('@primary', False): + output["final_answer"] = pod_info["description"] - return result.rstrip() + return output + + def _get_wolframalpha_step_by_step_solution( + self, app_id: str, query: str + ) -> dict: + r"""Retrieve a step-by-step solution from the Wolfram Alpha API for a + given query. + + Args: + app_id (str): Your Wolfram Alpha API application ID. + query (str): The mathematical or computational query to solve. + + Returns: + dict: The step-by-step solution response text from the Wolfram + Alpha API. + """ + # Define the base URL + url = "https://api.wolframalpha.com/v2/query" + + # Set up the query parameters + params = { + 'appid': app_id, + 'input': query, + 'podstate': ['Result__Step-by-step solution', 'Show all steps'], + 'format': 'plaintext', + } + + # Send the request + response = requests.get(url, params=params) + root = ET.fromstring(response.text) + + # Extracting step-by-step hints and removing 'Hint: |' + steps = [] + for subpod in root.findall( + ".//pod[@title='Results']//subpod[stepbystepcontenttype='SBSHintStep']//plaintext" + ): + if subpod.text: + step_text = subpod.text.strip() + cleaned_step = step_text.replace('Hint: |', '').strip() + steps.append(cleaned_step) + + # Structuring the steps into a dictionary + structured_steps = {} + for i, step in enumerate(steps, start=1): + structured_steps[f"step{i}"] = step + + return structured_steps def tavily_search( self, query: str, num_results: int = 5, **kwargs diff --git a/test/toolkits/test_search_functions.py b/test/toolkits/test_search_functions.py index 71fb81b642..7405e5202c 100644 --- a/test/toolkits/test_search_functions.py +++ b/test/toolkits/test_search_functions.py @@ -13,7 +13,7 @@ # =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. =========== import os from unittest import mock -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, patch import pytest import requests @@ -184,57 +184,142 @@ def test_search_duckduckgo_images(): ) +@patch('requests.get') @patch('wolframalpha.Client') @patch('os.environ.get') -def test_query_wolfram_alpha(mock_get, mock_client, search_toolkit): - mock_get.return_value = 'FAKE_APP_ID' - - # Create mock subpods objects - mock_subpods1 = [ - MagicMock(plaintext="lim_(x->0) (sin^2(x))/x = 0"), - MagicMock(plaintext="lim_(x->-∞) (sin^2(x))/x = 0"), - MagicMock(plaintext="lim_(x->∞) (sin^2(x))/x = 0"), - ] - mock_subpods2 = [MagicMock(plaintext=None)] - - # Create mock pods objects - mock_pod1 = MagicMock() - mock_pod1.subpods = mock_subpods1 - mock_pod1.__getitem__.side_effect = lambda key: {'@title': 'Limit'}[key] - - mock_pod2 = MagicMock() - mock_pod2.subpods = mock_subpods2 - mock_pod2.__getitem__.side_effect = lambda key: {'@title': 'Plot'}[key] - - # Create mock results object - mock_results = MagicMock(text="lim_(x->0) (sin^2(x))/x = 0") - - # Create mock res object - mock_res = MagicMock() - mock_res.pods.__iter__.return_value = iter([mock_pod1, mock_pod2]) - mock_res.results.__iter__.return_value = iter([mock_results]) +def test_query_wolfram_alpha(mock_get_env, mock_client, mock_requests_get): + mock_get_env.return_value = 'FAKE_APP_ID' - # Configure the text attribute of the object returned by the next method - mock_res.pods.__next__.return_value.text = "lim_(x->0) (sin^2(x))/x = 0" - mock_res.results.__next__.return_value.text = "lim_(x->0) (sin^2(x))/x = 0" + mock_res = MagicMock() + mock_res.get.side_effect = lambda key, default: { + '@inputstring': 'calculate limit of sinx^2/x', + 'pod': [ + { + '@title': 'Limit', + 'subpod': {'plaintext': 'lim_(x->0) (sin^2(x))/x = 0'}, + }, + { + '@title': 'Plot', + 'subpod': {'plaintext': None}, + }, + ], + }[key] - # Configure the mock client instance to return the mock response mock_instance = MagicMock() mock_instance.query.return_value = mock_res mock_client.return_value = mock_instance - result = search_toolkit.query_wolfram_alpha( + mock_requests_get.return_value = MagicMock(status_code=200) + mock_requests_get.return_value.text = """ + + + + lim_(x->0) (sin^2(x))/x = 0</plaintext> + </subpod> + </pod> + <pod title="Plot"> + <subpod> + <plaintext></plaintext> + </subpod> + </pod> + </queryresult> + """ + + result = SearchToolkit().query_wolfram_alpha( "calculate limit of sinx^2/x", True ) - expected_output = ( - "Assumption:\n" - "lim_(x->0) (sin^2(x))/x = 0\n\n" - "Answer:\n" - "lim_(x->0) (sin^2(x))/x = 0\n\n" - "Limit:\n" - "lim_(x->0) (sin^2(x))/x = 0\n" - "lim_(x->-∞) (sin^2(x))/x = 0\n" - "lim_(x->∞) (sin^2(x))/x = 0\n\n" - "Plot:\nNone" - ) + + expected_output = { + "query": "calculate limit of sinx^2/x", + "pod_info": [ + { + "title": "Limit", + "description": "lim_(x->0) (sin^2(x))/x = 0", + "image_url": '', + }, + { + "title": "Plot", + "description": None, + "image_url": '', + }, + ], + "final_answer": None, + "steps": {}, + } + assert result == expected_output + + +def test_parse_wolfram_result(): + sample_wolfram_result = { + "@inputstring": "What is 2+2?", + "pod": [ + { + "@title": "Input", + "subpod": { + "plaintext": "2 + 2", + "img": {"@src": "http://example.com/image1.png"}, + }, + }, + { + "@title": "Result", + "subpod": { + "plaintext": "4", + "img": {"@src": "http://example.com/image2.png"}, + }, + "@primary": "true", + }, + ], + } + expected_output = { + "query": "What is 2+2?", + "pod_info": [ + { + "title": "Input", + "description": "2 + 2", + "image_url": "http://example.com/image1.png", + }, + { + "title": "Result", + "description": "4", + "image_url": "http://example.com/image2.png", + }, + ], + "final_answer": "4", + } + + result = SearchToolkit()._parse_wolfram_result(sample_wolfram_result) + + assert ( + result == expected_output + ), f"Expected {expected_output}, but got {result}" + + +@patch('requests.get') +def test_get_wolframalpha_step_by_step_solution(mock_get): + sample_response = """ + <queryresult> + <pod title="Results"> + <subpod> + <stepbystepcontenttype>SBSHintStep</stepbystepcontenttype> + <plaintext>Hint: | Step 1</plaintext> + </subpod> + <subpod> + <stepbystepcontenttype>SBSHintStep</stepbystepcontenttype> + <plaintext>Hint: | Step 2</plaintext> + </subpod> + </pod> + </queryresult> + """ + + mock_get.return_value = Mock(text=sample_response) + + expected_steps = {"step1": "Step 1", "step2": "Step 2"} + + result = SearchToolkit()._get_wolframalpha_step_by_step_solution( + "dummy_app_id", "dummy_query" + ) + + assert ( + result == expected_steps + ), f"Expected {expected_steps}, but got {result}"