Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
[Issue #10] Populate the search index from the opportunity tables (#47)
Browse files Browse the repository at this point in the history
## Summary
Fixes #10

### Time to review: __10 mins__

## Changes proposed
Setup a script to populate the search index by loading opportunities
from the DB, jsonify'ing them, loading them into a new index, and then
aliasing that index.

Several utilities were created for simplifying working with the
OpenSearch client (a wrapper for setting up configuration / patterns)

## Context for reviewers
Iterating over the opportunities and doing something with them is a
common pattern in several of our scripts, so nothing is really different
there.

The meaningful implementation is how we handle creating and aliasing the
index. In OpenSearch you can give any index an alias (including putting
multiple indexes behind the same alias). The approach is pretty simple:
* Create an index
* Load opportunities into the index
* Atomically swap the index backing the `opportunity-index-alias`
* Delete the old index if they exist

This approach means that our search endpoint just needs to query the
alias, and we can keep making new indexes and swapping them out behind
the scenes. Because we could remake the index every few minutes, if we
ever need to re-configure things like the number of shards, or any other
index-creation configuration, we just update that in this script and
wait for it to run again.

## Additional information
I ran this locally after loading `83250` records, and it took about 61s.

You can run this locally yourself by doing:
```sh
make init
make db-seed-local
poetry run flask load-search-data load-opportunity-data
```

If you'd like to see the data, you can test it out on
http://localhost:5601/app/dev_tools#/console - here is an example query
that filters by the word `research` across a few fields and filters to
just forecasted/posted.

```json
GET opportunity-index-alias/_search
{
  "size": 25,
  "from": 0,
  "query": {
    "bool": {
      "must": [
        {
          "simple_query_string": {
            "query": "research",
            "default_operator": "AND", 
            "fields": ["agency.keyword^16", "opportunity_title^2", "opportunity_number^12", "summary.summary_description", "opportunity_assistance_listings.assistance_listing_number^10", "opportunity_assistance_listings.program_title^4"]
          }
        }
      ],
      "filter": [
        {
          "terms": {
            "opportunity_status": [
              "forecasted",
              "posted"
            ]
          }
        }
      ]
    }
  }
}

```
  • Loading branch information
chouinar committed May 22, 2024
1 parent b40344d commit 879e743
Show file tree
Hide file tree
Showing 14 changed files with 455 additions and 80 deletions.
4 changes: 2 additions & 2 deletions api/src/adapters/search/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from src.adapters.search.opensearch_client import SearchClient, get_opensearch_client
from src.adapters.search.opensearch_client import SearchClient
from src.adapters.search.opensearch_config import get_opensearch_config

__all__ = ["SearchClient", "get_opensearch_client", "get_opensearch_config"]
__all__ = ["SearchClient", "get_opensearch_config"]
115 changes: 102 additions & 13 deletions api/src/adapters/search/opensearch_client.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,114 @@
from typing import Any
import logging
from typing import Any, Sequence

import opensearchpy

from src.adapters.search.opensearch_config import OpensearchConfig, get_opensearch_config

# More configuration/setup coming in:
# TODO - https://github.com/navapbc/simpler-grants-gov/issues/13
logger = logging.getLogger(__name__)

# Alias the OpenSearch client so that it doesn't need to be imported everywhere
# and to make it clear it's a client
SearchClient = opensearchpy.OpenSearch

class SearchClient:
def __init__(self, opensearch_config: OpensearchConfig | None = None) -> None:
if opensearch_config is None:
opensearch_config = get_opensearch_config()

def get_opensearch_client(
opensearch_config: OpensearchConfig | None = None,
) -> SearchClient:
if opensearch_config is None:
opensearch_config = get_opensearch_config()
# See: https://opensearch.org/docs/latest/clients/python-low-level/ for more details
self._client = opensearchpy.OpenSearch(**_get_connection_parameters(opensearch_config))

# See: https://opensearch.org/docs/latest/clients/python-low-level/ for more details
return opensearchpy.OpenSearch(**_get_connection_parameters(opensearch_config))
def create_index(
self, index_name: str, *, shard_count: int = 1, replica_count: int = 1
) -> None:
"""
Create an empty search index
"""
body = {
"settings": {
"index": {"number_of_shards": shard_count, "number_of_replicas": replica_count}
}
}

logger.info("Creating search index %s", index_name, extra={"index_name": index_name})
self._client.indices.create(index_name, body=body)

def delete_index(self, index_name: str) -> None:
"""
Delete an index. Can also delete all indexes via a prefix.
"""
logger.info("Deleting search index %s", index_name, extra={"index_name": index_name})
self._client.indices.delete(index=index_name)

def bulk_upsert(
self,
index_name: str,
records: Sequence[dict[str, Any]],
primary_key_field: str,
*,
refresh: bool = True
) -> None:
"""
Bulk upsert records to an index
See: https://opensearch.org/docs/latest/api-reference/document-apis/bulk/ for details
In this method we only use the "index" operation which creates or updates a record
based on the id value.
"""

bulk_operations = []

for record in records:
# For each record, we create two entries in the bulk operation list
# which include the unique ID + the actual record on separate lines
# When this is sent to the search index, this will send two lines like:
#
# {"index": {"_id": 123}}
# {"opportunity_id": 123, "opportunity_title": "example title", ...}
bulk_operations.append({"index": {"_id": record[primary_key_field]}})
bulk_operations.append(record)

logger.info(
"Upserting records to %s",
index_name,
extra={"index_name": index_name, "record_count": int(len(bulk_operations) / 2)},
)
self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh)

