diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index a1eeeba02c4b..eecebde3d693 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -244,7 +244,7 @@ func (h *runner) ExecuteTransform(stageID, tid string, t *pipepb.PTransform, com kc := coders[kcID] ec := coders[ecID] - data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders, watermark)) + data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders)) if len(data[0]) == 0 { panic("no data for GBK") } @@ -290,7 +290,7 @@ func windowingStrategy(comps *pipepb.Components, tid string) *pipepb.WindowingSt } // gbkBytes re-encodes gbk inputs in a gbk result. -func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder, watermark mtime.Time) []byte { +func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder) []byte { // Pick how the timestamp of the aggregated output is computed. var outputTime func(typex.Window, mtime.Time, mtime.Time) mtime.Time switch ws.GetOutputTime() { @@ -333,9 +333,8 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat kd := pullDecoder(kc, coders) vd := pullDecoder(vc, coders) - // Right, need to get the key coder, and the element coder. - // Cus I'll need to pull out anything the runner knows how to deal with. - // And repeat. + // Aggregate by windows and keys, using the window coder and KV coders. + // We need to extract and split the key bytes from the element bytes. for _, data := range toAggregate { // Parse out each element's data, and repeat. buf := bytes.NewBuffer(data) @@ -388,34 +387,41 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat } // Use a decreasing sort (latest to earliest) so we can correct // the output timestamp to the new end of window immeadiately. - // TODO need to correct this if output time is different. sort.Slice(ordered, func(i, j int) bool { return ordered[i].MaxTimestamp() > ordered[j].MaxTimestamp() }) cur := ordered[0] sessionData := windows[cur] + delete(windows, cur) for _, iw := range ordered[1:] { - // If they overlap, then we merge the data. + // Check if the gap between windows is less than the gapSize. + // If not, this window is done, and we start a next window. if iw.End+gapSize < cur.Start { - // Start a new session. + // Store current data with the current window. windows[cur] = sessionData + // Use the incoming window instead, and clear it from the map. cur = iw sessionData = windows[iw] + delete(windows, cur) + // There's nothing to merge, since we've just started with this windowed data. continue } - // Extend the session + // Extend the session with the incoming window, and merge the the incoming window's data. cur.Start = iw.Start toMerge := windows[iw] delete(windows, iw) for k, kt := range toMerge { skt := sessionData[k] + // Ensure the output time matches the given function. + skt.time = outputTime(cur, kt.time, skt.time) skt.key = kt.key skt.w = cur skt.values = append(skt.values, kt.values...) sessionData[k] = skt } } + windows[cur] = sessionData } // Everything's aggregated! // Time to turn things into a windowed KV> diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index f1ccf66a2289..324fe5a17b54 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -40,7 +40,9 @@ from apache_beam.runners.portability import portable_runner_test from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms import window from apache_beam.transforms.sql import SqlTransform +from apache_beam.utils import timestamp # Run as # @@ -178,6 +180,26 @@ def create_options(self): return options + # Slightly more robust session window test: + # Validates that an inner grouping doesn't duplicate data either. + # Copied also because the timestamp in fn_runner_test.py isn't being + # inferred correctly as seconds for some reason, but as micros. + # The belabored specification is validating the timestamp type works at least. + # See https://github.com/apache/beam/issues/32085 + def test_windowing(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create([1, 2, 100, 101, 102, 123]) + | beam.Map( + lambda t: window.TimestampedValue( + ('k', t), timestamp.Timestamp.of(t).micros)) + | beam.WindowInto(beam.transforms.window.Sessions(10)) + | beam.GroupByKey() + | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1])))) + assert_that( + res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])])) + # Can't read host files from within docker, read a "local" file there. def test_read(self): print('name:', __name__)