diff --git a/python/docs/source/reference/sources.md b/python/docs/source/reference/sources.md index 9da8e9aff..59a66a1be 100644 --- a/python/docs/source/reference/sources.md +++ b/python/docs/source/reference/sources.md @@ -9,6 +9,7 @@ Source CsvString + JsonlFile JsonlString Pandas Parquet diff --git a/python/pysrc/kaskada/sources/__init__.py b/python/pysrc/kaskada/sources/__init__.py index 6dac88ecf..5fc38fb41 100644 --- a/python/pysrc/kaskada/sources/__init__.py +++ b/python/pysrc/kaskada/sources/__init__.py @@ -1,6 +1,6 @@ """Sources of data for Kaskada queries.""" -from .arrow import CsvString, JsonlString, Pandas, Parquet, PyDict +from .arrow import CsvString, JsonlFile, JsonlString, Pandas, Parquet, PyDict from .source import Source -__all__ = ["Source", "CsvString", "Pandas", "JsonlString", "PyDict", "Parquet"] +__all__ = ["Source", "CsvString", "Pandas", "JsonlFile", "JsonlString", "PyDict", "Parquet"] diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py index 18d2111c7..ae72f74c6 100644 --- a/python/pysrc/kaskada/sources/arrow.py +++ b/python/pysrc/kaskada/sources/arrow.py @@ -1,7 +1,6 @@ """Provide sources based on PyArrow, including Pandas and CSV.""" from __future__ import annotations -import os from io import BytesIO from typing import Optional @@ -310,6 +309,100 @@ async def add_string(self, csv_string: str | BytesIO) -> None: await self._ffi_table.add_pyarrow(batch) +class JsonlFile(Source): + """Source reading data from line-delimited JSON files using PyArrow.""" + + def __init__( + self, + *, + time_column: str, + key_column: str, + schema: pa.Schema, + subsort_column: Optional[str] = None, + grouping_name: Optional[str] = None, + time_unit: Optional[TimeUnit] = None, + ) -> None: + """Create a line-delimited JSON File Source. + + Args: + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + schema: The schema to use. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. + """ + super().__init__( + schema=schema, + time_column=time_column, + key_column=key_column, + subsort_column=subsort_column, + grouping_name=grouping_name, + time_unit=time_unit, + ) + self._parse_options = pyarrow.json.ParseOptions(explicit_schema=schema) + + @staticmethod + async def create( + path: Optional[str] = None, + *, + time_column: str, + key_column: str, + subsort_column: Optional[str] = None, + schema: Optional[pa.Schema] = None, + grouping_name: Optional[str] = None, + time_unit: Optional[TimeUnit] = None, + ) -> JsonlFile: + """Create a source reading a line-delimited JSON file. + + Args: + path: The path to the line-delimited JSON file to add. This can be relative to + the current working directory or an absolute path (prefixed by '/'). + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + schema: The schema to use. If not provided, it will be inferred from the input. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. + """ + path = Source._get_absolute_path(path) + + if schema is None: + if path is None: + raise ValueError("Must provide schema or path to jsonl file") + schema = pa.json.read_json(path).schema + + source = JsonlFile( + time_column=time_column, + key_column=key_column, + subsort_column=subsort_column, + schema=schema, + grouping_name=grouping_name, + time_unit=time_unit, + ) + + if path: + await source.add_file(path) + return source + + async def add_file(self, path: str) -> None: + """Add data to the source.""" + batches = pa.json.read_json( + Source._get_absolute_path(path), + parse_options=self._parse_options + ) + for batch in batches.to_batches(): + await self._ffi_table.add_pyarrow(batch) + + class JsonlString(Source): """Source reading data from line-delimited JSON strings using PyArrow.""" @@ -467,16 +560,18 @@ async def create( time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. If not specified (and not specified in the data), nanosecond will be assumed. """ + path = Source._get_absolute_path(path) + if schema is None: if path is None: raise ValueError("Must provide schema or path to parquet file") schema = pa.parquet.read_schema(path) source = Parquet( - schema=schema, time_column=time_column, key_column=key_column, subsort_column=subsort_column, + schema=schema, grouping_name=grouping_name, time_unit=time_unit, ) @@ -487,7 +582,4 @@ async def create( async def add_file(self, path: str) -> None: """Add data to the source.""" - if not path.startswith("/"): - path = os.getcwd() + "/" + path - - await self._ffi_table.add_parquet(path) + await self._ffi_table.add_parquet(str(Source._get_absolute_path(path))) diff --git a/python/pysrc/kaskada/sources/source.py b/python/pysrc/kaskada/sources/source.py index 539413d9c..175d774e6 100644 --- a/python/pysrc/kaskada/sources/source.py +++ b/python/pysrc/kaskada/sources/source.py @@ -1,5 +1,6 @@ """Provide the base-class for Kaskada sources.""" from typing import Literal, Optional +import os import kaskada._ffi as _ffi import pyarrow as pa @@ -89,3 +90,9 @@ def _validate_column(field_name: Optional[str], schema: pa.Schema) -> None: raise KeyError(f"Column {field_name!r} does not exist") if field.nullable: raise ValueError(f"Column: {field_name!r} must be non-nullable") + + @staticmethod + def _get_absolute_path(path: Optional[str]) -> Optional[str]: + if path is None or path.startswith("/"): + return path + return os.getcwd() + "/" + path diff --git a/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file.jsonl b/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file.jsonl new file mode 100644 index 000000000..deb214f21 --- /dev/null +++ b/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file.jsonl @@ -0,0 +1,10 @@ +{"_time":"1970-01-01T00:26:17.836800000","_key":"karen","id":"cb_001","purchase_time":1577836800000,"customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0} +{"_time":"1970-01-01T00:26:17.836800000","_key":"patrick","id":"kk_001","purchase_time":1577836800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1} +{"_time":"1970-01-01T00:26:17.923200000","_key":"karen","id":"cb_002","purchase_time":1577923200000,"customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2} +{"_time":"1970-01-01T00:26:17.923200000","_key":"patrick","id":"kk_002","purchase_time":1577923200000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3} +{"_time":"1970-01-01T00:26:18.009600000","_key":"karen","id":"cb_003","purchase_time":1578009600000,"customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4} +{"_time":"1970-01-01T00:26:18.009600000","_key":"patrick","id":"kk_003","purchase_time":1578009600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5} +{"_time":"1970-01-01T00:26:18.096000000","_key":"patrick","id":"cb_004","purchase_time":1578096000000,"customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6} +{"_time":"1970-01-01T00:26:18.096000000","_key":"karen","id":"cb_005","purchase_time":1578096000000,"customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7} +{"_time":"1970-01-01T00:26:18.182400000","_key":"karen","id":"cb_006","purchase_time":1578182400000,"customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8} +{"_time":"1970-01-01T00:26:18.182400000","_key":"patrick","id":"kk_004","purchase_time":1578182400000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9} diff --git a/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file_1.jsonl b/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file_1.jsonl new file mode 100644 index 000000000..ff94d5a94 --- /dev/null +++ b/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file_1.jsonl @@ -0,0 +1,15 @@ +{"_time":"1970-01-01T00:26:17.836800000","_key":"karen","id":"cb_001","purchase_time":1577836800000,"customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0} +{"_time":"1970-01-01T00:26:17.836800000","_key":"patrick","id":"kk_001","purchase_time":1577836800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1} +{"_time":"1970-01-01T00:26:17.923200000","_key":"karen","id":"cb_002","purchase_time":1577923200000,"customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2} +{"_time":"1970-01-01T00:26:17.923200000","_key":"patrick","id":"kk_002","purchase_time":1577923200000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3} +{"_time":"1970-01-01T00:26:18.009600000","_key":"karen","id":"cb_003","purchase_time":1578009600000,"customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4} +{"_time":"1970-01-01T00:26:18.009600000","_key":"patrick","id":"kk_003","purchase_time":1578009600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5} +{"_time":"1970-01-01T00:26:18.096000000","_key":"patrick","id":"cb_004","purchase_time":1578096000000,"customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6} +{"_time":"1970-01-01T00:26:18.096000000","_key":"karen","id":"cb_005","purchase_time":1578096000000,"customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7} +{"_time":"1970-01-01T00:26:18.182400000","_key":"karen","id":"cb_006","purchase_time":1578182400000,"customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8} +{"_time":"1970-01-01T00:26:18.182400000","_key":"patrick","id":"kk_004","purchase_time":1578182400000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9} +{"_time":"1970-01-01T00:26:18.268800000","_key":"patrick","id":"kk_005","purchase_time":1578268800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":2,"subsort_id":0} +{"_time":"1970-01-01T00:26:18.268800000","_key":"spongebob","id":"wh_001","purchase_time":1578268800000,"customer_id":"spongebob","vendor_id":"weenie_hut","amount":7,"subsort_id":1} +{"_time":"1970-01-01T00:26:18.355200000","_key":"spongebob","id":"cb_007","purchase_time":1578355200000,"customer_id":"spongebob","vendor_id":"chum_bucket","amount":34,"subsort_id":2} +{"_time":"1970-01-01T00:26:18.441600000","_key":"karen","id":"wh_002","purchase_time":1578441600000,"customer_id":"karen","vendor_id":"weenie_hut","amount":8,"subsort_id":3} +{"_time":"1970-01-01T00:26:18.441600000","_key":"patrick","id":"kk_006","purchase_time":1578441600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":4} diff --git a/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file_with_subsort.jsonl b/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file_with_subsort.jsonl new file mode 100644 index 000000000..deb214f21 --- /dev/null +++ b/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file_with_subsort.jsonl @@ -0,0 +1,10 @@ +{"_time":"1970-01-01T00:26:17.836800000","_key":"karen","id":"cb_001","purchase_time":1577836800000,"customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0} +{"_time":"1970-01-01T00:26:17.836800000","_key":"patrick","id":"kk_001","purchase_time":1577836800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1} +{"_time":"1970-01-01T00:26:17.923200000","_key":"karen","id":"cb_002","purchase_time":1577923200000,"customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2} +{"_time":"1970-01-01T00:26:17.923200000","_key":"patrick","id":"kk_002","purchase_time":1577923200000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3} +{"_time":"1970-01-01T00:26:18.009600000","_key":"karen","id":"cb_003","purchase_time":1578009600000,"customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4} +{"_time":"1970-01-01T00:26:18.009600000","_key":"patrick","id":"kk_003","purchase_time":1578009600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5} +{"_time":"1970-01-01T00:26:18.096000000","_key":"patrick","id":"cb_004","purchase_time":1578096000000,"customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6} +{"_time":"1970-01-01T00:26:18.096000000","_key":"karen","id":"cb_005","purchase_time":1578096000000,"customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7} +{"_time":"1970-01-01T00:26:18.182400000","_key":"karen","id":"cb_006","purchase_time":1578182400000,"customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8} +{"_time":"1970-01-01T00:26:18.182400000","_key":"patrick","id":"kk_004","purchase_time":1578182400000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9} diff --git a/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file_with_subsort_1.jsonl b/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file_with_subsort_1.jsonl new file mode 100644 index 000000000..ff94d5a94 --- /dev/null +++ b/python/pytests/golden/jsonl_file_source_test/test_read_jsonl_file_with_subsort_1.jsonl @@ -0,0 +1,15 @@ +{"_time":"1970-01-01T00:26:17.836800000","_key":"karen","id":"cb_001","purchase_time":1577836800000,"customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0} +{"_time":"1970-01-01T00:26:17.836800000","_key":"patrick","id":"kk_001","purchase_time":1577836800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1} +{"_time":"1970-01-01T00:26:17.923200000","_key":"karen","id":"cb_002","purchase_time":1577923200000,"customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2} +{"_time":"1970-01-01T00:26:17.923200000","_key":"patrick","id":"kk_002","purchase_time":1577923200000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3} +{"_time":"1970-01-01T00:26:18.009600000","_key":"karen","id":"cb_003","purchase_time":1578009600000,"customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4} +{"_time":"1970-01-01T00:26:18.009600000","_key":"patrick","id":"kk_003","purchase_time":1578009600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5} +{"_time":"1970-01-01T00:26:18.096000000","_key":"patrick","id":"cb_004","purchase_time":1578096000000,"customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6} +{"_time":"1970-01-01T00:26:18.096000000","_key":"karen","id":"cb_005","purchase_time":1578096000000,"customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7} +{"_time":"1970-01-01T00:26:18.182400000","_key":"karen","id":"cb_006","purchase_time":1578182400000,"customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8} +{"_time":"1970-01-01T00:26:18.182400000","_key":"patrick","id":"kk_004","purchase_time":1578182400000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9} +{"_time":"1970-01-01T00:26:18.268800000","_key":"patrick","id":"kk_005","purchase_time":1578268800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":2,"subsort_id":0} +{"_time":"1970-01-01T00:26:18.268800000","_key":"spongebob","id":"wh_001","purchase_time":1578268800000,"customer_id":"spongebob","vendor_id":"weenie_hut","amount":7,"subsort_id":1} +{"_time":"1970-01-01T00:26:18.355200000","_key":"spongebob","id":"cb_007","purchase_time":1578355200000,"customer_id":"spongebob","vendor_id":"chum_bucket","amount":34,"subsort_id":2} +{"_time":"1970-01-01T00:26:18.441600000","_key":"karen","id":"wh_002","purchase_time":1578441600000,"customer_id":"karen","vendor_id":"weenie_hut","amount":8,"subsort_id":3} +{"_time":"1970-01-01T00:26:18.441600000","_key":"patrick","id":"kk_006","purchase_time":1578441600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":4} diff --git a/python/pytests/jsonl_file_source_test.py b/python/pytests/jsonl_file_source_test.py new file mode 100644 index 000000000..44ea5e1b3 --- /dev/null +++ b/python/pytests/jsonl_file_source_test.py @@ -0,0 +1,26 @@ +import kaskada as kd + + +async def test_read_jsonl_file(golden) -> None: + source = await kd.sources.JsonlFile.create( + "../testdata/purchases/purchases_part1.jsonl", + time_column="purchase_time", + key_column="customer_id", + ) + golden.jsonl(source) + + await source.add_file("../testdata/purchases/purchases_part2.jsonl") + golden.jsonl(source) + + +async def test_read_jsonl_file_with_subsort(golden) -> None: + source = await kd.sources.JsonlFile.create( + "../testdata/purchases/purchases_part1.jsonl", + time_column="purchase_time", + key_column="customer_id", + subsort_column="subsort_id", + ) + golden.jsonl(source) + + await source.add_file("../testdata/purchases/purchases_part2.jsonl") + golden.jsonl(source) diff --git a/testdata/purchases/purchases_part1.jsonl b/testdata/purchases/purchases_part1.jsonl new file mode 100644 index 000000000..dff460715 --- /dev/null +++ b/testdata/purchases/purchases_part1.jsonl @@ -0,0 +1,10 @@ +{"id":"cb_001","purchase_time":1577836800000,"customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0} +{"id":"kk_001","purchase_time":1577836800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1} +{"id":"cb_002","purchase_time":1577923200000,"customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2} +{"id":"kk_002","purchase_time":1577923200000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3} +{"id":"cb_003","purchase_time":1578009600000,"customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4} +{"id":"kk_003","purchase_time":1578009600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5} +{"id":"cb_004","purchase_time":1578096000000,"customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6} +{"id":"cb_005","purchase_time":1578096000000,"customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7} +{"id":"cb_006","purchase_time":1578182400000,"customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8} +{"id":"kk_004","purchase_time":1578182400000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9} diff --git a/testdata/purchases/purchases_part2.jsonl b/testdata/purchases/purchases_part2.jsonl new file mode 100644 index 000000000..9f318efc0 --- /dev/null +++ b/testdata/purchases/purchases_part2.jsonl @@ -0,0 +1,5 @@ +{"id":"kk_005","purchase_time":1578268800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":2,"subsort_id":0} +{"id":"wh_001","purchase_time":1578268800000,"customer_id":"spongebob","vendor_id":"weenie_hut","amount":7,"subsort_id":1} +{"id":"cb_007","purchase_time":1578355200000,"customer_id":"spongebob","vendor_id":"chum_bucket","amount":34,"subsort_id":2} +{"id":"wh_002","purchase_time":1578441600000,"customer_id":"karen","vendor_id":"weenie_hut","amount":8,"subsort_id":3} +{"id":"kk_006","purchase_time":1578441600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":4}