Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
More docs
Browse files Browse the repository at this point in the history
  • Loading branch information
chouinar committed Aug 27, 2024
1 parent 0f25373 commit 898e85d
Showing 1 changed file with 33 additions and 2 deletions.
35 changes: 33 additions & 2 deletions api/src/adapters/search/opensearch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ def bulk_upsert(
self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh)

def bulk_delete(self, index_name: str, ids: Iterable[Any], *, refresh: bool = True) -> None:
"""
Bulk delete records from an index
See: https://opensearch.org/docs/latest/api-reference/document-apis/bulk/ for details.
In this method, we delete records based on the IDs passed in.
"""
bulk_operations = []

for _id in ids:
Expand All @@ -130,11 +136,15 @@ def bulk_delete(self, index_name: str, ids: Iterable[Any], *, refresh: bool = Tr
self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh)

def index_exists(self, index_name: str) -> bool:
# Check if an index OR alias exists by a given name
"""
Check if an index OR alias exists by a given name
"""
return self._client.indices.exists(index_name)

def alias_exists(self, alias_name: str) -> bool:
# Check if an alias exists
"""
Check if an alias exists
"""
existing_index_mapping = self._client.cat.aliases(alias_name, format="json")
return len(existing_index_mapping) > 0

Expand Down Expand Up @@ -194,6 +204,24 @@ def scroll(
include_scores: bool = True,
duration: str = "10m",
) -> Generator[SearchResponse, None, None]:
"""
Scroll (iterate) over a large result set a given search query.
This query uses additional resources to keep the response open, but
keeps a consistent set of results and is useful for backend processes
that need to fetch a large amount of search data. After processing the results,
the scroll lock is closed for you.
This method is setup as a generator method and the results can be iterated over::
for response in search_client.scroll("my_index", {"size": 10000}):
for record in response.records:
process_record(record)
See: https://opensearch.org/docs/latest/api-reference/scroll/
"""

# start scroll
response = self.search(
index_name=index_name,
Expand All @@ -209,6 +237,9 @@ def scroll(
while True:
raw_response = self._client.scroll({"scroll_id": scroll_id, "scroll": duration})
response = SearchResponse.from_opensearch_response(raw_response, include_scores)

# The scroll ID can change between queries according to the docs, so we
# keep updating the value while iterating in case they change.
scroll_id = response.scroll_id

if len(response.records) == 0:
Expand Down

0 comments on commit 898e85d

Please sign in to comment.