Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make operators a singleton object instead of a class #106

Merged
merged 3 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 141 additions & 117 deletions aiostream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
Protocol,
Union,
TypeVar,
cast,
AsyncIterable,
Awaitable,
)
Expand Down Expand Up @@ -258,23 +257,26 @@ def streamcontext(aiterable: AsyncIterable[T]) -> Streamer[T]:
# Operator type protocol


class OperatorType(Protocol[P, T]):
class Operator(Protocol[P, T]):
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]: ...

def raw(self, *args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]: ...
@staticmethod
def raw(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]: ...


class PipableOperatorType(Protocol[A, P, T]):
class PipableOperator(Protocol[A, P, T]):
def __call__(
self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
) -> Stream[T]: ...

@staticmethod
def raw(
self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
) -> AsyncIterator[T]: ...

@staticmethod
def pipe(
self, *args: P.args, **kwargs: P.kwargs
*args: P.args, **kwargs: P.kwargs
) -> Callable[[AsyncIterable[A]], Stream[T]]: ...


Expand All @@ -284,7 +286,7 @@ def pipe(
def operator(
func: Callable[P, AsyncIterator[T]] | None = None,
pipable: bool | None = None,
) -> OperatorType[P, T]:
) -> Operator[P, T]:
"""Create a stream operator from an asynchronous generator
(or any function returning an asynchronous iterable).

Expand Down Expand Up @@ -330,7 +332,6 @@ async def random(offset=0., width=1.):
)

# Gather data
bases = (Stream,)
name = func.__name__
module = func.__module__
extra_doc = func.__doc__
Expand All @@ -339,55 +340,73 @@ async def random(offset=0., width=1.):
# Extract signature
signature = inspect.signature(func)
parameters = list(signature.parameters.values())
return_annotation = signature.return_annotation
if parameters and parameters[0].name in ("self", "cls"):
raise ValueError(
"An operator cannot be created from a method, "
"since the decorated function becomes an operator class"
)

# Injected parameters
self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD)

# Wrapped static method
original = func
original.__qualname__ = name + ".original"
original_func = func
original_func.__qualname__ = name + ".original"

# Raw static method
raw = func
raw.__qualname__ = name + ".raw"
raw_func = func
raw_func.__qualname__ = name + ".raw"

# Init method
def init(self: BaseStream[T], *args: P.args, **kwargs: P.kwargs) -> None:
factory = functools.partial(raw, *args, **kwargs)
return BaseStream.__init__(self, factory)
# Gather attributes
class OperatorImplementation:
__qualname__ = name
__module__ = module
__doc__ = doc
__signature__ = signature

# Customize init signature
new_parameters = [self_parameter] + parameters
init.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined]
original = staticmethod(original_func)

# Customize init method
init.__qualname__ = name + ".__init__"
init.__name__ = "__init__"
init.__module__ = module
init.__doc__ = f"Initialize the {name} stream."
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]:
factory = functools.partial(raw_func, *args, **kwargs)
return Stream(factory)

# Gather attributes
attrs = {
"__init__": init,
"__module__": module,
"__doc__": doc,
"raw": staticmethod(raw),
"original": staticmethod(original),
}
@staticmethod
def raw(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]:
return raw_func(*args, **kwargs)

def __repr__(self) -> str:
return f"{module}.{name}"

def __str__(self) -> str:
return f"{module}.{name}"

# Customize raw method
OperatorImplementation.raw.__signature__ = signature # type: ignore[attr-defined]
OperatorImplementation.raw.__qualname__ = name + ".raw"
OperatorImplementation.raw.__module__ = module
OperatorImplementation.raw.__doc__ = doc

# Customize call method
self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
new_parameters = [self_parameter] + parameters
new_return_annotation = (
return_annotation.replace("AsyncIterator", "Stream")
if isinstance(return_annotation, str)
else return_annotation
)
OperatorImplementation.__call__.__signature__ = signature.replace( # type: ignore[attr-defined]
parameters=new_parameters, return_annotation=new_return_annotation
)
OperatorImplementation.__call__.__qualname__ = name + ".__call__"
OperatorImplementation.__call__.__name__ = "__call__"
OperatorImplementation.__call__.__module__ = module
OperatorImplementation.__call__.__doc__ = doc

# Create operator class
return cast("OperatorType[P, T]", type(name, bases, attrs))
return OperatorImplementation()


def pipable_operator(
func: Callable[Concatenate[AsyncIterable[X], P], AsyncIterator[T]],
) -> PipableOperatorType[X, P, T]:
) -> PipableOperator[X, P, T]:
"""Create a pipable stream operator from an asynchronous generator
(or any function returning an asynchronous iterable).

Expand Down Expand Up @@ -441,7 +460,6 @@ def double(source):
)

# Gather data
bases = (Stream,)
name = func.__name__
module = func.__module__
extra_doc = func.__doc__
Expand All @@ -450,12 +468,20 @@ def double(source):
# Extract signature
signature = inspect.signature(func)
parameters = list(signature.parameters.values())
return_annotation = signature.return_annotation
if parameters and parameters[0].name in ("self", "cls"):
raise ValueError(
"An operator cannot be created from a method, "
"since the decorated function becomes an operator class"
)

# Check for positional first parameter
if not parameters or parameters[0].kind not in (
inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
):
raise ValueError("The first parameter of the operator must be positional")

# Look for "more_sources"
for i, p in enumerate(parameters):
if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL:
Expand All @@ -464,89 +490,87 @@ def double(source):
else:
more_sources_index = None

# Injected parameters
self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
cls_parameter = inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD)