def swap_alias_index(
self, index_name: str, alias_name: str, *, delete_prior_indexes: bool = False
) -> None:
"""
For a given index, set it to the given alias. If any existing index(es) are
attached to the alias, remove them from the alias.
This operation is done atomically.
"""
extra = {"index_name": index_name, "index_alias": alias_name}
logger.info("Swapping index that backs alias %s", alias_name, extra=extra)

existing_index_mapping = self._client.cat.aliases(alias_name, format="json")
existing_indexes = [i["index"] for i in existing_index_mapping]

logger.info(
"Found existing indexes", extra=extra | {"existing_indexes": ",".join(existing_indexes)}
)

actions = [{"add": {"index": index_name, "alias": alias_name}}]

for index in existing_indexes:
actions.append({"remove": {"index": index, "alias": alias_name}})

self._client.indices.update_aliases({"actions": actions})

# Cleanup old indexes now that they aren't connected to the alias
if delete_prior_indexes:
for index in existing_indexes:
self.delete_index(index)

def search(self, index_name: str, search_query: dict) -> dict:
# TODO - add more when we build out the request/response parsing logic
# we use something like Pydantic to help reorganize the response
# object into something easier to parse.
return self._client.search(index=index_name, body=search_query)


def _get_connection_parameters(opensearch_config: OpensearchConfig) -> dict[str, Any]:
Expand Down
4 changes: 4 additions & 0 deletions api/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from src.api.schemas import response_schema
from src.auth.api_key_auth import get_app_security_scheme
from src.data_migration.data_migration_blueprint import data_migration_blueprint
from src.search.backend.load_search_data_blueprint import load_search_data_blueprint
from src.task import task_blueprint

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -103,8 +104,11 @@ def register_blueprints(app: APIFlask) -> None:
app.register_blueprint(opportunities_v0_blueprint)
app.register_blueprint(opportunities_v0_1_blueprint)
app.register_blueprint(opportunities_v1_blueprint)

# Non-api blueprints
app.register_blueprint(data_migration_blueprint)
app.register_blueprint(task_blueprint)
app.register_blueprint(load_search_data_blueprint)


def get_project_root_dir() -> str:
Expand Down
Empty file added api/src/search/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions api/src/search/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# import all files so they get initialized and attached to the blueprint
from . import load_search_data # noqa: F401
112 changes: 112 additions & 0 deletions api/src/search/backend/load_opportunities_to_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import logging
from enum import StrEnum
from typing import Iterator, Sequence

from pydantic import Field
from pydantic_settings import SettingsConfigDict
from sqlalchemy import select
from sqlalchemy.orm import noload, selectinload

import src.adapters.db as db
import src.adapters.search as search
from src.api.opportunities_v0_1.opportunity_schemas import OpportunityV01Schema
from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity
from src.task.task import Task
from src.util.datetime_util import get_now_us_eastern_datetime
from src.util.env_config import PydanticBaseEnvConfig

logger = logging.getLogger(__name__)


class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig):
model_config = SettingsConfigDict(env_prefix="LOAD_OPP_SEARCH_")

shard_count: int = Field(default=1) # LOAD_OPP_SEARCH_SHARD_COUNT
replica_count: int = Field(default=1) # LOAD_OPP_SEARCH_REPLICA_COUNT

# TODO - these might make sense to come from some sort of opportunity-search-index-config?
# look into this a bit more when we setup the search endpoint itself.
alias_name: str = Field(default="opportunity-index-alias") # LOAD_OPP_SEARCH_ALIAS_NAME
index_prefix: str = Field(default="opportunity-index") # LOAD_OPP_INDEX_PREFIX


