From 101d675b92463d8ffd31fd80932ca5a4f5e8bd0f Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 07:50:20 -0500 Subject: [PATCH 01/12] create unit test --- .../apache_beam/typehints/typed_pipeline_test.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 9cb3fcdbb91d..23a385e40a97 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -1005,5 +1005,21 @@ def filter_fn(element: int) -> bool: self.assertEqual(th.output_types, ((int, ), {})) +class TestFlatMapTuple(unittest.TestCase): + def test_flatmap(self): + from typing import Tuple + + def identity(x: Tuple[str, int]) -> Tuple[str, int]: + return x + + with beam.Pipeline() as p: + with beam.Pipeline() as p: + ( + p + | "Generate input" >> beam.Create([('P1', [2])]) + | "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs]) + | "Identity" >> beam.Map(identity)) + + if __name__ == '__main__': unittest.main() From b06cf9bab21a9239bb79f6940dd8c75344d882c5 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 07:57:19 -0500 Subject: [PATCH 02/12] minimize to not using flatmaptuple --- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index c7cb18b66384..24cc08c933b8 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -1006,12 +1006,17 @@ def test_flatmap(self): def identity(x: Tuple[str, int]) -> Tuple[str, int]: return x + def inner(k, vs): + return [(k, v) for v in vs] + + wrapper = lambda x: inner(*x) with beam.Pipeline() as p: with beam.Pipeline() as p: ( p | "Generate input" >> beam.Create([('P1', [2])]) - | "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs]) + #| "Flat" >> beam.FlatMapTuple(inner) + | "Flat" >> beam.FlatMap(wrapper) | "Identity" >> beam.Map(identity)) From e7fe17e8817450a0c534070c2fec7b5ecb85fcb8 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 08:15:11 -0500 Subject: [PATCH 03/12] fix by adding a tuple conersion in flatmaptuple --- sdks/python/apache_beam/transforms/core.py | 2 +- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 9c798d3ce6dc..4d1678d72a69 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2238,7 +2238,7 @@ def FlatMapTuple(fn, *args, **kwargs): # pylint: disable=invalid-name if defaults or args or kwargs: wrapper = lambda x, *args, **kwargs: fn(*(tuple(x) + args), **kwargs) else: - wrapper = lambda x: fn(*x) + wrapper = lambda x: fn(*tuple(x)) # Proxy the type-hint information from the original function to this new # wrapped function. diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 24cc08c933b8..c7cb18b66384 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -1006,17 +1006,12 @@ def test_flatmap(self): def identity(x: Tuple[str, int]) -> Tuple[str, int]: return x - def inner(k, vs): - return [(k, v) for v in vs] - - wrapper = lambda x: inner(*x) with beam.Pipeline() as p: with beam.Pipeline() as p: ( p | "Generate input" >> beam.Create([('P1', [2])]) - #| "Flat" >> beam.FlatMapTuple(inner) - | "Flat" >> beam.FlatMap(wrapper) + | "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs]) | "Identity" >> beam.Map(identity)) From 05be9d186cb4f42886511abc4c92ca14b25e2c38 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 08:29:57 -0500 Subject: [PATCH 04/12] add comment referring to ticket --- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index c7cb18b66384..3001cb0b144f 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -20,6 +20,7 @@ # pytype: skip-file import typing +from typing import Tuple import unittest import apache_beam as beam @@ -1000,14 +1001,16 @@ def filter_fn(element: int) -> bool: class TestFlatMapTuple(unittest.TestCase): - def test_flatmap(self): - from typing import Tuple + def test_flatmaptuple(self): + # Regression test. See + # https://github.com/apache/beam/issues/33014 def identity(x: Tuple[str, int]) -> Tuple[str, int]: return x with beam.Pipeline() as p: with beam.Pipeline() as p: + # Just checking that this doesn't raise an exception. ( p | "Generate input" >> beam.Create([('P1', [2])]) From 5f96f92defd59b5be333cbb06e3828710523993c Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 08:47:29 -0500 Subject: [PATCH 05/12] remove extra pipeline --- .../apache_beam/typehints/typed_pipeline_test.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 3001cb0b144f..57c68aafc1c8 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -1009,13 +1009,12 @@ def identity(x: Tuple[str, int]) -> Tuple[str, int]: return x with beam.Pipeline() as p: - with beam.Pipeline() as p: - # Just checking that this doesn't raise an exception. - ( - p - | "Generate input" >> beam.Create([('P1', [2])]) - | "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs]) - | "Identity" >> beam.Map(identity)) + # Just checking that this doesn't raise an exception. + ( + p + | "Generate input" >> beam.Create([('P1', [2])]) + | "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs]) + | "Identity" >> beam.Map(identity)) if __name__ == '__main__': From 5d2a83eb18109fa13b63aed11549a6f7dd36ce85 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 10:57:37 -0500 Subject: [PATCH 06/12] manually isort --- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 57c68aafc1c8..820f78fa9ef5 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -20,8 +20,8 @@ # pytype: skip-file import typing -from typing import Tuple import unittest +from typing import Tuple import apache_beam as beam from apache_beam import pvalue From 5f9eb87d8e54a8a18e128b334a2ed08e762be715 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 12:17:45 -0500 Subject: [PATCH 07/12] retrigger builder --- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 820f78fa9ef5..442c52267a82 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -22,7 +22,6 @@ import typing import unittest from typing import Tuple - import apache_beam as beam from apache_beam import pvalue from apache_beam import typehints From 8f798ff91a876bf7a79b15db03185d6d556bb209 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 12:17:53 -0500 Subject: [PATCH 08/12] retrigger builder --- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 442c52267a82..0c82eeba0206 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -23,6 +23,7 @@ import unittest from typing import Tuple import apache_beam as beam + from apache_beam import pvalue from apache_beam import typehints from apache_beam.options.pipeline_options import OptionsContext From 0a2c7264d2239b39fee9a07b5c055e07457aabdd Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 12:54:56 -0500 Subject: [PATCH 09/12] isort? --- sdks/python/apache_beam/typehints/trivial_inference_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py index 48ccc8a6a2ed..83bcda31db5a 100644 --- a/sdks/python/apache_beam/typehints/trivial_inference_test.py +++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py @@ -21,8 +21,8 @@ import types import unittest - import apache_beam as beam + from apache_beam.typehints import row_type from apache_beam.typehints import trivial_inference from apache_beam.typehints import typehints From a0fac321a15a07169fb27e217b61be3edc73d157 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 14:38:31 -0500 Subject: [PATCH 10/12] try manually isorting again --- .../typehints/trivial_inference_test.py | 2 +- .../typehints/typed_pipeline_test.py | 20 +------------------ 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py index 83bcda31db5a..48ccc8a6a2ed 100644 --- a/sdks/python/apache_beam/typehints/trivial_inference_test.py +++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py @@ -21,8 +21,8 @@ import types import unittest -import apache_beam as beam +import apache_beam as beam from apache_beam.typehints import row_type from apache_beam.typehints import trivial_inference from apache_beam.typehints import typehints diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 0c82eeba0206..44318fa44a8c 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -21,9 +21,8 @@ import typing import unittest -from typing import Tuple -import apache_beam as beam +import apache_beam as beam from apache_beam import pvalue from apache_beam import typehints from apache_beam.options.pipeline_options import OptionsContext @@ -1000,22 +999,5 @@ def filter_fn(element: int) -> bool: self.assertEqual(th.output_types, ((int, ), {})) -class TestFlatMapTuple(unittest.TestCase): - def test_flatmaptuple(self): - # Regression test. See - # https://github.com/apache/beam/issues/33014 - - def identity(x: Tuple[str, int]) -> Tuple[str, int]: - return x - - with beam.Pipeline() as p: - # Just checking that this doesn't raise an exception. - ( - p - | "Generate input" >> beam.Create([('P1', [2])]) - | "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs]) - | "Identity" >> beam.Map(identity)) - - if __name__ == '__main__': unittest.main() From e2a53e83e84395671b7abe547ed315a03775b16a Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 15:10:22 -0500 Subject: [PATCH 11/12] Revert "try manually isorting again" This reverts commit a0fac321a15a07169fb27e217b61be3edc73d157. --- .../typehints/trivial_inference_test.py | 2 +- .../typehints/typed_pipeline_test.py | 20 ++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py index 48ccc8a6a2ed..83bcda31db5a 100644 --- a/sdks/python/apache_beam/typehints/trivial_inference_test.py +++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py @@ -21,8 +21,8 @@ import types import unittest - import apache_beam as beam + from apache_beam.typehints import row_type from apache_beam.typehints import trivial_inference from apache_beam.typehints import typehints diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 44318fa44a8c..0c82eeba0206 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -21,8 +21,9 @@ import typing import unittest - +from typing import Tuple import apache_beam as beam + from apache_beam import pvalue from apache_beam import typehints from apache_beam.options.pipeline_options import OptionsContext @@ -999,5 +1000,22 @@ def filter_fn(element: int) -> bool: self.assertEqual(th.output_types, ((int, ), {})) +class TestFlatMapTuple(unittest.TestCase): + def test_flatmaptuple(self): + # Regression test. See + # https://github.com/apache/beam/issues/33014 + + def identity(x: Tuple[str, int]) -> Tuple[str, int]: + return x + + with beam.Pipeline() as p: + # Just checking that this doesn't raise an exception. + ( + p + | "Generate input" >> beam.Create([('P1', [2])]) + | "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs]) + | "Identity" >> beam.Map(identity)) + + if __name__ == '__main__': unittest.main() From 9260db4355bcc2b02d4bc587b058ec8f561ccf27 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 6 Dec 2024 15:10:54 -0500 Subject: [PATCH 12/12] manually fix isort --- sdks/python/apache_beam/typehints/trivial_inference_test.py | 2 +- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py index 83bcda31db5a..48ccc8a6a2ed 100644 --- a/sdks/python/apache_beam/typehints/trivial_inference_test.py +++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py @@ -21,8 +21,8 @@ import types import unittest -import apache_beam as beam +import apache_beam as beam from apache_beam.typehints import row_type from apache_beam.typehints import trivial_inference from apache_beam.typehints import typehints diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 0c82eeba0206..820f78fa9ef5 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -22,8 +22,8 @@ import typing import unittest from typing import Tuple -import apache_beam as beam +import apache_beam as beam from apache_beam import pvalue from apache_beam import typehints from apache_beam.options.pipeline_options import OptionsContext