From 251524d14caad2da22d5105ed82d65060d05860f Mon Sep 17 00:00:00 2001 From: Michael Chouinard <46358556+chouinar@users.noreply.github.com> Date: Wed, 22 May 2024 16:15:25 -0400 Subject: [PATCH] [Issue #2092] Populate the search index from the opportunity tables (navapbc/simpler-grants-gov#47) Fixes #2092 Setup a script to populate the search index by loading opportunities from the DB, jsonify'ing them, loading them into a new index, and then aliasing that index. Several utilities were created for simplifying working with the OpenSearch client (a wrapper for setting up configuration / patterns) Iterating over the opportunities and doing something with them is a common pattern in several of our scripts, so nothing is really different there. The meaningful implementation is how we handle creating and aliasing the index. In OpenSearch you can give any index an alias (including putting multiple indexes behind the same alias). The approach is pretty simple: * Create an index * Load opportunities into the index * Atomically swap the index backing the `opportunity-index-alias` * Delete the old index if they exist This approach means that our search endpoint just needs to query the alias, and we can keep making new indexes and swapping them out behind the scenes. Because we could remake the index every few minutes, if we ever need to re-configure things like the number of shards, or any other index-creation configuration, we just update that in this script and wait for it to run again. I ran this locally after loading `83250` records, and it took about 61s. You can run this locally yourself by doing: ```sh make init make db-seed-local poetry run flask load-search-data load-opportunity-data ``` If you'd like to see the data, you can test it out on http://localhost:5601/app/dev_tools#/console - here is an example query that filters by the word `research` across a few fields and filters to just forecasted/posted. ```json GET opportunity-index-alias/_search { "size": 25, "from": 0, "query": { "bool": { "must": [ { "simple_query_string": { "query": "research", "default_operator": "AND", "fields": ["agency.keyword^16", "opportunity_title^2", "opportunity_number^12", "summary.summary_description", "opportunity_assistance_listings.assistance_listing_number^10", "opportunity_assistance_listings.program_title^4"] } } ], "filter": [ { "terms": { "opportunity_status": [ "forecasted", "posted" ] } } ] } } } ``` --- api/src/adapters/search/__init__.py | 4 +- api/src/adapters/search/opensearch_client.py | 115 ++++++++++++++++-- api/src/app.py | 4 + api/src/search/__init__.py | 0 api/src/search/backend/__init__.py | 2 + .../backend/load_opportunities_to_index.py | 112 +++++++++++++++++ api/src/search/backend/load_search_data.py | 15 +++ .../backend/load_search_data_blueprint.py | 5 + api/tests/conftest.py | 24 ++-- .../src/adapters/search/test_opensearch.py | 58 --------- .../adapters/search/test_opensearch_client.py | 105 ++++++++++++++++ api/tests/src/search/__init__.py | 0 api/tests/src/search/backend/__init__.py | 0 .../test_load_opportunities_to_index.py | 91 ++++++++++++++ 14 files changed, 455 insertions(+), 80 deletions(-) create mode 100644 api/src/search/__init__.py create mode 100644 api/src/search/backend/__init__.py create mode 100644 api/src/search/backend/load_opportunities_to_index.py create mode 100644 api/src/search/backend/load_search_data.py create mode 100644 api/src/search/backend/load_search_data_blueprint.py delete mode 100644 api/tests/src/adapters/search/test_opensearch.py create mode 100644 api/tests/src/adapters/search/test_opensearch_client.py create mode 100644 api/tests/src/search/__init__.py create mode 100644 api/tests/src/search/backend/__init__.py create mode 100644 api/tests/src/search/backend/test_load_opportunities_to_index.py diff --git a/api/src/adapters/search/__init__.py b/api/src/adapters/search/__init__.py index 166441e1d..6b2607a04 100644 --- a/api/src/adapters/search/__init__.py +++ b/api/src/adapters/search/__init__.py @@ -1,4 +1,4 @@ -from src.adapters.search.opensearch_client import SearchClient, get_opensearch_client +from src.adapters.search.opensearch_client import SearchClient from src.adapters.search.opensearch_config import get_opensearch_config -__all__ = ["SearchClient", "get_opensearch_client", "get_opensearch_config"] +__all__ = ["SearchClient", "get_opensearch_config"] diff --git a/api/src/adapters/search/opensearch_client.py b/api/src/adapters/search/opensearch_client.py index dadcfd7c4..b93d33917 100644 --- a/api/src/adapters/search/opensearch_client.py +++ b/api/src/adapters/search/opensearch_client.py @@ -1,25 +1,114 @@ -from typing import Any +import logging +from typing import Any, Sequence import opensearchpy from src.adapters.search.opensearch_config import OpensearchConfig, get_opensearch_config -# More configuration/setup coming in: -# TODO - https://github.com/navapbc/simpler-grants-gov/issues/13 +logger = logging.getLogger(__name__) -# Alias the OpenSearch client so that it doesn't need to be imported everywhere -# and to make it clear it's a client -SearchClient = opensearchpy.OpenSearch +class SearchClient: + def __init__(self, opensearch_config: OpensearchConfig | None = None) -> None: + if opensearch_config is None: + opensearch_config = get_opensearch_config() -def get_opensearch_client( - opensearch_config: OpensearchConfig | None = None, -) -> SearchClient: - if opensearch_config is None: - opensearch_config = get_opensearch_config() + # See: https://opensearch.org/docs/latest/clients/python-low-level/ for more details + self._client = opensearchpy.OpenSearch(**_get_connection_parameters(opensearch_config)) - # See: https://opensearch.org/docs/latest/clients/python-low-level/ for more details - return opensearchpy.OpenSearch(**_get_connection_parameters(opensearch_config)) + def create_index( + self, index_name: str, *, shard_count: int = 1, replica_count: int = 1 + ) -> None: + """ + Create an empty search index + """ + body = { + "settings": { + "index": {"number_of_shards": shard_count, "number_of_replicas": replica_count} + } + } + + logger.info("Creating search index %s", index_name, extra={"index_name": index_name}) + self._client.indices.create(index_name, body=body) + + def delete_index(self, index_name: str) -> None: + """ + Delete an index. Can also delete all indexes via a prefix. + """ + logger.info("Deleting search index %s", index_name, extra={"index_name": index_name}) + self._client.indices.delete(index=index_name) + + def bulk_upsert( + self, + index_name: str, + records: Sequence[dict[str, Any]], + primary_key_field: str, + *, + refresh: bool = True + ) -> None: + """ + Bulk upsert records to an index + + See: https://opensearch.org/docs/latest/api-reference/document-apis/bulk/ for details + In this method we only use the "index" operation which creates or updates a record + based on the id value. + """ + + bulk_operations = [] + + for record in records: + # For each record, we create two entries in the bulk operation list + # which include the unique ID + the actual record on separate lines + # When this is sent to the search index, this will send two lines like: + # + # {"index": {"_id": 123}} + # {"opportunity_id": 123, "opportunity_title": "example title", ...} + bulk_operations.append({"index": {"_id": record[primary_key_field]}}) + bulk_operations.append(record) + + logger.info( + "Upserting records to %s", + index_name, + extra={"index_name": index_name, "record_count": int(len(bulk_operations) / 2)}, + ) + self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh) + + def swap_alias_index( + self, index_name: str, alias_name: str, *, delete_prior_indexes: bool = False + ) -> None: + """ + For a given index, set it to the given alias. If any existing index(es) are + attached to the alias, remove them from the alias. + + This operation is done atomically. + """ + extra = {"index_name": index_name, "index_alias": alias_name} + logger.info("Swapping index that backs alias %s", alias_name, extra=extra) + + existing_index_mapping = self._client.cat.aliases(alias_name, format="json") + existing_indexes = [i["index"] for i in existing_index_mapping] + + logger.info( + "Found existing indexes", extra=extra | {"existing_indexes": ",".join(existing_indexes)} + ) + + actions = [{"add": {"index": index_name, "alias": alias_name}}] + + for index in existing_indexes: + actions.append({"remove": {"index": index, "alias": alias_name}}) + + self._client.indices.update_aliases({"actions": actions}) + + # Cleanup old indexes now that they aren't connected to the alias + if delete_prior_indexes: + for index in existing_indexes: + self.delete_index(index) + + def search(self, index_name: str, search_query: dict) -> dict: + # TODO - add more when we build out the request/response parsing logic + # we use something like Pydantic to help reorganize the response + # object into something easier to parse. + return self._client.search(index=index_name, body=search_query) def _get_connection_parameters(opensearch_config: OpensearchConfig) -> dict[str, Any]: diff --git a/api/src/app.py b/api/src/app.py index 0d584a683..e9604157b 100644 --- a/api/src/app.py +++ b/api/src/app.py @@ -18,6 +18,7 @@ from src.api.schemas import response_schema from src.auth.api_key_auth import get_app_security_scheme from src.data_migration.data_migration_blueprint import data_migration_blueprint +from src.search.backend.load_search_data_blueprint import load_search_data_blueprint from src.task import task_blueprint logger = logging.getLogger(__name__) @@ -103,8 +104,11 @@ def register_blueprints(app: APIFlask) -> None: app.register_blueprint(opportunities_v0_blueprint) app.register_blueprint(opportunities_v0_1_blueprint) app.register_blueprint(opportunities_v1_blueprint) + + # Non-api blueprints app.register_blueprint(data_migration_blueprint) app.register_blueprint(task_blueprint) + app.register_blueprint(load_search_data_blueprint) def get_project_root_dir() -> str: diff --git a/api/src/search/__init__.py b/api/src/search/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/src/search/backend/__init__.py b/api/src/search/backend/__init__.py new file mode 100644 index 000000000..00a43e108 --- /dev/null +++ b/api/src/search/backend/__init__.py @@ -0,0 +1,2 @@ +# import all files so they get initialized and attached to the blueprint +from . import load_search_data # noqa: F401 diff --git a/api/src/search/backend/load_opportunities_to_index.py b/api/src/search/backend/load_opportunities_to_index.py new file mode 100644 index 000000000..a01357a96 --- /dev/null +++ b/api/src/search/backend/load_opportunities_to_index.py @@ -0,0 +1,112 @@ +import logging +from enum import StrEnum +from typing import Iterator, Sequence + +from pydantic import Field +from pydantic_settings import SettingsConfigDict +from sqlalchemy import select +from sqlalchemy.orm import noload, selectinload + +import src.adapters.db as db +import src.adapters.search as search +from src.api.opportunities_v0_1.opportunity_schemas import OpportunityV01Schema +from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity +from src.task.task import Task +from src.util.datetime_util import get_now_us_eastern_datetime +from src.util.env_config import PydanticBaseEnvConfig + +logger = logging.getLogger(__name__) + + +class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig): + model_config = SettingsConfigDict(env_prefix="LOAD_OPP_SEARCH_") + + shard_count: int = Field(default=1) # LOAD_OPP_SEARCH_SHARD_COUNT + replica_count: int = Field(default=1) # LOAD_OPP_SEARCH_REPLICA_COUNT + + # TODO - these might make sense to come from some sort of opportunity-search-index-config? + # look into this a bit more when we setup the search endpoint itself. + alias_name: str = Field(default="opportunity-index-alias") # LOAD_OPP_SEARCH_ALIAS_NAME + index_prefix: str = Field(default="opportunity-index") # LOAD_OPP_INDEX_PREFIX + + +class LoadOpportunitiesToIndex(Task): + class Metrics(StrEnum): + RECORDS_LOADED = "records_loaded" + + def __init__( + self, + db_session: db.Session, + search_client: search.SearchClient, + config: LoadOpportunitiesToIndexConfig | None = None, + ) -> None: + super().__init__(db_session) + + self.search_client = search_client + + if config is None: + config = LoadOpportunitiesToIndexConfig() + self.config = config + + current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S") + self.index_name = f"{self.config.index_prefix}-{current_timestamp}" + self.set_metrics({"index_name": self.index_name}) + + def run_task(self) -> None: + # create the index + self.search_client.create_index( + self.index_name, + shard_count=self.config.shard_count, + replica_count=self.config.replica_count, + ) + + # load the records + for opp_batch in self.fetch_opportunities(): + self.load_records(opp_batch) + + # handle aliasing of endpoints + self.search_client.swap_alias_index( + self.index_name, self.config.alias_name, delete_prior_indexes=True + ) + + def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]: + """ + Fetch the opportunities in batches. The iterator returned + will give you each individual batch to be processed. + + Fetches all opportunities where: + * is_draft = False + * current_opportunity_summary is not None + """ + return ( + self.db_session.execute( + select(Opportunity) + .join(CurrentOpportunitySummary) + .where( + Opportunity.is_draft.is_(False), + CurrentOpportunitySummary.opportunity_status.isnot(None), + ) + .options(selectinload("*"), noload(Opportunity.all_opportunity_summaries)) + .execution_options(yield_per=5000) + ) + .scalars() + .partitions() + ) + + def load_records(self, records: Sequence[Opportunity]) -> None: + logger.info("Loading batch of opportunities...") + schema = OpportunityV01Schema() + json_records = [] + + for record in records: + logger.info( + "Preparing opportunity for upload to search index", + extra={ + "opportunity_id": record.opportunity_id, + "opportunity_status": record.opportunity_status, + }, + ) + json_records.append(schema.dump(record)) + self.increment(self.Metrics.RECORDS_LOADED) + + self.search_client.bulk_upsert(self.index_name, json_records, "opportunity_id") diff --git a/api/src/search/backend/load_search_data.py b/api/src/search/backend/load_search_data.py new file mode 100644 index 000000000..cf6f0445f --- /dev/null +++ b/api/src/search/backend/load_search_data.py @@ -0,0 +1,15 @@ +import src.adapters.db as db +import src.adapters.search as search +from src.adapters.db import flask_db +from src.search.backend.load_opportunities_to_index import LoadOpportunitiesToIndex +from src.search.backend.load_search_data_blueprint import load_search_data_blueprint + + +@load_search_data_blueprint.cli.command( + "load-opportunity-data", help="Load opportunity data from our database to the search index" +) +@flask_db.with_db_session() +def load_opportunity_data(db_session: db.Session) -> None: + search_client = search.SearchClient() + + LoadOpportunitiesToIndex(db_session, search_client).run() diff --git a/api/src/search/backend/load_search_data_blueprint.py b/api/src/search/backend/load_search_data_blueprint.py new file mode 100644 index 000000000..fffd9f915 --- /dev/null +++ b/api/src/search/backend/load_search_data_blueprint.py @@ -0,0 +1,5 @@ +from apiflask import APIBlueprint + +load_search_data_blueprint = APIBlueprint( + "load-search-data", __name__, enable_openapi=False, cli_group="load-search-data" +) diff --git a/api/tests/conftest.py b/api/tests/conftest.py index 97173e9a7..4b45c4f2c 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -151,25 +151,35 @@ def test_foreign_schema(db_schema_prefix): @pytest.fixture(scope="session") def search_client() -> search.SearchClient: - return search.get_opensearch_client() + client = search.SearchClient() + try: + yield client + finally: + # Just in case a test setup an index + # in a way that didn't clean it up, delete + # all indexes at the end of a run that start with test + client.delete_index("test-*") @pytest.fixture(scope="session") def opportunity_index(search_client): - # TODO - will adjust this in the future to use utils we'll build - # for setting up / aliasing indexes. For now, keep it simple - # create a random index name just to make sure it won't ever conflict # with an actual one, similar to how we create schemas for database tests - index_name = f"test_{uuid.uuid4().int}_opportunity" + index_name = f"test-opportunity-index-{uuid.uuid4().int}" - search_client.indices.create(index_name, body={}) + search_client.create_index(index_name) try: yield index_name finally: # Try to clean up the index at the end - search_client.indices.delete(index_name) + search_client.delete_index(index_name) + + +@pytest.fixture(scope="session") +def opportunity_index_alias(search_client): + # Note we don't actually create anything, this is just a random name + return f"test-opportunity-index-alias-{uuid.uuid4().int}" #################### diff --git a/api/tests/src/adapters/search/test_opensearch.py b/api/tests/src/adapters/search/test_opensearch.py deleted file mode 100644 index 490ffcb3b..000000000 --- a/api/tests/src/adapters/search/test_opensearch.py +++ /dev/null @@ -1,58 +0,0 @@ -######################################## -# This is a placeholder set of tests, -# we'll evolve / change the structure -# as we continue developing this -# -# Just wanted something simple so I can verify -# the early steps of this setup are working -# before we actually have code to use -######################################## - - -def test_index_is_running(search_client, opportunity_index): - # Very simple test, will rewrite / remove later once we have something - # more meaningful to test. - - existing_indexes = search_client.cat.indices(format="json") - - found_opportunity_index = False - for index in existing_indexes: - if index["index"] == opportunity_index: - found_opportunity_index = True - break - - assert found_opportunity_index is True - - # Add a few records to the index - - record1 = { - "opportunity_id": 1, - "opportunity_title": "Research into how to make a search engine", - "opportunity_status": "posted", - } - record2 = { - "opportunity_id": 2, - "opportunity_title": "Research about words, and more words!", - "opportunity_status": "forecasted", - } - - search_client.index(index=opportunity_index, body=record1, id=1, refresh=True) - search_client.index(index=opportunity_index, body=record2, id=2, refresh=True) - - search_request = { - "query": { - "bool": { - "must": { - "simple_query_string": {"query": "research", "fields": ["opportunity_title"]} - } - } - } - } - response = search_client.search(index=opportunity_index, body=search_request) - assert response["hits"]["total"]["value"] == 2 - - filter_request = { - "query": {"bool": {"filter": [{"terms": {"opportunity_status": ["forecasted"]}}]}} - } - response = search_client.search(index=opportunity_index, body=filter_request) - assert response["hits"]["total"]["value"] == 1 diff --git a/api/tests/src/adapters/search/test_opensearch_client.py b/api/tests/src/adapters/search/test_opensearch_client.py new file mode 100644 index 000000000..d9ba22194 --- /dev/null +++ b/api/tests/src/adapters/search/test_opensearch_client.py @@ -0,0 +1,105 @@ +import uuid + +import pytest + +######################################################################## +# These tests are primarily looking to validate +# that our wrappers around the OpenSearch client +# are being used correctly / account for error cases correctly. +# +# We are not validating all the intricacies of OpenSearch itself. +######################################################################## + + +@pytest.fixture +def generic_index(search_client): + # This is very similar to the opportunity_index fixture, but + # is reused per unit test rather than a global value + index_name = f"test-index-{uuid.uuid4().int}" + + search_client.create_index(index_name) + + try: + yield index_name + finally: + # Try to clean up the index at the end + search_client.delete_index(index_name) + + +def test_create_and_delete_index_duplicate(search_client): + index_name = f"test-index-{uuid.uuid4().int}" + + search_client.create_index(index_name) + with pytest.raises(Exception, match="already exists"): + search_client.create_index(index_name) + + # Cleanup the index + search_client.delete_index(index_name) + with pytest.raises(Exception, match="no such index"): + search_client.delete_index(index_name) + + +def test_bulk_upsert(search_client, generic_index): + records = [ + {"id": 1, "title": "Green Eggs & Ham", "notes": "why are the eggs green?"}, + {"id": 2, "title": "The Cat in the Hat", "notes": "silly cat wears a hat"}, + {"id": 3, "title": "One Fish, Two Fish, Red Fish, Blue Fish", "notes": "fish"}, + ] + + search_client.bulk_upsert(generic_index, records, primary_key_field="id") + + # Verify the records are in the index + for record in records: + assert search_client._client.get(generic_index, record["id"])["_source"] == record + + # Can update + add more + records = [ + {"id": 1, "title": "Green Eggs & Ham", "notes": "Sam, eat the eggs"}, + {"id": 2, "title": "The Cat in the Hat", "notes": "watch the movie"}, + {"id": 3, "title": "One Fish, Two Fish, Red Fish, Blue Fish", "notes": "colors & numbers"}, + {"id": 4, "title": "How the Grinch Stole Christmas", "notes": "who"}, + ] + search_client.bulk_upsert(generic_index, records, primary_key_field="id") + + for record in records: + assert search_client._client.get(generic_index, record["id"])["_source"] == record + + +def test_swap_alias_index(search_client, generic_index): + alias_name = f"tmp-alias-{uuid.uuid4().int}" + + # Populate the generic index, we won't immediately use this one + records = [ + {"id": 1, "data": "abc123"}, + {"id": 2, "data": "def456"}, + {"id": 3, "data": "xyz789"}, + ] + search_client.bulk_upsert(generic_index, records, primary_key_field="id") + + # Create a different index that we'll attach to the alias first. + tmp_index = f"test-tmp-index-{uuid.uuid4().int}" + search_client.create_index(tmp_index) + # Add a few records + tmp_index_records = [ + {"id": 1, "data": "abc123"}, + {"id": 2, "data": "xyz789"}, + ] + search_client.bulk_upsert(tmp_index, tmp_index_records, primary_key_field="id") + + # Set the alias + search_client.swap_alias_index(tmp_index, alias_name, delete_prior_indexes=True) + + # Can search by this alias and get records from the tmp index + resp = search_client.search(alias_name, {}) + resp_records = [record["_source"] for record in resp["hits"]["hits"]] + assert resp_records == tmp_index_records + + # Swap the index to the generic one + delete the tmp one + search_client.swap_alias_index(generic_index, alias_name, delete_prior_indexes=True) + + resp = search_client.search(alias_name, {}) + resp_records = [record["_source"] for record in resp["hits"]["hits"]] + assert resp_records == records + + # Verify the tmp one was deleted + assert search_client._client.indices.exists(tmp_index) is False diff --git a/api/tests/src/search/__init__.py b/api/tests/src/search/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/tests/src/search/backend/__init__.py b/api/tests/src/search/backend/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/tests/src/search/backend/test_load_opportunities_to_index.py b/api/tests/src/search/backend/test_load_opportunities_to_index.py new file mode 100644 index 000000000..a079b83c8 --- /dev/null +++ b/api/tests/src/search/backend/test_load_opportunities_to_index.py @@ -0,0 +1,91 @@ +import pytest + +from src.search.backend.load_opportunities_to_index import ( + LoadOpportunitiesToIndex, + LoadOpportunitiesToIndexConfig, +) +from tests.conftest import BaseTestClass +from tests.src.db.models.factories import OpportunityFactory + + +class TestLoadOpportunitiesToIndex(BaseTestClass): + @pytest.fixture(scope="class") + def load_opportunities_to_index(self, db_session, search_client, opportunity_index_alias): + config = LoadOpportunitiesToIndexConfig( + alias_name=opportunity_index_alias, index_prefix="test-load-opps" + ) + return LoadOpportunitiesToIndex(db_session, search_client, config) + + def test_load_opportunities_to_index( + self, + truncate_opportunities, + enable_factory_create, + search_client, + opportunity_index_alias, + load_opportunities_to_index, + ): + # Create 25 opportunities we will load into the search index + opportunities = [] + opportunities.extend(OpportunityFactory.create_batch(size=6, is_posted_summary=True)) + opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True)) + opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True)) + opportunities.extend( + OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True) + ) + opportunities.extend( + OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True) + ) + + # Create some opportunities that won't get fetched / loaded into search + OpportunityFactory.create_batch(size=3, is_draft=True) + OpportunityFactory.create_batch(size=4, no_current_summary=True) + + load_opportunities_to_index.run() + # Verify some metrics first + assert ( + len(opportunities) + == load_opportunities_to_index.metrics[ + load_opportunities_to_index.Metrics.RECORDS_LOADED + ] + ) + + # Just do some rough validation that the data is present + resp = search_client.search(opportunity_index_alias, {"size": 100}) + + total_records = resp["hits"]["total"]["value"] + assert total_records == len(opportunities) + + records = [record["_source"] for record in resp["hits"]["hits"]] + assert set([opp.opportunity_id for opp in opportunities]) == set( + [record["opportunity_id"] for record in records] + ) + + # Rerunning without changing anything about the data in the DB doesn't meaningfully change anything + load_opportunities_to_index.index_name = load_opportunities_to_index.index_name + "-another" + load_opportunities_to_index.run() + resp = search_client.search(opportunity_index_alias, {"size": 100}) + + total_records = resp["hits"]["total"]["value"] + assert total_records == len(opportunities) + + records = [record["_source"] for record in resp["hits"]["hits"]] + assert set([opp.opportunity_id for opp in opportunities]) == set( + [record["opportunity_id"] for record in records] + ) + + # Rerunning but first add a few more opportunities to show up + opportunities.extend(OpportunityFactory.create_batch(size=3)) + load_opportunities_to_index.index_name = ( + load_opportunities_to_index.index_name + "-new-data" + ) + load_opportunities_to_index.run() + + resp = search_client.search(opportunity_index_alias, {"size": 100}) + + total_records = resp["hits"]["total"]["value"] + assert total_records == len(opportunities) + + records = [record["_source"] for record in resp["hits"]["hits"]] + assert set([opp.opportunity_id for opp in opportunities]) == set( + [record["opportunity_id"] for record in records] + )