Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sources operator #108

Merged
merged 6 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ repos:
additional_dependencies: [pytest, typing-extensions]
types: [python]
- repo: https://github.com/RobertCraigie/pyright-python
rev: v1.1.361
rev: v1.1.362
hooks:
- id: pyright
additional_dependencies: [pytest, typing-extensions]
Expand Down
240 changes: 215 additions & 25 deletions aiostream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,19 @@ def pipe(
) -> Callable[[AsyncIterable[A]], Stream[T]]: ...


# Operator decorator
class SourcesOperator(Protocol[P, T]):
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]: ...

@staticmethod
def raw(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]: ...

@staticmethod
def pipe(
*args: P.args, **kwargs: P.kwargs
) -> Callable[[AsyncIterable[Any]], Stream[T]]: ...


# Operator decorators


def operator(
Expand All @@ -297,18 +309,18 @@ async def random(offset=0., width=1.):
while True:
yield offset + width * random.random()

The return value is a dynamically created class.
It has the same name, module and doc as the original function.
The return value is a dynamically created callable.
It has the same name, module and documentation as the original function.

A new stream is created by simply instanciating the operator::
A new stream is created by simply calling the operator::

xs = random()

The original function is called at instanciation to check that
signature match. Other methods are available:
The original function is called right away to check that the
signatures match. Other methods are available:

- `original`: the original function as a static method
- `raw`: same as original but add extra checking
- `raw`: same as original with extra checking

The `pipable` argument is deprecated, use `pipable_operator` instead.
"""
Expand Down Expand Up @@ -357,9 +369,6 @@ async def random(offset=0., width=1.):

# Gather attributes
class OperatorImplementation:
__qualname__ = name
__module__ = module
__doc__ = doc

original = staticmethod(original_func)

Expand Down Expand Up @@ -399,8 +408,18 @@ def __str__(self) -> str:
OperatorImplementation.__call__.__module__ = module
OperatorImplementation.__call__.__doc__ = doc

# Create operator class
return OperatorImplementation()
# Create operator singleton
properly_named_class = type(
name,
(OperatorImplementation,),
{
"__qualname__": name,
"__module__": module,
"__doc__": doc,
},
)
operator_instance = properly_named_class()
return operator_instance


def pipable_operator(
Expand All @@ -420,19 +439,19 @@ async def multiply(source, factor):
The first argument is expected to be the asynchronous iteratable used
for piping.

The return value is a dynamically created class.
It has the same name, module and doc as the original function.
The return value is a dynamically created callable.
It has the same name, module and documentation as the original function.

A new stream is created by simply instanciating the operator::
A new stream is created by simply calling the operator::

xs = random()
ys = multiply(xs, 2)

The original function is called at instanciation to check that
signature match. The source is also checked for asynchronous iteration.
The original function is called right away (but not awaited) to check that
signatures match. The sources are also checked for asynchronous iteration.

The operator also have a pipe class method that can be used along
with the piping synthax::
The operator also have a `pipe` method that can be used with the pipe
synthax::

xs = random()
ys = xs | multiply.pipe(2)
Expand All @@ -442,7 +461,7 @@ async def multiply(source, factor):
Other methods are available:

- `original`: the original function as a static method
- `raw`: same as original but add extra checking
- `raw`: same as original with extra checking

The raw method is useful to create new operators from existing ones::

Expand Down Expand Up @@ -495,9 +514,6 @@ def double(source):

# Gather attributes
class PipableOperatorImplementation:
__qualname__ = name
__module__ = module
__doc__ = doc

original = staticmethod(original_func)

Expand Down Expand Up @@ -570,6 +586,180 @@ def __str__(self) -> str:
if extra_doc:
PipableOperatorImplementation.pipe.__doc__ += "\n\n " + extra_doc

# Create operator class
operator_instance = PipableOperatorImplementation()
# Create operator singleton
properly_named_class = type(
name,
(PipableOperatorImplementation,),
{
"__qualname__": name,
"__module__": module,
"__doc__": doc,
},
)
operator_instance = properly_named_class()
return operator_instance


def sources_operator(
func: Callable[P, AsyncIterator[T]],
) -> SourcesOperator[P, T]:
"""Create a pipable stream operator from an asynchronous generator
(or any function returning an asynchronous iterable) that takes
a variadic ``*args`` of sources as argument.

Decorator usage::

@sources_operator
async def chain(*sources, repeat=1):
for source in (sources * repeat):
async with streamcontext(source) as streamer:
async for item in streamer:
yield item

Positional arguments are expected to be asynchronous iterables.

When used in a pipable context, the asynchronous iterable injected by
the pipe operator is used as the first argument.

The return value is a dynamically created callable.
It has the same name, module and documentation as the original function.

A new stream is created by simply calling the operator::

xs = chain()
ys = chain(random())
zs = chain(stream.just(0.0), stream.just(1.0), random())

The original function is called right away (but not awaited) to check that
signatures match. The sources are also checked for asynchronous iteration.

The operator also have a `pipe` method that can be used with the pipe
synthax::

just_zero = stream.just(0.0)
zs = just_zero | chain.pipe(stream.just(1.0), random())

This is strictly equivalent to the previous ``zs`` example.

Other methods are available:

- `original`: the original function as a static method
- `raw`: same as original with extra checking

The raw method is useful to create new operators from existing ones::

@sources_operator
def chain_twice(*sources):
return chain.raw(*sources, repeat=2)
"""
# First check for classmethod instance, to avoid more confusing errors later on
if isinstance(func, classmethod):
raise ValueError(
"An operator cannot be created from a class method, "
"since the decorated function becomes an operator class"
)

# Gather data
name = func.__name__
module = func.__module__
extra_doc = func.__doc__
doc = extra_doc or f"Regular {name} stream operator."

# Extract signature
signature = inspect.signature(func)
parameters = list(signature.parameters.values())
return_annotation = signature.return_annotation
if parameters and parameters[0].name in ("self", "cls"):
raise ValueError(
"An operator cannot be created from a method, "
"since the decorated function becomes an operator class"
)

# Check for positional first parameter
if not parameters or parameters[0].kind != inspect.Parameter.VAR_POSITIONAL:
raise ValueError(
"The first parameter of the sources operator must be var-positional"
)

# Wrapped static method
original_func = func
original_func.__qualname__ = name + ".original"

# Gather attributes
class SourcesOperatorImplementation:

original = staticmethod(original_func)

@staticmethod
def raw(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]:
for source in args:
assert_async_iterable(source)
return func(*args, **kwargs)

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]:
for source in args:
assert_async_iterable(source)
factory = functools.partial(self.raw, *args, **kwargs)
return Stream(factory)

@staticmethod
def pipe(
*args: P.args,
**kwargs: P.kwargs,
) -> Callable[[AsyncIterable[Any]], Stream[T]]:
return lambda source: operator_instance(source, *args, **kwargs) # type: ignore

def __repr__(self) -> str:
return f"{module}.{name}"

def __str__(self) -> str:
return f"{module}.{name}"

# Customize raw method
SourcesOperatorImplementation.raw.__signature__ = signature # type: ignore[attr-defined]
SourcesOperatorImplementation.raw.__qualname__ = name + ".raw"
SourcesOperatorImplementation.raw.__module__ = module
SourcesOperatorImplementation.raw.__doc__ = doc

# Customize call method
self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
new_parameters = [self_parameter] + parameters
new_return_annotation = (
return_annotation.replace("AsyncIterator", "Stream")
if isinstance(return_annotation, str)
else return_annotation
)
SourcesOperatorImplementation.__call__.__signature__ = signature.replace( # type: ignore[attr-defined]
parameters=new_parameters, return_annotation=new_return_annotation
)
SourcesOperatorImplementation.__call__.__qualname__ = name + ".__call__"
SourcesOperatorImplementation.__call__.__name__ = "__call__"
SourcesOperatorImplementation.__call__.__module__ = module
SourcesOperatorImplementation.__call__.__doc__ = doc

# Customize pipe method
pipe_parameters = parameters
pipe_return_annotation = f"Callable[[AsyncIterable[Any]], {new_return_annotation}]"
SourcesOperatorImplementation.pipe.__signature__ = signature.replace( # type: ignore[attr-defined]
parameters=pipe_parameters, return_annotation=pipe_return_annotation
)
SourcesOperatorImplementation.pipe.__qualname__ = name + ".pipe"
SourcesOperatorImplementation.pipe.__module__ = module
SourcesOperatorImplementation.pipe.__doc__ = (
f'Piped version of the "{name}" stream operator.'
)
if extra_doc:
SourcesOperatorImplementation.pipe.__doc__ += "\n\n " + extra_doc

# Create operator singleton
properly_named_class = type(
name,
(SourcesOperatorImplementation,),
{
"__qualname__": name,
"__module__": module,
"__doc__": doc,
},
)
operator_instance = properly_named_class()
return operator_instance
Loading
Loading