# Wrapped static method
original = func
original.__qualname__ = name + ".original"
original_func = func
original_func.__qualname__ = name + ".original"

# Raw static method
def raw(
arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs
) -> AsyncIterator[T]:
assert_async_iterable(arg)
if more_sources_index is not None:
for source in args[more_sources_index - 1 :]:
assert_async_iterable(source)
return func(arg, *args, **kwargs)

# Custonize raw method
raw.__signature__ = signature # type: ignore[attr-defined]
raw.__qualname__ = name + ".raw"
raw.__module__ = module
raw.__doc__ = doc

# Init method
def init(
self: BaseStream[T], arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs
) -> None:
assert_async_iterable(arg)
if more_sources_index is not None:
for source in args[more_sources_index - 1 :]:
assert_async_iterable(source)
factory = functools.partial(raw, arg, *args, **kwargs)
return BaseStream.__init__(self, factory)

# Customize init signature
# Gather attributes
class PipableOperatorImplementation:
__qualname__ = name
__module__ = module
__doc__ = doc

original = staticmethod(original_func)

@staticmethod
def raw(
arg: AsyncIterable[X], /, *args: P.args, **kwargs: P.kwargs
) -> AsyncIterator[T]:
assert_async_iterable(arg)
if more_sources_index is not None:
for source in args[more_sources_index - 1 :]:
assert_async_iterable(source)
return func(arg, *args, **kwargs)

def __call__(
self, arg: AsyncIterable[X], /, *args: P.args, **kwargs: P.kwargs
) -> Stream[T]:
assert_async_iterable(arg)
if more_sources_index is not None:
for source in args[more_sources_index - 1 :]:
assert_async_iterable(source)
factory = functools.partial(self.raw, arg, *args, **kwargs)
return Stream(factory)

@staticmethod
def pipe(
*args: P.args,
**kwargs: P.kwargs,
) -> Callable[[AsyncIterable[X]], Stream[T]]:
return lambda source: operator_instance(source, *args, **kwargs)

def __repr__(self) -> str:
return f"{module}.{name}"

def __str__(self) -> str:
return f"{module}.{name}"

# Customize raw method
PipableOperatorImplementation.raw.__signature__ = signature # type: ignore[attr-defined]
PipableOperatorImplementation.raw.__qualname__ = name + ".raw"
PipableOperatorImplementation.raw.__module__ = module
PipableOperatorImplementation.raw.__doc__ = doc

# Customize call method
self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
new_parameters = [self_parameter] + parameters
init.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined]

# Customize init method
init.__qualname__ = name + ".__init__"
init.__name__ = "__init__"
init.__module__ = module
init.__doc__ = f"Initialize the {name} stream."

# Pipe class method
def pipe(
cls: PipableOperatorType[X, P, T],
/,
*args: P.args,
**kwargs: P.kwargs,
) -> Callable[[AsyncIterable[X]], Stream[T]]:
return lambda source: cls(source, *args, **kwargs)

# Customize pipe signature
if parameters and parameters[0].kind in (
inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
):
new_parameters = [cls_parameter] + parameters[1:]
else:
raise ValueError("The first parameter of the operator must be positional")
pipe.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined]
new_return_annotation = (
return_annotation.replace("AsyncIterator", "Stream")
if isinstance(return_annotation, str)
else return_annotation
)
PipableOperatorImplementation.__call__.__signature__ = signature.replace( # type: ignore[attr-defined]
parameters=new_parameters, return_annotation=new_return_annotation
)
PipableOperatorImplementation.__call__.__qualname__ = name + ".__call__"
PipableOperatorImplementation.__call__.__name__ = "__call__"
PipableOperatorImplementation.__call__.__module__ = module
PipableOperatorImplementation.__call__.__doc__ = doc

# Customize pipe method
pipe.__qualname__ = name + ".pipe"
pipe.__module__ = module
pipe.__doc__ = f'Pipable "{name}" stream operator.'
pipe_parameters = parameters[1:]
pipe_return_annotation = f"Callable[[AsyncIterable[X]], {new_return_annotation}]"
PipableOperatorImplementation.pipe.__signature__ = signature.replace( # type: ignore[attr-defined]
parameters=pipe_parameters, return_annotation=pipe_return_annotation
)
PipableOperatorImplementation.pipe.__qualname__ = name + ".pipe"
PipableOperatorImplementation.pipe.__module__ = module
PipableOperatorImplementation.pipe.__doc__ = (
f'Piped version of the "{name}" stream operator.'
)
if extra_doc:
pipe.__doc__ += "\n\n " + extra_doc

# Gather attributes
attrs = {
"__init__": init,
"__module__": module,
"__doc__": doc,
"raw": staticmethod(raw),
"original": staticmethod(original),
"pipe": classmethod(pipe), # type: ignore[arg-type]
}
PipableOperatorImplementation.pipe.__doc__ += "\n\n " + extra_doc

# Create operator class
return cast(
"PipableOperatorType[X, P, T]",
type(name, bases, attrs),
)
operator_instance = PipableOperatorImplementation()
return operator_instance
2 changes: 0 additions & 2 deletions aiostream/stream/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

__all__ = ["map", "enumerate", "starmap", "cycle", "chunks"]

# map, amap and smap are also transform operators
map, amap, smap

T = TypeVar("T")
U = TypeVar("U")
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ strict = [
"aiostream/manager.py",
"aiostream/pipe.py",
"aiostream/test_utils.py",
"aiostream/core.py",
]

[tool.mypy]
Expand Down
Loading
Loading