Skip to content

Commit

Permalink
pull out shared socrata functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
fvankrieken committed Dec 9, 2024
1 parent 340ec91 commit 0f43063
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 26 deletions.
23 changes: 23 additions & 0 deletions dcpy/connectors/socrata/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
import requests
from typing import Literal

SOCRATA_USER = os.getenv("SOCRATA_USER")
SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD")


def _simple_auth():
return (SOCRATA_USER, SOCRATA_PASSWORD)


def _socrata_request(
url,
method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"],
**kwargs,
) -> dict:
"""Request wrapper to add auth, and raise exceptions on error."""
request_fn = getattr(requests, method.lower())
resp = request_fn(url, auth=_simple_auth(), **kwargs)
if not resp.ok:
raise Exception(resp.text)
return resp.json()
38 changes: 12 additions & 26 deletions dcpy/connectors/socrata/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import copy
from dataclasses import dataclass
import json
import os
from pathlib import Path
from pydantic import BaseModel
import requests
from socrata.authorization import Authorization
from socrata import Socrata as SocrataPy
from socrata.revisions import Revision as SocrataPyRevision
Expand All @@ -25,9 +23,8 @@
from dcpy.utils.logging import logger

import dcpy.models.product.dataset.metadata_v2 as md
from . import SOCRATA_USER, SOCRATA_PASSWORD, _socrata_request

SOCRATA_USER = os.getenv("SOCRATA_USER")
SOCRATA_PASSWORD = os.getenv("SOCRATA_PASSWORD")
SOCRATA_REVISION_APPLY_TIMEOUT_SECS = 10 * 60 # Ten Mins

SOCRATA_DOMAIN = "data.cityofnewyork.us"
Expand All @@ -38,25 +35,16 @@
DISTRIBUTIONS_BUCKET = "edm-distributions"


def _simple_auth():
return (SOCRATA_USER, SOCRATA_PASSWORD)


def _socratapy_client():
return SocrataPy(Authorization(SOCRATA_DOMAIN, SOCRATA_USER, SOCRATA_PASSWORD))


def _socrata_request(
def _request(
url,
method: Literal["GET", "PUT", "POST", "PATCH", "DELETE"],
**kwargs,
) -> dict:
"""Request wrapper to add auth, and raise exceptions on error."""
request_fn = getattr(requests, method.lower())
resp = request_fn(url, auth=_simple_auth(), **kwargs)
if not resp.ok:
raise Exception(resp.text)
return resp.json()
return _socrata_request(url, method, **kwargs).json()


# There are required publishing frequency fields in two different sections of
Expand Down Expand Up @@ -321,7 +309,7 @@ def calculate_pushed_col_metadata(self, our_columns: list[md.DatasetColumn]):

def push_socrata_column_metadata(self, our_cols: list[md.DatasetColumn]):
cols = self.calculate_pushed_col_metadata(our_cols)
return _socrata_request(
return _request(
self.column_update_endpoint,
"POST",
json={"output_columns": cols},
Expand All @@ -338,11 +326,11 @@ def revisions_endpoint(self):

def fetch_metadata(self):
"""Fetch metadata (e.g. dataset name, tags) for a dataset."""
return _socrata_request(f"{_views_root}/{self.four_four}.json", "GET")
return _request(f"{_views_root}/{self.four_four}.json", "GET")

def fetch_open_revisions(self) -> list[Revision]:
"""Fetch all open revisions for a given dataset."""
revs = _socrata_request(
revs = _request(
self.revisions_endpoint,
"GET",
params={"open": "true"},
Expand All @@ -360,7 +348,7 @@ def discard_open_revisions(self):
def create_replace_revision(self) -> Revision:
"""Create a revision that will replace/overwrite the existing dataset, rather than upserting."""
logger.info(f"Creating a revision at: {self.revisions_endpoint}")
resp = _socrata_request(
resp = _request(
self.revisions_endpoint, "POST", json={"action": {"type": "replace"}}
)["resource"]
revision = Revision(revision_num=resp["revision_seq"], four_four=self.four_four)
Expand Down Expand Up @@ -388,11 +376,11 @@ def page_url(self):

def apply(self):
"""Apply the revision to the dataset, closing the revision."""
return _socrata_request(f"{self.revision_endpoint}/apply", "put")
return _request(f"{self.revision_endpoint}/apply", "put")

def discard(self):
"""Discard this revision."""
return _socrata_request(self.revision_endpoint, "delete")
return _request(self.revision_endpoint, "delete")

def fetch_default_metadata(self) -> Socrata.Responses.Revision:
"""Fetch default metadata for a revision.
Expand All @@ -401,17 +389,15 @@ def fetch_default_metadata(self) -> Socrata.Responses.Revision:
e.g. If you patch an update to `contact email` for this revision, that change will not
be reflected in the revision you fetch here, nor on the Socrata revision page.
"""
return Socrata.Responses.Revision(
_socrata_request(self.revision_endpoint, "GET")
)
return Socrata.Responses.Revision(_request(self.revision_endpoint, "GET"))

def patch_metadata(
self,
metadata: Socrata.Inputs.DatasetMetadata,
attachments: list[Socrata.Inputs.Attachment],
):
return Socrata.Responses.Revision(
_socrata_request(
_request(
self.revision_endpoint,
"PATCH",
# TODO: should this header be a default?
Expand Down Expand Up @@ -526,7 +512,7 @@ def upload_attachment(
}

with open(path, "rb") as f:
attachment_md_raw = _socrata_request(
attachment_md_raw = _request(
f"{self.revision_endpoint}/attachment",
"POST",
headers={
Expand Down

0 comments on commit 0f43063

Please sign in to comment.