Skip to content

Commit

Permalink
use socrata connector, auth for ingest/library calls (#1312)
Browse files Browse the repository at this point in the history
  • Loading branch information
fvankrieken authored Dec 12, 2024
1 parent e2f01fc commit 31cce05
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 69 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ingest_open_data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ jobs:
AWS_SECRET_ACCESS_KEY: "op://Data Engineering/DO_keys/AWS_SECRET_ACCESS_KEY"
AWS_ACCESS_KEY_ID: "op://Data Engineering/DO_keys/AWS_ACCESS_KEY_ID"
BUILD_ENGINE_SERVER: "op://Data Engineering/EDM_DATA/server_url"
SOCRATA_USER: "op://Data Engineering/DCP_OpenData/username"
SOCRATA_PASSWORD: "op://Data Engineering/DCP_OpenData/password"

- name: Finish container setup ...
run: ./bash/docker_container_setup.sh
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/ingest_single.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ jobs:
AWS_S3_ENDPOINT: "op://Data Engineering/DO_keys/AWS_S3_ENDPOINT"
AWS_SECRET_ACCESS_KEY: "op://Data Engineering/DO_keys/AWS_SECRET_ACCESS_KEY"
AWS_ACCESS_KEY_ID: "op://Data Engineering/DO_keys/AWS_ACCESS_KEY_ID"
SOCRATA_USER: "op://Data Engineering/DCP_OpenData/username"
SOCRATA_PASSWORD: "op://Data Engineering/DCP_OpenData/password"

- name: Finish container setup ...
run: ./bash/docker_container_setup.sh
Expand Down
17 changes: 7 additions & 10 deletions dcpy/connectors/socrata/extract.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from datetime import datetime
from pathlib import Path
import requests

from dcpy.models.connectors.socrata import Source
from dcpy.connectors import web
from .utils import _socrata_request


def get_base_url(source: Source):
Expand Down Expand Up @@ -31,18 +30,16 @@ def get_download_url(source: Source):
return url


def _get_version_from_resp(resp: dict) -> str:
return datetime.fromtimestamp(resp["rowsUpdatedAt"]).strftime("%Y%m%d")


def get_version(source: Source):
"""For given socrata dataset, get date last updated formatted as a string.
This is used as a proxy for a 'version' of the dataset."""
url = get_metadata_url(source)
resp = requests.get(url)
resp.raise_for_status()
return _get_version_from_resp(resp.json())
resp = _socrata_request(url, "GET")
return datetime.fromtimestamp(resp.json()["rowsUpdatedAt"]).strftime("%Y%m%d")


def download(source: Source, path: Path):
web.download_file(get_download_url(source), path)
path.parent.mkdir(exist_ok=True, parents=True)
response = _socrata_request(get_download_url(source), "GET")
with open(path, "wb") as f:
f.write(response.content)
38 changes: 12 additions & 26 deletions dcpy/connectors/socrata/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import copy
from dataclasses import dataclass
import json
import os
from pathlib import Path
from pydantic import BaseModel
import requests
from socrata.authorization import Authorization
from socrata import Socrata as SocrataPy
from socrata.revisions import Revision as SocrataPyRevision
Expand All @@ -25,9 +23,8 @@
from dcpy.utils.logging import logger

import dcpy.models.product.dataset.metadata_v2 as md
from .utils import SOCRATA_USER, SOCRATA_PASSWORD, _socrata_request

SOCRATA_USER = os.getenv("SOCRATA_USER")
SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD")
SOCRATA_REVISION_APPLY_TIMEOUT_SECS = 10 * 60 # Ten Mins

SOCRATA_DOMAIN = "data.cityofnewyork.us"
Expand All @@ -38,25 +35,16 @@
DISTRIBUTIONS_BUCKET = "edm-distributions"


def _simple_auth():
return (SOCRATA_USER, SOCRATA_PASSWORD)


def _socratapy_client():
return SocrataPy(Authorization(SOCRATA_DOMAIN, SOCRATA_USER, SOCRATA_PASSWORD))


def _socrata_request(
def _request(
url,
method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"],
**kwargs,
) -> dict:
"""Request wrapper to add auth, and raise exceptions on error."""
request_fn = getattr(requests, method.lower())
resp = request_fn(url, auth=_simple_auth(), **kwargs)
if not resp.ok:
raise Exception(resp.text)
return resp.json()
return _socrata_request(url, method, **kwargs).json()


# There are required publishing frequency fields in two different sections of
Expand Down Expand Up @@ -321,7 +309,7 @@ def calculate_pushed_col_metadata(self, our_columns: list[md.DatasetColumn]):

def push_socrata_column_metadata(self, our_cols: list[md.DatasetColumn]):
cols = self.calculate_pushed_col_metadata(our_cols)
return _socrata_request(
return _request(
self.column_update_endpoint,
"POST",
json={"output_columns": cols},
Expand All @@ -338,11 +326,11 @@ def revisions_endpoint(self):

def fetch_metadata(self):
"""Fetch metadata (e.g. dataset name, tags) for a dataset."""
return _socrata_request(f"{_views_root}/{self.four_four}.json", "GET")
return _request(f"{_views_root}/{self.four_four}.json", "GET")

def fetch_open_revisions(self) -> list[Revision]:
"""Fetch all open revisions for a given dataset."""
revs = _socrata_request(
revs = _request(
self.revisions_endpoint,
"GET",
params={"open": "true"},
Expand All @@ -360,7 +348,7 @@ def discard_open_revisions(self):
def create_replace_revision(self) -> Revision:
"""Create a revision that will replace/overwrite the existing dataset, rather than upserting."""
logger.info(f"Creating a revision at: {self.revisions_endpoint}")
resp = _socrata_request(
resp = _request(
self.revisions_endpoint, "POST", json={"action": {"type": "replace"}}
)["resource"]
revision = Revision(revision_num=resp["revision_seq"], four_four=self.four_four)
Expand Down Expand Up @@ -388,11 +376,11 @@ def page_url(self):

def apply(self):
"""Apply the revision to the dataset, closing the revision."""
return _socrata_request(f"{self.revision_endpoint}/apply", "put")
return _request(f"{self.revision_endpoint}/apply", "put")

def discard(self):
"""Discard this revision."""
return _socrata_request(self.revision_endpoint, "delete")
return _request(self.revision_endpoint, "delete")

def fetch_default_metadata(self) -> Socrata.Responses.Revision:
"""Fetch default metadata for a revision.
Expand All @@ -401,17 +389,15 @@ def fetch_default_metadata(self) -> Socrata.Responses.Revision:
e.g. If you patch an update to `contact email` for this revision, that change will not
be reflected in the revision you fetch here, nor on the Socrata revision page.
"""
return Socrata.Responses.Revision(
_socrata_request(self.revision_endpoint, "GET")
)
return Socrata.Responses.Revision(_request(self.revision_endpoint, "GET"))

def patch_metadata(
self,
metadata: Socrata.Inputs.DatasetMetadata,
attachments: list[Socrata.Inputs.Attachment],
):
return Socrata.Responses.Revision(
_socrata_request(
_request(
self.revision_endpoint,
"PATCH",
# TODO: should this header be a default?
Expand Down Expand Up @@ -526,7 +512,7 @@ def upload_attachment(
}

with open(path, "rb") as f:
attachment_md_raw = _socrata_request(
attachment_md_raw = _request(
f"{self.revision_endpoint}/attachment",
"POST",
headers={
Expand Down
25 changes: 25 additions & 0 deletions dcpy/connectors/socrata/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import requests
from typing import Literal

SOCRATA_USER = os.getenv("SOCRATA_USER")
SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD")


def _simple_auth() -> tuple[str, str] | None:
if SOCRATA_USER and SOCRATA_PASSWORD:
return (SOCRATA_USER, SOCRATA_PASSWORD)
else:
return None


def _socrata_request(
url,
method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"],
**kwargs,
) -> requests.Response:
"""Request wrapper to add auth, and raise exceptions on error."""
request_fn = getattr(requests, method.lower())
resp: requests.Response = request_fn(url, auth=_simple_auth(), **kwargs)
resp.raise_for_status()
return resp
44 changes: 23 additions & 21 deletions dcpy/library/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
from jinja2 import Template
import json
import importlib
import os
from pathlib import Path
import requests
import yaml

from dcpy.models.library import DatasetDefinition
from dcpy.models.connectors.socrata import Org as SocrataOrg
from dcpy.connectors.esri import arcgis_feature_service
from dcpy.connectors.socrata import extract as socrata
from .utils import format_url
from .validator import Validator, Dataset

Expand Down Expand Up @@ -49,14 +49,6 @@ def parsed_rendered_template(self, version: str) -> Dataset:
loaded["version"] = version
return Dataset(**loaded)

def version_socrata(self, uid: str) -> str:
"""using the socrata API, collect the 'data last update' date"""
metadata = requests.get(
f"https://data.cityofnewyork.us/api/views/{uid}.json"
).json()
version = datetime.fromtimestamp(metadata["rowsUpdatedAt"]).strftime("%Y%m%d")
return version

# @property
# def version_bytes(self) -> str:
# """parsing bytes of the big apple to get the latest bytes version"""
Expand All @@ -80,7 +72,13 @@ def compute(self) -> DatasetDefinition:

_config = self.parsed_unrendered_template
if _config.source.socrata:
version = self.version_socrata(_config.source.socrata.uid)
socrata_source = socrata.Source(
type="socrata",
org=SocrataOrg.nyc,
uid=_config.source.socrata.uid,
format=_config.source.socrata.format,
)
version = socrata.get_version(socrata_source)
elif _config.source.arcgis_feature_server:
fs = _config.source.arcgis_feature_server
feature_server_layer = arcgis_feature_service.resolve_layer(
Expand Down Expand Up @@ -121,21 +119,25 @@ def compute(self) -> DatasetDefinition:
elif config.source.socrata:
if self.source_path_override:
raise ValueError("Cannot override path of socrata dataset")
socrata = config.source.socrata
if socrata.format == "csv":
path = f"https://data.cityofnewyork.us/api/views/{socrata.uid}/rows.csv"
elif socrata.format == "geojson":
path = f"https://nycopendata.socrata.com/api/geospatial/{socrata.uid}?method=export&format=GeoJSON"
elif socrata.format == "shapefile":
source = config.source.socrata
if source.format == "csv":
path = f"library/tmp/{config.name}.csv"
elif source.format == "geojson":
path = f"library/tmp/{config.name}.geojson"
elif source.format == "shapefile":
path = f"library/tmp/{config.name}.zip"
url = f"https://data.cityofnewyork.us/api/geospatial/{socrata.uid}?method=export&format=Shapefile"
os.system("mkdir -p library/tmp")
os.system(f'curl -o {path} "{url}"')
else:
raise Exception(
"Socrata source format must be 'csv', 'geojson', or 'shapefile'."
)
config.source.gdalpath = format_url(path)
socrata_source = socrata.Source(
type="socrata",
org=SocrataOrg.nyc,
uid=source.uid,
format=source.format,
)
socrata.download(socrata_source, Path(path))
config.source.gdalpath = path

elif config.source.arcgis_feature_server:
if self.source_path_override:
Expand Down
2 changes: 1 addition & 1 deletion dcpy/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def raise_for_status(self):


def mock_request_get(
url: str, headers=None, params: dict | None = None
url: str, headers=None, auth=None, params: dict | None = None
) -> MockResponse:
"""
Mocks calls to request.get
Expand Down
11 changes: 0 additions & 11 deletions dcpy/test/library/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,6 @@ def test_config_source_type():
assert c.source.url


@patch("requests.get", side_effect=mock_request_get)
def test_config_version_socrata(request_get):
c = Config(get_config_file("socrata"))
assert c.parsed_unrendered_template.source.socrata
uid = c.parsed_unrendered_template.source.socrata.uid
version = c.version_socrata(uid)
assert len(version) == 8 # format: YYYYMMDD
assert int(version[-2:]) <= 31 # check date
assert int(version[-4:-2]) <= 12 # check month


def test_config_version_today():
c = Config(get_config_file("socrata"))
version = c.version_today
Expand Down

0 comments on commit 31cce05

Please sign in to comment.