Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added substring, json, len, is_valid methods #767

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/docs/source/reference/timestream/string.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
.. autosummary::
:toctree: ../apidocs/

Timestream.len
Timestream.lower
Timestream.substring
Timestream.upper
```
28 changes: 28 additions & 0 deletions python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,14 @@ def hash(self) -> Timestream:
"""
return Timestream._call("hash", self)

def len(self) -> Timestream:
"""Return a Timestream with the length of input string.
epinzur marked this conversation as resolved.
Show resolved Hide resolved

Notes:
Returns `0` for an empty string and `null` for `null`
"""
return Timestream._call("len", self)

def lower(self) -> Timestream:
"""Return a Timestream with all values converted to lower case."""
return Timestream._call("lower", self)
Expand Down Expand Up @@ -557,6 +565,26 @@ def select(self, *args: str) -> Timestream:
"""
return Timestream._call("select_fields", self, *args)

def substring(self, start: Optional[int] = None, end: Optional[int] = None) -> Timestream:
"""Return a Timestream with a substring between the start and end indices.

Args:
start: The inclusive index to start at. `None` indicates the beginning
of the string. Negative indices count backwards from the end of
the string.
end: The exclusive index to end at. `None` indicates the length of
the string. Negative indices count backwards from the end of
the string.

Notes:
Returns the substring starting at `start` (inclusive) up to but not
including the `end`.

If the input is `null`, returns `null`. If `end` > `start` an empty
string is returned.
"""
return Timestream._call("substring", self, start, end)

def remove(self, *args: str) -> Timestream:
"""Return a Timestream removing the given fields from `self`.

Expand Down
12 changes: 10 additions & 2 deletions python/pysrc/kaskada/sources/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ def __init__(
strings_can_be_null=True,
)

_parse_options = pyarrow.csv.ParseOptions(
escape_char="\\",
)

epinzur marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
async def create(
csv_string: Optional[str | BytesIO] = None,
Expand Down Expand Up @@ -277,7 +281,7 @@ async def create(
if schema is None:
if csv_string is None:
raise ValueError("Must provide schema or csv_string")
schema = pa.csv.read_csv(csv_string).schema
schema = pa.csv.read_csv(csv_string, parse_options=CsvString._parse_options).schema
csv_string.seek(0)

source = CsvString(
Expand All @@ -297,7 +301,11 @@ async def add_string(self, csv_string: str | BytesIO) -> None:
"""Add data to the source."""
if isinstance(csv_string, str):
csv_string = BytesIO(csv_string.encode("utf-8"))
content = pa.csv.read_csv(csv_string, convert_options=self._convert_options)
content = pa.csv.read_csv(
csv_string,
convert_options=self._convert_options,
parse_options=CsvString._parse_options
)
for batch in content.to_batches():
await self._ffi_table.add_pyarrow(batch)

Expand Down
6 changes: 6 additions & 0 deletions python/pytests/golden/len_test/test_len.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","len":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","len":5.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"B","len":11.0}
{"_time":"1996-12-19T16:40:00.000000000","_key":"B","len":null}
{"_time":"1996-12-19T16:40:01.000000000","_key":"B","len":null}
{"_time":"1996-12-19T16:40:02.000000000","_key":"B","len":7.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/substring_test/test_substring.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","substring_0_2":"hE","substring_1":"Ello","substring_0_i":"","substring_i":"hEllo"}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","substring_0_2":"Wo","substring_1":"orld","substring_0_i":"World","substring_i":""}
{"_time":"1996-12-19T16:39:59.000000000","_key":"B","substring_0_2":"he","substring_1":"ello world","substring_0_i":"hello wor","substring_i":"ld"}
{"_time":"1996-12-19T16:40:00.000000000","_key":"B","substring_0_2":null,"substring_1":null,"substring_0_i":null,"substring_i":null}
{"_time":"1996-12-19T16:40:01.000000000","_key":"B","substring_0_2":null,"substring_1":null,"substring_0_i":null,"substring_i":null}
{"_time":"1996-12-19T16:40:02.000000000","_key":"B","substring_0_2":"go","substring_1":"oodbye","substring_0_i":"goodbye","substring_i":"goodbye"}
31 changes: 31 additions & 0 deletions python/pytests/len_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import kaskada as kd
import pytest


@pytest.fixture(scope="module")
async def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,s,n,t",
'1996-12-19T16:39:57,A,"hEllo",0,"hEllo"',
'1996-12-19T16:39:58,B,"World",5,"world"',
'1996-12-19T16:39:59,B,"hello world",-2,"hello world"',
'1996-12-19T16:40:00,B,,-2,"greetings"',
'1996-12-19T16:40:01,B,,2,"salutations"',
'1996-12-19T16:40:02,B,"goodbye",,',
]
)
return await kd.sources.CsvString.create(
content, time_column="time", key_column="key"
)


async def test_len(source, golden) -> None:
s = source.col("s")
golden.jsonl(
kd.record(
{
"len": s.len()
}
)
)
35 changes: 35 additions & 0 deletions python/pytests/substring_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import kaskada as kd
import pytest


@pytest.fixture(scope="module")
async def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,s,n,t",
'1996-12-19T16:39:57,A,"hEllo",0,"hEllo"',
'1996-12-19T16:39:58,B,"World",5,"world"',
'1996-12-19T16:39:59,B,"hello world",-2,"hello world"',
'1996-12-19T16:40:00,B,,-2,"greetings"',
'1996-12-19T16:40:01,B,,2,"salutations"',
'1996-12-19T16:40:02,B,"goodbye",,',
]
)
return await kd.sources.CsvString.create(
content, time_column="time", key_column="key"
)


async def test_substring(source, golden) -> None:
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
s = source.col("s")
n = source.col("n")
golden.jsonl(
kd.record(
{
"substring_0_2": s.substring(start=0, end=2),
"substring_1": s.substring(start=1),
"substring_0_i": s.substring(end=n),
"substring_i": s.substring(start=n),
}
)
)
Loading