diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c42e1b7..b908c38 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -12,7 +12,7 @@ repos: rev: 1.7.5 hooks: - id: bandit - exclude: '(tests|docs)/.*' + exclude: '(tests|docs|test)/.*' - repo: https://github.com/psf/black rev: 22.6.0 hooks: diff --git a/api/api/application/services/schema_infer_service.py b/api/api/application/services/schema_infer_service.py index 9ebdc69..3e45084 100644 --- a/api/api/application/services/schema_infer_service.py +++ b/api/api/application/services/schema_infer_service.py @@ -13,10 +13,12 @@ ) from api.common.value_transformers import clean_column_name -from api.domain.data_types import extract_athena_types +from api.domain.data_types import extract_athena_types, is_date_type from api.domain.schema import Schema, Column from api.domain.schema_metadata import Owner, SchemaMetadata +DEFAULT_DATE_FORMAT = "%Y-%m-%d" + class SchemaInferService: def infer_schema( @@ -67,7 +69,7 @@ def _infer_columns(self, dataframe: pd.DataFrame) -> List[Column]: partition_index=None, data_type=_type, allow_null=True, - format=None, + format=DEFAULT_DATE_FORMAT if is_date_type(_type) else None, ) for name, _type in extract_athena_types(dataframe).items() ] diff --git a/api/requirements.txt b/api/requirements.txt index 95c1fe0..440de54 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -7,6 +7,7 @@ httpx jinja2 pandas psutil +pyarrow pyjwt pydantic[email] python-multipart diff --git a/api/test/api/application/services/test_schema_infer_service.py b/api/test/api/application/services/test_schema_infer_service.py index c130de3..7c40fba 100644 --- a/api/test/api/application/services/test_schema_infer_service.py +++ b/api/test/api/application/services/test_schema_infer_service.py @@ -3,6 +3,9 @@ from pathlib import Path from unittest.mock import patch +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq import pytest from api.application.services.schema_infer_service import SchemaInferService @@ -67,6 +70,44 @@ def test_infer_schema(self): assert actual_schema == expected_schema os.remove(temp_out_path) + def test_infer_schema_with_date(self): + expected_schema = Schema( + metadata=SchemaMetadata( + layer="raw", + domain="mydomain", + dataset="mydataset", + sensitivity="PUBLIC", + owners=[Owner(name="change_me", email="change_me@email.com")], + ), + columns=[ + Column( + name="colname1", + partition_index=None, + data_type="string", + allow_null=True, + format=None, + ), + Column( + name="colname2", + partition_index=None, + data_type="date", + allow_null=True, + format="%Y-%m-%d", + ), + ], + ).dict(exclude={"metadata": {"version"}}) + df = pd.DataFrame(data={"colname1": ["something"], "colname2": ["2021-01-01"]}) + df["colname2"] = pd.to_datetime(df["colname2"]) + temp_out_path = tempfile.mkstemp(suffix=".parquet")[1] + path = Path(temp_out_path) + pq.write_table(pa.Table.from_pandas(df), path) + + actual_schema = self.infer_schema_service.infer_schema( + "raw", "mydomain", "mydataset", "PUBLIC", path + ) + assert actual_schema == expected_schema + os.remove(temp_out_path) + @patch("api.application.services.schema_infer_service.construct_chunked_dataframe") def test_raises_error_when_parsing_provided_file_fails( self, mock_construct_chunked_dataframe diff --git a/sdk/rapid/rapid.py b/sdk/rapid/rapid.py index c24d48a..77c8e06 100644 --- a/sdk/rapid/rapid.py +++ b/sdk/rapid/rapid.py @@ -245,8 +245,8 @@ def convert_dataframe_for_file_upload(self, df: DataFrame): """ return { "file": ( - f"rapid-sdk-{int(datetime.now().timestamp())}.csv", - df.to_csv(index=False), + f"rapid-sdk-{int(datetime.now().timestamp())}.parquet", + df.to_parquet(index=False), ) } diff --git a/sdk/requirements.txt b/sdk/requirements.txt index c086b21..98354f7 100644 --- a/sdk/requirements.txt +++ b/sdk/requirements.txt @@ -7,3 +7,4 @@ requests requests-mock twine pydantic +pyarrow diff --git a/sdk/setup.py b/sdk/setup.py index 2d3f388..43bb1b4 100644 --- a/sdk/setup.py +++ b/sdk/setup.py @@ -9,6 +9,6 @@ author_email="lcard@no10.gov.uk", license="MIT", packages=find_packages(include=["rapid", "rapid.*"], exclude=["tests"]), - install_requires=["pandas", "requests", "deepdiff"], + install_requires=["pandas", "requests", "deepdiff", "pyarrow", "pydantic"], include_package_data=True, ) diff --git a/sdk/tests/test_rapid.py b/sdk/tests/test_rapid.py index dd4c716..bd3a267 100644 --- a/sdk/tests/test_rapid.py +++ b/sdk/tests/test_rapid.py @@ -1,6 +1,7 @@ from mock import Mock, call -from pandas import DataFrame import pytest +import io +import pandas as pd from requests_mock import Mocker from rapid import Rapid @@ -154,7 +155,7 @@ def test_upload_dataframe_success_after_waiting( domain = "test_domain" dataset = "test_dataset" job_id = 1234 - df = DataFrame() + df = pd.DataFrame() requests_mock.post( f"{RAPID_URL}/datasets/{layer}/{domain}/{dataset}", json={"details": {"job_id": job_id}}, @@ -176,7 +177,7 @@ def test_upload_dataframe_success_no_waiting( domain = "test_domain" dataset = "test_dataset" job_id = 1234 - df = DataFrame() + df = pd.DataFrame() requests_mock.post( f"{RAPID_URL}/datasets/{layer}/{domain}/{dataset}", json={"details": {"job_id": job_id}}, @@ -194,7 +195,7 @@ def test_upload_dataframe_failure(self, requests_mock: Mocker, rapid: Rapid): domain = "test_domain" dataset = "test_dataset" job_id = 1234 - df = DataFrame() + df = pd.DataFrame() requests_mock.post( f"{RAPID_URL}/datasets/{layer}/{domain}/{dataset}", json={"details": {"job_id": job_id}}, @@ -238,12 +239,14 @@ def test_fetch_dataset_info_failure(self, requests_mock: Mocker, rapid: Rapid): @pytest.mark.usefixtures("rapid") def test_convert_dataframe_for_file_upload(self, rapid: Rapid): - df = DataFrame() + df = pd.DataFrame() res = rapid.convert_dataframe_for_file_upload(df) filename = res["file"][0] - data = res["file"][1] - assert filename.startswith("rapid-sdk") and filename.endswith(".csv") - assert data == "\n" + data = io.BytesIO(res["file"][1]) + df = pd.read_parquet(data) + + assert filename.startswith("rapid-sdk") and filename.endswith(".parquet") + assert len(df) == 0 @pytest.mark.usefixtures("requests_mock", "rapid") def test_generate_schema_success(self, requests_mock: Mocker, rapid: Rapid): @@ -251,7 +254,7 @@ def test_generate_schema_success(self, requests_mock: Mocker, rapid: Rapid): domain = "test_domain" dataset = "test_dataset" sensitivity = "PUBLIC" - df = DataFrame() + df = pd.DataFrame() mocked_response = { "metadata": { "layer": "raw", @@ -299,7 +302,7 @@ def test_generate_schema_failure(self, requests_mock: Mocker, rapid: Rapid): domain = "test_domain" dataset = "test_dataset" sensitivity = "PUBLIC" - df = DataFrame() + df = pd.DataFrame() mocked_response = {"data": "dummy"} requests_mock.post( f"{RAPID_URL}/schema/{layer}/{sensitivity}/{domain}/{dataset}/generate",