Skip to content

Commit

Permalink
use socrata connector, auth for ingest/library calls
Browse files Browse the repository at this point in the history
  • Loading branch information
fvankrieken committed Dec 9, 2024
1 parent e7bdf5a commit f518900
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 40 deletions.
23 changes: 23 additions & 0 deletions dcpy/connectors/socrata/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
import requests
from typing import Literal


SOCRATA_USER = os.getenv("SOCRATA_USER")
SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD")

def _simple_auth():
return (SOCRATA_USER, SOCRATA_PASSWORD)


def _socrata_request(
url,
method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"],
**kwargs,
) -> dict:
"""Request wrapper to add auth, and raise exceptions on error."""
request_fn = getattr(requests, method.lower())
resp = request_fn(url, auth=_simple_auth(), **kwargs)
if not resp.ok:
raise Exception(resp.text)
return resp.json()
7 changes: 5 additions & 2 deletions dcpy/connectors/socrata/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from dcpy.models.connectors.socrata import Source
from dcpy.connectors import web
from . import _socrata_request


def get_base_url(source: Source):
Expand Down Expand Up @@ -39,10 +40,12 @@ def get_version(source: Source):
"""For given socrata dataset, get date last updated formatted as a string.
This is used as a proxy for a 'version' of the dataset."""
url = get_metadata_url(source)
resp = requests.get(url)
resp = _socrata_request(url, "GET")
resp.raise_for_status()
return _get_version_from_resp(resp.json())


def download(source: Source, path: Path):
web.download_file(get_download_url(source), path)
response = _socrata_request(get_download_url(source), "GET")
with open(path, "wb") as f:
f.write(response.content)
24 changes: 2 additions & 22 deletions dcpy/connectors/socrata/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,19 @@
import copy
from dataclasses import dataclass
import json
import os
from pathlib import Path
from pydantic import BaseModel
import requests
from socrata.authorization import Authorization
from socrata import Socrata as SocrataPy
from socrata.revisions import Revision as SocrataPyRevision
import time
from typing import TypedDict, Literal, NotRequired, Any
from typing import TypedDict, NotRequired, Any

from dcpy.utils.logging import logger

import dcpy.models.product.dataset.metadata_v2 as md
from . import SOCRATA_USER, SOCRATA_PASSWORD

SOCRATA_USER = os.getenv("SOCRATA_USER")
SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD")
SOCRATA_REVISION_APPLY_TIMEOUT_SECS = 10 * 60 # Ten Mins

SOCRATA_DOMAIN = "data.cityofnewyork.us"
Expand All @@ -38,27 +35,10 @@
DISTRIBUTIONS_BUCKET = "edm-distributions"


def _simple_auth():
return (SOCRATA_USER, SOCRATA_PASSWORD)


def _socratapy_client():
return SocrataPy(Authorization(SOCRATA_DOMAIN, SOCRATA_USER, SOCRATA_PASSWORD))


def _socrata_request(
url,
method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"],
**kwargs,
) -> dict:
"""Request wrapper to add auth, and raise exceptions on error."""
request_fn = getattr(requests, method.lower())
resp = request_fn(url, auth=_simple_auth(), **kwargs)
if not resp.ok:
raise Exception(resp.text)
return resp.json()


# There are required publishing frequency fields in two different sections of
# the required metadata, and they're different. Below are the shared fields
UPDATE_SECTION_FREQUENCIES = {
Expand Down
41 changes: 25 additions & 16 deletions dcpy/library/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from dcpy.models.library import DatasetDefinition
from dcpy.connectors.esri import arcgis_feature_service
from dcpy.connectors.socrata import extract as socrata
from .utils import format_url
from .validator import Validator, Dataset

Expand Down Expand Up @@ -51,11 +52,9 @@ def parsed_rendered_template(self, version: str) -> Dataset:

def version_socrata(self, uid: str) -> str:
"""using the socrata API, collect the 'data last update' date"""
metadata = requests.get(
f"https://data.cityofnewyork.us/api/views/{uid}.json"
).json()
version = datetime.fromtimestamp(metadata["rowsUpdatedAt"]).strftime("%Y%m%d")
return version
return socrata.get_version(
socrata.Source(type="socrata", org="nyc", uid=uid, format="csv")
)

# @property
# def version_bytes(self) -> str:
Expand All @@ -80,7 +79,13 @@ def compute(self) -> DatasetDefinition:

_config = self.parsed_unrendered_template
if _config.source.socrata:
version = self.version_socrata(_config.source.socrata.uid)
source = socrata.Source(
type="socrata",
org="nyc",
uid=_config.source.socrata.uid,
format=_config.source.socrata.format,
)
version = source.get_version(source)
elif _config.source.arcgis_feature_server:
fs = _config.source.arcgis_feature_server
feature_server_layer = arcgis_feature_service.resolve_layer(
Expand Down Expand Up @@ -121,21 +126,25 @@ def compute(self) -> DatasetDefinition:
elif config.source.socrata:
if self.source_path_override:
raise ValueError("Cannot override path of socrata dataset")
socrata = config.source.socrata
if socrata.format == "csv":
path = f"https://data.cityofnewyork.us/api/views/{socrata.uid}/rows.csv"
elif socrata.format == "geojson":
path = f"https://nycopendata.socrata.com/api/geospatial/{socrata.uid}?method=export&format=GeoJSON"
elif socrata.format == "shapefile":
source = config.source.socrata
if source.format == "csv":
path = f"library/tmp/{config.name}.csv"
elif source.format == "geojson":
path = f"library/tmp/{config.name}.geojson"
elif source.format == "shapefile":
path = f"library/tmp/{config.name}.zip"
url = f"https://data.cityofnewyork.us/api/geospatial/{socrata.uid}?method=export&format=Shapefile"
os.system("mkdir -p library/tmp")
os.system(f'curl -o {path} "{url}"')
else:
raise Exception(
"Socrata source format must be 'csv', 'geojson', or 'shapefile'."
)
config.source.gdalpath = format_url(path)
socrata_source = socrata.Source(
type="socrata",
org="nyc",
uid=source.uid,
format=source.format,
)
socrata.download(socrata_source, Path(path))
config.source.gdalpath = path

elif config.source.arcgis_feature_server:
if self.source_path_override:
Expand Down

0 comments on commit f518900

Please sign in to comment.