diff --git a/pynumaflow/reducer/async_server.py b/pynumaflow/reducer/async_server.py index b461cc43..2c2de147 100644 --- a/pynumaflow/reducer/async_server.py +++ b/pynumaflow/reducer/async_server.py @@ -35,7 +35,7 @@ def get_handler( """ if inspect.isfunction(reducer_handler): if len(init_args) > 0 or len(init_kwargs) > 0: - # if the init_args or init_kwargs are passed, then the reducer_handler + # if the init_args or init_kwargs are passed, then the reducer_instance # can only be of class Reducer type raise TypeError("Cannot pass function handler with init args or kwargs") # return the function handler @@ -58,7 +58,7 @@ class ReduceAsyncServer(NumaflowServer): A new servicer instance is created and attached to the server. The server instance is returned. Args: - reducer_handler: The reducer instance to be used for Reduce UDF + reducer_instance: The reducer instance to be used for Reduce UDF sock_path: The UNIX socket path to be used for the server max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; @@ -115,7 +115,7 @@ async def reduce_handler(keys: list[str], def __init__( self, - reducer_handler: ReduceCallable, + reducer_instance: ReduceCallable, init_args: tuple = (), init_kwargs: dict = None, sock_path=REDUCE_SOCK_PATH, @@ -137,7 +137,7 @@ def __init__( """ if init_kwargs is None: init_kwargs = {} - self.reducer_handler = get_handler(reducer_handler, init_args, init_kwargs) + self.reducer_handler = get_handler(reducer_instance, init_args, init_kwargs) self.sock_path = f"unix://{sock_path}" self.max_message_size = max_message_size self.max_threads = min(max_threads, MAX_NUM_THREADS) diff --git a/pynumaflow/reducestreamer/async_server.py b/pynumaflow/reducestreamer/async_server.py index c411abf5..f899e8e6 100644 --- a/pynumaflow/reducestreamer/async_server.py +++ b/pynumaflow/reducestreamer/async_server.py @@ -35,7 +35,7 @@ def get_handler( """ if inspect.isfunction(reducer_handler): if init_args or init_kwargs: - # if the init_args or init_kwargs are passed, then the reduce_stream_handler + # if the init_args or init_kwargs are passed, then the reduce_stream_instance # can only be of class ReduceStreamer type raise TypeError("Cannot pass function handler with init args or kwargs") # return the function handler @@ -60,7 +60,7 @@ class ReduceStreamAsyncServer(NumaflowServer): A new servicer instance is created and attached to the server. The server instance is returned. Args: - reduce_stream_handler: The reducer instance to be used for + reduce_stream_instance: The reducer instance to be used for Reduce Streaming UDF init_args: The arguments to be passed to the reduce_stream_handler init_kwargs: The keyword arguments to be passed to the @@ -128,7 +128,7 @@ async def reduce_handler( def __init__( self, - reduce_stream_handler: ReduceStreamCallable, + reduce_stream_instance: ReduceStreamCallable, init_args: tuple = (), init_kwargs: dict = None, sock_path=REDUCE_STREAM_SOCK_PATH, @@ -141,7 +141,7 @@ def __init__( A new servicer instance is created and attached to the server. The server instance is returned. Args: - reduce_stream_handler: The reducer instance to be used for + reduce_stream_instance: The reducer instance to be used for Reduce Streaming UDF init_args: The arguments to be passed to the reduce_stream_handler init_kwargs: The keyword arguments to be passed to the @@ -154,7 +154,7 @@ def __init__( """ if init_kwargs is None: init_kwargs = {} - self.reduce_stream_handler = get_handler(reduce_stream_handler, init_args, init_kwargs) + self.reduce_stream_handler = get_handler(reduce_stream_instance, init_args, init_kwargs) self.sock_path = f"unix://{sock_path}" self.max_message_size = max_message_size self.max_threads = min(max_threads, MAX_NUM_THREADS) diff --git a/tests/reduce/test_async_reduce.py b/tests/reduce/test_async_reduce.py index 91e3c210..3eda5854 100644 --- a/tests/reduce/test_async_reduce.py +++ b/tests/reduce/test_async_reduce.py @@ -231,7 +231,7 @@ def __stub(self): return reduce_pb2_grpc.ReduceStub(_channel) def test_error_init(self): - # Check that reducer_handler in required + # Check that reducer_instance in required with self.assertRaises(TypeError): ReduceAsyncServer() # Check that the init_args and init_kwargs are passed @@ -248,19 +248,19 @@ class ExampleBadClass: pass with self.assertRaises(TypeError): - ReduceAsyncServer(reducer_handler=ExampleBadClass) + ReduceAsyncServer(reducer_instance=ExampleBadClass) def test_max_threads(self): # max cap at 16 - server = ReduceAsyncServer(reducer_handler=ExampleClass, max_threads=32) + server = ReduceAsyncServer(reducer_instance=ExampleClass, max_threads=32) self.assertEqual(server.max_threads, 16) # use argument provided - server = ReduceAsyncServer(reducer_handler=ExampleClass, max_threads=5) + server = ReduceAsyncServer(reducer_instance=ExampleClass, max_threads=5) self.assertEqual(server.max_threads, 5) # defaults to 4 - server = ReduceAsyncServer(reducer_handler=ExampleClass) + server = ReduceAsyncServer(reducer_instance=ExampleClass) self.assertEqual(server.max_threads, 4) diff --git a/tests/reducestreamer/test_async_reduce.py b/tests/reducestreamer/test_async_reduce.py index 327f8fa9..fd7a49bd 100644 --- a/tests/reducestreamer/test_async_reduce.py +++ b/tests/reducestreamer/test_async_reduce.py @@ -262,7 +262,7 @@ def __stub(self): return reduce_pb2_grpc.ReduceStub(_channel) def test_error_init(self): - # Check that reducer_handler in required + # Check that reducer_instance in required with self.assertRaises(TypeError): ReduceStreamAsyncServer() # Check that the init_args and init_kwargs are passed @@ -279,19 +279,19 @@ class ExampleBadClass: pass with self.assertRaises(TypeError): - ReduceStreamAsyncServer(reduce_stream_handler=ExampleBadClass) + ReduceStreamAsyncServer(reduce_stream_instance=ExampleBadClass) def test_max_threads(self): # max cap at 16 - server = ReduceStreamAsyncServer(reduce_stream_handler=ExampleClass, max_threads=32) + server = ReduceStreamAsyncServer(reduce_stream_instance=ExampleClass, max_threads=32) self.assertEqual(server.max_threads, 16) # use argument provided - server = ReduceStreamAsyncServer(reduce_stream_handler=ExampleClass, max_threads=5) + server = ReduceStreamAsyncServer(reduce_stream_instance=ExampleClass, max_threads=5) self.assertEqual(server.max_threads, 5) # defaults to 4 - server = ReduceStreamAsyncServer(reduce_stream_handler=ExampleClass) + server = ReduceStreamAsyncServer(reduce_stream_instance=ExampleClass) self.assertEqual(server.max_threads, 4)