diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index a632318e02c7..bc8449c72b39 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -45,6 +45,14 @@ type element struct { holdTimestamp mtime.Time // only used for Timers pane typex.PaneInfo transform, family, tag string // only used for Timers. + // Used to ensure ordering within a key when sorting the heap, + // which isn't using a stable sort. + // Since ordering is weak across multiple bundles, it needs only + // be consistent between exiting a stage and entering a stateful stage. + // No synchronization is required in specifying this, + // since keyed elements are only processed by a single bundle at a time, + // if stateful stages are concerned. + sequence int elmBytes []byte // When nil, indicates this is a timer. keyBytes []byte @@ -103,7 +111,8 @@ func (h elementHeap) Less(i, j int) bool { } else if h[i].IsData() && h[j].IsTimer() { return true // i before j. } - // They're the same kind, fall through to timestamp less for consistency. + // They're the same kind, so compare by the sequence value. + return h[i].sequence < h[j].sequence } // Otherwise compare by timestamp. return h[i].timestamp < h[j].timestamp @@ -688,6 +697,7 @@ func reElementResiduals(residuals []Residual, inputInfo PColInfo, rb RunBundle) pane: pn, elmBytes: elmBytes, keyBytes: keyBytes, + sequence: len(unprocessedElements), }) } } @@ -704,6 +714,7 @@ func reElementResiduals(residuals []Residual, inputInfo PColInfo, rb RunBundle) // PersistBundle takes in the stage ID, ID of the bundle associated with the pending // input elements, and the committed output elements. func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals Residuals) { + var seq int for output, data := range d.Raw { info := col2Coders[output] var newPending []element @@ -743,7 +754,9 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol pane: pn, elmBytes: elmBytes, keyBytes: keyBytes, + sequence: seq, }) + seq++ } } } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go index 3f52ebc4510c..787d27858a0e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go @@ -74,6 +74,7 @@ func decodeTimer(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw []byt timestamp: firing, holdTimestamp: hold, pane: pane, + sequence: len(ret), }) } return keyBytes, tag, ret