Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use socrata connector, auth for ingest/library calls #1312

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ingest_open_data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ jobs:
AWS_SECRET_ACCESS_KEY: "op://Data Engineering/DO_keys/AWS_SECRET_ACCESS_KEY"
AWS_ACCESS_KEY_ID: "op://Data Engineering/DO_keys/AWS_ACCESS_KEY_ID"
BUILD_ENGINE_SERVER: "op://Data Engineering/EDM_DATA/server_url"
SOCRATA_USER: "op://Data Engineering/DCP_OpenData/username"
SOCRATA_PASSWORD: "op://Data Engineering/DCP_OpenData/password"

- name: Finish container setup ...
run: ./bash/docker_container_setup.sh
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/ingest_single.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ jobs:
AWS_S3_ENDPOINT: "op://Data Engineering/DO_keys/AWS_S3_ENDPOINT"
AWS_SECRET_ACCESS_KEY: "op://Data Engineering/DO_keys/AWS_SECRET_ACCESS_KEY"
AWS_ACCESS_KEY_ID: "op://Data Engineering/DO_keys/AWS_ACCESS_KEY_ID"
SOCRATA_USER: "op://Data Engineering/DCP_OpenData/username"
SOCRATA_PASSWORD: "op://Data Engineering/DCP_OpenData/password"

- name: Finish container setup ...
run: ./bash/docker_container_setup.sh
Expand Down
17 changes: 7 additions & 10 deletions dcpy/connectors/socrata/extract.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from datetime import datetime
from pathlib import Path
import requests

from dcpy.models.connectors.socrata import Source
from dcpy.connectors import web
from .utils import _socrata_request


def get_base_url(source: Source):
Expand Down Expand Up @@ -31,18 +30,16 @@ def get_download_url(source: Source):
return url


def _get_version_from_resp(resp: dict) -> str:
return datetime.fromtimestamp(resp["rowsUpdatedAt"]).strftime("%Y%m%d")


def get_version(source: Source):
"""For given socrata dataset, get date last updated formatted as a string.
This is used as a proxy for a 'version' of the dataset."""
url = get_metadata_url(source)
resp = requests.get(url)
resp.raise_for_status()
return _get_version_from_resp(resp.json())
resp = _socrata_request(url, "GET")
return datetime.fromtimestamp(resp.json()["rowsUpdatedAt"]).strftime("%Y%m%d")


def download(source: Source, path: Path):
web.download_file(get_download_url(source), path)
path.parent.mkdir(exist_ok=True, parents=True)
response = _socrata_request(get_download_url(source), "GET")
with open(path, "wb") as f:
f.write(response.content)
38 changes: 12 additions & 26 deletions dcpy/connectors/socrata/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import copy
from dataclasses import dataclass
import json
import os
from pathlib import Path
from pydantic import BaseModel
import requests
from socrata.authorization import Authorization
from socrata import Socrata as SocrataPy
from socrata.revisions import Revision as SocrataPyRevision
Expand All @@ -25,9 +23,8 @@
from dcpy.utils.logging import logger

import dcpy.models.product.dataset.metadata_v2 as md
from .utils import SOCRATA_USER, SOCRATA_PASSWORD, _socrata_request

SOCRATA_USER = os.getenv("SOCRATA_USER")
SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD")
SOCRATA_REVISION_APPLY_TIMEOUT_SECS = 10 * 60 # Ten Mins

SOCRATA_DOMAIN = "data.cityofnewyork.us"
Expand All @@ -38,25 +35,16 @@
DISTRIBUTIONS_BUCKET = "edm-distributions"


def _simple_auth():
return (SOCRATA_USER, SOCRATA_PASSWORD)


def _socratapy_client():
return SocrataPy(Authorization(SOCRATA_DOMAIN, SOCRATA_USER, SOCRATA_PASSWORD))


