diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go index 55d33b491b4be..7eadd46eb6fa6 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go @@ -525,6 +525,9 @@ func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) { case coder.String: return b.internBuiltInCoder(urnStringCoder), nil + case coder.IW: + return b.internBuiltInCoder(urnIntervalWindow), nil + case coder.Row: rt := c.T.Type() s, err := schema.FromType(rt) diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go index aad15df0f23f6..bd2c8450e8c5b 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go @@ -84,6 +84,10 @@ func TestMarshalUnmarshalCoders(t *testing.T) { "baz", baz, }, + { + name: "IW", + c: coder.NewIntervalWindowCoder(), + }, { "W", coder.NewW(coder.NewBytes(), coder.NewGlobalWindow()), diff --git a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go index efdf4ef140f4a..1a76c7d0b5388 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go @@ -164,6 +164,9 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) { } return &CoderRef{Type: windowedValueType, Components: []*CoderRef{elm, w}, IsWrapper: true}, nil + case coder.IW: + return &CoderRef{Type: intervalWindowType}, nil + case coder.Bytes: return &CoderRef{Type: bytesType}, nil @@ -301,6 +304,9 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) { return decodeDataflowCustomCoder(subC.Type) } + case intervalWindowType: + return coder.NewIntervalWindowCoder(), nil + case windowedValueType: if len(c.Components) != 2 { return nil, errors.Errorf("bad windowed value: %+v", c)