Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added New Retriever: BingRM #1994

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions dspy/retrieve/bing_rm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Bing Retriever
The Bing Retriever leverages the Bing API to perform web based searches as a DSPy compatible retriever.

Various APIs exist within Azure's Bing Search, although only the following are (currently) supported:
- `search` (a.k.a `web_search`); default
- `news`

## Setup
The Bing Retriever requires an API key from Azure. Various tiers for the API exist, including a free tier.

1. Get an [API key](https://portal.azure.com/#create/Microsoft.BingSearch)
2. Set the API as an environment variable: `BING_API_KEY`
```bash
export BING_API_KEY='your_api_key_here'
```

## Example Usage
1. Retrieve via settings (recommended)
```python
import dspy
from dspy.retrieve.bing_rm import BingRM

dspy.settings.configure(
rm=BingRM()
)

bing = dspy.Retrieve(k=3)
results = bing("Current interest rates in the USA")
# Returns a list of strings
```
2. Simple News Retrieve; topk=3
```python
import dspy
from dspy.retrieve.bing_rm import BingRM

bing = BingRM(api="news")
results = bing("OpenAI o3 model", k=3)
# Returns a list of BingSource objects, top 3; easily parsable
```

### Parsing BingSource Objects
BingSource objects are used to format results from Bing. When retrieving from Bing via the `dspy.Retrieve` function, results are returned as strings. When retrieving directly from `BingRM`, results are returned as BingSource objects. BingSource objects also contain structured metadata specific to the article retrieved from Bing. This metadata is formatted into the string results which are returned from `dspy.Retrieve`.
BingSource objects can be easily cast into strings.
For example (beginning where example 2 ends):
```python
str_results = [
str(result) #auto casts and formats to string
for result in results
]
```

## Additional Notes
- Reranking is done by default by the Bing API itself.
- For more information on the underlying API, please see its [documentation](https://learn.microsoft.com/en-us/bing/search-apis/bing-web-search/).
- For more information on the internals of the API itself, please see [base.py](./base) and [config.py](./config.py)
- An SDK does exist for Bing Search, however it is not longer maintained [according to PyPi](https://pypi.org/project/azure-cognitiveservices-search-websearch/). For this reason, the (custom) modules referenced above are used to interact with the API.
- The underlying classes used to query Bing do support async, but because DSPy does not currently suppport async retrieval via their retriever modules this has not been integrated into the BingRM class.
- For more information regarding async search with Bing, see the methods `async_search` and `search_all` within `BingClient`.
5 changes: 5 additions & 0 deletions dspy/retrieve/bing_rm/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .retriever import BingRM

__all__ = [
"BingRM"
]
273 changes: 273 additions & 0 deletions dspy/retrieve/bing_rm/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
import requests
import aiohttp
import asyncio
from dspy.retrieve.bing_rm.config import BingConfig
from concurrent.futures import ThreadPoolExecutor
from pydantic import Field, BaseModel
from typing import (
Optional
, List
, Dict
)
import logging

class BingResponse(BaseModel):
"""
A response class for the Bing API.
Contains many class methods to handle different types of responses.
"""
status: int = Field(..., description="HTTP status code")
data: list = Field(..., description="The results of the search")
err_message: Optional[str] = Field(None, description="Any error message")

@classmethod
def from_search_json(
cls
, results: dict
, status:int=200
) -> "BingResponse":
pages = [ # kinda ugly but I promise it makes sense
{
"headline": page.get("name")
, "url": page.get("url")
, "published": page.get("datePublished")
, "text": page.get("snippet")
} for page in
results.get("webPages", {}).get("value", [])
]

return cls(
status=status
, data=pages
)

@classmethod
def from_news_json(
cls
, results: dict
, status:int=200
) -> "BingResponse":
pages = [
{
"headline": page.get("name")
, "url": page.get("url")
, "published": page.get("datePublished")
, "text": page.get("description")
, "provider": page.get("provider")[0].get("name")
} for page in
results.get("value", [])
]

return cls(
status=status
, data=pages
)

@classmethod
def from_search(cls, response: requests.Response) -> "BingResponse":
response.raise_for_status()
status = response.status_code
results = response.json()
pages = [ # kinda ugly but I promise it makes sense
{
"headline": page.get("name")
, "url": page.get("url")
, "published": page.get("datePublished")
, "text": page.get("snippet")
, "provider": None
} for page in
results.get("webPages", {}).get("value", [])
]

return cls(
status=status
, data=pages
)

@classmethod
def from_news(
cls
, response: requests.Response
) -> "BingResponse":
response.raise_for_status()
status = response.status_code
results = response.json()
pages = [
{
"headline": page.get("name")
, "url": page.get("url")
, "published": page.get("datePublished")
, "text": page.get("description")
, "provider": page.get("provider")[0].get("name")
} for page in
results.get("value", [])
]

return cls(
status=status
, data=pages
)

@classmethod
def from_error(cls, status, err_message: Optional[str]) -> "BingResponse":
return cls(
status=status
, data=[]
, err_message=err_message
)

@property
def warnings(self) -> bool:
return self.status != 200

class BingClient(BaseModel):
"""
A client for the Bing API.
Supports searching for web pages and news articles.
Current API support:
- News ("news")
- Web ("search")

Async support is also available, although it is not the default.
Likewise, async may not be currently compatible with DSPy.
"""
config: BingConfig = Field(
BingConfig.from_env()
, description="The configuration for the bing api"
)
method_matrix: dict = {
"search": BingResponse.from_search
, "web_search": BingResponse.from_search
, "news": BingResponse.from_news
}

def generate_reqs(
self
, api: str
, query: str
, mkt: str="en-US"
, pargs: dict={
"sortBy": "Relevance"
}
) -> Optional[dict]:
params = {
"q": query
, "mkt": mkt
, **pargs
}
headers = {
'Ocp-Apim-Subscription-Key': self.config.key
}
url = self.config.endpoint(api)
if not url:
return BingResponse(
status=404
, err_message=f"Invalid API / API not found: {api}"
)

return {
"url": url
, "params": params
, "headers": headers
}

def search(self, api: str, query: str) -> BingResponse:
"""
Searches Bing for a query using the specified api.

Args:
api (str): The api to use
query (str): The query to search for
Returns:
BingResponse: The response from the search
"""
reqs = self.generate_reqs(api, query)
if isinstance(reqs, BingResponse):
return reqs
params = reqs.get("params")
headers = reqs.get("headers")
url = reqs.get("url")

logging.info(f"GET @ {url}")
response = requests.get(
url
, headers=headers
, params=params
)
parser = self.method_matrix.get(api)

try:
return parser(
response
)
except requests.exceptions.HTTPError as err:
return BingResponse.from_error(
err.response.status_code, str(err)
)

async def async_search(self, api: str, query: str) -> BingResponse:
"""
Asynchronous version of search
"""
reqs = self.generate_reqs(api, query)
if isinstance(reqs, BingResponse):
return reqs
params = reqs.get("params")
headers = reqs.get("headers")
url = reqs.get("url")

logging.info(f"GET @ {url}")

parser = self.method_matrix.get(api)

async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, params=params) as response:
try:
status = response.status
if status != 200:
return BingResponse.from_error(
status=status
, err_message=f"Error {status}"
)
json = await response.json()
return BingResponse.from_search_json(
json
)
except aiohttp.ClientResponseError:
err = "Error "
return BingResponse.from_error(
err.status, str(err)
)

def search_all(
self
, api: str
, queries: List[str]
) -> Optional[Dict[str, BingResponse]]:
"""
Searches for multiple queries in parallel.
Be wary of rate limiting when using this.

Args:
api (str): The api to use
queries (List[str]): The queries to search for
Returns:
Dict[str, BingResponse]: A dictionary of responses; keys are queries
"""

async def async_search_all():
tasks = [self.async_search(api, query) for query in queries]
return await asyncio.gather(*tasks)

def run_in_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(async_search_all())

try:
with ThreadPoolExecutor() as executor:
results = executor.submit(run_in_thread).result()
return dict(zip(queries, results))
except Exception as e:
logging.error(f"Error in search_all: {str(e)}")
return None
51 changes: 51 additions & 0 deletions dspy/retrieve/bing_rm/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from pydantic import BaseModel, Field
from typing import Optional
import logging
import os

class BingConfig(BaseModel):
"""
Config setup for Bing API.
This class will automatically pull the API key and base URI from the environment
or they can be passed manually. For most use cases, the base_uri is not necessary
to specify as it should not change for most users.

Args:
key (str): The Bing API key.
base_uri (str, Optional): The base URI for the Bing API.
endpoints (dict, Optional): A dictionary of Bing API endpoints.
"""
key: str = Field(..., description="BING_API_KEY")
base_uri: str = Field(..., description="BING_BASE_URI")
endpoints: dict = {
"web_search": "v7.0/search", # Alias for search
"search": "v7.0/search",
"news": "v7.0/news/search",
}

@classmethod
def from_env(cls) -> "BingConfig":
key = os.getenv("BING_API_KEY")
base_uri = os.getenv("BING_BASE_URI", "https://api.bing.microsoft.com")

for req in [key, base_uri]:
assert req, f"Key/base url missing; key: {key}, base_url: {base_uri}"

return cls(
key=key
, base_uri=base_uri
)

def endpoint(self, endpoint: str) -> Optional[str]:
if endpoint not in self.endpoints:
wrn = f"Endpoint {endpoint} is currently unsupported"
logging.warning(wrn)
return None

path = os.path.join(
self.base_uri
, self.endpoints[endpoint]
)

return path

Loading
Loading