Skip to content

Commit

Permalink
use socrata connector, auth for ingest/library calls
Browse files Browse the repository at this point in the history
  • Loading branch information
fvankrieken committed Dec 9, 2024
1 parent 531b328 commit 73bc340
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 39 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ingest_open_data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ jobs:
AWS_SECRET_ACCESS_KEY: "op://Data Engineering/DO_keys/AWS_SECRET_ACCESS_KEY"
AWS_ACCESS_KEY_ID: "op://Data Engineering/DO_keys/AWS_ACCESS_KEY_ID"
BUILD_ENGINE_SERVER: "op://Data Engineering/EDM_DATA/server_url"
SOCRATA_USER: "op://Data Engineering/DCP_OpenData/username"
SOCRATA_PASSWORD: "op://Data Engineering/DCP_OpenData/password"

- name: Finish container setup ...
run: ./bash/docker_container_setup.sh
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/ingest_single.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ jobs:
AWS_S3_ENDPOINT: "op://Data Engineering/DO_keys/AWS_S3_ENDPOINT"
AWS_SECRET_ACCESS_KEY: "op://Data Engineering/DO_keys/AWS_SECRET_ACCESS_KEY"
AWS_ACCESS_KEY_ID: "op://Data Engineering/DO_keys/AWS_ACCESS_KEY_ID"
SOCRATA_USER: "op://Data Engineering/DCP_OpenData/username"
SOCRATA_PASSWORD: "op://Data Engineering/DCP_OpenData/password"

- name: Finish container setup ...
run: ./bash/docker_container_setup.sh
Expand Down
16 changes: 9 additions & 7 deletions dcpy/connectors/socrata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD")


def _simple_auth():
return (SOCRATA_USER, SOCRATA_PASSWORD)
def _simple_auth() -> tuple[str, str] | None:
if SOCRATA_USER and SOCRATA_PASSWORD:
return (SOCRATA_USER, SOCRATA_PASSWORD)
else:
return None


def _socrata_request(
url,
method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"],
**kwargs,
) -> dict:
) -> requests.Response:
"""Request wrapper to add auth, and raise exceptions on error."""
request_fn = getattr(requests, method.lower())
resp = request_fn(url, auth=_simple_auth(), **kwargs)
if not resp.ok:
raise Exception(resp.text)
return resp.json()
resp: requests.Response = request_fn(url, auth=_simple_auth(), **kwargs)
resp.raise_for_status()
return resp
17 changes: 7 additions & 10 deletions dcpy/connectors/socrata/extract.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from datetime import datetime
from pathlib import Path
import requests

from dcpy.models.connectors.socrata import Source
from dcpy.connectors import web
from . import _socrata_request


def get_base_url(source: Source):
Expand Down Expand Up @@ -31,18 +30,16 @@ def get_download_url(source: Source):
return url


def _get_version_from_resp(resp: dict) -> str:
return datetime.fromtimestamp(resp["rowsUpdatedAt"]).strftime("%Y%m%d")


def get_version(source: Source):
"""For given socrata dataset, get date last updated formatted as a string.
This is used as a proxy for a 'version' of the dataset."""
url = get_metadata_url(source)
resp = requests.get(url)
resp.raise_for_status()
return _get_version_from_resp(resp.json())
resp = _socrata_request(url, "GET")
return datetime.fromtimestamp(resp.json()["rowsUpdatedAt"]).strftime("%Y%m%d")


def download(source: Source, path: Path):
web.download_file(get_download_url(source), path)
path.parent.mkdir(exist_ok=True, parents=True)
response = _socrata_request(get_download_url(source), "GET")
with open(path, "wb") as f:
f.write(response.content)
44 changes: 23 additions & 21 deletions dcpy/library/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
from jinja2 import Template
import json
import importlib
import os
from pathlib import Path
import requests
import yaml

from dcpy.models.library import DatasetDefinition
from dcpy.models.connectors.socrata import Org as SocrataOrg
from dcpy.connectors.esri import arcgis_feature_service
from dcpy.connectors.socrata import extract as socrata
from .utils import format_url
from .validator import Validator, Dataset

Expand Down Expand Up @@ -49,14 +49,6 @@ def parsed_rendered_template(self, version: str) -> Dataset:
loaded["version"] = version
return Dataset(**loaded)

def version_socrata(self, uid: str) -> str:
"""using the socrata API, collect the 'data last update' date"""
metadata = requests.get(
f"https://data.cityofnewyork.us/api/views/{uid}.json"
).json()
version = datetime.fromtimestamp(metadata["rowsUpdatedAt"]).strftime("%Y%m%d")
return version

# @property
# def version_bytes(self) -> str:
# """parsing bytes of the big apple to get the latest bytes version"""
Expand All @@ -80,7 +72,13 @@ def compute(self) -> DatasetDefinition:

_config = self.parsed_unrendered_template
if _config.source.socrata:
version = self.version_socrata(_config.source.socrata.uid)
socrata_source = socrata.Source(
type="socrata",
org=SocrataOrg.nyc,
uid=_config.source.socrata.uid,
format=_config.source.socrata.format,
)
version = socrata.get_version(socrata_source)
elif _config.source.arcgis_feature_server:
fs = _config.source.arcgis_feature_server
feature_server_layer = arcgis_feature_service.resolve_layer(
Expand Down Expand Up @@ -121,21 +119,25 @@ def compute(self) -> DatasetDefinition:
elif config.source.socrata:
if self.source_path_override:
raise ValueError("Cannot override path of socrata dataset")
socrata = config.source.socrata
if socrata.format == "csv":
path = f"https://data.cityofnewyork.us/api/views/{socrata.uid}/rows.csv"
elif socrata.format == "geojson":
path = f"https://nycopendata.socrata.com/api/geospatial/{socrata.uid}?method=export&format=GeoJSON"
elif socrata.format == "shapefile":
source = config.source.socrata
if source.format == "csv":
path = f"library/tmp/{config.name}.csv"
elif source.format == "geojson":
path = f"library/tmp/{config.name}.geojson"
elif source.format == "shapefile":
path = f"library/tmp/{config.name}.zip"
url = f"https://data.cityofnewyork.us/api/geospatial/{socrata.uid}?method=export&format=Shapefile"
os.system("mkdir -p library/tmp")
os.system(f'curl -o {path} "{url}"')
else:
raise Exception(
"Socrata source format must be 'csv', 'geojson', or 'shapefile'."
)
config.source.gdalpath = format_url(path)
socrata_source = socrata.Source(
type="socrata",
org=SocrataOrg.nyc,
uid=source.uid,
format=source.format,
)
socrata.download(socrata_source, Path(path))
config.source.gdalpath = path

elif config.source.arcgis_feature_server:
if self.source_path_override:
Expand Down
2 changes: 1 addition & 1 deletion dcpy/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def raise_for_status(self):


def mock_request_get(
url: str, headers=None, params: dict | None = None
url: str, headers=None, auth=None, params: dict | None = None
) -> MockResponse:
"""
Mocks calls to request.get
Expand Down

0 comments on commit 73bc340

Please sign in to comment.