class LoadOpportunitiesToIndex(Task):
class Metrics(StrEnum):
RECORDS_LOADED = "records_loaded"

def __init__(
self,
db_session: db.Session,
search_client: search.SearchClient,
config: LoadOpportunitiesToIndexConfig | None = None,
) -> None:
super().__init__(db_session)

self.search_client = search_client

if config is None:
config = LoadOpportunitiesToIndexConfig()
self.config = config

current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S")
self.index_name = f"{self.config.index_prefix}-{current_timestamp}"
self.set_metrics({"index_name": self.index_name})

def run_task(self) -> None:
# create the index
self.search_client.create_index(
self.index_name,
shard_count=self.config.shard_count,
replica_count=self.config.replica_count,
)

# load the records
for opp_batch in self.fetch_opportunities():
self.load_records(opp_batch)

# handle aliasing of endpoints
self.search_client.swap_alias_index(
self.index_name, self.config.alias_name, delete_prior_indexes=True
)

def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:
"""
Fetch the opportunities in batches. The iterator returned
will give you each individual batch to be processed.
Fetches all opportunities where:
* is_draft = False
* current_opportunity_summary is not None
"""
return (
self.db_session.execute(
select(Opportunity)
.join(CurrentOpportunitySummary)
.where(
Opportunity.is_draft.is_(False),
CurrentOpportunitySummary.opportunity_status.isnot(None),
)
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries))
.execution_options(yield_per=5000)
)
.scalars()
.partitions()
)

def load_records(self, records: Sequence[Opportunity]) -> None:
logger.info("Loading batch of opportunities...")
schema = OpportunityV01Schema()
json_records = []

for record in records:
logger.info(
"Preparing opportunity for upload to search index",
extra={
"opportunity_id": record.opportunity_id,
"opportunity_status": record.opportunity_status,
},
)
json_records.append(schema.dump(record))
self.increment(self.Metrics.RECORDS_LOADED)

self.search_client.bulk_upsert(self.index_name, json_records, "opportunity_id")
15 changes: 15 additions & 0 deletions api/src/search/backend/load_search_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import src.adapters.db as db
import src.adapters.search as search
from src.adapters.db import flask_db
from src.search.backend.load_opportunities_to_index import LoadOpportunitiesToIndex
from src.search.backend.load_search_data_blueprint import load_search_data_blueprint


@load_search_data_blueprint.cli.command(
"load-opportunity-data", help="Load opportunity data from our database to the search index"
)
@flask_db.with_db_session()
def load_opportunity_data(db_session: db.Session) -> None:
search_client = search.SearchClient()

LoadOpportunitiesToIndex(db_session, search_client).run()
5 changes: 5 additions & 0 deletions api/src/search/backend/load_search_data_blueprint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from apiflask import APIBlueprint

load_search_data_blueprint = APIBlueprint(
"load-search-data", __name__, enable_openapi=False, cli_group="load-search-data"
)
24 changes: 17 additions & 7 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,25 +151,35 @@ def test_foreign_schema(db_schema_prefix):

@pytest.fixture(scope="session")
def search_client() -> search.SearchClient:
return search.get_opensearch_client()
client = search.SearchClient()
try:
yield client
finally:
# Just in case a test setup an index
# in a way that didn't clean it up, delete
# all indexes at the end of a run that start with test
client.delete_index("test-*")


@pytest.fixture(scope="session")
def opportunity_index(search_client):
# TODO - will adjust this in the future to use utils we'll build
# for setting up / aliasing indexes. For now, keep it simple

# create a random index name just to make sure it won't ever conflict
# with an actual one, similar to how we create schemas for database tests
index_name = f"test_{uuid.uuid4().int}_opportunity"
index_name = f"test-opportunity-index-{uuid.uuid4().int}"

search_client.indices.create(index_name, body={})
search_client.create_index(index_name)

try:
yield index_name
finally:
# Try to clean up the index at the end
search_client.indices.delete(index_name)
search_client.delete_index(index_name)


@pytest.fixture(scope="session")
def opportunity_index_alias(search_client):
# Note we don't actually create anything, this is just a random name
return f"test-opportunity-index-alias-{uuid.uuid4().int}"


####################
Expand Down
58 changes: 0 additions & 58 deletions api/tests/src/adapters/search/test_opensearch.py

This file was deleted.

Loading

0 comments on commit 879e743

Please sign in to comment.