Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow GlobalWindows to be encoded as IntervalWindows #32569

Closed
wants to merge 14 commits into from
18 changes: 14 additions & 4 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
if TYPE_CHECKING:
import proto
from apache_beam.transforms import userstate
from apache_beam.transforms.window import BoundedWindow
from apache_beam.transforms.window import IntervalWindow

try:
Expand Down Expand Up @@ -806,6 +807,7 @@ def estimate_size(self, unused_value, nested=False):

if not TYPE_CHECKING:
IntervalWindow = None
BoundedWindow = None


class IntervalWindowCoderImpl(StreamCoderImpl):
Expand All @@ -822,15 +824,19 @@ def _from_normal_time(self, value):

def encode_to_stream(self, value, out, nested):
# type: (IntervalWindow, create_OutputStream, bool) -> None
typed_value = value
if not TYPE_CHECKING:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be checking this at every element encoding; instead lift the import.

global IntervalWindow # pylint: disable=global-variable-not-assigned
if IntervalWindow is None:
from apache_beam.transforms.window import IntervalWindow
typed_value = IntervalWindow.try_from_global_window(value)
span_millis = (
typed_value._end_micros // 1000 - typed_value._start_micros // 1000)
out.write_bigendian_uint64(
self._from_normal_time(typed_value._end_micros // 1000))
out.write_var_int64(span_millis)

def decode_from_stream(self, in_, nested):
# type: (create_InputStream, bool) -> IntervalWindow
# type: (create_InputStream, bool) -> BoundedWindow
if not TYPE_CHECKING:
global IntervalWindow # pylint: disable=global-variable-not-assigned
if IntervalWindow is None:
Expand All @@ -841,13 +847,17 @@ def decode_from_stream(self, in_, nested):
1000 * self._to_normal_time(in_.read_bigendian_uint64()))
typed_value._start_micros = (
typed_value._end_micros - 1000 * in_.read_var_int64())
return typed_value
return typed_value.try_to_global_window()

def estimate_size(self, value, nested=False):
# type: (Any, bool) -> int
# An IntervalWindow is context-insensitive, with a timestamp (8 bytes)
# and a varint timespam.
typed_value = value
if not TYPE_CHECKING:
global IntervalWindow # pylint: disable=global-variable-not-assigned
if IntervalWindow is None:
from apache_beam.transforms.window import IntervalWindow
typed_value = IntervalWindow.try_from_global_window(value)
span_millis = (
typed_value._end_micros // 1000 - typed_value._start_micros // 1000)
return 8 + get_varint_size(span_millis)
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/transforms/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,20 @@ def union(self, other: 'IntervalWindow') -> 'IntervalWindow':
return IntervalWindow(
min(self.start, other.start), max(self.end, other.end))

@staticmethod
def try_from_global_window(value) -> 'IntervalWindow':
gw = GlobalWindow()
if gw == value:
return IntervalWindow(gw.start, GlobalWindow._getTimestampFromProto())
return value

def try_to_global_window(self) -> BoundedWindow:
gw = GlobalWindow()
if (self.start == gw.start and
self.end == GlobalWindow._getTimestampFromProto()):
return gw
return self


V = TypeVar("V")

Expand Down
17 changes: 17 additions & 0 deletions sdks/python/apache_beam/utils/windowed_value_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from parameterized import parameterized
from parameterized import parameterized_class

import apache_beam as beam
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import Timestamp

Expand Down Expand Up @@ -75,6 +76,22 @@ def test_pickle(self):
wv = windowed_value.WindowedValue(1, 3, (), pane_info)
self.assertTrue(pickle.loads(pickle.dumps(wv)) == wv)

def test_encoding_global_window_in_interval_window(self):
input_data = ['123']

class ComputeWordLengthFn(beam.DoFn):
def process(self, element):
pass

def finish_bundle(self):
yield beam.transforms.window.GlobalWindows.windowed_value('test')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One should only emit to windows that were part of the input.


with beam.Pipeline() as p:
_ = (
p | 'create' >> beam.Create(input_data)
| beam.WindowInto(beam.transforms.window.FixedWindows(10))
| beam.ParDo(ComputeWordLengthFn()))


WINDOWED_BATCH_INSTANCES = [
windowed_value.HomogeneousWindowedBatch.of(
Expand Down
Loading