-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async class methods in pipeline #69
Comments
Hi @cje, thanks for the report! This limitation is here because it's quite tricky to bind the operator class (i.e. diff --git a/aiostream/core.py b/aiostream/core.py
index bdaedd9..f46d892 100644
--- a/aiostream/core.py
+++ b/aiostream/core.py
@@ -279,9 +279,9 @@ def operator(func=None, *, pipable=False):
signature = inspect.signature(func)
parameters = list(signature.parameters.values())
if parameters and parameters[0].name in ("self", "cls"):
- raise ValueError(
- "An operator cannot be created from a method, "
- "since the decorated function becomes an operator class"
+ parameters[0] = inspect.Parameter(
+ "original_" + parameters[0].name,
+ inspect.Parameter.POSITIONAL_OR_KEYWORD
)
# Injected parameters
@@ -296,10 +296,6 @@ def operator(func=None, *, pipable=False):
original = func
original.__qualname__ = name + ".original"
- # Raw static method
- raw = func
- raw.__qualname__ = name + ".raw"
-
# Init method
def init(self, *args, **kwargs):
if pipable and args:
@@ -317,15 +313,27 @@ def operator(func=None, *, pipable=False):
init.__module__ = module
init.__doc__ = f"Initialize the {name} stream."
- if pipable:
+ if not pipable:
+
+ # Raw static method
+ def raw(cls, *args, **kwargs):
+ return cls.original(*args, **kwargs)
+
+ # Customize raw method
+ raw.__signature__ = signature
+ raw.__qualname__ = name + ".raw"
+ raw.__module__ = module
+ raw.__doc__ = doc
+
+ else:
# Raw static method
- def raw(*args, **kwargs):
- if args:
- assert_async_iterable(args[0])
- return func(*args, **kwargs)
+ def raw(cls, *args, **kwargs):
+ assert args
+ assert_async_iterable(args[0])
+ return cls.original(*args, **kwargs)
- # Custonize raw method
+ # Customize raw method
raw.__signature__ = signature
raw.__qualname__ = name + ".raw"
raw.__module__ = module
@@ -352,17 +360,28 @@ def operator(func=None, *, pipable=False):
if extra_doc:
pipe.__doc__ += "\n\n " + extra_doc
+ # Metaclass
+ class MetaDescriptor(type(Stream)):
+
+ def __get__(self, instance, owner=None):
+ owner = owner or type(instance)
+ attrs = {
+ "original": func.__get__(instance, owner),
+ "__module__": f"{owner.__module__}.{owner.__name__}",
+ }
+ return MetaDescriptor(self.__name__, (self,), attrs)
+
# Gather attributes
attrs = {
"__init__": init,
"__module__": module,
"__doc__": doc,
- "raw": staticmethod(raw),
+ "raw": classmethod(raw),
"original": staticmethod(original),
"pipe": classmethod(pipe) if pipable else None,
}
# Create operator class
- return type(name, bases, attrs)
+ return MetaDescriptor(name, bases, attrs)
return decorator if func is None else decorator(func) I'll let it sink for a few days and let you know whether or not I think this fix is worth the trouble of maintaining this use case. In the meantime, could you elaborate a bit more about your use case? Do you use |
I hit this recently. Imo, I think it's probably more likely to be a problem when you are refactoring existing code. In my case I had implemented the chainable/builder style of If i was starting from scratch I might do this in a more aiostream "native" way. But I needed to be able to gradually replace the old API, so I ended up making a function that adapted foo - e.g. I don't like the idea that the filters need access to |
The idea of providing a rust-style method-based chaining mechanism is also in my mind, although it's not obvious how this thing should work (pipable vs non-pipable, vanilla vs custom operators, etc.).
I'm probably missing something, but couldn't you simply omit the class MyOperators:
@operator
async def random(offset=0.0, width=1.0, interval=0.1):
"""Generate a stream of random numbers."""
while True:
await asyncio.sleep(interval)
yield offset + width * random_module.random()
@operator(pipable=True)
async def power(source, exponent):
"""Raise the elements of an asynchronous sequence to the given power."""
async with streamcontext(source) as streamer:
async for item in streamer:
yield item ** exponent
@operator(pipable=True)
def square(source):
"""Square the elements of an asynchronous sequence."""
return MyOperators.power.raw(source, 2)
async def main(self):
xs = (
self.random() # Stream random numbers
| self.square.pipe() # Square the values
| pipe.accumulate()
| pipe.take(3)
| pipe.list()
) # Sum the values
print(await xs) |
The chaining pattern I had going was pre-aiostreams, I hadn't thought about applying it to aiostream - I actually like the pipe stuff (though i wish i could make the .pipe() constructor the default for custom operators). I think for aiostreams auto-generating a chaining interface would require some fairly awful looking dynamic code that might not gel well with typing and probably isn't worth it?
Ah I think I haven't been very clear here, i wasn't looking for a solution, I was just weighing in on the original problem and saying I probably didn't find the usecase for pipable operators being on a stateful class very compelling (as you seemed unsure as whether this change was something you wanted to apply and have to maintain). I was saying personally i prefer the idea of my pipable operators being "pure", standalone and easy to test. I would try to avoid putting them on a class. And i'd do namespacing with modules. |
My thoughts exactly 😁 (although the operator classes are already generated dynamically and causing some linting issues).
Oh got it, thanks for your feedback!
Hmm right, it's probably something the lib should promote too.
Well namespacing with classes is not a problem either as long as you don't use |
@vxgmichel Hey, I'm pretty late to this conversation, and can't speak to the difficulty of implementing these pipable operators as methods. I'm a big fan of this library, but I can't help but wonder if perhaps async python is the wrong domain to argue for functional purity, as @Jc2k was doing. I can definitely see the appeal in the argument, and I wholeheartedly agree that in many cases, pure functions are more testable, more maintainable, and just generally more elegant. With that in mind, I hope you'll forgive me if I offer a good-faith argument for why functional purity is an unreasonable expectation in asynchronous settings, and I hope you'll further forgive me for erring on the side of over-explaining my point. Pure functions are inextricably side-effect free. They usually have no internal state (except, perhaps some value cache or something). Indeed, this is usually what makes them so attractive to work with, since it makes their behavior very predictable. However, as the name would suggest, the asyncio package is principally designed with IO operations in mind. IO operations are included in a program with the sole purpose of allowing side-effects. In many practical situations, we require data to flow in and out of our program through "side-channels" (typically files, databases, network calls, etc.), with the goals of modifying the behavior of our program based on the in-channels, and causing some persistent change on the world outside the program through the out-channels. Channels in and out of our program have side effects as a feature, not as a bug. As for internal state, I often find during IO operations that some behavior-modifying internal state is absolutely unavoidable. For example, if I have to make a large number of HTTP requests to some API, I have to pipe these requests to a network call throttling proxy, which keeps me from exceeding that API's rate limits. This proxy needs to use internal state to keep track of how many requests it has made during a specified interval in time, when the current running request counter will reset, and a queue of all the calls loaded into the proxy during the wait time. |
Hi @MaxwellWibert, thanks for sharing your thoughts :) I agree with pretty much everything you say, there's no real point in sticking to functional purity in the context of a lib like The problem as I said at in my first post is that:
So it's not trivial to implement, possibly painful to maintain, and very hard to type correctly. And there are alternatives to that which are:
What is your opinion about this? |
I have an existing project which I'd like to convert to using your library. I've modified one of your examples (listed at the foot of this message) to show the structure I'm aiming for, ie: using async class methods as steps in a transformation pipeline.
This does not work due to the presence of the @operator decorator on class methods, and fails with the message:
Is there some alternative way in which I can have an async class methods process objects in the pipeline? I'd prefer keep all of the logic in the pipeline code if I can, and all of the existing logic in the existing classes.
The text was updated successfully, but these errors were encountered: