diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index ff5fb5bef7ac..fe63c1f96f11 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -74,6 +74,7 @@ if TYPE_CHECKING: import proto from apache_beam.transforms import userstate + from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import IntervalWindow try: @@ -806,6 +807,7 @@ def estimate_size(self, unused_value, nested=False): if not TYPE_CHECKING: IntervalWindow = None + GlobalWindow = None class IntervalWindowCoderImpl(StreamCoderImpl): @@ -834,6 +836,7 @@ def decode_from_stream(self, in_, nested): if not TYPE_CHECKING: global IntervalWindow # pylint: disable=global-variable-not-assigned if IntervalWindow is None: + from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import IntervalWindow # instantiating with None is not part of the public interface typed_value = IntervalWindow(None, None) # type: ignore[arg-type] @@ -841,6 +844,10 @@ def decode_from_stream(self, in_, nested): 1000 * self._to_normal_time(in_.read_bigendian_uint64())) typed_value._start_micros = ( typed_value._end_micros - 1000 * in_.read_var_int64()) + gw = GlobalWindow() + if typed_value == gw: + return gw + return typed_value def estimate_size(self, value, nested=False):