From 3d3168773b29edab2a7de19facdd0219fdc98c75 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 10:12:13 -0400 Subject: [PATCH 01/14] Try to repro failure --- .github/trigger_files/beam_PostCommit_Python.json | 2 +- sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 2934a91b84b1e..30ee463ad4e96 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 2 } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index b0140793cf79e..bb5c36a3e9b99 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -470,7 +470,7 @@ def test_big_query_write_insert_non_transient_api_call_error(self): input_data = [{ 'number': 1, 'str': 'some_string', - }] + }]*500 table_schema = { "fields": [{ @@ -483,7 +483,7 @@ def test_big_query_write_insert_non_transient_api_call_error(self): bq_result_errors = [({ 'number': 1, 'str': 'some_string', - }, "Not Found")] + }, "Not Found")]*500 args = self.test_pipeline.get_full_options_as_args() From 3a472e0340149954ef9c56e2657f6fd873a72549 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 10:13:35 -0400 Subject: [PATCH 02/14] More suites --- .../beam_PostCommit_Python_ValidatesContainer_Dataflow.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json index d6c608f6dabab..a88b73a2acdac 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json @@ -1,3 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": "1" } \ No newline at end of file From 65e382594e363f2a5c8c1fddbcc71c30edcc8b6e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 12:43:38 -0400 Subject: [PATCH 03/14] Got the repro --- sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index bb5c36a3e9b99..cd3edf19de5fc 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -470,7 +470,7 @@ def test_big_query_write_insert_non_transient_api_call_error(self): input_data = [{ 'number': 1, 'str': 'some_string', - }]*500 + }] table_schema = { "fields": [{ @@ -483,7 +483,7 @@ def test_big_query_write_insert_non_transient_api_call_error(self): bq_result_errors = [({ 'number': 1, 'str': 'some_string', - }, "Not Found")]*500 + }, "Not Found")] args = self.test_pipeline.get_full_options_as_args() @@ -491,6 +491,7 @@ def test_big_query_write_insert_non_transient_api_call_error(self): # pylint: disable=expression-not-assigned errors = ( p | 'create' >> beam.Create(input_data) + | beam.WindowInto(beam.transforms.window.FixedWindows(10)) | 'write' >> beam.io.WriteToBigQuery( table_id, schema=table_schema, From 948eb33fa8a1fd448ad526376843c33c62f69c1b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 14:49:22 -0400 Subject: [PATCH 04/14] Simpler repro of problem --- .../trigger_files/beam_PostCommit_Python.json | 2 +- ...ommit_Python_ValidatesContainer_Dataflow.json | 3 +-- .../apache_beam/io/gcp/bigquery_write_it_test.py | 1 - .../apache_beam/utils/windowed_value_test.py | 16 ++++++++++++++++ 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 30ee463ad4e96..2934a91b84b1e 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json index a88b73a2acdac..d6c608f6dabab 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json @@ -1,4 +1,3 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": "1" + "comment": "Modify this file in a trivial way to cause this test suite to run" } \ No newline at end of file diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index cd3edf19de5fc..b0140793cf79e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -491,7 +491,6 @@ def test_big_query_write_insert_non_transient_api_call_error(self): # pylint: disable=expression-not-assigned errors = ( p | 'create' >> beam.Create(input_data) - | beam.WindowInto(beam.transforms.window.FixedWindows(10)) | 'write' >> beam.io.WriteToBigQuery( table_id, schema=table_schema, diff --git a/sdks/python/apache_beam/utils/windowed_value_test.py b/sdks/python/apache_beam/utils/windowed_value_test.py index 1e4892aa9bd32..313850c23252b 100644 --- a/sdks/python/apache_beam/utils/windowed_value_test.py +++ b/sdks/python/apache_beam/utils/windowed_value_test.py @@ -27,6 +27,7 @@ from parameterized import parameterized from parameterized import parameterized_class +import apache_beam as beam from apache_beam.utils import windowed_value from apache_beam.utils.timestamp import Timestamp @@ -75,6 +76,21 @@ def test_pickle(self): wv = windowed_value.WindowedValue(1, 3, (), pane_info) self.assertTrue(pickle.loads(pickle.dumps(wv)) == wv) + def test_encoding_global_window_in_interval_window(self): + input_data = ['123'] + + class ComputeWordLengthFn(beam.DoFn): + def process(self, element): + pass + + def finish_bundle(self): + yield beam.transforms.window.GlobalWindows.windowed_value('test') + + with beam.Pipeline() as p: + (p | 'create' >> beam.Create(input_data) + | beam.WindowInto(beam.transforms.window.FixedWindows(10)) + | beam.ParDo(ComputeWordLengthFn())) + WINDOWED_BATCH_INSTANCES = [ windowed_value.HomogeneousWindowedBatch.of( From 280691bec5a88c09b8f04cbc1e9547d1482da1c3 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 16:01:00 -0400 Subject: [PATCH 05/14] Fix coder issues --- sdks/python/apache_beam/transforms/window.py | 4 ++-- sdks/python/apache_beam/utils/windowed_value_test.py | 9 +++++---- sdks/python/setup.py | 5 +++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 592164a5ef49b..4871613ae6bb3 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -300,7 +300,7 @@ def __lt__(self, other): return self.timestamp < other.timestamp -class GlobalWindow(BoundedWindow): +class GlobalWindow(IntervalWindow): """The default window into which all data is placed (via GlobalWindows).""" _instance: Optional['GlobalWindow'] = None @@ -310,7 +310,7 @@ def __new__(cls): return cls._instance def __init__(self) -> None: - super().__init__(GlobalWindow._getTimestampFromProto()) + super().__init__(MIN_TIMESTAMP, GlobalWindow._getTimestampFromProto()) def __repr__(self): return 'GlobalWindow' diff --git a/sdks/python/apache_beam/utils/windowed_value_test.py b/sdks/python/apache_beam/utils/windowed_value_test.py index 313850c23252b..4bc56ef0f7cb4 100644 --- a/sdks/python/apache_beam/utils/windowed_value_test.py +++ b/sdks/python/apache_beam/utils/windowed_value_test.py @@ -82,14 +82,15 @@ def test_encoding_global_window_in_interval_window(self): class ComputeWordLengthFn(beam.DoFn): def process(self, element): pass - + def finish_bundle(self): yield beam.transforms.window.GlobalWindows.windowed_value('test') with beam.Pipeline() as p: - (p | 'create' >> beam.Create(input_data) - | beam.WindowInto(beam.transforms.window.FixedWindows(10)) - | beam.ParDo(ComputeWordLengthFn())) + ( + p | 'create' >> beam.Create(input_data) + | beam.WindowInto(beam.transforms.window.FixedWindows(10)) + | beam.ParDo(ComputeWordLengthFn())) WINDOWED_BATCH_INSTANCES = [ diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 7bcff2bacfd2e..9d1a14e475ca0 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -194,7 +194,8 @@ def generate_protos_first(): check=True) print(out.stdout) except subprocess.CalledProcessError as err: - raise RuntimeError('Could not generate protos due to error: %s', err.stderr) + # raise RuntimeError('Could not generate protos due to error: %s', err.stderr) + return def copy_tests_from_docs(): @@ -285,7 +286,7 @@ def get_portability_package_data(): # executes below. generate_protos_first() - generate_external_transform_wrappers() + # generate_external_transform_wrappers() # These data files live elsewhere in the full Beam repository. copy_tests_from_docs() From fed480e29830bcd6d6e5002b5c88a4b2f4c27a94 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 16:09:07 -0400 Subject: [PATCH 06/14] clean up equals --- sdks/python/apache_beam/transforms/window.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 4871613ae6bb3..c99e6c757a808 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -260,6 +260,13 @@ def __lt__(self, other): if self.end != other.end: return self.end < other.end return hash(self) < hash(other) + + def __eq__(self, other): + return ( + self is other or + (type(self) is type(other) and + self.end == other.end and + self.start == other.start)) def intersects(self, other: 'IntervalWindow') -> bool: return other.start < self.end or self.start < other.end @@ -319,8 +326,10 @@ def __hash__(self): return hash(type(self)) def __eq__(self, other): - # Global windows are always and only equal to each other. - return self is other or type(self) is type(other) + return ( + self is other or + type(self) is type(other) or + (type(other) is IntervalWindow and other.__eq__(self))) @property def start(self) -> Timestamp: From e2705dde45230b1154b3ef47e8abb6973f2742bb Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 16:12:33 -0400 Subject: [PATCH 07/14] fully translate back --- sdks/python/apache_beam/coders/coder_impl.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index ff5fb5bef7ac9..fe63c1f96f11d 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -74,6 +74,7 @@ if TYPE_CHECKING: import proto from apache_beam.transforms import userstate + from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import IntervalWindow try: @@ -806,6 +807,7 @@ def estimate_size(self, unused_value, nested=False): if not TYPE_CHECKING: IntervalWindow = None + GlobalWindow = None class IntervalWindowCoderImpl(StreamCoderImpl): @@ -834,6 +836,7 @@ def decode_from_stream(self, in_, nested): if not TYPE_CHECKING: global IntervalWindow # pylint: disable=global-variable-not-assigned if IntervalWindow is None: + from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import IntervalWindow # instantiating with None is not part of the public interface typed_value = IntervalWindow(None, None) # type: ignore[arg-type] @@ -841,6 +844,10 @@ def decode_from_stream(self, in_, nested): 1000 * self._to_normal_time(in_.read_bigendian_uint64())) typed_value._start_micros = ( typed_value._end_micros - 1000 * in_.read_var_int64()) + gw = GlobalWindow() + if typed_value == gw: + return gw + return typed_value def estimate_size(self, value, nested=False): From 8e76e1aee1b03fd07064e895791775b29ef66886 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 16:13:26 -0400 Subject: [PATCH 08/14] revert setup changes --- sdks/python/setup.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 9d1a14e475ca0..7bcff2bacfd2e 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -194,8 +194,7 @@ def generate_protos_first(): check=True) print(out.stdout) except subprocess.CalledProcessError as err: - # raise RuntimeError('Could not generate protos due to error: %s', err.stderr) - return + raise RuntimeError('Could not generate protos due to error: %s', err.stderr) def copy_tests_from_docs(): @@ -286,7 +285,7 @@ def get_portability_package_data(): # executes below. generate_protos_first() - # generate_external_transform_wrappers() + generate_external_transform_wrappers() # These data files live elsewhere in the full Beam repository. copy_tests_from_docs() From a67a9d851896683df21080b9099334bb2cc6056b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 16:47:19 -0400 Subject: [PATCH 09/14] Add hash function --- sdks/python/apache_beam/coders/coder_impl.py | 2 +- sdks/python/apache_beam/transforms/window.py | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index fe63c1f96f11d..1b33f2365ead1 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -847,7 +847,7 @@ def decode_from_stream(self, in_, nested): gw = GlobalWindow() if typed_value == gw: return gw - + return typed_value def estimate_size(self, value, nested=False): diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index c99e6c757a808..7da58c293932e 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -260,13 +260,15 @@ def __lt__(self, other): if self.end != other.end: return self.end < other.end return hash(self) < hash(other) - + def __eq__(self, other): return ( - self is other or - (type(self) is type(other) and - self.end == other.end and - self.start == other.start)) + self is other or ( + type(self) is type(other) and self.end == other.end and + self.start == other.start)) + + def __hash__(self): + return hash((self.start, self.end)) def intersects(self, other: 'IntervalWindow') -> bool: return other.start < self.end or self.start < other.end @@ -327,9 +329,8 @@ def __hash__(self): def __eq__(self, other): return ( - self is other or - type(self) is type(other) or - (type(other) is IntervalWindow and other.__eq__(self))) + self is other or type(self) is type(other) or + (type(other) is IntervalWindow and other.__eq__(self))) @property def start(self) -> Timestamp: From e76d4c65a05ce17a2666da52b74c284f9fca391f Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 20:18:47 -0400 Subject: [PATCH 10/14] use member functions instead of inheritance --- sdks/python/apache_beam/coders/coder_impl.py | 21 +++++++------- sdks/python/apache_beam/transforms/window.py | 30 +++++++++++--------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 1b33f2365ead1..6c664e88fe6ff 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -74,7 +74,6 @@ if TYPE_CHECKING: import proto from apache_beam.transforms import userstate - from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import IntervalWindow try: @@ -807,7 +806,6 @@ def estimate_size(self, unused_value, nested=False): if not TYPE_CHECKING: IntervalWindow = None - GlobalWindow = None class IntervalWindowCoderImpl(StreamCoderImpl): @@ -824,7 +822,11 @@ def _from_normal_time(self, value): def encode_to_stream(self, value, out, nested): # type: (IntervalWindow, create_OutputStream, bool) -> None - typed_value = value + if not TYPE_CHECKING: + global IntervalWindow # pylint: disable=global-variable-not-assigned + if IntervalWindow is None: + from apache_beam.transforms.window import IntervalWindow + typed_value = IntervalWindow.try_from_global_window(value) span_millis = ( typed_value._end_micros // 1000 - typed_value._start_micros // 1000) out.write_bigendian_uint64( @@ -836,7 +838,6 @@ def decode_from_stream(self, in_, nested): if not TYPE_CHECKING: global IntervalWindow # pylint: disable=global-variable-not-assigned if IntervalWindow is None: - from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import IntervalWindow # instantiating with None is not part of the public interface typed_value = IntervalWindow(None, None) # type: ignore[arg-type] @@ -844,17 +845,17 @@ def decode_from_stream(self, in_, nested): 1000 * self._to_normal_time(in_.read_bigendian_uint64())) typed_value._start_micros = ( typed_value._end_micros - 1000 * in_.read_var_int64()) - gw = GlobalWindow() - if typed_value == gw: - return gw - - return typed_value + return typed_value.try_to_global_window() def estimate_size(self, value, nested=False): # type: (Any, bool) -> int # An IntervalWindow is context-insensitive, with a timestamp (8 bytes) # and a varint timespam. - typed_value = value + if not TYPE_CHECKING: + global IntervalWindow # pylint: disable=global-variable-not-assigned + if IntervalWindow is None: + from apache_beam.transforms.window import IntervalWindow + typed_value = IntervalWindow.try_from_global_window(value) span_millis = ( typed_value._end_micros // 1000 - typed_value._start_micros // 1000) return 8 + get_varint_size(span_millis) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 7da58c293932e..691b2b567e658 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -261,21 +261,25 @@ def __lt__(self, other): return self.end < other.end return hash(self) < hash(other) - def __eq__(self, other): - return ( - self is other or ( - type(self) is type(other) and self.end == other.end and - self.start == other.start)) - - def __hash__(self): - return hash((self.start, self.end)) - def intersects(self, other: 'IntervalWindow') -> bool: return other.start < self.end or self.start < other.end def union(self, other: 'IntervalWindow') -> 'IntervalWindow': return IntervalWindow( min(self.start, other.start), max(self.end, other.end)) + + @staticmethod + def try_from_global_window(value) -> 'IntervalWindow': + gw = GlobalWindow() + if gw == value: + return IntervalWindow(gw.start, GlobalWindow._getTimestampFromProto()) + return value + + def try_to_global_window(self) -> BoundedWindow: + gw = GlobalWindow() + if self.start == gw.start and self.end == GlobalWindow._getTimestampFromProto(): + return gw + return IntervalWindow(gw.start(), GlobalWindow._getTimestampFromProto()) V = TypeVar("V") @@ -309,7 +313,7 @@ def __lt__(self, other): return self.timestamp < other.timestamp -class GlobalWindow(IntervalWindow): +class GlobalWindow(BoundedWindow): """The default window into which all data is placed (via GlobalWindows).""" _instance: Optional['GlobalWindow'] = None @@ -319,7 +323,7 @@ def __new__(cls): return cls._instance def __init__(self) -> None: - super().__init__(MIN_TIMESTAMP, GlobalWindow._getTimestampFromProto()) + super().__init__(GlobalWindow._getTimestampFromProto()) def __repr__(self): return 'GlobalWindow' @@ -328,9 +332,7 @@ def __hash__(self): return hash(type(self)) def __eq__(self, other): - return ( - self is other or type(self) is type(other) or - (type(other) is IntervalWindow and other.__eq__(self))) + return self is other or type(self) is type(other) @property def start(self) -> Timestamp: From 674d3caba0aa3921959e78d89040245970d600b1 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 20:49:24 -0400 Subject: [PATCH 11/14] ssmall fix --- sdks/python/apache_beam/transforms/window.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 691b2b567e658..1f1acdfbc5a13 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -267,19 +267,20 @@ def intersects(self, other: 'IntervalWindow') -> bool: def union(self, other: 'IntervalWindow') -> 'IntervalWindow': return IntervalWindow( min(self.start, other.start), max(self.end, other.end)) - + @staticmethod def try_from_global_window(value) -> 'IntervalWindow': gw = GlobalWindow() if gw == value: return IntervalWindow(gw.start, GlobalWindow._getTimestampFromProto()) return value - + def try_to_global_window(self) -> BoundedWindow: gw = GlobalWindow() - if self.start == gw.start and self.end == GlobalWindow._getTimestampFromProto(): + if (self.start == gw.start and + self.end == GlobalWindow._getTimestampFromProto()): return gw - return IntervalWindow(gw.start(), GlobalWindow._getTimestampFromProto()) + return IntervalWindow(gw.start, GlobalWindow._getTimestampFromProto()) V = TypeVar("V") @@ -332,6 +333,7 @@ def __hash__(self): return hash(type(self)) def __eq__(self, other): + # Global windows are always and only equal to each other. return self is other or type(self) is type(other) @property From 67c8349a024f26584a3a242bdd02a58f6438f644 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 21:15:16 -0400 Subject: [PATCH 12/14] fix to_global_window --- sdks/python/apache_beam/transforms/window.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 1f1acdfbc5a13..33ae8bb0946c6 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -280,7 +280,7 @@ def try_to_global_window(self) -> BoundedWindow: if (self.start == gw.start and self.end == GlobalWindow._getTimestampFromProto()): return gw - return IntervalWindow(gw.start, GlobalWindow._getTimestampFromProto()) + return self V = TypeVar("V") From 4e0ad877043fe9cbb5717ff485516357f36c452a Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 26 Sep 2024 21:59:20 -0400 Subject: [PATCH 13/14] return type --- sdks/python/apache_beam/coders/coder_impl.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 6c664e88fe6ff..8220f54e05f42 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -74,6 +74,7 @@ if TYPE_CHECKING: import proto from apache_beam.transforms import userstate + from apache_beam.transforms.window import BoundedWindow from apache_beam.transforms.window import IntervalWindow try: @@ -806,6 +807,7 @@ def estimate_size(self, unused_value, nested=False): if not TYPE_CHECKING: IntervalWindow = None + BoundedWindow = None class IntervalWindowCoderImpl(StreamCoderImpl): @@ -834,7 +836,7 @@ def encode_to_stream(self, value, out, nested): out.write_var_int64(span_millis) def decode_from_stream(self, in_, nested): - # type: (create_InputStream, bool) -> IntervalWindow + # type: (create_InputStream, bool) -> BoundedWindow if not TYPE_CHECKING: global IntervalWindow # pylint: disable=global-variable-not-assigned if IntervalWindow is None: From 82a05fee9fe45ceb0677463c7f0bb29fa551a436 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 27 Sep 2024 05:45:18 -0400 Subject: [PATCH 14/14] lint --- sdks/python/apache_beam/utils/windowed_value_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/windowed_value_test.py b/sdks/python/apache_beam/utils/windowed_value_test.py index 4bc56ef0f7cb4..36a5e8d57fba2 100644 --- a/sdks/python/apache_beam/utils/windowed_value_test.py +++ b/sdks/python/apache_beam/utils/windowed_value_test.py @@ -87,7 +87,7 @@ def finish_bundle(self): yield beam.transforms.window.GlobalWindows.windowed_value('test') with beam.Pipeline() as p: - ( + _ = ( p | 'create' >> beam.Create(input_data) | beam.WindowInto(beam.transforms.window.FixedWindows(10)) | beam.ParDo(ComputeWordLengthFn()))