def _socrata_request(
def _request(
url,
method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"],
**kwargs,
) -> dict:
"""Request wrapper to add auth, and raise exceptions on error."""
request_fn = getattr(requests, method.lower())
resp = request_fn(url, auth=_simple_auth(), **kwargs)
if not resp.ok:
raise Exception(resp.text)
return resp.json()
return _socrata_request(url, method, **kwargs).json()

Check warning on line 47 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L47

Added line #L47 was not covered by tests


# There are required publishing frequency fields in two different sections of
Expand Down Expand Up @@ -321,7 +309,7 @@

def push_socrata_column_metadata(self, our_cols: list[md.DatasetColumn]):
cols = self.calculate_pushed_col_metadata(our_cols)
return _socrata_request(
return _request(

Check warning on line 312 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L312

Added line #L312 was not covered by tests
self.column_update_endpoint,
"POST",
json={"output_columns": cols},
Expand All @@ -338,11 +326,11 @@

def fetch_metadata(self):
"""Fetch metadata (e.g. dataset name, tags) for a dataset."""
return _socrata_request(f"{_views_root}/{self.four_four}.json", "GET")
return _request(f"{_views_root}/{self.four_four}.json", "GET")

Check warning on line 329 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L329

Added line #L329 was not covered by tests

def fetch_open_revisions(self) -> list[Revision]:
"""Fetch all open revisions for a given dataset."""
revs = _socrata_request(
revs = _request(

Check warning on line 333 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L333

Added line #L333 was not covered by tests
self.revisions_endpoint,
"GET",
params={"open": "true"},
Expand All @@ -360,7 +348,7 @@
def create_replace_revision(self) -> Revision:
"""Create a revision that will replace/overwrite the existing dataset, rather than upserting."""
logger.info(f"Creating a revision at: {self.revisions_endpoint}")
resp = _socrata_request(
resp = _request(

Check warning on line 351 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L351

Added line #L351 was not covered by tests
self.revisions_endpoint, "POST", json={"action": {"type": "replace"}}
)["resource"]
revision = Revision(revision_num=resp["revision_seq"], four_four=self.four_four)
Expand Down Expand Up @@ -388,11 +376,11 @@

def apply(self):
"""Apply the revision to the dataset, closing the revision."""
return _socrata_request(f"{self.revision_endpoint}/apply", "put")
return _request(f"{self.revision_endpoint}/apply", "put")

Check warning on line 379 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L379

Added line #L379 was not covered by tests

def discard(self):
"""Discard this revision."""
return _socrata_request(self.revision_endpoint, "delete")
return _request(self.revision_endpoint, "delete")

Check warning on line 383 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L383

Added line #L383 was not covered by tests

def fetch_default_metadata(self) -> Socrata.Responses.Revision:
"""Fetch default metadata for a revision.
Expand All @@ -401,17 +389,15 @@
e.g. If you patch an update to `contact email` for this revision, that change will not
be reflected in the revision you fetch here, nor on the Socrata revision page.
"""
return Socrata.Responses.Revision(
_socrata_request(self.revision_endpoint, "GET")
)
return Socrata.Responses.Revision(_request(self.revision_endpoint, "GET"))

Check warning on line 392 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L392

Added line #L392 was not covered by tests

def patch_metadata(
self,
metadata: Socrata.Inputs.DatasetMetadata,
attachments: list[Socrata.Inputs.Attachment],
):
return Socrata.Responses.Revision(
_socrata_request(
_request(
self.revision_endpoint,
"PATCH",
# TODO: should this header be a default?
Expand Down Expand Up @@ -526,7 +512,7 @@
}

with open(path, "rb") as f:
attachment_md_raw = _socrata_request(
attachment_md_raw = _request(

Check warning on line 515 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L515

Added line #L515 was not covered by tests
f"{self.revision_endpoint}/attachment",
"POST",
headers={
Expand Down
25 changes: 25 additions & 0 deletions dcpy/connectors/socrata/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import requests
from typing import Literal

SOCRATA_USER = os.getenv("SOCRATA_USER")
SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD")


def _simple_auth() -> tuple[str, str] | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think we'd want to throw an exception if _simple_auth() is called without these vars present. Any reason not to? (IIRC the Socrata request would fail in that case, even for public resources)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going with no auth is fine for get requests for things like metadata or pulling datasets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to try to use auth when user/pass is defined, and let failures occur when a resource isn't allowed for a given user (or None auth).

Arguably then, maybe we only call simple_auth if those are defined. But I thought this was simple - get optional auth credentials if available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, so you've tested that this works for public resources when there are no creds defined? (for some reason I remember that failing, but that was a long time ago.) Maybe it makes sense to just log a warning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup - just ran locally with no issue

if SOCRATA_USER and SOCRATA_PASSWORD:
return (SOCRATA_USER, SOCRATA_PASSWORD)

Check warning on line 11 in dcpy/connectors/socrata/utils.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/utils.py#L11

Added line #L11 was not covered by tests
else:
return None


def _socrata_request(
url,
method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"],
**kwargs,
) -> requests.Response:
"""Request wrapper to add auth, and raise exceptions on error."""
request_fn = getattr(requests, method.lower())
resp: requests.Response = request_fn(url, auth=_simple_auth(), **kwargs)
resp.raise_for_status()
return resp
44 changes: 23 additions & 21 deletions dcpy/library/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
from jinja2 import Template
import json
import importlib
import os
from pathlib import Path
import requests
import yaml

from dcpy.models.library import DatasetDefinition
from dcpy.models.connectors.socrata import Org as SocrataOrg
from dcpy.connectors.esri import arcgis_feature_service
from dcpy.connectors.socrata import extract as socrata
from .utils import format_url
from .validator import Validator, Dataset

Expand Down Expand Up @@ -49,14 +49,6 @@
loaded["version"] = version
return Dataset(**loaded)

def version_socrata(self, uid: str) -> str:
"""using the socrata API, collect the 'data last update' date"""
metadata = requests.get(
f"https://data.cityofnewyork.us/api/views/{uid}.json"
).json()
version = datetime.fromtimestamp(metadata["rowsUpdatedAt"]).strftime("%Y%m%d")
return version

# @property
# def version_bytes(self) -> str:
# """parsing bytes of the big apple to get the latest bytes version"""
Expand All @@ -80,7 +72,13 @@

_config = self.parsed_unrendered_template
if _config.source.socrata:
version = self.version_socrata(_config.source.socrata.uid)
socrata_source = socrata.Source(
type="socrata",
org=SocrataOrg.nyc,
uid=_config.source.socrata.uid,
format=_config.source.socrata.format,
)
version = socrata.get_version(socrata_source)
elif _config.source.arcgis_feature_server:
fs = _config.source.arcgis_feature_server
feature_server_layer = arcgis_feature_service.resolve_layer(
Expand Down Expand Up @@ -121,21 +119,25 @@
elif config.source.socrata:
if self.source_path_override:
raise ValueError("Cannot override path of socrata dataset")
socrata = config.source.socrata
if socrata.format == "csv":
path = f"https://data.cityofnewyork.us/api/views/{socrata.uid}/rows.csv"
elif socrata.format == "geojson":
path = f"https://nycopendata.socrata.com/api/geospatial/{socrata.uid}?method=export&format=GeoJSON"
elif socrata.format == "shapefile":
source = config.source.socrata
if source.format == "csv":
path = f"library/tmp/{config.name}.csv"
elif source.format == "geojson":
path = f"library/tmp/{config.name}.geojson"

Check warning on line 126 in dcpy/library/config.py

View check run for this annotation

Codecov / codecov/patch

dcpy/library/config.py#L126

Added line #L126 was not covered by tests
elif source.format == "shapefile":
path = f"library/tmp/{config.name}.zip"
url = f"https://data.cityofnewyork.us/api/geospatial/{socrata.uid}?method=export&format=Shapefile"
os.system("mkdir -p library/tmp")
os.system(f'curl -o {path} "{url}"')
else:
raise Exception(
"Socrata source format must be 'csv', 'geojson', or 'shapefile'."
)
config.source.gdalpath = format_url(path)
socrata_source = socrata.Source(
type="socrata",
org=SocrataOrg.nyc,
uid=source.uid,
format=source.format,
)
socrata.download(socrata_source, Path(path))
config.source.gdalpath = path

elif config.source.arcgis_feature_server:
if self.source_path_override:
Expand Down
2 changes: 1 addition & 1 deletion dcpy/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def raise_for_status(self):


def mock_request_get(
url: str, headers=None, params: dict | None = None
url: str, headers=None, auth=None, params: dict | None = None
) -> MockResponse:
"""
Mocks calls to request.get
Expand Down
11 changes: 0 additions & 11 deletions dcpy/test/library/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,6 @@ def test_config_source_type():
assert c.source.url


@patch("requests.get", side_effect=mock_request_get)
def test_config_version_socrata(request_get):
c = Config(get_config_file("socrata"))
assert c.parsed_unrendered_template.source.socrata
uid = c.parsed_unrendered_template.source.socrata.uid
version = c.version_socrata(uid)
assert len(version) == 8 # format: YYYYMMDD
assert int(version[-2:]) <= 31 # check date
assert int(version[-4:-2]) <= 12 # check month


def test_config_version_today():
c = Config(get_config_file("socrata"))
version = c.version_today
Expand Down