From 587c43c0928a8f7248908c365b21675e3010be8e Mon Sep 17 00:00:00 2001 From: Keyur Shah Date: Fri, 23 Aug 2024 09:10:44 -0700 Subject: [PATCH 1/3] Add support for importing keys into apigee. (#4568) --- tools/apigee/README.md | 12 ++ tools/apigee/apigee.py | 236 +++++++++++++++++++++++++++++++++++---- tools/apigee/prod.env | 1 + tools/apigee/run.sh | 3 +- tools/apigee/staging.env | 1 + 5 files changed, 228 insertions(+), 25 deletions(-) diff --git a/tools/apigee/README.md b/tools/apigee/README.md index d19def30e4..ee21893bce 100644 --- a/tools/apigee/README.md +++ b/tools/apigee/README.md @@ -14,6 +14,18 @@ To migrate keys in prod env: To migrate keys in staging env: +```bash +./run.sh staging.env +``` + +To only export keys (i.e. fetch DC api keys and write to the spreadsheet) in staging env: + +```bash +./run.sh staging.env +``` + +To only import keys (i.e. export keys from the spreadsheet to apigee) in staging env: + ```bash ./run.sh staging.env ``` \ No newline at end of file diff --git a/tools/apigee/apigee.py b/tools/apigee/apigee.py index ba9e440297..d07a69bfa4 100644 --- a/tools/apigee/apigee.py +++ b/tools/apigee/apigee.py @@ -13,11 +13,18 @@ # limitations under the License. import asyncio +from collections import defaultdict +from enum import StrEnum +from functools import wraps import json import logging import os +import random +import re +import string from absl import app +from absl import flags from google.auth import default from google.auth.transport.requests import Request import gspread @@ -25,11 +32,23 @@ from httpx import HTTPStatusError from httpx import Limits +FLAGS = flags.FLAGS + + +class Mode(StrEnum): + MIGRATE = "migrate" + IMPORT = "import" + EXPORT = "export" + + +flags.DEFINE_enum_class("mode", Mode.MIGRATE, Mode, "The mode of operation") + _BILLING_PROJECT_ID = "datcom-204919" _API_KEYS_BASE_URL = "https://apikeys.googleapis.com" _APIGEE_BASE_URL = "https://apigee.googleapis.com" _HTTPX_LIMITS = Limits(max_keepalive_connections=5, max_connections=10) _HTTP_RESOURCE_EXISTS_CODE = 409 +_SECRET_CHARS = string.ascii_letters + string.digits # The DC API target of the keys being migrated. # e.g. api.datacommons.org @@ -37,6 +56,9 @@ # The apigee organization to migrate the keys to. # e.g. datcom-apigee _APIGEE_ORGANIZATION = os.environ.get("APIGEE_ORGANIZATION") +# The apigee api product that the migrated keys should be associated to. +# e.g. api, api-staging +_APIGEE_API_PRODUCT = os.environ.get("APIGEE_API_PRODUCT") # The URL to the Google Sheet with details of partners to be migrated. _SHEETS_URL = os.environ.get("SHEETS_URL") # The name of the worksheet at the SHEETS_URL with details of partners to be migrated. @@ -44,17 +66,30 @@ # project_id, developer_email, developer_first_name, developer_last_name, org_name, keys _WORKSHEET_NAME = os.environ.get("WORKSHEET_NAME") +_DEFAULT_DEVELOPER_EMAIL = "datcom-core@google.com" +_DEFAULT_DEVELOPER_FIRST_NAME = "Data Commons" +_DEFAULT_ORG_NAME = "Unknown Org" +_APP_NAME_SUFFIX = " - Data Commons" + assert _DC_API_TARGET, "'DC_API_TARGET' env variable not specified" assert _APIGEE_ORGANIZATION, "'APIGEE_ORGANIZATION' env variable not specified" +assert _APIGEE_API_PRODUCT, "'APIGEE_API_PRODUCT' env variable not specified" assert _SHEETS_URL, "'SHEETS_URL' env variable not specified" assert _WORKSHEET_NAME, "'WORKSHEET_NAME' env variable not specified" +def _generate_secret() -> str: + """Generates a random 16 character secret string.""" + return ''.join(random.choices(_SECRET_CHARS, k=16)) + + class CloudApiClient: - def __init__(self, dc_api_target: str, apigee_organization: str) -> None: + def __init__(self, dc_api_target: str, apigee_organization: str, + apigee_api_product: str) -> None: self.dc_api_target = dc_api_target self.apigee_organization = apigee_organization + self.apigee_api_product = apigee_api_product self.credentials, self.project_id = default() self.credentials.refresh(Request()) logging.info("Obtained cloud credentials, project id = %s", self.project_id) @@ -64,15 +99,83 @@ def __init__(self, dc_api_target: str, apigee_organization: str) -> None: "x-goog-user-project": _BILLING_PROJECT_ID, "Content-Type": "application/json" } + self._concurrent_requests_limit = asyncio.Semaphore( + _HTTPX_LIMITS.max_connections) + # Used to serialize calls to the same URL to prevent concurrent calls from mutating the same resource. + self._in_flight_urls = defaultdict(asyncio.Lock) + + async def bulk_import_keys(self, + key_2_data: dict[str, dict[str, str]]) -> set[str]: + futures = [self.import_key(key, data) for key, data in key_2_data.items()] + keys = set(filter(lambda x: x, await asyncio.gather(*futures))) + return keys + + async def import_key(self, key: str, data: dict[str, str]) -> str: + """ + Imports the key to apigee by creating the developer, app and key in apigee + and associating the key with the configured api product. + + The data needed is expected in the form of a dict with the following fields: + developer_email, developer_first_name, developer_last_name and org_name. - async def import_key(self, developer_email: str, app_name: str, + Defaults are assigned if these fields don"t exist or are empty. + + If import was successful, it returns the imported key. + In case of a failure, it logs the failure and returns an empty string. + """ + + developer_email = data.get("developer_email") or _DEFAULT_DEVELOPER_EMAIL + developer_first_name = data.get( + "developer_first_name") or _DEFAULT_DEVELOPER_FIRST_NAME + developer_last_name = data.get( + "developer_last_name") or developer_first_name + org_name = data.get("org_name") or _DEFAULT_ORG_NAME + app_name = f"{org_name}{_APP_NAME_SUFFIX}" + + try: + await self.create_developer(developer_email, developer_first_name, + developer_last_name) + logging.info("Created / found developer: %s", developer_email) + + await self.create_app(developer_email, app_name) + logging.info("Created / found app: %s", app_name) + + await self.create_key(developer_email, app_name, key) + logging.info("Created / found key: %s", key) + + await self.associate_key(developer_email, app_name, key) + logging.info("Associated key with api product: %s", + self.apigee_api_product) + + # If successful, set the migrated* fields so they are reflected in the sheet. + data["migrated_app_name"] = app_name + data["migrated_developer_first_name"] = developer_first_name + data["migrated_developer_last_name"] = developer_last_name + data["migrated_api_product"] = self.apigee_api_product + + return key + except HTTPStatusError as hse: + logging.error( + "Error importing key to apigee: %s (%s),\nRequest:\n%s,\nResponse:\n%s", + key, str(hse), hse.request.content, hse.response.content) + return "" + + async def associate_key(self, developer_email: str, app_name: str, + key: str) -> str: + """Associates the key with the configured api product.""" + request = {"apiProducts": [self.apigee_api_product]} + response = await self.http_put( + f"{_APIGEE_BASE_URL}/v1/organizations/{self.apigee_organization}/developers/{developer_email}/apps/{app_name}/keys/{key}", + request) + return response.get("consumerKey", "") + + async def create_key(self, developer_email: str, app_name: str, key: str) -> str: - request = {"consumerKey": key, "consumerSecret": key} + request = {"consumerKey": key, "consumerSecret": _generate_secret()} try: response = await self.http_post( f"{_APIGEE_BASE_URL}/v1/organizations/{self.apigee_organization}/developers/{developer_email}/apps/{app_name}/keys", request) - logging.info(json.dumps(response, indent=1)) return response.get("consumerKey", "") except HTTPStatusError as hse: # 409 status = Key already exists, log and return key. @@ -87,7 +190,6 @@ async def create_app(self, developer_email: str, app_name: str) -> str: response = await self.http_post( f"{_APIGEE_BASE_URL}/v1/organizations/{self.apigee_organization}/developers/{developer_email}/apps", request) - logging.info(json.dumps(response, indent=1)) return response.get("name", "") except HTTPStatusError as hse: # 409 status = App already exists, log and return app name. @@ -108,7 +210,6 @@ async def create_developer(self, email: str, first_name: str, response = await self.http_post( f"{_APIGEE_BASE_URL}/v1/organizations/{self.apigee_organization}/developers", request) - logging.info(json.dumps(response, indent=1)) return response.get("email", "") except HTTPStatusError as hse: # 409 status = Developer already exists, log and return email. @@ -117,13 +218,13 @@ async def create_developer(self, email: str, first_name: str, return email raise hse - async def batch_get_dc_api_keys( - self, project_ids: list[str]) -> dict[str, list[str]]: - futures = [self.get_dc_api_keys(project_id) for project_id in project_ids] + async def bulk_export_keys(self, + project_ids: list[str]) -> dict[str, list[str]]: + futures = [self.export_keys(project_id) for project_id in project_ids] keys = dict(await asyncio.gather(*futures)) return keys - async def get_dc_api_keys(self, project_id: str) -> tuple[str, list[str]]: + async def export_keys(self, project_id: str) -> tuple[str, list[str]]: key_names = await self.fetch_dc_api_key_names(project_id) futures = [self.fetch_dc_api_key_string(key_name) for key_name in key_names] keys = list(filter(lambda x: x, await asyncio.gather(*futures))) @@ -154,8 +255,22 @@ async def fetch_dc_api_key_names(self, project_id: str) -> list[str]: logging.warning("No DC API key name found for project: %s", project_id) return key_names + def _serialize_in_flight_urls(func): + + @wraps(func) + async def wrapper(self, url, *args, **kwargs): + async with self._in_flight_urls[url]: + try: + return await func(self, url, *args, **kwargs) + finally: + if url in self._in_flight_urls: + del self._in_flight_urls[url] + + return wrapper + + @_serialize_in_flight_urls async def http_get(self, url: str, params: dict = None) -> dict: - async with asyncio.Semaphore(_HTTPX_LIMITS.max_connections): + async with self._concurrent_requests_limit: response = await self.http_client.get(url, params=params, headers=self.http_headers) @@ -164,8 +279,9 @@ async def http_get(self, url: str, params: dict = None) -> dict: logging.debug("Response: %s", json.dumps(result, indent=1)) return result + @_serialize_in_flight_urls async def http_post(self, url: str, data: dict = None) -> dict: - async with asyncio.Semaphore(_HTTPX_LIMITS.max_connections): + async with self._concurrent_requests_limit: response = await self.http_client.post(url, json=data, headers=self.http_headers) @@ -174,6 +290,17 @@ async def http_post(self, url: str, data: dict = None) -> dict: logging.debug("Response: %s", json.dumps(result, indent=1)) return result + @_serialize_in_flight_urls + async def http_put(self, url: str, data: dict = None) -> dict: + async with self._concurrent_requests_limit: + response = await self.http_client.put(url, + json=data, + headers=self.http_headers) + response.raise_for_status() + result = response.json() + logging.debug("Response: %s", json.dumps(result, indent=1)) + return result + class SheetsClient: @@ -188,20 +315,21 @@ def __init__(self, cloud_client: CloudApiClient, sheets_url: str, logging.info("Connected to worksheet '%s' at: %s", self.worksheet_name, self.sheets_url) - async def write_dc_api_keys(self): + async def export_keys(self): """ + Exports DC API keys from apikeys. + Reads the sheet, fetches DC api keys for each project and writes them back to the sheet. """ # Read sheet data. - data = self._read_sheet_data() + data = self._read_sheet_data(clear_keys=True) # Extract project ids from the data. project_ids: list[str] = list( filter(lambda x: x, map(lambda row: row.get("project_id"), data))) # Fetch API keys for each project. - project_id_2_keys = await self.cloud_client.batch_get_dc_api_keys( - project_ids) + project_id_2_keys = await self.cloud_client.bulk_export_keys(project_ids) # Insert fetched keys into the in-memory data. self._insert_project_keys(data, project_id_2_keys) @@ -209,6 +337,46 @@ async def write_dc_api_keys(self): # Write data with keys back to the sheet. self._write_sheet_data(data) + async def import_keys(self): + """ + Imports DC API keys into apigee. + + Reads the sheet that should already include they keys, + imports the keys to apigee and + writes the keys that were migrated back in the sheet. + """ + # Read sheet data. + data = self._read_sheet_data(clear_migrated=True) + + # Create a reverse lookup dict from key to row data to prepare for bulk import. + key_2_data: dict[str, dict[str, str]] = {} + for row in data: + keys_value = row.get("keys", "") + if not keys_value: + logging.warning("No keys found in row and will be skipped: %s", + ", ".join(row.values())) + continue + keys = re.split(r"\s*,\s*", keys_value) + for key in keys: + key_2_data[key] = row + + # Migrated keys. + migrated_keys = await self.cloud_client.bulk_import_keys(key_2_data) + + if migrated_keys: + # Insert migrated keys into the in-memory data. + self._insert_migrated_keys(key_2_data, migrated_keys) + # Write data with migrated keys back to the sheet. + self._write_sheet_data(data) + + def _insert_migrated_keys(self, key_2_data: dict[str, dict[str, str]], + migrated_keys: set[str]): + for key, data in key_2_data.items(): + if key in migrated_keys: + migrated_keys_value = data.get("migrated_keys", "") + migrated_keys_value = f"{migrated_keys_value}, {key}" if migrated_keys_value else key + data["migrated_keys"] = migrated_keys_value + def _insert_project_keys(self, data: list[dict[str, str]], project_id_2_keys: dict[str, list[str]]): for row in data: @@ -220,13 +388,25 @@ def _insert_project_keys(self, data: list[dict[str, str]], # Write keys as comma separated values. row["keys"] = ", ".join(keys) - def _read_sheet_data(self) -> list[dict[str, str]]: + def _read_sheet_data(self, + clear_keys: bool = False, + clear_migrated: bool = False) -> list[dict[str, str]]: # Get all values from the sheet. sheet_data = self.worksheet.get_all_values() # Extract headers (first row) headers = sheet_data.pop(0) # Create a list of dictionaries, where each dictionary represents a row - data = [dict(zip(headers, row)) for row in sheet_data] + data: list[dict[str, str]] = [] + for row_values in sheet_data: + row: dict[str, str] = dict(zip(headers, row_values)) + # Clear all migrated fields when clearing keys or migrated fields. + if clear_keys or clear_migrated: + for field in row.keys(): + if field.startswith("migrated_"): + row[field] = "" + if clear_keys: + row["keys"] = "" + data.append(row) return data def _write_sheet_data(self, data: list[dict[str, str]]): @@ -236,23 +416,31 @@ def _write_sheet_data(self, data: list[dict[str, str]]): rows = [list(row.values()) for row in data] # Combine headers and rows to write to sheet. sheet_data = [headers] + rows - # Clear existing sheet content. - self.worksheet.clear() # Write updated data back to the sheet. self.worksheet.update(range_name="A1", values=sheet_data) logging.info("Wrote keys back to worksheet '%s' at: %s", self.worksheet_name, self.sheets_url) -async def async_main(): +async def async_main(mode: Mode): cloud_client = CloudApiClient(dc_api_target=_DC_API_TARGET, - apigee_organization=_APIGEE_ORGANIZATION) + apigee_organization=_APIGEE_ORGANIZATION, + apigee_api_product=_APIGEE_API_PRODUCT) sheets_client = SheetsClient(cloud_client, _SHEETS_URL, _WORKSHEET_NAME) - await sheets_client.write_dc_api_keys() + + if mode == Mode.IMPORT: + await sheets_client.import_keys() + elif mode == Mode.EXPORT: + await sheets_client.export_keys() + elif mode == Mode.MIGRATE: + await sheets_client.export_keys() + await sheets_client.import_keys() + else: + raise ValueError(f"Invalid mode: {mode}") def main(_): - asyncio.run(async_main()) + asyncio.run(async_main(FLAGS.mode)) if __name__ == "__main__": diff --git a/tools/apigee/prod.env b/tools/apigee/prod.env index f00e351b3b..ecc86ffddc 100644 --- a/tools/apigee/prod.env +++ b/tools/apigee/prod.env @@ -1,4 +1,5 @@ DC_API_TARGET=api.datacommons.org APIGEE_ORGANIZATION=datcom-apigee +APIGEE_API_PRODUCT=datacommons-api SHEETS_URL=https://docs.google.com/spreadsheets/d/1hxOmST5kLtil8RFfZcwYgeRvSATvvPa3EkduPpvNosc WORKSHEET_NAME=prod \ No newline at end of file diff --git a/tools/apigee/run.sh b/tools/apigee/run.sh index aa44343376..85543c48b2 100755 --- a/tools/apigee/run.sh +++ b/tools/apigee/run.sh @@ -25,4 +25,5 @@ set -a source "$ENV_FILE" set +a -python3 apigee.py \ No newline at end of file +# Pass all args, except the first one to the python script. +python3 apigee.py "${@:2}" \ No newline at end of file diff --git a/tools/apigee/staging.env b/tools/apigee/staging.env index d55ea28e6d..fc6f7fe0ca 100644 --- a/tools/apigee/staging.env +++ b/tools/apigee/staging.env @@ -1,4 +1,5 @@ DC_API_TARGET=staging.api.datacommons.org APIGEE_ORGANIZATION=datcom-apigee-dev +APIGEE_API_PRODUCT=datacommons-api-staging SHEETS_URL=https://docs.google.com/spreadsheets/d/1hxOmST5kLtil8RFfZcwYgeRvSATvvPa3EkduPpvNosc WORKSHEET_NAME=staging \ No newline at end of file From 2c7230d67a78ae2b0a16737708bcc9a3eda7a0d3 Mon Sep 17 00:00:00 2001 From: Keyur Shah Date: Fri, 23 Aug 2024 12:48:51 -0700 Subject: [PATCH 2/3] Set default value for FLASK_ENV. (#4576) --- build/cdc_services/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/build/cdc_services/Dockerfile b/build/cdc_services/Dockerfile index 6bb5e89350..e8973fd018 100644 --- a/build/cdc_services/Dockerfile +++ b/build/cdc_services/Dockerfile @@ -127,6 +127,7 @@ FROM gcr.io/datcom-ci/datacommons-services-runtime:2024-08-08 as runner # Env defaults. ENV WEBSITE_MIXER_API_ROOT=http://127.0.0.1:8081 \ ENV_PREFIX=Compose \ + FLASK_ENV=custom \ ENABLE_ADMIN=false \ DEBUG=false \ USE_SQLITE=false \ From 0ac63c11943aeaecbc83b2bba012cef34e2787a4 Mon Sep 17 00:00:00 2001 From: Dan Noble Date: Sat, 24 Aug 2024 13:49:32 -0400 Subject: [PATCH 3/3] updated custom dc docker dev image to pull down models in container build (#4575) --- build/cdc/dev/Dockerfile.nl-python.cdc.dev | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/build/cdc/dev/Dockerfile.nl-python.cdc.dev b/build/cdc/dev/Dockerfile.nl-python.cdc.dev index 45ada83ea2..635005913c 100644 --- a/build/cdc/dev/Dockerfile.nl-python.cdc.dev +++ b/build/cdc/dev/Dockerfile.nl-python.cdc.dev @@ -13,8 +13,15 @@ # limitations under the License. # Development Dockerfile for Custom Data Commons NL server +# Stage 1: Download base dc model from GCS. +FROM google/cloud-sdk:slim as model-downloader -# Use official Python runtime as a parent image +# Download NL embeddings model +RUN mkdir -p /tmp/datcom-nl-models \ + && gsutil -m cp -R gs://datcom-nl-models/ft_final_v20230717230459.all-MiniLM-L6-v2/ /tmp/datcom-nl-models/ \ + && gsutil cp -R gs://datcom-nl-models/embeddings_medium_2024_05_09_18_01_32.ft_final_v20230717230459.all-MiniLM-L6-v2.csv /tmp/datcom-nl-models/ + +# Stage 2: Use official Python runtime image FROM python:3.11.4-slim # Set the working directory in the container @@ -41,6 +48,9 @@ COPY deploy/. /app/deploy/ # Copy the NL application start script into the container COPY run_nl_server.sh /app/run_nl_server.sh +# Copy NL embeddings model from model-downloader +COPY --from=model-downloader /tmp/datcom-nl-models /tmp/datcom-nl-models + # Copy the NL application start script into the container COPY nl_app.py /app/nl_app.py