From f5189002cf2e0d4062e093c58f01f2fb85877e5f Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Mon, 9 Dec 2024 13:52:11 -0500 Subject: [PATCH] use socrata connector, auth for ingest/library calls --- dcpy/connectors/socrata/__init__.py | 23 ++++++++++++++++ dcpy/connectors/socrata/extract.py | 7 +++-- dcpy/connectors/socrata/publish.py | 24 ++--------------- dcpy/library/config.py | 41 ++++++++++++++++++----------- 4 files changed, 55 insertions(+), 40 deletions(-) diff --git a/dcpy/connectors/socrata/__init__.py b/dcpy/connectors/socrata/__init__.py index e69de29bb..083f5c121 100644 --- a/dcpy/connectors/socrata/__init__.py +++ b/dcpy/connectors/socrata/__init__.py @@ -0,0 +1,23 @@ +import os +import requests +from typing import Literal + + +SOCRATA_USER = os.getenv("SOCRATA_USER") +SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD") + +def _simple_auth(): + return (SOCRATA_USER, SOCRATA_PASSWORD) + + +def _socrata_request( + url, + method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"], + **kwargs, +) -> dict: + """Request wrapper to add auth, and raise exceptions on error.""" + request_fn = getattr(requests, method.lower()) + resp = request_fn(url, auth=_simple_auth(), **kwargs) + if not resp.ok: + raise Exception(resp.text) + return resp.json() diff --git a/dcpy/connectors/socrata/extract.py b/dcpy/connectors/socrata/extract.py index 5957bbbe0..2eda0aad9 100644 --- a/dcpy/connectors/socrata/extract.py +++ b/dcpy/connectors/socrata/extract.py @@ -4,6 +4,7 @@ from dcpy.models.connectors.socrata import Source from dcpy.connectors import web +from . import _socrata_request def get_base_url(source: Source): @@ -39,10 +40,12 @@ def get_version(source: Source): """For given socrata dataset, get date last updated formatted as a string. This is used as a proxy for a 'version' of the dataset.""" url = get_metadata_url(source) - resp = requests.get(url) + resp = _socrata_request(url, "GET") resp.raise_for_status() return _get_version_from_resp(resp.json()) def download(source: Source, path: Path): - web.download_file(get_download_url(source), path) + response = _socrata_request(get_download_url(source), "GET") + with open(path, "wb") as f: + f.write(response.content) diff --git a/dcpy/connectors/socrata/publish.py b/dcpy/connectors/socrata/publish.py index a7999155c..327de44c7 100644 --- a/dcpy/connectors/socrata/publish.py +++ b/dcpy/connectors/socrata/publish.py @@ -12,22 +12,19 @@ import copy from dataclasses import dataclass import json -import os from pathlib import Path from pydantic import BaseModel -import requests from socrata.authorization import Authorization from socrata import Socrata as SocrataPy from socrata.revisions import Revision as SocrataPyRevision import time -from typing import TypedDict, Literal, NotRequired, Any +from typing import TypedDict, NotRequired, Any from dcpy.utils.logging import logger import dcpy.models.product.dataset.metadata_v2 as md +from . import SOCRATA_USER, SOCRATA_PASSWORD -SOCRATA_USER = os.getenv("SOCRATA_USER") -SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD") SOCRATA_REVISION_APPLY_TIMEOUT_SECS = 10 * 60 # Ten Mins SOCRATA_DOMAIN = "data.cityofnewyork.us" @@ -38,27 +35,10 @@ DISTRIBUTIONS_BUCKET = "edm-distributions" -def _simple_auth(): - return (SOCRATA_USER, SOCRATA_PASSWORD) - - def _socratapy_client(): return SocrataPy(Authorization(SOCRATA_DOMAIN, SOCRATA_USER, SOCRATA_PASSWORD)) -def _socrata_request( - url, - method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"], - **kwargs, -) -> dict: - """Request wrapper to add auth, and raise exceptions on error.""" - request_fn = getattr(requests, method.lower()) - resp = request_fn(url, auth=_simple_auth(), **kwargs) - if not resp.ok: - raise Exception(resp.text) - return resp.json() - - # There are required publishing frequency fields in two different sections of # the required metadata, and they're different. Below are the shared fields UPDATE_SECTION_FREQUENCIES = { diff --git a/dcpy/library/config.py b/dcpy/library/config.py index 9ebe8b446..f2b25f41b 100644 --- a/dcpy/library/config.py +++ b/dcpy/library/config.py @@ -10,6 +10,7 @@ from dcpy.models.library import DatasetDefinition from dcpy.connectors.esri import arcgis_feature_service +from dcpy.connectors.socrata import extract as socrata from .utils import format_url from .validator import Validator, Dataset @@ -51,11 +52,9 @@ def parsed_rendered_template(self, version: str) -> Dataset: def version_socrata(self, uid: str) -> str: """using the socrata API, collect the 'data last update' date""" - metadata = requests.get( - f"https://data.cityofnewyork.us/api/views/{uid}.json" - ).json() - version = datetime.fromtimestamp(metadata["rowsUpdatedAt"]).strftime("%Y%m%d") - return version + return socrata.get_version( + socrata.Source(type="socrata", org="nyc", uid=uid, format="csv") + ) # @property # def version_bytes(self) -> str: @@ -80,7 +79,13 @@ def compute(self) -> DatasetDefinition: _config = self.parsed_unrendered_template if _config.source.socrata: - version = self.version_socrata(_config.source.socrata.uid) + source = socrata.Source( + type="socrata", + org="nyc", + uid=_config.source.socrata.uid, + format=_config.source.socrata.format, + ) + version = source.get_version(source) elif _config.source.arcgis_feature_server: fs = _config.source.arcgis_feature_server feature_server_layer = arcgis_feature_service.resolve_layer( @@ -121,21 +126,25 @@ def compute(self) -> DatasetDefinition: elif config.source.socrata: if self.source_path_override: raise ValueError("Cannot override path of socrata dataset") - socrata = config.source.socrata - if socrata.format == "csv": - path = f"https://data.cityofnewyork.us/api/views/{socrata.uid}/rows.csv" - elif socrata.format == "geojson": - path = f"https://nycopendata.socrata.com/api/geospatial/{socrata.uid}?method=export&format=GeoJSON" - elif socrata.format == "shapefile": + source = config.source.socrata + if source.format == "csv": + path = f"library/tmp/{config.name}.csv" + elif source.format == "geojson": + path = f"library/tmp/{config.name}.geojson" + elif source.format == "shapefile": path = f"library/tmp/{config.name}.zip" - url = f"https://data.cityofnewyork.us/api/geospatial/{socrata.uid}?method=export&format=Shapefile" - os.system("mkdir -p library/tmp") - os.system(f'curl -o {path} "{url}"') else: raise Exception( "Socrata source format must be 'csv', 'geojson', or 'shapefile'." ) - config.source.gdalpath = format_url(path) + socrata_source = socrata.Source( + type="socrata", + org="nyc", + uid=source.uid, + format=source.format, + ) + socrata.download(socrata_source, Path(path)) + config.source.gdalpath = path elif config.source.arcgis_feature_server: if self.source_path_override: