Skip to content

Commit

Permalink
feat: implement __getitem__ (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBernstorff committed Jan 1, 2024
2 parents bfa28de + f120b34 commit 1dd6588
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 53 deletions.
95 changes: 60 additions & 35 deletions functionalpy/_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,78 +6,103 @@
Iterator,
Sequence,
)
from copy import deepcopy
from functools import reduce
from itertools import islice
from typing import Generic, TypeVar

_T = TypeVar("_T")
_S = TypeVar("_S")


class Seq(Generic[_T]):
def __init__(self, iterable: Iterable[_T]):
self._seq = iterable
T = TypeVar("T")
S = TypeVar("S")


class Seq(Generic[T]):
def __init__(self, iterable: Iterable[T]) -> None:
self._iterator: Iterator[T] = iter(iterable)

@property
def _non_consumable(self) -> Iterator[T]:
return deepcopy(self._iterator)

def __getitem__(self, index: int | slice) -> T | "Seq[T]":
if isinstance(index, int) and index >= 0:
try:
return list(self._non_consumable)[index]
except StopIteration:
raise IndexError("Index out of range") from None
elif isinstance(index, slice):
return Seq(
islice(
self._non_consumable,
index.start,
index.stop,
index.step,
)
)
else:
raise KeyError(
f"Key must be non-negative integer or slice, not {index}"
)

### Reductions
def count(self) -> int:
return sum(1 for _ in self._seq)
return sum(1 for _ in self._iterator)

### Output
def to_list(self) -> list[_T]:
return list(self._seq)
def to_list(self) -> list[T]:
return list(self._iterator)

def to_tuple(self) -> tuple[_T, ...]:
return tuple(self._seq) # pragma: no cover
def to_tuple(self) -> tuple[T, ...]:
return tuple(self._iterator) # pragma: no cover

def to_iter(self) -> Iterator[_T]:
return iter(self._seq) # pragma: no cover
def to_iter(self) -> Iterator[T]:
return iter(self._iterator) # pragma: no cover

def to_set(self) -> set[_T]:
return set(self._seq) # pragma: no cover
def to_set(self) -> set[T]:
return set(self._iterator) # pragma: no cover

### Transformations
def map( # noqa: A003 # Ignore that it's shadowing a python built-in
self,
func: Callable[[_T], _S],
) -> "Seq[_S]":
return Seq(map(func, self._seq))
func: Callable[[T], S],
) -> "Seq[S]":
return Seq(map(func, self._iterator))

def pmap(
self,
func: Callable[[_T], _S],
) -> "Seq[_S]":
func: Callable[[T], S],
) -> "Seq[S]":
"""Parallel map using multiprocessing.Pool
Not that lambdas are not supported by multiprocessing.Pool.map.
"""
with multiprocessing.Pool() as pool:
return Seq(pool.map(func, self._seq))
return Seq(pool.map(func, self._iterator))

def filter(self, func: Callable[[_T], bool]) -> "Seq[_T]": # noqa: A003
return Seq(filter(func, self._seq))
def filter(self, func: Callable[[T], bool]) -> "Seq[T]": # noqa: A003
return Seq(filter(func, self._iterator))

def reduce(self, func: Callable[[_T, _T], _T]) -> _T:
return reduce(func, self._seq)
def reduce(self, func: Callable[[T, T], T]) -> T:
return reduce(func, self._iterator)

def groupby(
self, func: Callable[[_T], str]
) -> "Seq[tuple[str, list[_T]]]":
groups_with_values: defaultdict[str, list[_T]] = defaultdict(
self, func: Callable[[T], str]
) -> "Seq[tuple[str, list[T]]]":
groups_with_values: defaultdict[str, list[T]] = defaultdict(
list
)

for value in self._seq:
for value in self._iterator:
value_key = func(value)
groups_with_values[value_key].append(value)

tuples = list(groups_with_values.items())
return Seq(tuples)

def flatten(self) -> "Seq[_T]":
values: list[_T] = []

for i in self._seq:
def flatten(self) -> "Seq[T]":
values: list[T] = []
for i in self._iterator:
if isinstance(i, Sequence) and not isinstance(i, str):
values.extend(i) # type: ignore
values.extend(i)
else:
values.append(i)

Expand Down
40 changes: 22 additions & 18 deletions functionalpy/_sequence.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,34 @@ from collections.abc import (
)
from typing import Generic, TypeVar, overload

_T = TypeVar("_T")
_S = TypeVar("_S")
T = TypeVar("T")
S = TypeVar("S")

class Seq(Generic[_T]):
def __init__(self, iterable: Iterable[_T]) -> None: ...
class Seq(Generic[T]):
def __init__(self, iterable: Iterable[T]) -> None: ...
@overload
def __getitem__(self, index: int) -> T: ...
@overload
def __getitem__(self, index: slice) -> Seq[T]: ...
def count(self) -> int: ...
def to_list(self) -> list[_T]: ...
def to_tuple(self) -> tuple[_T, ...]: ...
def to_iter(self) -> Iterator[_T]: ...
def to_set(self) -> set[_T]: ...
def map(self, func: Callable[[_T], _S]) -> Seq[_S]: # noqa: A003
def to_list(self) -> list[T]: ...
def to_tuple(self) -> tuple[T, ...]: ...
def to_iter(self) -> Iterator[T]: ...
def to_set(self) -> set[T]: ...
def map(self, func: Callable[[T], S]) -> Seq[S]: # noqa: A003
...
def pmap(self, func: Callable[[_T], _S]) -> Seq[_S]: ...
def filter(self, func: Callable[[_T], bool]) -> Seq[_T]: # noqa: A003
def pmap(self, func: Callable[[T], S]) -> Seq[S]: ...
def filter(self, func: Callable[[T], bool]) -> Seq[T]: # noqa: A003
...
def reduce(self, func: Callable[[_T, _T], _T]) -> _T: ...
def reduce(self, func: Callable[[T, T], T]) -> T: ...
def groupby(
self, func: Callable[[_T], str]
) -> Seq[tuple[str, list[_T]]]: ...
self, func: Callable[[T], str]
) -> Seq[tuple[str, list[T]]]: ...
@overload
def flatten(self: Seq[list[_S]]) -> Seq[_S]: ...
def flatten(self: Seq[list[S]]) -> Seq[S]: ...
@overload
def flatten(self: Seq[list[_S] | _S]) -> Seq[_S]: ...
def flatten(self: Seq[list[S] | S]) -> Seq[S]: ...
@overload
def flatten(self: Seq[tuple[_S, ...]]) -> Seq[_S]: ...
def flatten(self: Seq[tuple[S, ...]]) -> Seq[S]: ...
@overload
def flatten(self: Seq[_S]) -> Seq[_S]: ...
def flatten(self: Seq[S]) -> Seq[S]: ...
7 changes: 7 additions & 0 deletions functionalpy/test_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ def is_even(num: int) -> str:
]


def test_getitem():
test_input = [1, 2, 3]
test_sequence = Seq(test_input)
assert test_sequence[0] == 1
assert test_sequence[0:2].to_list() == [1, 2]


def test_flatten():
test_input: list[list[int]] = [[1, 2], [3, 4]]
sequence = Seq(test_input)
Expand Down

0 comments on commit 1dd6588

Please sign in to comment.