Skip to content

Commit

Permalink
tweak api to default a date format and fix sdk to send data to parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
TobyDrane committed Nov 15, 2023
1 parent 3595f5a commit ee46ffa
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ repos:
rev: 1.7.5
hooks:
- id: bandit
exclude: '(tests|docs)/.*'
exclude: '(tests|docs|test)/.*'
- repo: https://github.com/psf/black
rev: 22.6.0
hooks:
Expand Down
6 changes: 4 additions & 2 deletions api/api/application/services/schema_infer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
)
from api.common.value_transformers import clean_column_name

from api.domain.data_types import extract_athena_types
from api.domain.data_types import extract_athena_types, is_date_type
from api.domain.schema import Schema, Column
from api.domain.schema_metadata import Owner, SchemaMetadata

DEFAULT_DATE_FORMAT = "%Y-%m-%d"


class SchemaInferService:
def infer_schema(
Expand Down Expand Up @@ -67,7 +69,7 @@ def _infer_columns(self, dataframe: pd.DataFrame) -> List[Column]:
partition_index=None,
data_type=_type,
allow_null=True,
format=None,
format=DEFAULT_DATE_FORMAT if is_date_type(_type) else None,
)
for name, _type in extract_athena_types(dataframe).items()
]
1 change: 1 addition & 0 deletions api/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ httpx
jinja2
pandas
psutil
pyarrow
pyjwt
pydantic[email]
python-multipart
Expand Down
41 changes: 41 additions & 0 deletions api/test/api/application/services/test_schema_infer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from pathlib import Path
from unittest.mock import patch

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pytest

from api.application.services.schema_infer_service import SchemaInferService
Expand Down Expand Up @@ -67,6 +70,44 @@ def test_infer_schema(self):
assert actual_schema == expected_schema
os.remove(temp_out_path)

def test_infer_schema_with_date(self):
expected_schema = Schema(
metadata=SchemaMetadata(
layer="raw",
domain="mydomain",
dataset="mydataset",
sensitivity="PUBLIC",
owners=[Owner(name="change_me", email="change_me@email.com")],
),
columns=[
Column(
name="colname1",
partition_index=None,
data_type="string",
allow_null=True,
format=None,
),
Column(
name="colname2",
partition_index=None,
data_type="date",
allow_null=True,
format="%Y-%m-%d",
),
],
).dict(exclude={"metadata": {"version"}})
df = pd.DataFrame(data={"colname1": ["something"], "colname2": ["2021-01-01"]})
df["colname2"] = pd.to_datetime(df["colname2"])
temp_out_path = tempfile.mkstemp(suffix=".parquet")[1]
path = Path(temp_out_path)
pq.write_table(pa.Table.from_pandas(df), path)

actual_schema = self.infer_schema_service.infer_schema(
"raw", "mydomain", "mydataset", "PUBLIC", path
)
assert actual_schema == expected_schema
os.remove(temp_out_path)

@patch("api.application.services.schema_infer_service.construct_chunked_dataframe")
def test_raises_error_when_parsing_provided_file_fails(
self, mock_construct_chunked_dataframe
Expand Down
4 changes: 2 additions & 2 deletions sdk/rapid/rapid.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ def convert_dataframe_for_file_upload(self, df: DataFrame):
"""
return {
"file": (
f"rapid-sdk-{int(datetime.now().timestamp())}.csv",
df.to_csv(index=False),
f"rapid-sdk-{int(datetime.now().timestamp())}.parquet",
df.to_parquet(index=False),
)
}

Expand Down
1 change: 1 addition & 0 deletions sdk/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ requests
requests-mock
twine
pydantic
pyarrow
2 changes: 1 addition & 1 deletion sdk/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
author_email="lcard@no10.gov.uk",
license="MIT",
packages=find_packages(include=["rapid", "rapid.*"], exclude=["tests"]),
install_requires=["pandas", "requests", "deepdiff"],
install_requires=["pandas", "requests", "deepdiff", "pyarrow", "pydantic"],
include_package_data=True,
)
23 changes: 13 additions & 10 deletions sdk/tests/test_rapid.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from mock import Mock, call
from pandas import DataFrame
import pytest
import io
import pandas as pd
from requests_mock import Mocker

from rapid import Rapid
Expand Down Expand Up @@ -154,7 +155,7 @@ def test_upload_dataframe_success_after_waiting(
domain = "test_domain"
dataset = "test_dataset"
job_id = 1234
df = DataFrame()
df = pd.DataFrame()
requests_mock.post(
f"{RAPID_URL}/datasets/{layer}/{domain}/{dataset}",
json={"details": {"job_id": job_id}},
Expand All @@ -176,7 +177,7 @@ def test_upload_dataframe_success_no_waiting(
domain = "test_domain"
dataset = "test_dataset"
job_id = 1234
df = DataFrame()
df = pd.DataFrame()
requests_mock.post(
f"{RAPID_URL}/datasets/{layer}/{domain}/{dataset}",
json={"details": {"job_id": job_id}},
Expand All @@ -194,7 +195,7 @@ def test_upload_dataframe_failure(self, requests_mock: Mocker, rapid: Rapid):
domain = "test_domain"
dataset = "test_dataset"
job_id = 1234
df = DataFrame()
df = pd.DataFrame()
requests_mock.post(
f"{RAPID_URL}/datasets/{layer}/{domain}/{dataset}",
json={"details": {"job_id": job_id}},
Expand Down Expand Up @@ -238,20 +239,22 @@ def test_fetch_dataset_info_failure(self, requests_mock: Mocker, rapid: Rapid):

@pytest.mark.usefixtures("rapid")
def test_convert_dataframe_for_file_upload(self, rapid: Rapid):
df = DataFrame()
df = pd.DataFrame()
res = rapid.convert_dataframe_for_file_upload(df)
filename = res["file"][0]
data = res["file"][1]
assert filename.startswith("rapid-sdk") and filename.endswith(".csv")
assert data == "\n"
data = io.BytesIO(res["file"][1])
df = pd.read_parquet(data)

assert filename.startswith("rapid-sdk") and filename.endswith(".parquet")
assert len(df) == 0

@pytest.mark.usefixtures("requests_mock", "rapid")
def test_generate_schema_success(self, requests_mock: Mocker, rapid: Rapid):
layer = "raw"
domain = "test_domain"
dataset = "test_dataset"
sensitivity = "PUBLIC"
df = DataFrame()
df = pd.DataFrame()
mocked_response = {
"metadata": {
"layer": "raw",
Expand Down Expand Up @@ -299,7 +302,7 @@ def test_generate_schema_failure(self, requests_mock: Mocker, rapid: Rapid):
domain = "test_domain"
dataset = "test_dataset"
sensitivity = "PUBLIC"
df = DataFrame()
df = pd.DataFrame()
mocked_response = {"data": "dummy"}
requests_mock.post(
f"{RAPID_URL}/schema/{layer}/{sensitivity}/{domain}/{dataset}/generate",
Expand Down

0 comments on commit ee46ffa

Please sign in to comment.