Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add map_windows support to Go SDK #24307

Merged
merged 6 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* Pipeline Resource Hints now supported via `--resource_hints` flag (Go) ([#23990](https://github.com/apache/beam/pull/23990)).
* Make Python SDK containers reusable on portable runners by installing dependencies to temporary venvs ([BEAM-12792](https://issues.apache.org/jira/browse/BEAM-12792)).
* RunInference model handlers now support the specification of a custom inference function in Python ([#22572](https://github.com/apache/beam/issues/22572))
* Support for `map_windows` urn added to Go SDK ([#24307](https://github.apache/beam/pull/24307)).

## Breaking Changes

Expand Down
10 changes: 10 additions & 0 deletions sdks/go/pkg/beam/core/graph/coder/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ const (
KV Kind = "KV"
LP Kind = "LP" // Explicitly length prefixed, likely at the runner's direction.

// IW stands for IntervalWindow and uses the short name to avoid a collision with the
// WindowCoder kind. This Kind is used when the window is provided as a value instead
// of a window for the value.
IW Kind = "IW"

Window Kind = "window" // A debug wrapper around a window coder.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It bother's me we can't re-use this, but I agree we shouldn't mix these up just now. It's an internal implementation detail at least, so it can be cleaned up at some future point.


// CoGBK is currently equivalent to either
Expand Down Expand Up @@ -294,6 +299,11 @@ func NewString() *Coder {
return &Coder{Kind: String, T: typex.New(reflectx.String)}
}

// NewIntervalWindowCoder returns a new IntervalWindow coder using the built-in scheme.
func NewIntervalWindowCoder() *Coder {
return &Coder{Kind: IW, T: typex.New(reflect.TypeOf((*struct{ Start, End int64 })(nil)).Elem())}
}

// IsW returns true iff the coder is for a WindowedValue.
func IsW(c *Coder) bool {
return c.Kind == WindowedValue
Expand Down
42 changes: 39 additions & 3 deletions sdks/go/pkg/beam/core/runtime/exec/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
package exec

import (
"bytes"
"fmt"
"io"
"reflect"
"strings"

"bytes"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
Expand Down Expand Up @@ -117,6 +116,9 @@ func MakeElementEncoder(c *coder.Coder) ElementEncoder {
snd: MakeElementEncoder(c.Components[1]),
}

case coder.IW:
return &intervalWindowValueEncoder{}

case coder.Window:
return &wrappedWindowEncoder{
enc: MakeWindowEncoder(c.Window),
Expand Down Expand Up @@ -229,6 +231,9 @@ func MakeElementDecoder(c *coder.Coder) ElementDecoder {
snd: MakeElementDecoder(c.Components[1]),
}

case coder.IW:
return &intervalWindowValueDecoder{}

// The following cases are not expected to be executed in the normal
// course of a pipeline, however including them here enables simpler
// end to end validation of standard coders against
Expand Down Expand Up @@ -589,7 +594,8 @@ func (c *kvDecoder) DecodeTo(r io.Reader, fv *FullValue) error {
// Elm will be the decoded type.
//
// Example:
// KV<int, KV<...>> decodes to *FullValue{Elm: int, Elm2: *FullValue{...}}
//
// KV<int, KV<...>> decodes to *FullValue{Elm: int, Elm2: *FullValue{...}}
func (c *kvDecoder) Decode(r io.Reader) (*FullValue, error) {
fv := &FullValue{}
if err := c.DecodeTo(r, fv); err != nil {
Expand Down Expand Up @@ -1180,6 +1186,36 @@ func (*intervalWindowDecoder) DecodeSingle(r io.Reader) (typex.Window, error) {
return window.IntervalWindow{Start: mtime.FromMilliseconds(end.Milliseconds() - int64(duration)), End: end}, nil
}

type intervalWindowValueEncoder struct {
intervalWindowEncoder
}

func (e *intervalWindowValueEncoder) Encode(v *FullValue, w io.Writer) error {
return e.EncodeSingle(v.Elm.(window.IntervalWindow), w)
}

type intervalWindowValueDecoder struct {
intervalWindowDecoder
}

func (d *intervalWindowValueDecoder) Decode(r io.Reader) (*FullValue, error) {
fv := &FullValue{}
err := d.DecodeTo(r, fv)
if err != nil {
return nil, err
}
return fv, nil
}

func (d *intervalWindowValueDecoder) DecodeTo(r io.Reader, value *FullValue) error {
w, err := d.DecodeSingle(r)
if err != nil {
return err
}
value.Elm = w
return nil
}

// EncodeWindowedValueHeader serializes a windowed value header.
func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.EventTime, p typex.PaneInfo, w io.Writer) error {
// Encoding: Timestamp, Window, Pane (header) + Element
Expand Down
3 changes: 3 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/coder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func TestCoders(t *testing.T) {
}, {
coder: coder.NewN(coder.NewBytes()),
val: &FullValue{Elm: []byte("myBytes")},
}, {
coder: coder.NewIntervalWindowCoder(),
val: &FullValue{Elm: window.IntervalWindow{Start: 0, End: 100}},
},
} {
t.Run(fmt.Sprintf("%v", test.coder), func(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,17 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
}
u = &WindowInto{UID: b.idgen.New(), Fn: wfn, Out: out[0]}

case graphx.URNMapWindows:
var fn pipepb.FunctionSpec
if err := proto.Unmarshal(payload, &fn); err != nil {
return nil, errors.Wrapf(err, "invalid SideInput payload for %v", transform)
}
mapper, err := unmarshalAndMakeWindowMapping(&fn)
if err != nil {
return nil, err
}
u = &MapWindows{UID: b.idgen.New(), Fn: mapper, Out: out[0]}

case graphx.URNFlatten:
u = &Flatten{UID: b.idgen.New(), N: len(transform.Inputs), Out: out[0]}

Expand Down
50 changes: 50 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
)

// WindowInto places each element in one or more windows.
Expand Down Expand Up @@ -97,6 +98,55 @@ func (w *WindowInto) String() string {
return fmt.Sprintf("WindowInto[%v]. Out:%v", w.Fn, w.Out.ID())
}

type MapWindows struct {
UID UnitID
Fn WindowMapper
Out Node
}

func (m *MapWindows) ID() UnitID {
return m.UID
}

func (m *MapWindows) Up(_ context.Context) error {
return nil
}

func (m *MapWindows) StartBundle(ctx context.Context, id string, data DataContext) error {
return m.Out.StartBundle(ctx, id, data)
}

func (m *MapWindows) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
w, ok := elm.Elm2.(window.IntervalWindow)
if !ok {
return errors.Errorf("not an IntervalWindow, got %T", elm.Elm2)
}
newW, err := m.Fn.MapWindow(w)
if err != nil {
return err
}
out := &FullValue{
Elm: elm.Elm,
camphillips22 marked this conversation as resolved.
Show resolved Hide resolved
Elm2: newW,
Timestamp: elm.Timestamp,
Windows: elm.Windows,
Pane: elm.Pane,
}
return m.Out.ProcessElement(ctx, out, values...)
}

func (m *MapWindows) FinishBundle(ctx context.Context) error {
return m.Out.FinishBundle(ctx)
}

func (m *MapWindows) Down(_ context.Context) error {
return nil
}

func (m *MapWindows) String() string {
return fmt.Sprintf("MapWindows[%v]. Out:%v", m.Fn, m.Out.ID())
}

// WindowMapper defines an interface maps windows from a main input window space
// to windows from a side input window space. Used during side input materialization.
type WindowMapper interface {
Expand Down
63 changes: 63 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package exec

import (
"context"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -170,3 +172,64 @@ func TestMapWindow(t *testing.T) {
}
}
}

func TestMapWindows(t *testing.T) {
tests := []struct {
name string
wFn *window.Fn
in []typex.Window
expect []typex.Window
}{
{
"fixed2fixed",
window.NewFixedWindows(1000 * time.Millisecond),
[]typex.Window{
window.IntervalWindow{Start: 100, End: 200},
window.IntervalWindow{Start: 100, End: 1100},
},
[]typex.Window{
window.IntervalWindow{Start: 0, End: 1000},
window.IntervalWindow{Start: 1000, End: 2000},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
inV, expected := makeNoncedWindowValues(tc.in, tc.expect)

out := &CaptureNode{UID: 1}
unit := &MapWindows{UID: 2, Fn: &windowMapper{wfn: tc.wFn}, Out: out}
a := &FixedRoot{UID: 3, Elements: inV, Out: unit}

p, err := NewPlan(tc.name, []Unit{a, unit, out})
if err != nil {
t.Fatalf("failed to construct plan: %s", err)
}
ctx := context.Background()
if err := p.Execute(ctx, "1", DataContext{}); err != nil {
t.Fatalf("execute failed: %s", err)
}
if err := p.Down(ctx); err != nil {
t.Fatalf("down failed: %s", err)
}
if !equalList(out.Elements, expected) {
t.Errorf("map_windows returned %v, want %v", extractValues(out.Elements...), extractValues(expected...))
}
})
}
}

func makeNoncedWindowValues(in []typex.Window, expect []typex.Window) ([]MainInput, []FullValue) {
if len(in) != len(expect) {
panic("provided window slices must be the same length")
}
inV := make([]MainInput, len(in))
expectV := make([]FullValue, len(in))
for i := range in {
nonce := make([]byte, 4)
rand.Read(nonce)
inV[i] = MainInput{Key: makeKV(nonce, in[i])[0]}
expectV[i] = makeKV(nonce, expect[i])[0]
}
return inV, expectV
}
21 changes: 10 additions & 11 deletions sdks/go/pkg/beam/core/runtime/graphx/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,18 +378,14 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
return nil, err
}
return coder.NewN(elm), nil

// Special handling for window coders so they can be treated as
// a general coder. Generally window coders are not used outside of
// specific contexts, but this enables improved testing.
// Window types are not permitted to be fulltypes, so
// we use assignably equivalent anonymous struct types.
case urnIntervalWindow:
camphillips22 marked this conversation as resolved.
Show resolved Hide resolved
w, err := b.WindowCoder(id)
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Window, T: typex.New(reflect.TypeOf((*struct{ Start, End int64 })(nil)).Elem()), Window: w}, nil
return coder.NewIntervalWindowCoder(), nil

// Special handling for the global window coder so it can be treated as
// a general coder. Generally window coders are not used outside of
// specific contexts, but this enables improved testing.
// Window types are not permitted to be fulltypes, so
// we use assignably equivalent anonymous struct types.
case urnGlobalWindow:
w, err := b.WindowCoder(id)
if err != nil {
Expand Down Expand Up @@ -528,6 +524,9 @@ func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) {
case coder.String:
return b.internBuiltInCoder(urnStringCoder), nil

case coder.IW:
return b.internBuiltInCoder(urnIntervalWindow), nil

case coder.Row:
rt := c.T.Type()
s, err := schema.FromType(rt)
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
name: "baz",
c: baz,
},
{
name: "IW",
c: coder.NewIntervalWindowCoder(),
},
{
name: "W<bytes>",
c: coder.NewW(coder.NewBytes(), coder.NewGlobalWindow()),
Expand Down
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
}
return &CoderRef{Type: windowedValueType, Components: []*CoderRef{elm, w}, IsWrapper: true}, nil

case coder.IW:
return &CoderRef{Type: intervalWindowType}, nil

case coder.Bytes:
return &CoderRef{Type: bytesType}, nil

Expand Down Expand Up @@ -305,6 +308,9 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
return decodeDataflowCustomCoder(subC.Type)
}

case intervalWindowType:
return coder.NewIntervalWindowCoder(), nil

case windowedValueType:
if len(c.Components) != 2 {
return nil, errors.Errorf("bad windowed value: %+v", c)
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
URNReshuffle = "beam:transform:reshuffle:v1"
URNCombinePerKey = "beam:transform:combine_per_key:v1"
URNWindow = "beam:transform:window_into:v1"
URNMapWindows = "beam:transform:map_windows:v1"

URNIterableSideInput = "beam:side_input:iterable:v1"
URNMultimapSideInput = "beam:side_input:multimap:v1"
Expand Down