From 2325b3a66d9aa6cf4bc0159446bbffc3bd95ff25 Mon Sep 17 00:00:00 2001 From: Katya Katsenelenbogen Date: Tue, 8 Jun 2021 17:27:19 +0300 Subject: [PATCH] bug fix in reading non existing column from v3io when keys are list (#233) --- integration/test_aggregation_integration.py | 24 +++++++++++++++++++-- storey/aggregations.py | 2 +- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/integration/test_aggregation_integration.py b/integration/test_aggregation_integration.py index b44de405..2c1c9d24 100644 --- a/integration/test_aggregation_integration.py +++ b/integration/test_aggregation_integration.py @@ -1166,7 +1166,7 @@ def test_aggregate_multiple_keys(setup_teardown_test): controller = build_flow([ SyncEmitSource(), QueryByKey(['number_of_stuff_sum_1h'], - other_table, keys=['first_name', 'last_name']), + other_table, key=['first_name', 'last_name']), Reduce([], lambda acc, x: append_return(acc, x)), ]).run() @@ -1299,7 +1299,7 @@ def map_multiply(x): controller = build_flow([ SyncEmitSource(), QueryByKey(['number_of_stuff_avg_1h', 'number_of_stuff2_sum_2h'], - other_table, keys=['first_name']), + other_table, key=['first_name']), Reduce([], lambda acc, x: append_return(acc, x)), ]).run() @@ -1344,3 +1344,23 @@ def test_write_read_first_last(setup_teardown_test): assert result == [{'mykey': 'onekey', 'attr_first_1h': 0.0, 'attr_last_1h': 9.0}, {'mykey': 'onekey', 'attr_first_1h': 0.0, 'attr_last_1h': 90.0}] + + +def test_non_existing_key_query_by_key_from_v3io_key_is_list(setup_teardown_test): + table = Table(setup_teardown_test, V3ioDriver()) + df = pd.DataFrame([['katya', 'green', 'hod hasharon'], ['dina', 'blue', 'ramat gan']], columns=['name', 'color', 'city']) + controller = build_flow([ + DataframeSource(df, key_field='name'), + NoSqlTarget(table), + ]).run() + controller.await_termination() + + controller = build_flow([ + SyncEmitSource(), + QueryByKey(["color"], table, key=["name"]), + QueryByKey(["city"], table, key="name"), + ]).run() + + controller.emit({'nameeeee': 'katya'}, 'katya') + controller.terminate() + controller.await_termination() diff --git a/storey/aggregations.py b/storey/aggregations.py index c6d4ff05..0efc224b 100644 --- a/storey/aggregations.py +++ b/storey/aggregations.py @@ -268,7 +268,7 @@ async def _do(self, event): if self.key_extractor: if element: key = self.key_extractor(element) - if key is None or element is None: + if key is None or key == [None] or element is None: event.body = None await self._do_downstream(event) return