diff --git a/python/docs/source/reference/timestream/string.md b/python/docs/source/reference/timestream/string.md index 4af609ed3..6649e8ed5 100644 --- a/python/docs/source/reference/timestream/string.md +++ b/python/docs/source/reference/timestream/string.md @@ -6,6 +6,8 @@ .. autosummary:: :toctree: ../apidocs/ + Timestream.len Timestream.lower + Timestream.substring Timestream.upper ``` \ No newline at end of file diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py index c3ac0d0fb..29a5b1a45 100644 --- a/python/pysrc/kaskada/_timestream.py +++ b/python/pysrc/kaskada/_timestream.py @@ -321,6 +321,14 @@ def hash(self) -> Timestream: """ return Timestream._call("hash", self) + def len(self) -> Timestream: + """Return a Timestream with the length of input string. + + Notes: + Returns `0` for an empty string and `null` for `null` + """ + return Timestream._call("len", self) + def lower(self) -> Timestream: """Return a Timestream with all values converted to lower case.""" return Timestream._call("lower", self) @@ -557,6 +565,26 @@ def select(self, *args: str) -> Timestream: """ return Timestream._call("select_fields", self, *args) + def substring(self, start: Optional[int] = None, end: Optional[int] = None) -> Timestream: + """Return a Timestream with a substring between the start and end indices. + + Args: + start: The inclusive index to start at. `None` indicates the beginning + of the string. Negative indices count backwards from the end of + the string. + end: The exclusive index to end at. `None` indicates the length of + the string. Negative indices count backwards from the end of + the string. + + Notes: + Returns the substring starting at `start` (inclusive) up to but not + including the `end`. + + If the input is `null`, returns `null`. If `end` > `start` an empty + string is returned. + """ + return Timestream._call("substring", self, start, end) + def remove(self, *args: str) -> Timestream: """Return a Timestream removing the given fields from `self`. diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py index 2480e63f6..18d2111c7 100644 --- a/python/pysrc/kaskada/sources/arrow.py +++ b/python/pysrc/kaskada/sources/arrow.py @@ -246,6 +246,10 @@ def __init__( strings_can_be_null=True, ) + _parse_options = pyarrow.csv.ParseOptions( + escape_char="\\", + ) + @staticmethod async def create( csv_string: Optional[str | BytesIO] = None, @@ -277,7 +281,7 @@ async def create( if schema is None: if csv_string is None: raise ValueError("Must provide schema or csv_string") - schema = pa.csv.read_csv(csv_string).schema + schema = pa.csv.read_csv(csv_string, parse_options=CsvString._parse_options).schema csv_string.seek(0) source = CsvString( @@ -297,7 +301,11 @@ async def add_string(self, csv_string: str | BytesIO) -> None: """Add data to the source.""" if isinstance(csv_string, str): csv_string = BytesIO(csv_string.encode("utf-8")) - content = pa.csv.read_csv(csv_string, convert_options=self._convert_options) + content = pa.csv.read_csv( + csv_string, + convert_options=self._convert_options, + parse_options=CsvString._parse_options + ) for batch in content.to_batches(): await self._ffi_table.add_pyarrow(batch) diff --git a/python/pytests/golden/len_test/test_len.jsonl b/python/pytests/golden/len_test/test_len.jsonl new file mode 100644 index 000000000..e6c369fc8 --- /dev/null +++ b/python/pytests/golden/len_test/test_len.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","len":5.0} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","len":5.0} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","len":11.0} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","len":null} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","len":null} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","len":7.0} diff --git a/python/pytests/golden/substring_test/test_substring.jsonl b/python/pytests/golden/substring_test/test_substring.jsonl new file mode 100644 index 000000000..e9d5ee346 --- /dev/null +++ b/python/pytests/golden/substring_test/test_substring.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","substring_0_2":"hE","substring_1":"Ello","substring_0_i":"","substring_i":"hEllo"} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","substring_0_2":"Wo","substring_1":"orld","substring_0_i":"World","substring_i":""} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","substring_0_2":"he","substring_1":"ello world","substring_0_i":"hello wor","substring_i":"ld"} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","substring_0_2":null,"substring_1":null,"substring_0_i":null,"substring_i":null} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","substring_0_2":null,"substring_1":null,"substring_0_i":null,"substring_i":null} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","substring_0_2":"go","substring_1":"oodbye","substring_0_i":"goodbye","substring_i":"goodbye"} diff --git a/python/pytests/len_test.py b/python/pytests/len_test.py new file mode 100644 index 000000000..4bdcf5257 --- /dev/null +++ b/python/pytests/len_test.py @@ -0,0 +1,31 @@ +import kaskada as kd +import pytest + + +@pytest.fixture(scope="module") +async def source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,s,n,t", + '1996-12-19T16:39:57,A,"hEllo",0,"hEllo"', + '1996-12-19T16:39:58,B,"World",5,"world"', + '1996-12-19T16:39:59,B,"hello world",-2,"hello world"', + '1996-12-19T16:40:00,B,,-2,"greetings"', + '1996-12-19T16:40:01,B,,2,"salutations"', + '1996-12-19T16:40:02,B,"goodbye",,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + + +async def test_len(source, golden) -> None: + s = source.col("s") + golden.jsonl( + kd.record( + { + "len": s.len() + } + ) + ) diff --git a/python/pytests/substring_test.py b/python/pytests/substring_test.py new file mode 100644 index 000000000..9a171c46c --- /dev/null +++ b/python/pytests/substring_test.py @@ -0,0 +1,35 @@ +import kaskada as kd +import pytest + + +@pytest.fixture(scope="module") +async def source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,s,n,t", + '1996-12-19T16:39:57,A,"hEllo",0,"hEllo"', + '1996-12-19T16:39:58,B,"World",5,"world"', + '1996-12-19T16:39:59,B,"hello world",-2,"hello world"', + '1996-12-19T16:40:00,B,,-2,"greetings"', + '1996-12-19T16:40:01,B,,2,"salutations"', + '1996-12-19T16:40:02,B,"goodbye",,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + + +async def test_substring(source, golden) -> None: + s = source.col("s") + n = source.col("n") + golden.jsonl( + kd.record( + { + "substring_0_2": s.substring(start=0, end=2), + "substring_1": s.substring(start=1), + "substring_0_i": s.substring(end=n), + "substring_i": s.substring(start=n), + } + ) + )