diff --git a/api/src/adapters/search/__init__.py b/api/src/adapters/search/__init__.py index 166441e1d..6b2607a04 100644 --- a/api/src/adapters/search/__init__.py +++ b/api/src/adapters/search/__init__.py @@ -1,4 +1,4 @@ -from src.adapters.search.opensearch_client import SearchClient, get_opensearch_client +from src.adapters.search.opensearch_client import SearchClient from src.adapters.search.opensearch_config import get_opensearch_config -__all__ = ["SearchClient", "get_opensearch_client", "get_opensearch_config"] +__all__ = ["SearchClient", "get_opensearch_config"] diff --git a/api/src/adapters/search/opensearch_client.py b/api/src/adapters/search/opensearch_client.py index dadcfd7c4..b93d33917 100644 --- a/api/src/adapters/search/opensearch_client.py +++ b/api/src/adapters/search/opensearch_client.py @@ -1,25 +1,114 @@ -from typing import Any +import logging +from typing import Any, Sequence import opensearchpy from src.adapters.search.opensearch_config import OpensearchConfig, get_opensearch_config -# More configuration/setup coming in: -# TODO - https://github.com/navapbc/simpler-grants-gov/issues/13 +logger = logging.getLogger(__name__) -# Alias the OpenSearch client so that it doesn't need to be imported everywhere -# and to make it clear it's a client -SearchClient = opensearchpy.OpenSearch +class SearchClient: + def __init__(self, opensearch_config: OpensearchConfig | None = None) -> None: + if opensearch_config is None: + opensearch_config = get_opensearch_config() -def get_opensearch_client( - opensearch_config: OpensearchConfig | None = None, -) -> SearchClient: - if opensearch_config is None: - opensearch_config = get_opensearch_config() + # See: https://opensearch.org/docs/latest/clients/python-low-level/ for more details + self._client = opensearchpy.OpenSearch(**_get_connection_parameters(opensearch_config)) - # See: https://opensearch.org/docs/latest/clients/python-low-level/ for more details - return opensearchpy.OpenSearch(**_get_connection_parameters(opensearch_config)) + def create_index( + self, index_name: str, *, shard_count: int = 1, replica_count: int = 1 + ) -> None: + """ + Create an empty search index + """ + body = { + "settings": { + "index": {"number_of_shards": shard_count, "number_of_replicas": replica_count} + } + } + + logger.info("Creating search index %s", index_name, extra={"index_name": index_name}) + self._client.indices.create(index_name, body=body) + + def delete_index(self, index_name: str) -> None: + """ + Delete an index. Can also delete all indexes via a prefix. + """ + logger.info("Deleting search index %s", index_name, extra={"index_name": index_name}) + self._client.indices.delete(index=index_name) + + def bulk_upsert( + self, + index_name: str, + records: Sequence[dict[str, Any]], + primary_key_field: str, + *, + refresh: bool = True + ) -> None: + """ + Bulk upsert records to an index + + See: https://opensearch.org/docs/latest/api-reference/document-apis/bulk/ for details + In this method we only use the "index" operation which creates or updates a record + based on the id value. + """ + + bulk_operations = [] + + for record in records: + # For each record, we create two entries in the bulk operation list + # which include the unique ID + the actual record on separate lines + # When this is sent to the search index, this will send two lines like: + # + # {"index": {"_id": 123}} + # {"opportunity_id": 123, "opportunity_title": "example title", ...} + bulk_operations.append({"index": {"_id": record[primary_key_field]}}) + bulk_operations.append(record) + + logger.info( + "Upserting records to %s", + index_name, + extra={"index_name": index_name, "record_count": int(len(bulk_operations) / 2)}, + ) + self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh) + + def swap_alias_index( + self, index_name: str, alias_name: str, *, delete_prior_indexes: bool = False + ) -> None: + """ + For a given index, set it to the given alias. If any existing index(es) are + attached to the alias, remove them from the alias. + + This operation is done atomically. + """ + extra = {"index_name": index_name, "index_alias": alias_name} + logger.info("Swapping index that backs alias %s", alias_name, extra=extra) + + existing_index_mapping = self._client.cat.aliases(alias_name, format="json") + existing_indexes = [i["index"] for i in existing_index_mapping] + + logger.info( + "Found existing indexes", extra=extra | {"existing_indexes": ",".join(existing_indexes)} + ) + + actions = [{"add": {"index": index_name, "alias": alias_name}}] + + for index in existing_indexes: + actions.append({"remove": {"index": index, "alias": alias_name}}) + + self._client.indices.update_aliases({"actions": actions}) + + # Cleanup old indexes now that they aren't connected to the alias + if delete_prior_indexes: + for index in existing_indexes: + self.delete_index(index) + + def search(self, index_name: str, search_query: dict) -> dict: + # TODO - add more when we build out the request/response parsing logic + # we use something like Pydantic to help reorganize the response + # object into something easier to parse. + return self._client.search(index=index_name, body=search_query) def _get_connection_parameters(opensearch_config: OpensearchConfig) -> dict[str, Any]: diff --git a/api/src/app.py b/api/src/app.py index 0d584a683..e9604157b 100644 --- a/api/src/app.py +++ b/api/src/app.py @@ -18,6 +18,7 @@ from src.api.schemas import response_schema from src.auth.api_key_auth import get_app_security_scheme from src.data_migration.data_migration_blueprint import data_migration_blueprint +from src.search.backend.load_search_data_blueprint import load_search_data_blueprint from src.task import task_blueprint logger = logging.getLogger(__name__) @@ -103,8 +104,11 @@ def register_blueprints(app: APIFlask) -> None: app.register_blueprint(opportunities_v0_blueprint) app.register_blueprint(opportunities_v0_1_blueprint) app.register_blueprint(opportunities_v1_blueprint) + + # Non-api blueprints app.register_blueprint(data_migration_blueprint) app.register_blueprint(task_blueprint) + app.register_blueprint(load_search_data_blueprint) def get_project_root_dir() -> str: diff --git a/api/src/search/__init__.py b/api/src/search/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/src/search/backend/__init__.py b/api/src/search/backend/__init__.py new file mode 100644 index 000000000..00a43e108 --- /dev/null +++ b/api/src/search/backend/__init__.py @@ -0,0 +1,2 @@ +# import all files so they get initialized and attached to the blueprint +from . import load_search_data # noqa: F401 diff --git a/api/src/search/backend/load_opportunities_to_index.py b/api/src/search/backend/load_opportunities_to_index.py new file mode 100644 index 000000000..a01357a96 --- /dev/null +++ b/api/src/search/backend/load_opportunities_to_index.py @@ -0,0 +1,112 @@ +import logging +from enum import StrEnum +from typing import Iterator, Sequence + +from pydantic import Field +from pydantic_settings import SettingsConfigDict +from sqlalchemy import select +from sqlalchemy.orm import noload, selectinload + +import src.adapters.db as db +import src.adapters.search as search +from src.api.opportunities_v0_1.opportunity_schemas import OpportunityV01Schema +from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity +from src.task.task import Task +from src.util.datetime_util import get_now_us_eastern_datetime +from src.util.env_config import PydanticBaseEnvConfig + +logger = logging.getLogger(__name__) + + +class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig): + model_config = SettingsConfigDict(env_prefix="LOAD_OPP_SEARCH_") + + shard_count: int = Field(default=1) # LOAD_OPP_SEARCH_SHARD_COUNT + replica_count: int = Field(default=1) # LOAD_OPP_SEARCH_REPLICA_COUNT + + # TODO - these might make sense to come from some sort of opportunity-search-index-config? + # look into this a bit more when we setup the search endpoint itself. + alias_name: str = Field(default="opportunity-index-alias") # LOAD_OPP_SEARCH_ALIAS_NAME + index_prefix: str = Field(default="opportunity-index") # LOAD_OPP_INDEX_PREFIX + + +class LoadOpportunitiesToIndex(Task): + class Metrics(StrEnum): + RECORDS_LOADED = "records_loaded" + + def __init__( + self, + db_session: db.Session, + search_client: search.SearchClient, + config: LoadOpportunitiesToIndexConfig | None = None, + ) -> None: + super().__init__(db_session) + + self.search_client = search_client + + if config is None: + config = LoadOpportunitiesToIndexConfig() + self.config = config + + current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S") + self.index_name = f"{self.config.index_prefix}-{current_timestamp}" + self.set_metrics({"index_name": self.index_name}) + + def run_task(self) -> None: + # create the index + self.search_client.create_index( + self.index_name, + shard_count=self.config.shard_count, + replica_count=self.config.replica_count, + ) + + # load the records + for opp_batch in self.fetch_opportunities(): + self.load_records(opp_batch) + + # handle aliasing of endpoints + self.search_client.swap_alias_index( + self.index_name, self.config.alias_name, delete_prior_indexes=True + ) + + def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]: + """ + Fetch the opportunities in batches. The iterator returned + will give you each individual batch to be processed. + + Fetches all opportunities where: + * is_draft = False + * current_opportunity_summary is not None + """ + return ( + self.db_session.execute( + select(Opportunity) + .join(CurrentOpportunitySummary) + .where( + Opportunity.is_draft.is_(False), + CurrentOpportunitySummary.opportunity_status.isnot(None), + ) + .options(selectinload("*"), noload(Opportunity.all_opportunity_summaries)) + .execution_options(yield_per=5000) + ) + .scalars() + .partitions() + ) + + def load_records(self, records: Sequence[Opportunity]) -> None: + logger.info("Loading batch of opportunities...") + schema = OpportunityV01Schema() + json_records = [] + + for record in records: + logger.info( + "Preparing opportunity for upload to search index", + extra={ + "opportunity_id": record.opportunity_id, + "opportunity_status": record.opportunity_status, + }, + ) + json_records.append(schema.dump(record)) + self.increment(self.Metrics.RECORDS_LOADED) + + self.search_client.bulk_upsert(self.index_name, json_records, "opportunity_id") diff --git a/api/src/search/backend/load_search_data.py b/api/src/search/backend/load_search_data.py new file mode 100644 index 000000000..cf6f0445f --- /dev/null +++ b/api/src/search/backend/load_search_data.py @@ -0,0 +1,15 @@ +import src.adapters.db as db +import src.adapters.search as search +from src.adapters.db import flask_db +from src.search.backend.load_opportunities_to_index import LoadOpportunitiesToIndex +from src.search.backend.load_search_data_blueprint import load_search_data_blueprint + + +@load_search_data_blueprint.cli.command( + "load-opportunity-data", help="Load opportunity data from our database to the search index" +) +@flask_db.with_db_session() +def load_opportunity_data(db_session: db.Session) -> None: + search_client = search.SearchClient() + + LoadOpportunitiesToIndex(db_session, search_client).run() diff --git a/api/src/search/backend/load_search_data_blueprint.py b/api/src/search/backend/load_search_data_blueprint.py new file mode 100644 index 000000000..fffd9f915 --- /dev/null +++ b/api/src/search/backend/load_search_data_blueprint.py @@ -0,0 +1,5 @@ +from apiflask import APIBlueprint + +load_search_data_blueprint = APIBlueprint( + "load-search-data", __name__, enable_openapi=False, cli_group="load-search-data" +) diff --git a/api/tests/conftest.py b/api/tests/conftest.py index 97173e9a7..4b45c4f2c 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -151,25 +151,35 @@ def test_foreign_schema(db_schema_prefix): @pytest.fixture(scope="session") def search_client() -> search.SearchClient: - return search.get_opensearch_client() + client = search.SearchClient() + try: + yield client + finally: + # Just in case a test setup an index + # in a way that didn't clean it up, delete + # all indexes at the end of a run that start with test + client.delete_index("test-*") @pytest.fixture(scope="session") def opportunity_index(search_client): - # TODO - will adjust this in the future to use utils we'll build - # for setting up / aliasing indexes. For now, keep it simple - # create a random index name just to make sure it won't ever conflict # with an actual one, similar to how we create schemas for database tests - index_name = f"test_{uuid.uuid4().int}_opportunity" + index_name = f"test-opportunity-index-{uuid.uuid4().int}" - search_client.indices.create(index_name, body={}) + search_client.create_index(index_name) try: yield index_name finally: # Try to clean up the index at the end - search_client.indices.delete(index_name) + search_client.delete_index(index_name) + + +@pytest.fixture(scope="session") +def opportunity_index_alias(search_client): + # Note we don't actually create anything, this is just a random name + return f"test-opportunity-index-alias-{uuid.uuid4().int}" #################### diff --git a/api/tests/src/adapters/search/test_opensearch.py b/api/tests/src/adapters/search/test_opensearch.py deleted file mode 100644 index 490ffcb3b..000000000 --- a/api/tests/src/adapters/search/test_opensearch.py +++ /dev/null @@ -1,58 +0,0 @@ -######################################## -# This is a placeholder set of tests, -# we'll evolve / change the structure -# as we continue developing this -# -# Just wanted something simple so I can verify -# the early steps of this setup are working -# before we actually have code to use -######################################## - - -def test_index_is_running(search_client, opportunity_index): - # Very simple test, will rewrite / remove later once we have something - # more meaningful to test. - - existing_indexes = search_client.cat.indices(format="json") - - found_opportunity_index = False - for index in existing_indexes: - if index["index"] == opportunity_index: - found_opportunity_index = True - break - - assert found_opportunity_index is True - - # Add a few records to the index - - record1 = { - "opportunity_id": 1, - "opportunity_title": "Research into how to make a search engine", - "opportunity_status": "posted", - } - record2 = { - "opportunity_id": 2, - "opportunity_title": "Research about words, and more words!", - "opportunity_status": "forecasted", - } - - search_client.index(index=opportunity_index, body=record1, id=1, refresh=True) - search_client.index(index=opportunity_index, body=record2, id=2, refresh=True) - - search_request = { - "query": { - "bool": { - "must": { - "simple_query_string": {"query": "research", "fields": ["opportunity_title"]} - } - } - } - } - response = search_client.search(index=opportunity_index, body=search_request) - assert response["hits"]["total"]["value"] == 2 - - filter_request = { - "query": {"bool": {"filter": [{"terms": {"opportunity_status": ["forecasted"]}}]}} - } - response = search_client.search(index=opportunity_index, body=filter_request) - assert response["hits"]["total"]["value"] == 1 diff --git a/api/tests/src/adapters/search/test_opensearch_client.py b/api/tests/src/adapters/search/test_opensearch_client.py new file mode 100644 index 000000000..d9ba22194 --- /dev/null +++ b/api/tests/src/adapters/search/test_opensearch_client.py @@ -0,0 +1,105 @@ +import uuid + +import pytest + +######################################################################## +# These tests are primarily looking to validate +# that our wrappers around the OpenSearch client +# are being used correctly / account for error cases correctly. +# +# We are not validating all the intricacies of OpenSearch itself. +######################################################################## + + +@pytest.fixture +def generic_index(search_client): + # This is very similar to the opportunity_index fixture, but + # is reused per unit test rather than a global value + index_name = f"test-index-{uuid.uuid4().int}" + + search_client.create_index(index_name) + + try: + yield index_name + finally: + # Try to clean up the index at the end + search_client.delete_index(index_name) + + +def test_create_and_delete_index_duplicate(search_client): + index_name = f"test-index-{uuid.uuid4().int}" + + search_client.create_index(index_name) + with pytest.raises(Exception, match="already exists"): + search_client.create_index(index_name) + + # Cleanup the index + search_client.delete_index(index_name) + with pytest.raises(Exception, match="no such index"): + search_client.delete_index(index_name) + + +def test_bulk_upsert(search_client, generic_index): + records = [ + {"id": 1, "title": "Green Eggs & Ham", "notes": "why are the eggs green?"}, + {"id": 2, "title": "The Cat in the Hat", "notes": "silly cat wears a hat"}, + {"id": 3, "title": "One Fish, Two Fish, Red Fish, Blue Fish", "notes": "fish"}, + ] + + search_client.bulk_upsert(generic_index, records, primary_key_field="id") + + # Verify the records are in the index + for record in records: + assert search_client._client.get(generic_index, record["id"])["_source"] == record + + # Can update + add more + records = [ + {"id": 1, "title": "Green Eggs & Ham", "notes": "Sam, eat the eggs"}, + {"id": 2, "title": "The Cat in the Hat", "notes": "watch the movie"}, + {"id": 3, "title": "One Fish, Two Fish, Red Fish, Blue Fish", "notes": "colors & numbers"}, + {"id": 4, "title": "How the Grinch Stole Christmas", "notes": "who"}, + ] + search_client.bulk_upsert(generic_index, records, primary_key_field="id") + + for record in records: + assert search_client._client.get(generic_index, record["id"])["_source"] == record + + +def test_swap_alias_index(search_client, generic_index): + alias_name = f"tmp-alias-{uuid.uuid4().int}" + + # Populate the generic index, we won't immediately use this one + records = [ + {"id": 1, "data": "abc123"}, + {"id": 2, "data": "def456"}, + {"id": 3, "data": "xyz789"}, + ] + search_client.bulk_upsert(generic_index, records, primary_key_field="id") + + # Create a different index that we'll attach to the alias first. + tmp_index = f"test-tmp-index-{uuid.uuid4().int}" + search_client.create_index(tmp_index) + # Add a few records + tmp_index_records = [ + {"id": 1, "data": "abc123"}, + {"id": 2, "data": "xyz789"}, + ] + search_client.bulk_upsert(tmp_index, tmp_index_records, primary_key_field="id") + + # Set the alias + search_client.swap_alias_index(tmp_index, alias_name, delete_prior_indexes=True) + + # Can search by this alias and get records from the tmp index + resp = search_client.search(alias_name, {}) + resp_records = [record["_source"] for record in resp["hits"]["hits"]] + assert resp_records == tmp_index_records + + # Swap the index to the generic one + delete the tmp one + search_client.swap_alias_index(generic_index, alias_name, delete_prior_indexes=True) + + resp = search_client.search(alias_name, {}) + resp_records = [record["_source"] for record in resp["hits"]["hits"]] + assert resp_records == records + + # Verify the tmp one was deleted + assert search_client._client.indices.exists(tmp_index) is False diff --git a/api/tests/src/search/__init__.py b/api/tests/src/search/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/tests/src/search/backend/__init__.py b/api/tests/src/search/backend/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/tests/src/search/backend/test_load_opportunities_to_index.py b/api/tests/src/search/backend/test_load_opportunities_to_index.py new file mode 100644 index 000000000..a079b83c8 --- /dev/null +++ b/api/tests/src/search/backend/test_load_opportunities_to_index.py @@ -0,0 +1,91 @@ +import pytest + +from src.search.backend.load_opportunities_to_index import ( + LoadOpportunitiesToIndex, + LoadOpportunitiesToIndexConfig, +) +from tests.conftest import BaseTestClass +from tests.src.db.models.factories import OpportunityFactory + + +class TestLoadOpportunitiesToIndex(BaseTestClass): + @pytest.fixture(scope="class") + def load_opportunities_to_index(self, db_session, search_client, opportunity_index_alias): + config = LoadOpportunitiesToIndexConfig( + alias_name=opportunity_index_alias, index_prefix="test-load-opps" + ) + return LoadOpportunitiesToIndex(db_session, search_client, config) + + def test_load_opportunities_to_index( + self, + truncate_opportunities, + enable_factory_create, + search_client, + opportunity_index_alias, + load_opportunities_to_index, + ): + # Create 25 opportunities we will load into the search index + opportunities = [] + opportunities.extend(OpportunityFactory.create_batch(size=6, is_posted_summary=True)) + opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True)) + opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True)) + opportunities.extend( + OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True) + ) + opportunities.extend( + OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True) + ) + + # Create some opportunities that won't get fetched / loaded into search + OpportunityFactory.create_batch(size=3, is_draft=True) + OpportunityFactory.create_batch(size=4, no_current_summary=True) + + load_opportunities_to_index.run() + # Verify some metrics first + assert ( + len(opportunities) + == load_opportunities_to_index.metrics[ + load_opportunities_to_index.Metrics.RECORDS_LOADED + ] + ) + + # Just do some rough validation that the data is present + resp = search_client.search(opportunity_index_alias, {"size": 100}) + + total_records = resp["hits"]["total"]["value"] + assert total_records == len(opportunities) + + records = [record["_source"] for record in resp["hits"]["hits"]] + assert set([opp.opportunity_id for opp in opportunities]) == set( + [record["opportunity_id"] for record in records] + ) + + # Rerunning without changing anything about the data in the DB doesn't meaningfully change anything + load_opportunities_to_index.index_name = load_opportunities_to_index.index_name + "-another" + load_opportunities_to_index.run() + resp = search_client.search(opportunity_index_alias, {"size": 100}) + + total_records = resp["hits"]["total"]["value"] + assert total_records == len(opportunities) + + records = [record["_source"] for record in resp["hits"]["hits"]] + assert set([opp.opportunity_id for opp in opportunities]) == set( + [record["opportunity_id"] for record in records] + ) + + # Rerunning but first add a few more opportunities to show up + opportunities.extend(OpportunityFactory.create_batch(size=3)) + load_opportunities_to_index.index_name = ( + load_opportunities_to_index.index_name + "-new-data" + ) + load_opportunities_to_index.run() + + resp = search_client.search(opportunity_index_alias, {"size": 100}) + + total_records = resp["hits"]["total"]["value"] + assert total_records == len(opportunities) + + records = [record["_source"] for record in resp["hits"]["hits"]] + assert set([opp.opportunity_id for opp in opportunities]) == set( + [record["opportunity_id"] for record in records] + )