From ba36bfa4d79175ea7ccb6fcf24edc0c84505b756 Mon Sep 17 00:00:00 2001 From: Eric Pinzur Date: Thu, 21 Sep 2023 09:23:07 +0200 Subject: [PATCH 1/4] added substring, json, len, is_valid methods --- .../docs/source/reference/timestream/misc.md | 1 + .../source/reference/timestream/string.md | 3 ++ python/pysrc/kaskada/_timestream.py | 47 +++++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/python/docs/source/reference/timestream/misc.md b/python/docs/source/reference/timestream/misc.md index 609c29f4a..0df58f6cc 100644 --- a/python/docs/source/reference/timestream/misc.md +++ b/python/docs/source/reference/timestream/misc.md @@ -12,6 +12,7 @@ Timestream.filter Timestream.hash Timestream.if_ + Timestream.is_valid Timestream.lag Timestream.null_if Timestream.pipe diff --git a/python/docs/source/reference/timestream/string.md b/python/docs/source/reference/timestream/string.md index 4af609ed3..8014668ad 100644 --- a/python/docs/source/reference/timestream/string.md +++ b/python/docs/source/reference/timestream/string.md @@ -6,6 +6,9 @@ .. autosummary:: :toctree: ../apidocs/ + Timestream.json + Timestream.len Timestream.lower + Timestream.substring Timestream.upper ``` \ No newline at end of file diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py index c3ac0d0fb..8946b572e 100644 --- a/python/pysrc/kaskada/_timestream.py +++ b/python/pysrc/kaskada/_timestream.py @@ -321,6 +321,33 @@ def hash(self) -> Timestream: """ return Timestream._call("hash", self) + def is_valid(self) -> Timestream: + """Returns a Timestream containing `true` if input is `non-null`, otherwise `false`. + + Notes: + Unlike many functions which return `null` if any of their + arguments are `null`, *is_valid()* will never return `null`. + """ + return Timestream._call("is_valid", self) + + def json(self) -> Timestream: + """Returns a Timestream containing an object deserialized from a json string. + + Notes: + Warning `json()` is experimental functionality. You should expect the + behavior to potentially change in the future. Certain functionality, + such as nested types, are not yet supported. + """ + return Timestream._call("json", self) + + def len(self) -> Timestream: + """Return a Timestream with the length of input string. + + Notes: + Returns `0` for an empty string and `null` for `null` + """ + return Timestream._call("len", self) + def lower(self) -> Timestream: """Return a Timestream with all values converted to lower case.""" return Timestream._call("lower", self) @@ -557,6 +584,26 @@ def select(self, *args: str) -> Timestream: """ return Timestream._call("select_fields", self, *args) + def substring(self, start: Optional[int], end: Optional[int] = None) -> Timestream: + """Return a Timestream with a substring between the start and end indices. + + Args: + start: The inclusive index to start at. `None` indicates the beginning + of the string. Negative indices count backwards from the end of + the string. + end: (optional) The exclusive index to end at. `None` indicates the + length of the string. Negative indices count backwards from the + end of the string. + + Notes: + Returns the substring starting at `start` (inclusive) up to but not + including the `end`. + + If the input is `null`, returns `null`. If `end` > `start` an empty + string is returned. + """ + return Timestream._call("substring", self) + def remove(self, *args: str) -> Timestream: """Return a Timestream removing the given fields from `self`. From cd54e3388ff65bdbbfb89bbabb0cf0c88703615c Mon Sep 17 00:00:00 2001 From: Eric Pinzur Date: Thu, 21 Sep 2023 16:47:40 +0200 Subject: [PATCH 2/4] added tests --- python/pysrc/kaskada/_timestream.py | 10 +- python/pysrc/kaskada/sources/arrow.py | 8 +- .../is_valid_test/test_is_valid_boolean.jsonl | 7 + .../is_valid_test/test_is_valid_f64.jsonl | 6 + .../is_valid_test/test_is_valid_i64.jsonl | 6 + .../is_valid_test/test_is_valid_record.jsonl | 6 + .../is_valid_test/test_is_valid_string.jsonl | 6 + .../test_is_valid_timestamp_ns.jsonl | 6 + python/pytests/golden/len_test/test_len.jsonl | 6 + .../substring_test/test_substring.jsonl | 6 + python/pytests/is_valid_test.py | 152 ++++++++++++++++++ python/pytests/json_test.py | 148 +++++++++++++++++ python/pytests/len_test.py | 31 ++++ python/pytests/substring_test.py | 35 ++++ 14 files changed, 426 insertions(+), 7 deletions(-) create mode 100644 python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl create mode 100644 python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl create mode 100644 python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl create mode 100644 python/pytests/golden/is_valid_test/test_is_valid_record.jsonl create mode 100644 python/pytests/golden/is_valid_test/test_is_valid_string.jsonl create mode 100644 python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl create mode 100644 python/pytests/golden/len_test/test_len.jsonl create mode 100644 python/pytests/golden/substring_test/test_substring.jsonl create mode 100644 python/pytests/is_valid_test.py create mode 100644 python/pytests/json_test.py create mode 100644 python/pytests/len_test.py create mode 100644 python/pytests/substring_test.py diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py index 8946b572e..401bcf0ea 100644 --- a/python/pysrc/kaskada/_timestream.py +++ b/python/pysrc/kaskada/_timestream.py @@ -584,16 +584,16 @@ def select(self, *args: str) -> Timestream: """ return Timestream._call("select_fields", self, *args) - def substring(self, start: Optional[int], end: Optional[int] = None) -> Timestream: + def substring(self, start: Optional[int] = None, end: Optional[int] = None) -> Timestream: """Return a Timestream with a substring between the start and end indices. Args: start: The inclusive index to start at. `None` indicates the beginning of the string. Negative indices count backwards from the end of the string. - end: (optional) The exclusive index to end at. `None` indicates the - length of the string. Negative indices count backwards from the - end of the string. + end: The exclusive index to end at. `None` indicates the length of + the string. Negative indices count backwards from the end of + the string. Notes: Returns the substring starting at `start` (inclusive) up to but not @@ -602,7 +602,7 @@ def substring(self, start: Optional[int], end: Optional[int] = None) -> Timestre If the input is `null`, returns `null`. If `end` > `start` an empty string is returned. """ - return Timestream._call("substring", self) + return Timestream._call("substring", self, start, end) def remove(self, *args: str) -> Timestream: """Return a Timestream removing the given fields from `self`. diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py index 2480e63f6..b2e61b9ee 100644 --- a/python/pysrc/kaskada/sources/arrow.py +++ b/python/pysrc/kaskada/sources/arrow.py @@ -246,6 +246,10 @@ def __init__( strings_can_be_null=True, ) + _parse_options = pyarrow.csv.ParseOptions( + escape_char="\\", + ) + @staticmethod async def create( csv_string: Optional[str | BytesIO] = None, @@ -277,7 +281,7 @@ async def create( if schema is None: if csv_string is None: raise ValueError("Must provide schema or csv_string") - schema = pa.csv.read_csv(csv_string).schema + schema = pa.csv.read_csv(csv_string, parse_options=CsvString._parse_options).schema csv_string.seek(0) source = CsvString( @@ -297,7 +301,7 @@ async def add_string(self, csv_string: str | BytesIO) -> None: """Add data to the source.""" if isinstance(csv_string, str): csv_string = BytesIO(csv_string.encode("utf-8")) - content = pa.csv.read_csv(csv_string, convert_options=self._convert_options) + content = pa.csv.read_csv(csv_string, convert_options=self._convert_options, parse_options=CsvString._parse_options) for batch in content.to_batches(): await self._ffi_table.add_pyarrow(batch) diff --git a/python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl new file mode 100644 index 000000000..0cc1ecab4 --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl @@ -0,0 +1,7 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","a":true,"is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","a":false,"is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","a":null,"is_valid":false} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","a":true,"is_valid":true} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","a":false,"is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","a":false,"is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","a":null,"is_valid":false} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl new file mode 100644 index 000000000..2de3c7561 --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","m":5.2,"is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","m":24.3,"is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","m":17.6,"is_valid":true} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","m":null,"is_valid":false} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","m":12.4,"is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","m":null,"is_valid":false} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl new file mode 100644 index 000000000..e3795c3bf --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","m":5.0,"is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","m":24.0,"is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","m":17.0,"is_valid":true} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","m":null,"is_valid":false} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","m":12.0,"is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","m":null,"is_valid":false} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_record.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_record.jsonl new file mode 100644 index 000000000..56409f828 --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_record.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","is_valid":true} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","is_valid":true} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","is_valid":true} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_string.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_string.jsonl new file mode 100644 index 000000000..2fb59057c --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_string.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","s":"hEllo","is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","s":"World","is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","s":"hello world","is_valid":true} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","s":null,"is_valid":false} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","s":null,"is_valid":false} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","s":"goodbye","is_valid":true} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl new file mode 100644 index 000000000..177dd4664 --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","n":2.0,"is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","n":4.0,"is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","n":5.0,"is_valid":true} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","n":null,"is_valid":false} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","n":8.0,"is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","n":23.0,"is_valid":true} diff --git a/python/pytests/golden/len_test/test_len.jsonl b/python/pytests/golden/len_test/test_len.jsonl new file mode 100644 index 000000000..e6c369fc8 --- /dev/null +++ b/python/pytests/golden/len_test/test_len.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","len":5.0} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","len":5.0} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","len":11.0} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","len":null} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","len":null} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","len":7.0} diff --git a/python/pytests/golden/substring_test/test_substring.jsonl b/python/pytests/golden/substring_test/test_substring.jsonl new file mode 100644 index 000000000..e9d5ee346 --- /dev/null +++ b/python/pytests/golden/substring_test/test_substring.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","substring_0_2":"hE","substring_1":"Ello","substring_0_i":"","substring_i":"hEllo"} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","substring_0_2":"Wo","substring_1":"orld","substring_0_i":"World","substring_i":""} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","substring_0_2":"he","substring_1":"ello world","substring_0_i":"hello wor","substring_i":"ld"} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","substring_0_2":null,"substring_1":null,"substring_0_i":null,"substring_i":null} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","substring_0_2":null,"substring_1":null,"substring_0_i":null,"substring_i":null} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","substring_0_2":"go","substring_1":"oodbye","substring_0_i":"goodbye","substring_i":"goodbye"} diff --git a/python/pytests/is_valid_test.py b/python/pytests/is_valid_test.py new file mode 100644 index 000000000..9cb087dae --- /dev/null +++ b/python/pytests/is_valid_test.py @@ -0,0 +1,152 @@ +import kaskada as kd +import pytest + +@pytest.fixture(scope="module") +async def boolean_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,a,b", + '1996-12-19T16:39:57,A,true,true', + '1996-12-19T16:39:58,B,false,false', + '1996-12-19T16:39:59,B,,true', + '1996-12-19T16:40:00,B,true,false', + '1996-12-19T16:40:01,B,false,true', + '1996-12-19T16:40:02,B,false,', + '1996-12-19T16:40:02,B,,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +@pytest.fixture(scope="module") +async def f64_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,m,n", + '1996-12-19T16:39:57,A,5.2,10', + '1996-12-19T16:39:58,B,24.3,3.9', + '1996-12-19T16:39:59,A,17.6,6.2', + '1996-12-19T16:40:00,A,,9.25', + '1996-12-19T16:40:01,A,12.4,', + '1996-12-19T16:40:02,A,,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +@pytest.fixture(scope="module") +async def i64_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,m,n", + '1996-12-19T16:39:57,A,5,10', + '1996-12-19T16:39:58,B,24,3', + '1996-12-19T16:39:59,A,17,6', + '1996-12-19T16:40:00,A,,9', + '1996-12-19T16:40:01,A,12,', + '1996-12-19T16:40:02,A,,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +@pytest.fixture(scope="module") +async def string_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,s,n,t", + '1996-12-19T16:39:57,A,"hEllo",0,"hEllo"', + '1996-12-19T16:39:58,B,"World",5,"world"', + '1996-12-19T16:39:59,B,"hello world",-2,"hello world"', + '1996-12-19T16:40:00,B,,-2,"greetings"', + '1996-12-19T16:40:01,B,,2,"salutations"', + '1996-12-19T16:40:02,B,"goodbye",,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +@pytest.fixture(scope="module") +async def timestamp_ns_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,n,m,other_time,fruit", + '1996-12-19T16:39:57,A,2,4,2003-12-19T16:39:57,pear', + '1996-12-19T16:39:58,B,4,3,1994-11-19T16:39:57,watermelon', + '1996-12-19T16:39:59,B,5,,1998-12-19T16:39:57,mango', + '1996-12-19T16:40:00,B,,,1992-12-19T16:39:57,', + '1996-12-19T16:40:01,B,8,8,,', + '1996-12-19T16:40:02,B,23,11,1994-12-19T16:39:57,mango', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +async def test_is_valid_boolean(boolean_source, golden) -> None: + a = boolean_source.col("a") + golden.jsonl( + kd.record( + { + "a": a, + "is_valid": a.is_valid(), + } + ) + ) + +async def test_is_valid_f64(f64_source, golden) -> None: + m = f64_source.col("m") + golden.jsonl( + kd.record( + { + "m": m, + "is_valid": m.is_valid(), + } + ) + ) + +async def test_is_valid_i64(i64_source, golden) -> None: + m = i64_source.col("m") + golden.jsonl( + kd.record( + { + "m": m, + "is_valid": m.is_valid(), + } + ) + ) + +async def test_is_valid_string(string_source, golden) -> None: + s = string_source.col("s") + golden.jsonl( + kd.record( + { + "s": s, + "is_valid": s.is_valid(), + } + ) + ) + +async def test_is_valid_timestamp_ns(timestamp_ns_source, golden) -> None: + n = timestamp_ns_source.col("n") + golden.jsonl( + kd.record( + { + "n": n, + "is_valid": n.is_valid(), + } + ) + ) + +async def test_is_valid_record(timestamp_ns_source, golden) -> None: + golden.jsonl( + kd.record( + { + "is_valid": timestamp_ns_source.is_valid() + } + ) + ) diff --git a/python/pytests/json_test.py b/python/pytests/json_test.py new file mode 100644 index 000000000..7c75f2f2b --- /dev/null +++ b/python/pytests/json_test.py @@ -0,0 +1,148 @@ +import kaskada as kd +import pytest + + +@pytest.fixture(scope="module") +async def source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,json_str", + '1996-12-19T16:39:57,A,"{\"a\": 10\\, \"b\": \"dog\"}"', + '1996-12-19T16:39:58,B,"{\"a\": 4\\, \"b\": \"lizard\"}"', + '1996-12-19T16:39:59,B,"{\"a\": 1\\, \"c\": 3.3}"', + '1996-12-19T16:40:00,B,"{\"a\": 34}"', + '1996-12-19T16:40:01,B,"{\"a\": 34}"', + '1996-12-19T16:40:02,B,"{\"a\": 6\\, \"b\": \"dog\"}"', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +@pytest.fixture(scope="module") +async def invalid_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,json_str", + '1996-12-19T16:39:57,A,"{a: 10\\, \"b\": \"dog\"}"', + '1996-12-19T16:39:58,B,"{\"a\": 4\\, \"b: lizard\"}"', + '1996-12-19T16:39:59,B,"{\"a\": 1\\, \"c\": 3.3}"', + '1996-12-19T16:40:00,B,"{\"a\": 12\\, \"b\": \"cat\"}"', + '1996-12-19T16:40:01,B,"{\"a\"\\, 34}"', + '1996-12-19T16:40:02,B,"{\"a\": 6\\, \"b\": \"dog\"}"', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +# "let json = json(Json.json) in { a_test: json.a as i64, b_test: json(Json.json).b }" +async def test_json_parse_field(source, golden) -> None: + json_str = source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "a_test": j.col("a"), + "b_test": json_str.json().b, + } + ) + ) +""" +# "let json = json(Json.json) in { string: json.b, len: len(json.b) }" +async def test_json_string_field_usable_in_string_functions(source, golden) -> None: + json_str = source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "string": j.col("b"), + "len": j.col("b").len(), + } + ) + ) + +# "let json = json(Json.json) in { num_as_str: json.a as string, len: len(json.a as string) }" +async def test_json_field_number_as_string(source, golden) -> None: + json_str = source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "num_as_str": j.col("a").cast("str"), + "len": j.col("a").cast("str").len(), + } + ) + ) + +# "let json = json(Json.json) in { a: json.a, plus_one: (json.a as i64) + 1 }" +async def test_json_field_as_number_with_addition(source, golden) -> None: + json_str = source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "a": j.col("a"), + "plus_one": j.col("a").cast("i64") + 1, + } + ) + ) + +# "let json = json(Json.json) in { a_test: json.a as i64, b_test: json(Json.json).b }" +# +# I guess this behavior is somewhat strange, in that creating a record with all +# nulls produces nothing, while one non-null field in a record causes us to +# print "null" in other fields. +async def test_incorrect_json_format_produces_null(invalid_source, golden) -> None: + json_str = invalid_source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "a_test": j.col("a").cast("i64"), + "b_test": json_str.json().b, + } + ) + ) + +# "let json = json(Json.json) in { a: json(json) }" +async def test_json_of_json_object_errors(invalid_source, golden) -> None: + json_str = invalid_source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "a": j.json(), + } + ) + ) + +# "{ out: json(Json.json).a.b }" +# +# There's a way we can probably produce a better error message, +# but, on the other hand, it's marked as an experimental feature, plus +# this returns an error rather than incorrect results :shrug: +# +# The `dfg` would need to check if it recursively encounters the pattern +# `(field_ref (json ?value ?op) ?field ?op)` +async def test_nested_json_produces_error(invalid_source, golden) -> None: + json_str = invalid_source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "out": j.json().col("a").col("b"), + } + ) + ) + +# \"{ out: json(Json.json) }" +async def test_json_as_output_field_produces_error(invalid_source, golden) -> None: + golden.jsonl( + kd.record( + { + "out": invalid_source.col("json_str").json(), + } + ) + ) + """ \ No newline at end of file diff --git a/python/pytests/len_test.py b/python/pytests/len_test.py new file mode 100644 index 000000000..4bdcf5257 --- /dev/null +++ b/python/pytests/len_test.py @@ -0,0 +1,31 @@ +import kaskada as kd +import pytest + + +@pytest.fixture(scope="module") +async def source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,s,n,t", + '1996-12-19T16:39:57,A,"hEllo",0,"hEllo"', + '1996-12-19T16:39:58,B,"World",5,"world"', + '1996-12-19T16:39:59,B,"hello world",-2,"hello world"', + '1996-12-19T16:40:00,B,,-2,"greetings"', + '1996-12-19T16:40:01,B,,2,"salutations"', + '1996-12-19T16:40:02,B,"goodbye",,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + + +async def test_len(source, golden) -> None: + s = source.col("s") + golden.jsonl( + kd.record( + { + "len": s.len() + } + ) + ) diff --git a/python/pytests/substring_test.py b/python/pytests/substring_test.py new file mode 100644 index 000000000..9a171c46c --- /dev/null +++ b/python/pytests/substring_test.py @@ -0,0 +1,35 @@ +import kaskada as kd +import pytest + + +@pytest.fixture(scope="module") +async def source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,s,n,t", + '1996-12-19T16:39:57,A,"hEllo",0,"hEllo"', + '1996-12-19T16:39:58,B,"World",5,"world"', + '1996-12-19T16:39:59,B,"hello world",-2,"hello world"', + '1996-12-19T16:40:00,B,,-2,"greetings"', + '1996-12-19T16:40:01,B,,2,"salutations"', + '1996-12-19T16:40:02,B,"goodbye",,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + + +async def test_substring(source, golden) -> None: + s = source.col("s") + n = source.col("n") + golden.jsonl( + kd.record( + { + "substring_0_2": s.substring(start=0, end=2), + "substring_1": s.substring(start=1), + "substring_0_i": s.substring(end=n), + "substring_i": s.substring(start=n), + } + ) + ) From 7f89eafd19f0458c8dc646816934d777e73711ea Mon Sep 17 00:00:00 2001 From: Eric Pinzur Date: Fri, 22 Sep 2023 12:35:56 +0200 Subject: [PATCH 3/4] removed json() and is_valid() --- .../docs/source/reference/timestream/misc.md | 1 - .../source/reference/timestream/string.md | 1 - python/pysrc/kaskada/_timestream.py | 19 --- .../is_valid_test/test_is_valid_boolean.jsonl | 7 - .../is_valid_test/test_is_valid_f64.jsonl | 6 - .../is_valid_test/test_is_valid_i64.jsonl | 6 - .../is_valid_test/test_is_valid_record.jsonl | 6 - .../is_valid_test/test_is_valid_string.jsonl | 6 - .../test_is_valid_timestamp_ns.jsonl | 6 - python/pytests/is_valid_test.py | 152 ------------------ python/pytests/json_test.py | 148 ----------------- 11 files changed, 358 deletions(-) delete mode 100644 python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl delete mode 100644 python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl delete mode 100644 python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl delete mode 100644 python/pytests/golden/is_valid_test/test_is_valid_record.jsonl delete mode 100644 python/pytests/golden/is_valid_test/test_is_valid_string.jsonl delete mode 100644 python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl delete mode 100644 python/pytests/is_valid_test.py delete mode 100644 python/pytests/json_test.py diff --git a/python/docs/source/reference/timestream/misc.md b/python/docs/source/reference/timestream/misc.md index 0df58f6cc..609c29f4a 100644 --- a/python/docs/source/reference/timestream/misc.md +++ b/python/docs/source/reference/timestream/misc.md @@ -12,7 +12,6 @@ Timestream.filter Timestream.hash Timestream.if_ - Timestream.is_valid Timestream.lag Timestream.null_if Timestream.pipe diff --git a/python/docs/source/reference/timestream/string.md b/python/docs/source/reference/timestream/string.md index 8014668ad..6649e8ed5 100644 --- a/python/docs/source/reference/timestream/string.md +++ b/python/docs/source/reference/timestream/string.md @@ -6,7 +6,6 @@ .. autosummary:: :toctree: ../apidocs/ - Timestream.json Timestream.len Timestream.lower Timestream.substring diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py index 401bcf0ea..29a5b1a45 100644 --- a/python/pysrc/kaskada/_timestream.py +++ b/python/pysrc/kaskada/_timestream.py @@ -321,25 +321,6 @@ def hash(self) -> Timestream: """ return Timestream._call("hash", self) - def is_valid(self) -> Timestream: - """Returns a Timestream containing `true` if input is `non-null`, otherwise `false`. - - Notes: - Unlike many functions which return `null` if any of their - arguments are `null`, *is_valid()* will never return `null`. - """ - return Timestream._call("is_valid", self) - - def json(self) -> Timestream: - """Returns a Timestream containing an object deserialized from a json string. - - Notes: - Warning `json()` is experimental functionality. You should expect the - behavior to potentially change in the future. Certain functionality, - such as nested types, are not yet supported. - """ - return Timestream._call("json", self) - def len(self) -> Timestream: """Return a Timestream with the length of input string. diff --git a/python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl deleted file mode 100644 index 0cc1ecab4..000000000 --- a/python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl +++ /dev/null @@ -1,7 +0,0 @@ -{"_time":"1996-12-19T16:39:57.000000000","_key":"A","a":true,"is_valid":true} -{"_time":"1996-12-19T16:39:58.000000000","_key":"B","a":false,"is_valid":true} -{"_time":"1996-12-19T16:39:59.000000000","_key":"B","a":null,"is_valid":false} -{"_time":"1996-12-19T16:40:00.000000000","_key":"B","a":true,"is_valid":true} -{"_time":"1996-12-19T16:40:01.000000000","_key":"B","a":false,"is_valid":true} -{"_time":"1996-12-19T16:40:02.000000000","_key":"B","a":false,"is_valid":true} -{"_time":"1996-12-19T16:40:02.000000000","_key":"B","a":null,"is_valid":false} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl deleted file mode 100644 index 2de3c7561..000000000 --- a/python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl +++ /dev/null @@ -1,6 +0,0 @@ -{"_time":"1996-12-19T16:39:57.000000000","_key":"A","m":5.2,"is_valid":true} -{"_time":"1996-12-19T16:39:58.000000000","_key":"B","m":24.3,"is_valid":true} -{"_time":"1996-12-19T16:39:59.000000000","_key":"A","m":17.6,"is_valid":true} -{"_time":"1996-12-19T16:40:00.000000000","_key":"A","m":null,"is_valid":false} -{"_time":"1996-12-19T16:40:01.000000000","_key":"A","m":12.4,"is_valid":true} -{"_time":"1996-12-19T16:40:02.000000000","_key":"A","m":null,"is_valid":false} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl deleted file mode 100644 index e3795c3bf..000000000 --- a/python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl +++ /dev/null @@ -1,6 +0,0 @@ -{"_time":"1996-12-19T16:39:57.000000000","_key":"A","m":5.0,"is_valid":true} -{"_time":"1996-12-19T16:39:58.000000000","_key":"B","m":24.0,"is_valid":true} -{"_time":"1996-12-19T16:39:59.000000000","_key":"A","m":17.0,"is_valid":true} -{"_time":"1996-12-19T16:40:00.000000000","_key":"A","m":null,"is_valid":false} -{"_time":"1996-12-19T16:40:01.000000000","_key":"A","m":12.0,"is_valid":true} -{"_time":"1996-12-19T16:40:02.000000000","_key":"A","m":null,"is_valid":false} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_record.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_record.jsonl deleted file mode 100644 index 56409f828..000000000 --- a/python/pytests/golden/is_valid_test/test_is_valid_record.jsonl +++ /dev/null @@ -1,6 +0,0 @@ -{"_time":"1996-12-19T16:39:57.000000000","_key":"A","is_valid":true} -{"_time":"1996-12-19T16:39:58.000000000","_key":"B","is_valid":true} -{"_time":"1996-12-19T16:39:59.000000000","_key":"B","is_valid":true} -{"_time":"1996-12-19T16:40:00.000000000","_key":"B","is_valid":true} -{"_time":"1996-12-19T16:40:01.000000000","_key":"B","is_valid":true} -{"_time":"1996-12-19T16:40:02.000000000","_key":"B","is_valid":true} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_string.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_string.jsonl deleted file mode 100644 index 2fb59057c..000000000 --- a/python/pytests/golden/is_valid_test/test_is_valid_string.jsonl +++ /dev/null @@ -1,6 +0,0 @@ -{"_time":"1996-12-19T16:39:57.000000000","_key":"A","s":"hEllo","is_valid":true} -{"_time":"1996-12-19T16:39:58.000000000","_key":"B","s":"World","is_valid":true} -{"_time":"1996-12-19T16:39:59.000000000","_key":"B","s":"hello world","is_valid":true} -{"_time":"1996-12-19T16:40:00.000000000","_key":"B","s":null,"is_valid":false} -{"_time":"1996-12-19T16:40:01.000000000","_key":"B","s":null,"is_valid":false} -{"_time":"1996-12-19T16:40:02.000000000","_key":"B","s":"goodbye","is_valid":true} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl deleted file mode 100644 index 177dd4664..000000000 --- a/python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl +++ /dev/null @@ -1,6 +0,0 @@ -{"_time":"1996-12-19T16:39:57.000000000","_key":"A","n":2.0,"is_valid":true} -{"_time":"1996-12-19T16:39:58.000000000","_key":"B","n":4.0,"is_valid":true} -{"_time":"1996-12-19T16:39:59.000000000","_key":"B","n":5.0,"is_valid":true} -{"_time":"1996-12-19T16:40:00.000000000","_key":"B","n":null,"is_valid":false} -{"_time":"1996-12-19T16:40:01.000000000","_key":"B","n":8.0,"is_valid":true} -{"_time":"1996-12-19T16:40:02.000000000","_key":"B","n":23.0,"is_valid":true} diff --git a/python/pytests/is_valid_test.py b/python/pytests/is_valid_test.py deleted file mode 100644 index 9cb087dae..000000000 --- a/python/pytests/is_valid_test.py +++ /dev/null @@ -1,152 +0,0 @@ -import kaskada as kd -import pytest - -@pytest.fixture(scope="module") -async def boolean_source() -> kd.sources.CsvString: - content = "\n".join( - [ - "time,key,a,b", - '1996-12-19T16:39:57,A,true,true', - '1996-12-19T16:39:58,B,false,false', - '1996-12-19T16:39:59,B,,true', - '1996-12-19T16:40:00,B,true,false', - '1996-12-19T16:40:01,B,false,true', - '1996-12-19T16:40:02,B,false,', - '1996-12-19T16:40:02,B,,', - ] - ) - return await kd.sources.CsvString.create( - content, time_column="time", key_column="key" - ) - -@pytest.fixture(scope="module") -async def f64_source() -> kd.sources.CsvString: - content = "\n".join( - [ - "time,key,m,n", - '1996-12-19T16:39:57,A,5.2,10', - '1996-12-19T16:39:58,B,24.3,3.9', - '1996-12-19T16:39:59,A,17.6,6.2', - '1996-12-19T16:40:00,A,,9.25', - '1996-12-19T16:40:01,A,12.4,', - '1996-12-19T16:40:02,A,,', - ] - ) - return await kd.sources.CsvString.create( - content, time_column="time", key_column="key" - ) - -@pytest.fixture(scope="module") -async def i64_source() -> kd.sources.CsvString: - content = "\n".join( - [ - "time,key,m,n", - '1996-12-19T16:39:57,A,5,10', - '1996-12-19T16:39:58,B,24,3', - '1996-12-19T16:39:59,A,17,6', - '1996-12-19T16:40:00,A,,9', - '1996-12-19T16:40:01,A,12,', - '1996-12-19T16:40:02,A,,', - ] - ) - return await kd.sources.CsvString.create( - content, time_column="time", key_column="key" - ) - -@pytest.fixture(scope="module") -async def string_source() -> kd.sources.CsvString: - content = "\n".join( - [ - "time,key,s,n,t", - '1996-12-19T16:39:57,A,"hEllo",0,"hEllo"', - '1996-12-19T16:39:58,B,"World",5,"world"', - '1996-12-19T16:39:59,B,"hello world",-2,"hello world"', - '1996-12-19T16:40:00,B,,-2,"greetings"', - '1996-12-19T16:40:01,B,,2,"salutations"', - '1996-12-19T16:40:02,B,"goodbye",,', - ] - ) - return await kd.sources.CsvString.create( - content, time_column="time", key_column="key" - ) - -@pytest.fixture(scope="module") -async def timestamp_ns_source() -> kd.sources.CsvString: - content = "\n".join( - [ - "time,key,n,m,other_time,fruit", - '1996-12-19T16:39:57,A,2,4,2003-12-19T16:39:57,pear', - '1996-12-19T16:39:58,B,4,3,1994-11-19T16:39:57,watermelon', - '1996-12-19T16:39:59,B,5,,1998-12-19T16:39:57,mango', - '1996-12-19T16:40:00,B,,,1992-12-19T16:39:57,', - '1996-12-19T16:40:01,B,8,8,,', - '1996-12-19T16:40:02,B,23,11,1994-12-19T16:39:57,mango', - ] - ) - return await kd.sources.CsvString.create( - content, time_column="time", key_column="key" - ) - -async def test_is_valid_boolean(boolean_source, golden) -> None: - a = boolean_source.col("a") - golden.jsonl( - kd.record( - { - "a": a, - "is_valid": a.is_valid(), - } - ) - ) - -async def test_is_valid_f64(f64_source, golden) -> None: - m = f64_source.col("m") - golden.jsonl( - kd.record( - { - "m": m, - "is_valid": m.is_valid(), - } - ) - ) - -async def test_is_valid_i64(i64_source, golden) -> None: - m = i64_source.col("m") - golden.jsonl( - kd.record( - { - "m": m, - "is_valid": m.is_valid(), - } - ) - ) - -async def test_is_valid_string(string_source, golden) -> None: - s = string_source.col("s") - golden.jsonl( - kd.record( - { - "s": s, - "is_valid": s.is_valid(), - } - ) - ) - -async def test_is_valid_timestamp_ns(timestamp_ns_source, golden) -> None: - n = timestamp_ns_source.col("n") - golden.jsonl( - kd.record( - { - "n": n, - "is_valid": n.is_valid(), - } - ) - ) - -async def test_is_valid_record(timestamp_ns_source, golden) -> None: - golden.jsonl( - kd.record( - { - "is_valid": timestamp_ns_source.is_valid() - } - ) - ) diff --git a/python/pytests/json_test.py b/python/pytests/json_test.py deleted file mode 100644 index 7c75f2f2b..000000000 --- a/python/pytests/json_test.py +++ /dev/null @@ -1,148 +0,0 @@ -import kaskada as kd -import pytest - - -@pytest.fixture(scope="module") -async def source() -> kd.sources.CsvString: - content = "\n".join( - [ - "time,key,json_str", - '1996-12-19T16:39:57,A,"{\"a\": 10\\, \"b\": \"dog\"}"', - '1996-12-19T16:39:58,B,"{\"a\": 4\\, \"b\": \"lizard\"}"', - '1996-12-19T16:39:59,B,"{\"a\": 1\\, \"c\": 3.3}"', - '1996-12-19T16:40:00,B,"{\"a\": 34}"', - '1996-12-19T16:40:01,B,"{\"a\": 34}"', - '1996-12-19T16:40:02,B,"{\"a\": 6\\, \"b\": \"dog\"}"', - ] - ) - return await kd.sources.CsvString.create( - content, time_column="time", key_column="key" - ) - -@pytest.fixture(scope="module") -async def invalid_source() -> kd.sources.CsvString: - content = "\n".join( - [ - "time,key,json_str", - '1996-12-19T16:39:57,A,"{a: 10\\, \"b\": \"dog\"}"', - '1996-12-19T16:39:58,B,"{\"a\": 4\\, \"b: lizard\"}"', - '1996-12-19T16:39:59,B,"{\"a\": 1\\, \"c\": 3.3}"', - '1996-12-19T16:40:00,B,"{\"a\": 12\\, \"b\": \"cat\"}"', - '1996-12-19T16:40:01,B,"{\"a\"\\, 34}"', - '1996-12-19T16:40:02,B,"{\"a\": 6\\, \"b\": \"dog\"}"', - ] - ) - return await kd.sources.CsvString.create( - content, time_column="time", key_column="key" - ) - -# "let json = json(Json.json) in { a_test: json.a as i64, b_test: json(Json.json).b }" -async def test_json_parse_field(source, golden) -> None: - json_str = source.col("json_str") - j = json_str.json() - golden.jsonl( - kd.record( - { - "a_test": j.col("a"), - "b_test": json_str.json().b, - } - ) - ) -""" -# "let json = json(Json.json) in { string: json.b, len: len(json.b) }" -async def test_json_string_field_usable_in_string_functions(source, golden) -> None: - json_str = source.col("json_str") - j = json_str.json() - golden.jsonl( - kd.record( - { - "string": j.col("b"), - "len": j.col("b").len(), - } - ) - ) - -# "let json = json(Json.json) in { num_as_str: json.a as string, len: len(json.a as string) }" -async def test_json_field_number_as_string(source, golden) -> None: - json_str = source.col("json_str") - j = json_str.json() - golden.jsonl( - kd.record( - { - "num_as_str": j.col("a").cast("str"), - "len": j.col("a").cast("str").len(), - } - ) - ) - -# "let json = json(Json.json) in { a: json.a, plus_one: (json.a as i64) + 1 }" -async def test_json_field_as_number_with_addition(source, golden) -> None: - json_str = source.col("json_str") - j = json_str.json() - golden.jsonl( - kd.record( - { - "a": j.col("a"), - "plus_one": j.col("a").cast("i64") + 1, - } - ) - ) - -# "let json = json(Json.json) in { a_test: json.a as i64, b_test: json(Json.json).b }" -# -# I guess this behavior is somewhat strange, in that creating a record with all -# nulls produces nothing, while one non-null field in a record causes us to -# print "null" in other fields. -async def test_incorrect_json_format_produces_null(invalid_source, golden) -> None: - json_str = invalid_source.col("json_str") - j = json_str.json() - golden.jsonl( - kd.record( - { - "a_test": j.col("a").cast("i64"), - "b_test": json_str.json().b, - } - ) - ) - -# "let json = json(Json.json) in { a: json(json) }" -async def test_json_of_json_object_errors(invalid_source, golden) -> None: - json_str = invalid_source.col("json_str") - j = json_str.json() - golden.jsonl( - kd.record( - { - "a": j.json(), - } - ) - ) - -# "{ out: json(Json.json).a.b }" -# -# There's a way we can probably produce a better error message, -# but, on the other hand, it's marked as an experimental feature, plus -# this returns an error rather than incorrect results :shrug: -# -# The `dfg` would need to check if it recursively encounters the pattern -# `(field_ref (json ?value ?op) ?field ?op)` -async def test_nested_json_produces_error(invalid_source, golden) -> None: - json_str = invalid_source.col("json_str") - j = json_str.json() - golden.jsonl( - kd.record( - { - "out": j.json().col("a").col("b"), - } - ) - ) - -# \"{ out: json(Json.json) }" -async def test_json_as_output_field_produces_error(invalid_source, golden) -> None: - golden.jsonl( - kd.record( - { - "out": invalid_source.col("json_str").json(), - } - ) - ) - """ \ No newline at end of file From ce7d5c8f36f676f4dae3ed3d7de778bf5c8e1251 Mon Sep 17 00:00:00 2001 From: Eric Pinzur Date: Fri, 22 Sep 2023 22:56:02 +0200 Subject: [PATCH 4/4] fix lint --- python/pysrc/kaskada/sources/arrow.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py index b2e61b9ee..18d2111c7 100644 --- a/python/pysrc/kaskada/sources/arrow.py +++ b/python/pysrc/kaskada/sources/arrow.py @@ -301,7 +301,11 @@ async def add_string(self, csv_string: str | BytesIO) -> None: """Add data to the source.""" if isinstance(csv_string, str): csv_string = BytesIO(csv_string.encode("utf-8")) - content = pa.csv.read_csv(csv_string, convert_options=self._convert_options, parse_options=CsvString._parse_options) + content = pa.csv.read_csv( + csv_string, + convert_options=self._convert_options, + parse_options=CsvString._parse_options + ) for batch in content.to_batches(): await self._ffi_table.add_pyarrow(batch)