From 0d12851f7f48df0f4da1e281a19aa13b0c0da9b7 Mon Sep 17 00:00:00 2001 From: Bill Neubauer Date: Thu, 22 Mar 2018 15:15:41 -0700 Subject: [PATCH 1/5] Add float support for the SDK. --- sdks/go/pkg/beam/coder.go | 7 + sdks/go/pkg/beam/core/runtime/coderx/float.go | 75 ++++++++ .../beam/core/runtime/coderx/float_test.go | 50 ++++++ .../pkg/beam/core/runtime/graphx/serialize.go | 8 + .../pkg/beam/core/runtime/graphx/v1/v1.pb.go | 165 +++++++++--------- .../pkg/beam/core/runtime/graphx/v1/v1.proto | 2 + 6 files changed, 228 insertions(+), 79 deletions(-) create mode 100644 sdks/go/pkg/beam/core/runtime/coderx/float.go create mode 100644 sdks/go/pkg/beam/core/runtime/coderx/float_test.go diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go index c7a2d4cabc571..b7f62186135d4 100644 --- a/sdks/go/pkg/beam/coder.go +++ b/sdks/go/pkg/beam/coder.go @@ -107,6 +107,13 @@ func inferCoder(t FullType) (*coder.Coder, error) { return nil, err } return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil + case reflectx.Float32, reflectx.Float64: + c, err := coderx.NewFloat(t.Type()) + if err != nil { + return nil, err + } + return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil + case reflectx.String, reflectx.ByteSlice: // TODO(BEAM-3580): we should stop encoding string using the bytecoder. It forces // conversions at runtime in inconvenient places. diff --git a/sdks/go/pkg/beam/core/runtime/coderx/float.go b/sdks/go/pkg/beam/core/runtime/coderx/float.go new file mode 100644 index 0000000000000..403cd30ac1dc9 --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/coderx/float.go @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coderx + +import ( + "fmt" + "math" + "math/bits" + "reflect" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" + "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" +) + +func init() { + runtime.RegisterFunction(encFloat) + runtime.RegisterFunction(decFloat) +} + +func encFloat(v typex.T) []byte { + var val float64 + switch n := v.(type) { + case float32: + val = float64(n) + case float64: + val = n + default: + panic(fmt.Sprintf("received unknown value type: want a float, got %T", n)) + } + + return encVarUintZ(bits.ReverseBytes64(math.Float64bits(val))) +} + +func decFloat(t reflect.Type, data []byte) (typex.T, error) { + uval, err := decVarUintZ(reflectx.Uint64, data) + if err != nil { + return nil, fmt.Errorf("invalid float encoding for: %v", data) + } + + n := math.Float64frombits(bits.ReverseBytes64(uval.(uint64))) + switch t.Kind() { + case reflect.Float64: + return n, nil + case reflect.Float32: + return float32(n), nil + default: + panic(fmt.Sprintf("unreachable statement: expected a float, got %v", t)) + } +} + +// NewFloat returns a coder for the given float type. It uses the same +// encoding scheme as the gob package. +func NewFloat(t reflect.Type) (*coder.CustomCoder, error) { + switch t.Kind() { + case reflect.Float32, reflect.Float64: + return coder.NewCustomCoder("float", t, encFloat, decFloat) + default: + return nil, fmt.Errorf("not a float type: %v", t) + } +} diff --git a/sdks/go/pkg/beam/core/runtime/coderx/float_test.go b/sdks/go/pkg/beam/core/runtime/coderx/float_test.go new file mode 100644 index 0000000000000..460057e900b8d --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/coderx/float_test.go @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coderx + +import ( + "reflect" + "testing" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" +) + +func TestFloat(t *testing.T) { + tests := []interface{}{ + float32(0.0), + float32(94.5), + float64(-127.25), + float64(8675309.4286), + } + + for _, v := range tests { + typ := reflect.ValueOf(v).Type() + + data := encFloat(v) + result, err := decFloat(typ, data) + if err != nil { + t.Fatalf("dec(enc(%v)) failed: %v", v, err) + } + + if v != result { + t.Errorf("dec(enc(%v)) = %v, want id", v, result) + } + resultT := reflectx.UnderlyingType(reflect.ValueOf(result)).Type() + if resultT != typ { + t.Errorf("type(dec(enc(%v))) = %v, want id", typ, resultT) + } + } +} diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go index f38a698d39172..5615b71dfbea4 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go @@ -347,6 +347,10 @@ func encodeType(t reflect.Type) (*v1.Type, error) { return &v1.Type{Kind: v1.Type_UINT32}, nil case reflect.Uint64: return &v1.Type{Kind: v1.Type_UINT64}, nil + case reflect.Float32: + return &v1.Type{Kind: v1.Type_FLOAT32}, nil + case reflect.Float64: + return &v1.Type{Kind: v1.Type_FLOAT64}, nil case reflect.String: return &v1.Type{Kind: v1.Type_STRING}, nil @@ -485,6 +489,10 @@ func decodeType(t *v1.Type) (reflect.Type, error) { return reflectx.Uint32, nil case v1.Type_UINT64: return reflectx.Uint64, nil + case v1.Type_FLOAT32: + return reflectx.Float32, nil + case v1.Type_FLOAT64: + return reflectx.Float64, nil case v1.Type_STRING: return reflectx.String, nil diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go index 97f591f070049..94dfd64847a21 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go @@ -42,18 +42,20 @@ type Type_Kind int32 const ( Type_INVALID Type_Kind = 0 // Primitive. - Type_BOOL Type_Kind = 1 - Type_INT Type_Kind = 2 - Type_INT8 Type_Kind = 3 - Type_INT16 Type_Kind = 4 - Type_INT32 Type_Kind = 5 - Type_INT64 Type_Kind = 6 - Type_UINT Type_Kind = 7 - Type_UINT8 Type_Kind = 8 - Type_UINT16 Type_Kind = 9 - Type_UINT32 Type_Kind = 10 - Type_UINT64 Type_Kind = 11 - Type_STRING Type_Kind = 12 + Type_BOOL Type_Kind = 1 + Type_INT Type_Kind = 2 + Type_INT8 Type_Kind = 3 + Type_INT16 Type_Kind = 4 + Type_INT32 Type_Kind = 5 + Type_INT64 Type_Kind = 6 + Type_UINT Type_Kind = 7 + Type_UINT8 Type_Kind = 8 + Type_UINT16 Type_Kind = 9 + Type_UINT32 Type_Kind = 10 + Type_UINT64 Type_Kind = 11 + Type_STRING Type_Kind = 12 + Type_FLOAT32 Type_Kind = 13 + Type_FLOAT64 Type_Kind = 14 // Non-primitive types. Type_SLICE Type_Kind = 20 Type_STRUCT Type_Kind = 21 @@ -78,6 +80,8 @@ var Type_Kind_name = map[int32]string{ 10: "UINT32", 11: "UINT64", 12: "STRING", + 13: "FLOAT32", + 14: "FLOAT64", 20: "SLICE", 21: "STRUCT", 22: "FUNC", @@ -100,6 +104,8 @@ var Type_Kind_value = map[string]int32{ "UINT32": 10, "UINT64": 11, "STRING": 12, + "FLOAT32": 13, + "FLOAT64": 14, "SLICE": 20, "STRUCT": 21, "FUNC": 22, @@ -749,73 +755,74 @@ func init() { func init() { proto.RegisterFile("v1.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1079 bytes of a gzipped FileDescriptorProto + // 1091 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdb, 0x6e, 0xdb, 0x46, - 0x13, 0x0e, 0x45, 0x89, 0x87, 0x91, 0x94, 0x6c, 0xf6, 0x77, 0xfc, 0x33, 0x46, 0x8a, 0x28, 0xbc, - 0xa9, 0xda, 0x18, 0x2a, 0x2c, 0x07, 0x46, 0xd1, 0x3b, 0x45, 0xa2, 0x1d, 0xc2, 0x32, 0x25, 0xac, + 0x13, 0x0e, 0x45, 0x89, 0x87, 0x91, 0xe4, 0x6c, 0xf6, 0x77, 0xfc, 0x33, 0x46, 0x8a, 0x28, 0xbc, + 0xa9, 0xda, 0x18, 0x2a, 0x2c, 0x1b, 0x46, 0xd1, 0x3b, 0x45, 0xa2, 0x1c, 0xc2, 0x34, 0x25, 0xac, 0x28, 0xd9, 0xe9, 0x8d, 0xc0, 0x88, 0x2b, 0x99, 0xb5, 0xb4, 0x64, 0x49, 0xca, 0x88, 0xd0, 0x57, - 0x29, 0xfa, 0x1c, 0x7d, 0x87, 0x3e, 0x47, 0xde, 0xa3, 0xd8, 0x25, 0x29, 0x5b, 0xae, 0x8b, 0x02, - 0xb9, 0xda, 0xd9, 0x99, 0xef, 0xdb, 0x39, 0xec, 0xec, 0x90, 0xa0, 0xdd, 0x1e, 0xb5, 0xa2, 0x38, - 0x4c, 0x43, 0x5c, 0xba, 0x3d, 0x32, 0xbf, 0xa8, 0x50, 0x76, 0x37, 0x11, 0xc5, 0x6f, 0xa0, 0x7c, - 0x13, 0x30, 0xdf, 0x90, 0x1a, 0x52, 0xf3, 0x69, 0xbb, 0xde, 0xba, 0x3d, 0x6a, 0x71, 0x7d, 0xeb, - 0x3c, 0x60, 0x3e, 0x11, 0x26, 0x6c, 0x82, 0x4a, 0x97, 0x74, 0x45, 0x59, 0x6a, 0x94, 0x1a, 0x52, - 0xb3, 0xda, 0xd6, 0x0a, 0x14, 0x29, 0x0c, 0xf8, 0x10, 0x94, 0x79, 0x40, 0x97, 0x7e, 0x62, 0xc8, - 0x0d, 0xb9, 0x59, 0x6d, 0xef, 0x6d, 0x0f, 0x1a, 0xa5, 0xf1, 0x7a, 0x96, 0x9e, 0x72, 0x23, 0xc9, - 0x31, 0xf8, 0x08, 0x9e, 0x45, 0x5e, 0xec, 0xad, 0x68, 0x4a, 0xe3, 0x69, 0xba, 0x89, 0x68, 0x62, - 0x94, 0x05, 0xed, 0xee, 0xe4, 0xa7, 0x5b, 0x00, 0xdf, 0x26, 0xf8, 0x2d, 0xd4, 0x62, 0x9a, 0xae, - 0x63, 0x96, 0xe3, 0x2b, 0x0f, 0xf0, 0xd5, 0xcc, 0x9a, 0x81, 0x5f, 0x43, 0x35, 0x48, 0xa6, 0xb7, - 0x5e, 0x1c, 0x78, 0x7e, 0x30, 0x33, 0x94, 0x86, 0xd4, 0xd4, 0x08, 0x04, 0xc9, 0x24, 0xd7, 0xe0, - 0xb7, 0xa0, 0xcd, 0xae, 0x3d, 0x36, 0xf5, 0x83, 0xd8, 0x50, 0x45, 0xe6, 0x68, 0x1b, 0x70, 0xf7, - 0xda, 0x63, 0xbd, 0x20, 0x26, 0xea, 0x2c, 0x13, 0xf0, 0xf7, 0xa0, 0x26, 0x11, 0x9d, 0x05, 0xde, - 0xd2, 0xd0, 0x1e, 0x60, 0x47, 0x99, 0x9e, 0x14, 0x00, 0xfc, 0x06, 0x6a, 0xf4, 0x73, 0x4a, 0x63, - 0xe6, 0x2d, 0xa7, 0x37, 0x74, 0x63, 0xe8, 0x0d, 0xa9, 0xa9, 0x93, 0x6a, 0xa1, 0x3b, 0xa7, 0x9b, - 0x83, 0x3f, 0x25, 0xa8, 0xde, 0x2b, 0x0a, 0xc6, 0x50, 0x66, 0xde, 0x8a, 0x8a, 0x1b, 0xd0, 0x89, - 0x90, 0xf1, 0x4b, 0xd0, 0xa2, 0x9b, 0xc5, 0x34, 0xf2, 0xd2, 0x6b, 0x51, 0x73, 0x9d, 0xa8, 0xd1, - 0xcd, 0x62, 0xe8, 0xa5, 0xd7, 0xf8, 0x15, 0x94, 0x79, 0x05, 0x0c, 0xf9, 0xc1, 0x55, 0x08, 0x2d, - 0x46, 0x20, 0xa7, 0xde, 0xc2, 0x28, 0x0b, 0x0e, 0x17, 0xf1, 0x3e, 0x28, 0xe1, 0x7c, 0x9e, 0xd0, - 0xd4, 0xa8, 0x34, 0xa4, 0xa6, 0x4c, 0xf2, 0x1d, 0xde, 0x83, 0x4a, 0xc0, 0x7c, 0xfa, 0xd9, 0x50, - 0x1a, 0x72, 0xb3, 0x42, 0xb2, 0x0d, 0x7e, 0x05, 0xba, 0xc7, 0x42, 0xb6, 0x59, 0x85, 0xeb, 0x44, - 0x54, 0x46, 0x23, 0x77, 0x0a, 0xf3, 0x8b, 0x04, 0x65, 0xde, 0x18, 0xb8, 0x0a, 0xaa, 0xed, 0x4c, - 0x3a, 0x7d, 0xbb, 0x87, 0x9e, 0x60, 0x0d, 0xca, 0xef, 0x07, 0x83, 0x3e, 0x92, 0xb0, 0x0a, 0xb2, - 0xed, 0xb8, 0xa8, 0xc4, 0x55, 0xb6, 0xe3, 0xfe, 0x88, 0x64, 0xac, 0x43, 0xc5, 0x76, 0xdc, 0xa3, - 0x13, 0x54, 0xce, 0xc5, 0xe3, 0x36, 0xaa, 0xe4, 0xe2, 0xc9, 0x3b, 0xa4, 0x70, 0xe8, 0x98, 0x93, - 0x54, 0xae, 0x1c, 0x0b, 0x96, 0x86, 0x01, 0x94, 0x71, 0x46, 0xd3, 0x0b, 0xf9, 0xb8, 0x8d, 0xa0, - 0x90, 0x4f, 0xde, 0xa1, 0x2a, 0x97, 0x47, 0x2e, 0xb1, 0x9d, 0x33, 0x54, 0xe3, 0xd4, 0x51, 0xdf, - 0xee, 0x5a, 0x68, 0x2f, 0x57, 0x8f, 0xbb, 0x2e, 0x7a, 0xc1, 0xcf, 0x3e, 0x1d, 0x3b, 0x5d, 0xb4, - 0xcf, 0xa5, 0xee, 0x87, 0x8e, 0x83, 0xfe, 0xcf, 0x63, 0x1c, 0xba, 0x04, 0x19, 0x3c, 0x87, 0xd1, - 0xd0, 0xea, 0xda, 0x9d, 0x3e, 0x7a, 0x89, 0x6b, 0xa0, 0x59, 0x57, 0xae, 0x45, 0x9c, 0x4e, 0x1f, - 0x1d, 0x98, 0xdf, 0x82, 0x9a, 0x77, 0x01, 0x27, 0x12, 0xab, 0x3b, 0xc9, 0xd2, 0x1c, 0x59, 0x4e, - 0x0f, 0x49, 0x59, 0xc2, 0xee, 0x07, 0x54, 0x32, 0xff, 0x90, 0x40, 0xcd, 0x7b, 0x40, 0xd4, 0xa4, - 0xdf, 0xb7, 0xce, 0x3a, 0x7d, 0xf4, 0x84, 0x07, 0x64, 0x11, 0x32, 0x20, 0x48, 0xe2, 0xfa, 0xee, - 0xc0, 0x71, 0xad, 0xab, 0xbc, 0x30, 0xee, 0xc7, 0xa1, 0x85, 0x64, 0x5c, 0x07, 0xdd, 0x9a, 0x58, - 0x8e, 0xeb, 0xda, 0x17, 0x16, 0x02, 0xac, 0x40, 0xe9, 0x7c, 0x82, 0xaa, 0x9c, 0xd8, 0x1d, 0x9c, - 0xbd, 0x3f, 0x47, 0x75, 0xfc, 0x1c, 0xea, 0x97, 0xb6, 0xd3, 0x1b, 0x5c, 0x5a, 0xbd, 0x49, 0xa7, - 0x3f, 0xb6, 0xd0, 0x53, 0x5c, 0x01, 0xc9, 0x45, 0xcf, 0xf8, 0x32, 0x46, 0x88, 0x2f, 0x13, 0xf4, - 0x9c, 0x2f, 0x97, 0x08, 0xf3, 0xe5, 0x0a, 0xfd, 0x8f, 0x2f, 0x1f, 0xd1, 0x1e, 0x5f, 0x7e, 0x46, - 0x2f, 0xcc, 0x09, 0x68, 0xa7, 0xeb, 0xe5, 0x52, 0x3c, 0xf5, 0xa2, 0x73, 0xa4, 0x47, 0x3b, 0xe7, - 0x10, 0x60, 0x16, 0xae, 0xa2, 0x90, 0x51, 0x96, 0x26, 0x46, 0x49, 0x3c, 0xaf, 0x1a, 0xc7, 0x14, - 0x7c, 0x72, 0xcf, 0x6e, 0xfe, 0x04, 0xca, 0x38, 0xa1, 0xf1, 0x29, 0x7b, 0xb4, 0x7d, 0x0b, 0x4f, - 0xa5, 0xc7, 0x3c, 0x99, 0x53, 0xa8, 0xf4, 0x36, 0xec, 0x6b, 0xa8, 0x9c, 0xe1, 0x7b, 0xa9, 0x27, - 0x9a, 0xbf, 0x46, 0x84, 0xcc, 0x5b, 0x7e, 0x41, 0x59, 0xd1, 0xf2, 0x0b, 0xca, 0xcc, 0x5f, 0xa1, - 0x74, 0xca, 0xf0, 0x01, 0x94, 0xe6, 0x2c, 0x4f, 0x16, 0xf8, 0x39, 0x59, 0xc0, 0xa4, 0x34, 0x67, - 0xff, 0xe1, 0x05, 0x81, 0x1c, 0x46, 0xa9, 0x70, 0xa2, 0x13, 0x2e, 0xe2, 0xd7, 0x50, 0xf1, 0x37, - 0x6c, 0x9e, 0x79, 0xa9, 0xb6, 0x75, 0x4e, 0x10, 0x39, 0x90, 0x4c, 0x6f, 0xfe, 0x06, 0xd5, 0xee, - 0x3a, 0x49, 0xc3, 0x55, 0x37, 0xf4, 0x69, 0xfc, 0x15, 0x99, 0xbd, 0x02, 0x99, 0xb2, 0x59, 0xfe, - 0xaa, 0xef, 0x87, 0xcb, 0xd5, 0xdc, 0xea, 0xd3, 0x59, 0xee, 0x7d, 0xc7, 0xea, 0xd3, 0x99, 0xf9, - 0xbb, 0x0c, 0xfa, 0xc5, 0x7a, 0x99, 0x06, 0x96, 0xbf, 0xa0, 0x78, 0xff, 0x5e, 0xde, 0x8a, 0xb8, - 0xc0, 0x2c, 0x67, 0x3e, 0x08, 0xa2, 0x59, 0xe8, 0xd3, 0xbc, 0x54, 0xf9, 0x0e, 0xff, 0x00, 0x6a, - 0xc0, 0x3e, 0x85, 0x6b, 0xe6, 0xe7, 0xb7, 0xfe, 0x82, 0x93, 0xb6, 0xe7, 0xb5, 0xec, 0xcc, 0x48, - 0x0a, 0x14, 0x6e, 0x83, 0x16, 0xae, 0xd3, 0x8c, 0x91, 0x4d, 0xfb, 0xfd, 0x5d, 0xc6, 0x20, 0xb7, - 0x92, 0x2d, 0xee, 0xe0, 0x2f, 0x09, 0xd4, 0xfc, 0x20, 0x7c, 0xbc, 0xf3, 0xc9, 0x79, 0xfd, 0xa8, - 0xb7, 0x96, 0xcd, 0xa2, 0x75, 0x7a, 0xef, 0x23, 0xd4, 0xd8, 0xa9, 0xde, 0x6e, 0x63, 0x66, 0x6d, - 0x15, 0x80, 0xbe, 0x25, 0xfd, 0x63, 0x40, 0x5d, 0x74, 0x6c, 0x07, 0x49, 0xfc, 0xd1, 0x8d, 0x6c, - 0xe7, 0xac, 0x6f, 0xb9, 0x03, 0x07, 0x95, 0xee, 0xc6, 0x86, 0xcc, 0xc7, 0xc2, 0x45, 0x67, 0x88, - 0xca, 0x7c, 0x12, 0x5c, 0x8c, 0xfb, 0xae, 0xcd, 0x77, 0x15, 0x31, 0xc8, 0x5c, 0x8b, 0x20, 0x85, - 0xcf, 0x15, 0x62, 0x09, 0x59, 0x3d, 0x38, 0x04, 0xad, 0xc8, 0x71, 0x1b, 0x98, 0xf4, 0xaf, 0x81, - 0x7d, 0x03, 0x75, 0x9b, 0xfd, 0x42, 0x67, 0xe9, 0xd0, 0xdb, 0x2c, 0x43, 0xcf, 0xc7, 0x35, 0x90, - 0xb2, 0x0b, 0xaa, 0x10, 0x89, 0x99, 0x31, 0x20, 0x37, 0xf6, 0x58, 0x32, 0x0f, 0xe3, 0x55, 0x81, - 0x40, 0x20, 0xaf, 0x63, 0x96, 0xb7, 0x0f, 0x17, 0xf9, 0x77, 0x9a, 0xfa, 0x8b, 0x22, 0xff, 0xfa, - 0x4e, 0xd1, 0x88, 0x30, 0xe1, 0xef, 0x40, 0x09, 0x84, 0x9f, 0xbc, 0x8b, 0x9e, 0x73, 0xd0, 0x8e, - 0x67, 0x92, 0x03, 0x3e, 0x29, 0xe2, 0x4f, 0xe0, 0xf8, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6b, - 0x77, 0x19, 0xb6, 0x15, 0x08, 0x00, 0x00, + 0x29, 0xfa, 0x1c, 0x7d, 0x87, 0x3e, 0x54, 0x8a, 0xe5, 0x41, 0xb6, 0x5c, 0x17, 0x05, 0x72, 0xb5, + 0xb3, 0x33, 0xdf, 0xb7, 0x73, 0xd8, 0xd9, 0x21, 0x41, 0xb9, 0x3b, 0x6e, 0x85, 0x51, 0x90, 0x04, + 0xb8, 0x74, 0x77, 0xac, 0x7f, 0x91, 0xa1, 0xec, 0x6c, 0x42, 0x8a, 0xdf, 0x42, 0xf9, 0xd6, 0x67, + 0x9e, 0x26, 0x34, 0x84, 0xe6, 0x5e, 0xbb, 0xde, 0xba, 0x3b, 0x6e, 0x71, 0x7d, 0xeb, 0xc2, 0x67, + 0x1e, 0x49, 0x4d, 0x58, 0x07, 0x99, 0x2e, 0xe9, 0x8a, 0xb2, 0x44, 0x2b, 0x35, 0x84, 0x66, 0xb5, + 0xad, 0x14, 0x28, 0x52, 0x18, 0xf0, 0x11, 0x48, 0x73, 0x9f, 0x2e, 0xbd, 0x58, 0x13, 0x1b, 0x62, + 0xb3, 0xda, 0xde, 0xdf, 0x1e, 0x34, 0x4a, 0xa2, 0xf5, 0x2c, 0xe9, 0x73, 0x23, 0xc9, 0x31, 0xf8, + 0x18, 0x9e, 0x87, 0x6e, 0xe4, 0xae, 0x68, 0x42, 0xa3, 0x69, 0xb2, 0x09, 0x69, 0xac, 0x95, 0x53, + 0xda, 0xfd, 0xc9, 0x7b, 0x5b, 0x00, 0xdf, 0xc6, 0xf8, 0x1d, 0xd4, 0x22, 0x9a, 0xac, 0x23, 0x96, + 0xe3, 0x2b, 0x8f, 0xf0, 0xd5, 0xcc, 0x9a, 0x81, 0xdf, 0x40, 0xd5, 0x8f, 0xa7, 0x77, 0x6e, 0xe4, + 0xbb, 0x9e, 0x3f, 0xd3, 0xa4, 0x86, 0xd0, 0x54, 0x08, 0xf8, 0xf1, 0x24, 0xd7, 0xe0, 0x77, 0xa0, + 0xcc, 0x6e, 0x5c, 0x36, 0xf5, 0xfc, 0x48, 0x93, 0xd3, 0xcc, 0xd1, 0x36, 0xe0, 0xee, 0x8d, 0xcb, + 0x7a, 0x7e, 0x44, 0xe4, 0x59, 0x26, 0xe0, 0xef, 0x41, 0x8e, 0x43, 0x3a, 0xf3, 0xdd, 0xa5, 0xa6, + 0x3c, 0xc2, 0x8e, 0x32, 0x3d, 0x29, 0x00, 0xf8, 0x2d, 0xd4, 0xe8, 0xe7, 0x84, 0x46, 0xcc, 0x5d, + 0x4e, 0x6f, 0xe9, 0x46, 0x53, 0x1b, 0x42, 0x53, 0x25, 0xd5, 0x42, 0x77, 0x41, 0x37, 0x87, 0x7f, + 0x0a, 0x50, 0x7d, 0x50, 0x14, 0x8c, 0xa1, 0xcc, 0xdc, 0x15, 0x4d, 0x6f, 0x40, 0x25, 0xa9, 0x8c, + 0x5f, 0x81, 0x12, 0xde, 0x2e, 0xa6, 0xa1, 0x9b, 0xdc, 0xa4, 0x35, 0x57, 0x89, 0x1c, 0xde, 0x2e, + 0x86, 0x6e, 0x72, 0x83, 0x5f, 0x43, 0x99, 0x57, 0x40, 0x13, 0x1f, 0x5d, 0x45, 0xaa, 0xc5, 0x08, + 0xc4, 0xc4, 0x5d, 0x68, 0xe5, 0x94, 0xc3, 0x45, 0x7c, 0x00, 0x52, 0x30, 0x9f, 0xc7, 0x34, 0xd1, + 0x2a, 0x0d, 0xa1, 0x29, 0x92, 0x7c, 0x87, 0xf7, 0xa1, 0xe2, 0x33, 0x8f, 0x7e, 0xd6, 0xa4, 0x86, + 0xd8, 0xac, 0x90, 0x6c, 0x83, 0x5f, 0x83, 0xea, 0xb2, 0x80, 0x6d, 0x56, 0xc1, 0x3a, 0x4e, 0x2b, + 0xa3, 0x90, 0x7b, 0x85, 0xfe, 0x45, 0x80, 0x32, 0x6f, 0x0c, 0x5c, 0x05, 0xd9, 0xb4, 0x27, 0x1d, + 0xcb, 0xec, 0xa1, 0x67, 0x58, 0x81, 0xf2, 0xfb, 0xc1, 0xc0, 0x42, 0x02, 0x96, 0x41, 0x34, 0x6d, + 0x07, 0x95, 0xb8, 0xca, 0xb4, 0x9d, 0x1f, 0x91, 0x88, 0x55, 0xa8, 0x98, 0xb6, 0x73, 0x7c, 0x86, + 0xca, 0xb9, 0x78, 0xd2, 0x46, 0x95, 0x5c, 0x3c, 0x3b, 0x45, 0x12, 0x87, 0x8e, 0x39, 0x49, 0xe6, + 0xca, 0x71, 0xca, 0x52, 0x30, 0x80, 0x34, 0xce, 0x68, 0x6a, 0x21, 0x9f, 0xb4, 0x11, 0x14, 0xf2, + 0xd9, 0x29, 0xaa, 0x72, 0x79, 0xe4, 0x10, 0xd3, 0x3e, 0x47, 0x35, 0x1e, 0x4f, 0xdf, 0x1a, 0x74, + 0x38, 0xa8, 0xbe, 0xdd, 0x9c, 0x9d, 0xa2, 0x3d, 0x7e, 0xe8, 0xc8, 0x32, 0xbb, 0x06, 0xda, 0xcf, + 0x09, 0xe3, 0xae, 0x83, 0x5e, 0x72, 0xaf, 0xfd, 0xb1, 0xdd, 0x45, 0x07, 0x5c, 0xea, 0x7e, 0xe8, + 0xd8, 0xe8, 0xff, 0x3c, 0xfa, 0xa1, 0x43, 0x90, 0xc6, 0x0f, 0x18, 0x0d, 0x8d, 0xae, 0xd9, 0xb1, + 0xd0, 0x2b, 0x5c, 0x03, 0xc5, 0xb8, 0x76, 0x0c, 0x62, 0x77, 0x2c, 0x74, 0xa8, 0x7f, 0x0b, 0x72, + 0xde, 0x1f, 0x9c, 0x48, 0x8c, 0xee, 0x24, 0x2b, 0xc0, 0xc8, 0xb0, 0x7b, 0x48, 0xc8, 0x4a, 0xe1, + 0x7c, 0x40, 0x25, 0xfd, 0x0f, 0x01, 0xe4, 0xbc, 0x3b, 0xd2, 0x6a, 0x59, 0x96, 0x71, 0xde, 0xb1, + 0xd0, 0x33, 0x1e, 0x90, 0x41, 0xc8, 0x80, 0x20, 0x81, 0xeb, 0xbb, 0x03, 0xdb, 0x31, 0xae, 0xf3, + 0x92, 0x39, 0x1f, 0x87, 0x06, 0x12, 0x71, 0x1d, 0x54, 0x63, 0x62, 0xd8, 0x8e, 0x63, 0x5e, 0x1a, + 0x08, 0xb0, 0x04, 0xa5, 0x8b, 0x09, 0xaa, 0x72, 0x62, 0x77, 0x70, 0xfe, 0xfe, 0x02, 0xd5, 0xf1, + 0x0b, 0xa8, 0x5f, 0x99, 0x76, 0x6f, 0x70, 0x65, 0xf4, 0x26, 0x1d, 0x6b, 0x6c, 0xa0, 0x3d, 0x5c, + 0x01, 0xc1, 0x41, 0xcf, 0xf9, 0x32, 0x46, 0x88, 0x2f, 0x13, 0xf4, 0x82, 0x2f, 0x57, 0x08, 0xf3, + 0xe5, 0x1a, 0xfd, 0x8f, 0x2f, 0x1f, 0xd1, 0x3e, 0x5f, 0x7e, 0x46, 0x2f, 0xf5, 0x09, 0x28, 0xfd, + 0xf5, 0x72, 0x99, 0x0e, 0x81, 0xa2, 0xa7, 0x84, 0x27, 0x7b, 0xea, 0x08, 0x60, 0x16, 0xac, 0xc2, + 0x80, 0x51, 0x96, 0xc4, 0x5a, 0x29, 0x7d, 0x78, 0x35, 0x8e, 0x29, 0xf8, 0xe4, 0x81, 0x5d, 0xff, + 0x09, 0xa4, 0x71, 0x4c, 0xa3, 0x3e, 0x7b, 0xb2, 0xb1, 0x0b, 0x4f, 0xa5, 0xa7, 0x3c, 0xe9, 0x53, + 0xa8, 0xf4, 0x36, 0xec, 0x6b, 0xa8, 0x9c, 0xe1, 0xb9, 0x89, 0x9b, 0x3e, 0x8b, 0x1a, 0x49, 0x65, + 0xfe, 0x18, 0x16, 0x94, 0x15, 0x8f, 0x61, 0x41, 0x99, 0xfe, 0x2b, 0x94, 0xfa, 0x0c, 0x1f, 0x42, + 0x69, 0xce, 0xf2, 0x64, 0x81, 0x9f, 0x93, 0x05, 0x4c, 0x4a, 0x73, 0xf6, 0x1f, 0x5e, 0x10, 0x88, + 0x41, 0x98, 0xa4, 0x4e, 0x54, 0xc2, 0x45, 0xfc, 0x06, 0x2a, 0xde, 0x86, 0xcd, 0x33, 0x2f, 0xd5, + 0xb6, 0xca, 0x09, 0x69, 0x0e, 0x24, 0xd3, 0xeb, 0xbf, 0x41, 0xb5, 0xbb, 0x8e, 0x93, 0x60, 0xd5, + 0x0d, 0x3c, 0x1a, 0x7d, 0x45, 0x66, 0xaf, 0x41, 0xa4, 0x6c, 0x96, 0xbf, 0xf7, 0x87, 0xe1, 0x72, + 0x35, 0xb7, 0x7a, 0x74, 0x96, 0x7b, 0xdf, 0xb1, 0x7a, 0x74, 0xa6, 0xff, 0x2e, 0x82, 0x7a, 0xb9, + 0x5e, 0x26, 0xbe, 0xe1, 0x2d, 0x28, 0x3e, 0x78, 0x90, 0xb7, 0x94, 0x5e, 0x60, 0x96, 0x33, 0x1f, + 0x11, 0xe1, 0x2c, 0xf0, 0x68, 0x5e, 0xaa, 0x7c, 0x87, 0x7f, 0x00, 0xd9, 0x67, 0x9f, 0x82, 0x35, + 0xf3, 0xf2, 0x5b, 0x7f, 0xc9, 0x49, 0xdb, 0xf3, 0x5a, 0x66, 0x66, 0x24, 0x05, 0x0a, 0xb7, 0x41, + 0x09, 0xd6, 0x49, 0xc6, 0xc8, 0xbe, 0x03, 0x07, 0xbb, 0x8c, 0x41, 0x6e, 0x25, 0x5b, 0xdc, 0xe1, + 0x5f, 0x02, 0xc8, 0xf9, 0x41, 0xf8, 0x64, 0xe7, 0x63, 0xf4, 0xe6, 0x49, 0x6f, 0x2d, 0x93, 0x85, + 0xeb, 0xe4, 0xc1, 0xe7, 0xa9, 0xb1, 0x53, 0xbd, 0xdd, 0xc6, 0xcc, 0xda, 0xca, 0x07, 0x75, 0x4b, + 0xfa, 0xc7, 0xe8, 0xba, 0xec, 0x98, 0x36, 0x12, 0xf8, 0xa3, 0x1b, 0x99, 0xf6, 0xb9, 0x65, 0x38, + 0x03, 0x1b, 0x95, 0xee, 0xc7, 0x86, 0xc8, 0xc7, 0xc2, 0x65, 0x67, 0x88, 0xca, 0x7c, 0x12, 0x5c, + 0x8e, 0x2d, 0xc7, 0xe4, 0xbb, 0x4a, 0x3a, 0xe2, 0x1c, 0x83, 0x20, 0x89, 0xcf, 0x15, 0x62, 0xa4, + 0xb2, 0x7c, 0x78, 0x04, 0x4a, 0x91, 0xe3, 0x36, 0x30, 0xe1, 0x5f, 0x03, 0xfb, 0x06, 0xea, 0x26, + 0xfb, 0x85, 0xce, 0x92, 0xa1, 0xbb, 0x59, 0x06, 0xae, 0x87, 0x6b, 0x20, 0x64, 0x17, 0x54, 0x21, + 0x02, 0xd3, 0x23, 0x40, 0x4e, 0xe4, 0xb2, 0x78, 0x1e, 0x44, 0xab, 0x02, 0x81, 0x40, 0x5c, 0x47, + 0x2c, 0x6f, 0x1f, 0x2e, 0xf2, 0x2f, 0x38, 0xf5, 0x16, 0x45, 0xfe, 0xf5, 0x9d, 0xa2, 0x91, 0xd4, + 0x84, 0xbf, 0x03, 0xc9, 0x4f, 0xfd, 0xe4, 0x5d, 0xf4, 0x82, 0x83, 0x76, 0x3c, 0x93, 0x1c, 0xf0, + 0x49, 0x4a, 0xff, 0x11, 0x4e, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x4c, 0x68, 0x2d, 0xd6, 0x2f, + 0x08, 0x00, 0x00, } diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto index a1a357312ca8b..273cf2ef82c59 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto +++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto @@ -46,6 +46,8 @@ message Type { UINT32 = 10; UINT64 = 11; STRING = 12; + FLOAT32 = 13; + FLOAT64 = 14; // Non-primitive types. SLICE = 20; From 071718adccadd34d27fcaf0502515f9fb5b728d3 Mon Sep 17 00:00:00 2001 From: Bill Neubauer Date: Fri, 6 Apr 2018 08:55:15 -0700 Subject: [PATCH 2/5] Add diagnostics for sinks. Track time and number of elements processed by sinks as we do for sources. Fix the source logging to use the provided context rather than context.Background() --- sdks/go/pkg/beam/core/runtime/exec/datasink.go | 13 +++++++++++-- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go b/sdks/go/pkg/beam/core/runtime/exec/datasink.go index ebf8ddc240763..73f4aab651575 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go @@ -20,8 +20,11 @@ import ( "context" "fmt" "io" + "sync/atomic" + "time" "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/log" ) // DataSink is a Node. @@ -31,8 +34,10 @@ type DataSink struct { Target Target Coder *coder.Coder - enc ElementEncoder - w io.WriteCloser + enc ElementEncoder + w io.WriteCloser + count int64 + start time.Time } func (n *DataSink) ID() UnitID { @@ -52,6 +57,8 @@ func (n *DataSink) StartBundle(ctx context.Context, id string, data DataManager) return err } n.w = w + atomic.StoreInt64(&n.count, 0) + n.start = time.Now() return nil } @@ -60,6 +67,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value FullValue, values . // unit. var b bytes.Buffer + atomic.AddInt64(&n.count, 1) if err := EncodeWindowedValueHeader(value.Timestamp, &b); err != nil { return err } @@ -75,6 +83,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value FullValue, values . } func (n *DataSink) FinishBundle(ctx context.Context) error { + log.Infof(ctx, "DataSource: %d elements in %d ns", atomic.LoadInt64(&n.count), time.Now().Sub(n.start)) return n.w.Close() } diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index af7dbb97bb124..12ce8847ff47e 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -171,7 +171,7 @@ func (n *DataSource) Process(ctx context.Context) error { } func (n *DataSource) FinishBundle(ctx context.Context) error { - log.Infof(context.Background(), "DataSource: %d elements in %d ns", atomic.LoadInt64(&n.count), time.Now().Sub(n.start)) + log.Infof(ctx, "DataSource: %d elements in %d ns", atomic.LoadInt64(&n.count), time.Now().Sub(n.start)) n.sid = StreamID{} n.source = nil return n.Out.FinishBundle(ctx) From dea8f6e7ae2074ea5d8a4087d1277e2609bfcca1 Mon Sep 17 00:00:00 2001 From: Bill Neubauer Date: Thu, 21 Dec 2017 13:58:47 -0800 Subject: [PATCH 3/5] Add hooks to the Go SDK. Hooks allow a user to tailor behavior of the SDK runtime. This implementation allows hooks to run at startup, and hook into the request/response processing of FnAPI messages. Hooks included in this commit allow for profiling and tracing, tailoring the behavior of the gRPC transport, and recording sessions of FnAPI data to be used for playback with the session runner. --- .../pkg/beam/core/runtime/harness/harness.go | 25 +-- .../pkg/beam/core/runtime/harness/session.go | 97 +++++---- sdks/go/pkg/beam/core/util/hooks/hooks.go | 186 ++++++++++++++++++ sdks/go/pkg/beam/runners/dataflow/dataflow.go | 40 +++- sdks/go/pkg/beam/runners/dataflow/messages.go | 6 +- sdks/go/pkg/beam/runners/session/session.go | 6 +- sdks/go/pkg/beam/util/grpcx/hook.go | 86 ++++++++ sdks/go/pkg/beam/x/hooks/perf/perf.go | 170 ++++++++++++++++ 8 files changed, 550 insertions(+), 66 deletions(-) create mode 100644 sdks/go/pkg/beam/core/util/hooks/hooks.go create mode 100644 sdks/go/pkg/beam/util/grpcx/hook.go create mode 100644 sdks/go/pkg/beam/x/hooks/perf/perf.go diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 5e545c550f986..f8efffece71ad 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -17,16 +17,14 @@ package harness import ( - "bytes" "context" "fmt" "io" - "io/ioutil" - "runtime/pprof" "sync" "time" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" "github.com/apache/beam/sdks/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx" @@ -37,12 +35,13 @@ import ( // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin). // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not -// "pipeline-construction time" -- on each worker. It is a Fn API client and +// "pipeline-construction time" -- on each worker. It is a FnAPI client and // ultimately responsible for correctly executing user code. func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { - setupRemoteLogging(ctx, loggingEndpoint) - setupDiagnosticRecording() + hooks.DeserializeHooksFromOptions() + hooks.RunInitHooks(ctx) + setupRemoteLogging(ctx, loggingEndpoint) recordHeader() // Connect to FnAPI control server. Receive and execute work. @@ -87,8 +86,6 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { data: &DataManager{}, } - var cpuProfBuf bytes.Buffer - // gRPC requires all readers of a stream be the same goroutine, so this goroutine // is responsible for managing the network data. All it does is pull data from // the stream, and hand off the message to a goroutine to actually be handled, @@ -112,18 +109,10 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { log.Debugf(ctx, "RECV: %v", proto.MarshalTextString(req)) recordInstructionRequest(req) - if isEnabled("cpu_profiling") { - cpuProfBuf.Reset() - pprof.StartCPUProfile(&cpuProfBuf) - } + hooks.RunRequestHooks(ctx, req) resp := ctrl.handleInstruction(ctx, req) - if isEnabled("cpu_profiling") { - pprof.StopCPUProfile() - if err := ioutil.WriteFile(fmt.Sprintf("%s/cpu_prof%s", storagePath, req.InstructionId), cpuProfBuf.Bytes(), 0644); err != nil { - log.Warnf(ctx, "Failed to write CPU profile for instruction %s: %v", req.InstructionId, err) - } - } + hooks.RunResponseHooks(ctx, req, resp) recordInstructionResponse(resp) if resp != nil { diff --git a/sdks/go/pkg/beam/core/runtime/harness/session.go b/sdks/go/pkg/beam/core/runtime/harness/session.go index 67f577f5035ba..69a1d9927863d 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/session.go +++ b/sdks/go/pkg/beam/core/runtime/harness/session.go @@ -16,13 +16,14 @@ package harness import ( + "context" "fmt" - "os" + "io" "sync" - "time" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session" + "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" "github.com/golang/protobuf/proto" ) @@ -35,12 +36,13 @@ const ( dataSend ) +// capture is set by the capture hook below. +var capture io.WriteCloser + var ( selectedOptions = make(map[string]bool) - // TODO(wcn): add a buffered writer around capture and use it. - capture *os.File - sessionLock sync.Mutex - bufPool = sync.Pool{ + sessionLock sync.Mutex + bufPool = sync.Pool{ New: func() interface{} { return proto.NewBuffer(nil) }, @@ -49,35 +51,6 @@ var ( storagePath string ) -// TODO(wcn): the plan is to make these hooks available in the harness in a fashion -// similar to net/http/httptrace. They are simple function calls now to get this -// code underway. -func setupDiagnosticRecording() error { - // No recording options specified? We're done. - if runtime.GlobalOptions.Get("cpu_profiling") == "" && runtime.GlobalOptions.Get("session_recording") == "" { - return nil - } - - var err error - - storagePath = runtime.GlobalOptions.Get("storage_path") - // Any form of recording requires the destination directory to exist. - if err = os.MkdirAll(storagePath, 0755); err != nil { - return fmt.Errorf("Unable to create session directory: %v", err) - } - - if !isEnabled("session_recording") { - return nil - } - - // Set up the session recorder. - if capture, err = os.Create(fmt.Sprintf("%s/session-%v", storagePath, time.Now().Unix())); err != nil { - return fmt.Errorf("Unable to create session file: %v", err) - } - - return nil -} - func isEnabled(option string) bool { return runtime.GlobalOptions.Get(option) == "true" } @@ -213,3 +186,57 @@ func recordFooter() error { }, }) } + +// CaptureHook writes the messaging content consumed and +// produced by the worker, allowing the data to be used as +// an input for the session runner. Since workers can exist +// in a variety of environments, this allows the runner +// to tailor the behavior best for its particular needs. +type CaptureHook io.WriteCloser + +// CaptureHookFactory produces a CaptureHook from the supplied +// options. +type CaptureHookFactory func([]string) CaptureHook + +var captureHookRegistry = make(map[string]CaptureHookFactory) + +func init() { + hf := func(opts []string) hooks.Hook { + return hooks.Hook{ + Init: func(_ context.Context) error { + if len(opts) > 0 { + name, opts := hooks.Decode(opts[0]) + capture = captureHookRegistry[name](opts) + } + return nil + }, + } + } + + hooks.RegisterHook("session", hf) +} + +// RegisterCaptureHook registers a CaptureHookFactory for the +// supplied identifier. +func RegisterCaptureHook(name string, c CaptureHookFactory) { + if _, exists := captureHookRegistry[name]; exists { + panic(fmt.Sprintf("RegisterSessionCaptureHook: %s registered twice", name)) + } + captureHookRegistry[name] = c +} + +// EnableCaptureHook is called to request the use of a hook in a pipeline. +// It updates the supplied pipelines to capture this request. +func EnableCaptureHook(name string, opts []string) { + if _, exists := captureHookRegistry[name]; !exists { + panic(fmt.Sprintf("EnableHook: %s not registered", name)) + } + if exists, opts := hooks.IsEnabled("session"); exists { + n, _ := hooks.Decode(opts[0]) + if n != name { + panic(fmt.Sprintf("EnableHook: can't enable hook %s, hook %s already enabled", name, n)) + } + } + + hooks.EnableHook("session", hooks.Encode(name, opts)) +} diff --git a/sdks/go/pkg/beam/core/util/hooks/hooks.go b/sdks/go/pkg/beam/core/util/hooks/hooks.go new file mode 100644 index 0000000000000..ea30cc6e50fbb --- /dev/null +++ b/sdks/go/pkg/beam/core/util/hooks/hooks.go @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package hooks allows runners to tailor execution of the worker harness. +// +// Examples of customization: +// +// gRPC integration +// session recording +// profile recording +// +// Registration methods for hooks must be called prior to calling beam.Init() +// Request methods for hooks must be called as part of building the pipeline +// request for the runner's Execute method. +package hooks + +import ( + "bytes" + "context" + "encoding/csv" + "encoding/json" + "fmt" + "strings" + + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" + "github.com/apache/beam/sdks/go/pkg/beam/log" + fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" +) + +var ( + hookRegistry = make(map[string]HookFactory) + enabledHooks = make(map[string][]string) + activeHooks = make(map[string]Hook) +) + +// A Hook is a set of hooks to run at various stages of executing a +// pipeline. +type Hook struct { + // Init is called once at the startup of the worker prior to + // connecting to the FnAPI services. + Init InitHook + // Req is called each time the worker handles a FnAPI instruction request. + Req RequestHook + // Resp is called each time the worker generates a FnAPI instruction response. + Resp ResponseHook +} + +// InitHook is a hook that is called when the harness +// initializes. +type InitHook func(context.Context) error + +// HookFactory is a function that produces a Hook from the supplied arguments. +type HookFactory func([]string) Hook + +// RegisterHook registers a Hook for the +// supplied identifier. +func RegisterHook(name string, h HookFactory) { + hookRegistry[name] = h +} + +// RunInitHooks runs the init hooks. +func RunInitHooks(ctx context.Context) error { + // If an init hook fails to complete, the invariants of the + // system are compromised and we can't run a workflow. + // The hooks can run in any order. They should not be + // interdependent or interfere with each other. + for _, h := range activeHooks { + if h.Init != nil { + if err := h.Init(ctx); err != nil { + return err + } + } + } + return nil +} + +// RequestHook is called when handling a FnAPI instruction. +type RequestHook func(context.Context, *fnpb.InstructionRequest) error + +// RunRequestHooks runs the hooks that handle a FnAPI request. +func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) { + // The request hooks should not modify the request. + for n, h := range activeHooks { + if h.Req != nil { + if err := h.Req(ctx, req); err != nil { + log.Infof(ctx, "request hook %s failed: %v", n, err) + } + } + } +} + +// ResponseHook is called when sending a FnAPI instruction response. +type ResponseHook func(context.Context, *fnpb.InstructionRequest, *fnpb.InstructionResponse) error + +// RunResponseHooks runs the hooks that handle a FnAPI response. +func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp *fnpb.InstructionResponse) { + for n, h := range activeHooks { + if h.Resp != nil { + if err := h.Resp(ctx, req, resp); err != nil { + log.Infof(ctx, "response hook %s failed: %v", n, err) + } + } + } +} + +// SerializeHooksToOptions serializes the activated hooks and their configuration into a JSON string +// that can be deserialized later by the runner. +func SerializeHooksToOptions() { + data, err := json.Marshal(enabledHooks) + if err != nil { + // Shouldn't happen, since all the data is strings. + panic(fmt.Sprintf("Couldn't serialize hooks: %v", err)) + } + runtime.GlobalOptions.Set("hooks", string(data)) +} + +// DeserializeHooksFromOptions extracts the hook configuration information from the options and configures +// the hooks with the supplied options. +func DeserializeHooksFromOptions() { + cfg := runtime.GlobalOptions.Get("hooks") + if err := json.Unmarshal([]byte(cfg), &enabledHooks); err != nil { + // Shouldn't happen, since all the data is strings. + panic(fmt.Sprintf("DeserializeHooks failed on input %q: %v", cfg, err)) + } + + for h, opts := range enabledHooks { + activeHooks[h] = hookRegistry[h](opts) + } +} + +// EnableHook enables the hook to be run for the pipline. It will be +// receive the supplied args when the pipeline executes. It is safe +// to enable the same hook with different options, as this is necessary +// if a hook wants to compose behavior. +func EnableHook(name string, args ...string) error { + if _, ok := hookRegistry[name]; !ok { + return fmt.Errorf("EnableHook: hook %s not found", name) + } + enabledHooks[name] = args + return nil +} + +// IsEnabled returns true and the registered options if the hook is +// already enabled. +func IsEnabled(name string) (bool, []string) { + opts, ok := enabledHooks[name] + return ok, opts +} + +// Encode encodes a hook name and its arguments into a single string. +// This is a convenience function for users of this package that are composing +// hooks. +func Encode(name string, opts []string) string { + var cfg bytes.Buffer + w := csv.NewWriter(&cfg) + // This should never happen since a bytes.Buffer doesn't fail to write. + if err := w.Write(append([]string{name}, opts...)); err != nil { + panic(fmt.Sprintf("error encoding arguments: %v", err)) + } + w.Flush() + return cfg.String() +} + +// Decode decodes a hook name and its arguments from a single string. +// This is a convenience function for users of this package that are composing +// hooks. +func Decode(in string) (string, []string) { + r := csv.NewReader(strings.NewReader(in)) + s, err := r.Read() + if err != nil { + panic(fmt.Sprintf("malformed input for decoding: %s %v", in, err)) + } + return s[0], s[1:] +} diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 247a2d1487ff1..4a9f4335e3466 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -22,6 +22,7 @@ import ( "errors" "flag" "fmt" + "io" "os" "os/user" "path" @@ -31,12 +32,14 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" // Importing to get the side effect of the remote execution hook. See init(). _ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init" + "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/go/pkg/beam/log" "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts" "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts" "github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib" "github.com/apache/beam/sdks/go/pkg/beam/util/gcsx" + "github.com/apache/beam/sdks/go/pkg/beam/x/hooks/perf" "github.com/golang/protobuf/proto" "golang.org/x/oauth2/google" df "google.golang.org/api/dataflow/v1b3" @@ -55,18 +58,19 @@ var ( teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).") // SDK options - cpuProfiling = flag.String("cpu_profiling", "", "Job records CPU profiles") + cpuProfiling = flag.String("cpu_profiling", "", "Job records CPU profiles to this GCS location (optional)") sessionRecording = flag.String("session_recording", "", "Job records session transcripts") ) func init() { // Note that we also _ import harness/init to setup the remote execution hook. beam.RegisterRunner("dataflow", Execute) + + perf.RegisterProfCaptureHook("gcs_profile_writer", gcsRecorderHook) } type dataflowOptions struct { - Options map[string]string `json:"options"` - PipelineURL string `json:"pipelineUrl"` + PipelineURL string `json:"pipelineUrl"` } // Execute runs the given pipeline on Google Cloud Dataflow. It uses the @@ -90,15 +94,20 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { } if *cpuProfiling != "" { - beam.PipelineOptions.Set("cpu_profiling", "true") - beam.PipelineOptions.Set("storage_path", "/var/opt/google/traces") + perf.EnableProfCaptureHook("gcs_profile_writer", *cpuProfiling) } if *sessionRecording != "" { - beam.PipelineOptions.Set("session_recording", "true") - beam.PipelineOptions.Set("storage_path", "/var/opt/google/traces") + // TODO(wcn): BEAM-4017 + // It's a bit inconvenient for GCS because the whole object is written in + // one pass, whereas the session logs are constantly appended. We wouldn't + // want to hold all the logs in memory to flush at the end of the pipeline + // as we'd blow out memory on the worker. The implementation of the + // CaptureHook should create an internal buffer and write chunks out to GCS + // once they get to an appropriate size (50M or so?) } + hooks.SerializeHooksToOptions() options := beam.PipelineOptions.Export() // (1) Upload Go binary and model to GCS. @@ -147,9 +156,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { SdkPipelineOptions: newMsg(pipelineOptions{ DisplayData: findPipelineFlags(), Options: dataflowOptions{ - Options: options.Options, PipelineURL: modelURL, }, + GoOptions: options, }), WorkerPools: []*df.WorkerPool{{ Kind: "harness", @@ -317,3 +326,18 @@ func printJob(ctx context.Context, job *df.Job) { } log.Info(ctx, string(str)) } + +func gcsRecorderHook(opts []string) perf.CaptureHook { + bucket, prefix, err := gcsx.ParseObject(opts[0]) + if err != nil { + panic(fmt.Sprintf("Invalid hook configuration for gcsRecorderHook: %s", opts)) + } + + return func(ctx context.Context, spec string, r io.Reader) error { + client, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope) + if err != nil { + return fmt.Errorf("couldn't establish GCS client: %v", err) + } + return gcsx.WriteObject(client, bucket, path.Join(prefix, spec), r) + } +} diff --git a/sdks/go/pkg/beam/runners/dataflow/messages.go b/sdks/go/pkg/beam/runners/dataflow/messages.go index e4abea67ac7cb..80fcebdadf0e8 100644 --- a/sdks/go/pkg/beam/runners/dataflow/messages.go +++ b/sdks/go/pkg/beam/runners/dataflow/messages.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" "google.golang.org/api/googleapi" ) @@ -34,8 +35,9 @@ func newMsg(msg interface{}) googleapi.RawMessage { // pipelineOptions models Job/Environment/SdkPipelineOptions type pipelineOptions struct { - DisplayData []*displayData `json:"display_data,omitempty"` - Options interface{} `json:"options,omitempty"` + DisplayData []*displayData `json:"display_data,omitempty"` + Options interface{} `json:"options,omitempty"` + GoOptions runtime.RawOptions `json:"beam:option:go_options:v1,omitempty"` } // NOTE(herohde) 2/9/2017: most of the v1b3 messages are weakly-typed json diff --git a/sdks/go/pkg/beam/runners/session/session.go b/sdks/go/pkg/beam/runners/session/session.go index b123466dcfc08..aa8ae1ec899e3 100644 --- a/sdks/go/pkg/beam/runners/session/session.go +++ b/sdks/go/pkg/beam/runners/session/session.go @@ -51,7 +51,7 @@ func init() { var sessionFile = flag.String("session_file", "", "Session file for the runner") -// controlServer manages the Fn API control channel. +// controlServer manages the FnAPI control channel. type controlServer struct { filename string wg *sync.WaitGroup // used to signal when the session is completed @@ -207,7 +207,7 @@ func extractPortSpec(spec *rapi_pb.FunctionSpec) string { panic("unable to extract port") } -// dataServer manages the Fn API data channel. +// dataServer manages the FnAPI data channel. type dataServer struct { ctrl *controlServer } @@ -235,7 +235,7 @@ func (d *dataServer) Data(stream fnapi_pb.BeamFnData_DataServer) error { } } -// loggingServer manages the Fn API logging channel. +// loggingServer manages the FnAPI logging channel. type loggingServer struct{} // no data content func (l *loggingServer) Logging(stream fnapi_pb.BeamFnLogging_LoggingServer) error { diff --git a/sdks/go/pkg/beam/util/grpcx/hook.go b/sdks/go/pkg/beam/util/grpcx/hook.go new file mode 100644 index 0000000000000..6978915aaa6c1 --- /dev/null +++ b/sdks/go/pkg/beam/util/grpcx/hook.go @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcx + +import ( + "context" + "fmt" + "time" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" + "google.golang.org/grpc" +) + +// Hook allow a runner to customize various aspects of gRPC +// communication with the FnAPI harness. Each member of the struct +// is optional; the default behavior will be used if a value is not +// supplied. +type Hook struct { + // Dialer allows the runner to customize the gRPC dialing behavior. + Dialer func(context.Context, string, time.Duration) (*grpc.ClientConn, error) + // TODO(wcn): expose other hooks here. +} + +type HookFactory func([]string) Hook + +var hookRegistry = make(map[string]HookFactory) + +// RegisterHook registers a HookFactory for the +// supplied identifier. It panics if the same identifier is +// registered twice. +func RegisterHook(name string, c HookFactory) { + if _, exists := hookRegistry[name]; exists { + panic(fmt.Sprintf("grpc.Hook: %s registered twice", name)) + } + hookRegistry[name] = c + + hf := func(opts []string) hooks.Hook { + return hooks.Hook{ + Init: func(_ context.Context) error { + if len(opts) == 0 { + return nil + } + + name, opts := hooks.Decode(opts[0]) + grpcHook := hookRegistry[name](opts) + if grpcHook.Dialer != nil { + Dial = grpcHook.Dialer + } + return nil + }, + } + } + hooks.RegisterHook("grpc", hf) +} + +// EnableHook is called to request the use of the gRPC +// hook in a pipeline. +func EnableHook(name string, opts ...string) { + _, exists := hookRegistry[name] + if !exists { + panic(fmt.Sprintf("EnableHook: %s not registered", name)) + } + // Only one hook can be enabled. If the pipeline has two conflicting views about how to use gRPC + // that won't end well. + if exists, opts := hooks.IsEnabled("grpc"); exists { + n, _ := hooks.Decode(opts[0]) + if n != name { + panic(fmt.Sprintf("EnableHook: can't enable hook %s, hook %s already enabled", name, n)) + } + } + + hooks.EnableHook("grpc", hooks.Encode(name, opts)) +} diff --git a/sdks/go/pkg/beam/x/hooks/perf/perf.go b/sdks/go/pkg/beam/x/hooks/perf/perf.go new file mode 100644 index 0000000000000..93fd611845913 --- /dev/null +++ b/sdks/go/pkg/beam/x/hooks/perf/perf.go @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package perf is to add performance measuring hooks to a runner, such as cpu, or trace profiles. +package perf + +import ( + "bytes" + "context" + "fmt" + "io" + "runtime/pprof" + "runtime/trace" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" + fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" +) + +// CaptureHook is used by the harness to have the runner +// persist a trace record with the supplied name and comment. +// The type of trace can be determined by the prefix of the string. +// +// * prof: A profile compatible with traces produced by runtime/pprof +// * trace: A trace compatible with traces produced by runtime/trace +type CaptureHook func(context.Context, string, io.Reader) error + +// CaptureHookFactory creates a CaptureHook from the supplied options. +type CaptureHookFactory func([]string) CaptureHook + +var profCaptureHookRegistry = make(map[string]CaptureHookFactory) + +var enabledProfCaptureHooks []string + +func init() { + hf := func(opts []string) hooks.Hook { + enabledProfCaptureHooks = opts + enabled := len(enabledProfCaptureHooks) > 0 + var cpuProfBuf bytes.Buffer + return hooks.Hook{ + Req: func(_ context.Context, _ *fnpb.InstructionRequest) error { + if !enabled { + return nil + } + cpuProfBuf.Reset() + return pprof.StartCPUProfile(&cpuProfBuf) + }, + Resp: func(ctx context.Context, req *fnpb.InstructionRequest, _ *fnpb.InstructionResponse) error { + if !enabled { + return nil + } + pprof.StopCPUProfile() + for _, h := range enabledProfCaptureHooks { + name, opts := hooks.Decode(h) + if err := profCaptureHookRegistry[name](opts)(ctx, fmt.Sprintf("prof%s", req.InstructionId), &cpuProfBuf); err != nil { + return err + } + } + return nil + }, + } + } + hooks.RegisterHook("prof", hf) + + hf = func(opts []string) hooks.Hook { + var traceProfBuf bytes.Buffer + enabledTraceCaptureHooks = opts + enabled := len(enabledTraceCaptureHooks) > 0 + return hooks.Hook{ + Req: func(_ context.Context, _ *fnpb.InstructionRequest) error { + if !enabled { + return nil + } + traceProfBuf.Reset() + return trace.Start(&traceProfBuf) + }, + Resp: func(ctx context.Context, req *fnpb.InstructionRequest, _ *fnpb.InstructionResponse) error { + if !enabled { + return nil + } + trace.Stop() + for _, h := range enabledTraceCaptureHooks { + name, opts := hooks.Decode(h) + if err := traceCaptureHookRegistry[name](opts)(ctx, fmt.Sprintf("trace_prof%s", req.InstructionId), &traceProfBuf); err != nil { + return err + } + } + return nil + }, + } + } + hooks.RegisterHook("trace", hf) +} + +// RegisterProfCaptureHook registers a CaptureHookFactory for the +// supplied identifier. It panics if the same identifier is +// registered twice. +func RegisterProfCaptureHook(name string, c CaptureHookFactory) { + if _, exists := profCaptureHookRegistry[name]; exists { + panic(fmt.Sprintf("RegisterProfCaptureHook: %s registered twice", name)) + } + profCaptureHookRegistry[name] = c +} + +// EnableProfCaptureHook actives a registered profile capture hook for a given pipeline. +func EnableProfCaptureHook(name string, opts ...string) { + _, exists := profCaptureHookRegistry[name] + if !exists { + panic(fmt.Sprintf("EnableProfCaptureHook: %s not registered", name)) + } + + enc := hooks.Encode(name, opts) + + for i, h := range enabledProfCaptureHooks { + n, _ := hooks.Decode(h) + if h == n { + // Rewrite the registration with the current arguments + enabledProfCaptureHooks[i] = enc + hooks.EnableHook("prof", enabledProfCaptureHooks...) + return + } + } + + enabledProfCaptureHooks = append(enabledProfCaptureHooks, enc) + hooks.EnableHook("prof", enabledProfCaptureHooks...) +} + +var traceCaptureHookRegistry = make(map[string]CaptureHookFactory) +var enabledTraceCaptureHooks []string + +// RegisterTraceCaptureHook registers a CaptureHookFactory for the +// supplied identifier. It panics if the same identifier is +// registered twice. +func RegisterTraceCaptureHook(name string, c CaptureHookFactory) { + if _, exists := traceCaptureHookRegistry[name]; exists { + panic(fmt.Sprintf("RegisterTraceCaptureHook: %s registered twice", name)) + } + traceCaptureHookRegistry[name] = c +} + +// EnableTraceCaptureHook actives a registered profile capture hook for a given pipeline. +func EnableTraceCaptureHook(name string, opts ...string) { + if _, exists := traceCaptureHookRegistry[name]; !exists { + panic(fmt.Sprintf("EnableTraceCaptureHook: %s not registered", name)) + } + + enc := hooks.Encode(name, opts) + for i, h := range enabledTraceCaptureHooks { + n, _ := hooks.Decode(h) + if h == n { + // Rewrite the registration with the current arguments + enabledTraceCaptureHooks[i] = enc + hooks.EnableHook("trace", enabledTraceCaptureHooks...) + return + } + } + enabledTraceCaptureHooks = append(enabledTraceCaptureHooks, enc) + hooks.EnableHook("trace", enabledTraceCaptureHooks...) +} From 1d8d3266be2c59e23d060bcd46bb40e5153866f7 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 6 Apr 2018 11:13:28 -0700 Subject: [PATCH 4/5] [BEAM-3250] Add JUnit test reporting to Jenkins. --- .../job_beam_PostCommit_Java_ValidatesRunner_Apex.groovy | 5 +++++ .../job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy | 5 +++++ .../job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy | 5 +++++ .../job_beam_PostCommit_Java_ValidatesRunner_Gearpump.groovy | 5 +++++ .../job_beam_PostCommit_Java_ValidatesRunner_Spark.groovy | 5 +++++ 5 files changed, 25 insertions(+) diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Apex.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Apex.groovy index 512cfa976a14b..f1c7e18c066a5 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Apex.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Apex.groovy @@ -36,6 +36,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Apex_Gradle') { '--rerun-tasks', ] + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + // Sets that this is a PostCommit job. common_job_properties.setPostCommit(delegate) diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy index 318c834957fc4..5159f8afae286 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy @@ -37,6 +37,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle') { '--rerun-tasks', ] + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + // Sets that this is a PostCommit job. common_job_properties.setPostCommit(delegate) diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy index af9855adc892f..6daed0bf95b7f 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy @@ -34,6 +34,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Flink_Gradle') { '--rerun-tasks', ] + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + // Sets that this is a PostCommit job. common_job_properties.setPostCommit(delegate) diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Gearpump.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Gearpump.groovy index 8ba0a71dc59b4..ef6b34ccdf85e 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Gearpump.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Gearpump.groovy @@ -39,6 +39,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle') { '--rerun-tasks', ] + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + // Sets that this is a PostCommit job. // 0 5 31 2 * will run on Feb 31 (i.e. never) according to job properties. // In post-commit this job triggers only on SCM changes. diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Spark.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Spark.groovy index 33c0d3f8f4d46..7824e91e8d256 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Spark.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Spark.groovy @@ -36,6 +36,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Spark_Gradle') { '--rerun-tasks', ] + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + // Sets that this is a PostCommit job. common_job_properties.setPostCommit(delegate) From b83fb93ec72884051fdf0d36bd364c46ec76c0e0 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Fri, 6 Apr 2018 12:24:28 -0700 Subject: [PATCH 5/5] Migrate JUnit test archiving to Python Gradle build --- ...b_beam_PreCommit_Python_GradleBuild.groovy | 6 ++ ..._beam_PreCommit_Python_MavenInstall.groovy | 57 ------------------- sdks/python/setup.cfg | 3 + sdks/python/tox.ini | 6 +- 4 files changed, 12 insertions(+), 60 deletions(-) delete mode 100644 .test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy diff --git a/.test-infra/jenkins/job_beam_PreCommit_Python_GradleBuild.groovy b/.test-infra/jenkins/job_beam_PreCommit_Python_GradleBuild.groovy index d2fdef72ad3b7..255277d471a88 100644 --- a/.test-infra/jenkins/job_beam_PreCommit_Python_GradleBuild.groovy +++ b/.test-infra/jenkins/job_beam_PreCommit_Python_GradleBuild.groovy @@ -41,6 +41,12 @@ job('beam_PreCommit_Python_GradleBuild') { '--rerun-tasks', ] + // Publish all test results to Jenkins. Note that Nose documentation + // specifically mentions that it produces JUnit compatible test results. + publishers { + archiveJunit('**/nosetests.xml') + } + def gradle_command_line = './gradlew ' + gradle_switches.join(' ') + ' :pythonPreCommit' // Sets that this is a PreCommit job. common_job_properties.setPreCommit(delegate, gradle_command_line, 'Run Python Gradle PreCommit') diff --git a/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy b/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy deleted file mode 100644 index f0429e4aa1866..0000000000000 --- a/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import common_job_properties - -// This is the Python precommit which runs a maven install, and the current set -// of precommit tests. -mavenJob('beam_PreCommit_Python_MavenInstall') { - description('Runs an install of the current GitHub Pull Request.') - - previousNames('beam_PreCommit_MavenVerify') - - // Execute concurrent builds if necessary. - concurrentBuild() - - // Set common parameters. - common_job_properties.setTopLevelMainJobProperties( - delegate, - 'master', - 150) - - // Set Maven parameters. - common_job_properties.setMavenConfig(delegate) - - // Sets that this is a PreCommit job. - common_job_properties.setPreCommit(delegate, 'mvn clean install -pl sdks/python -am -amd', 'Run Python PreCommit') - - // Maven modules for this job: The Python SDK, its dependencies, and things that depend on it, - // excluding the container. - goals([ - '--batch-mode', - '--errors', - '--activate-profiles release', - '--projects sdks/python,!sdks/python/container', - '--also-make', - '--also-make-dependents', - '-D pullRequest=$ghprbPullId', - 'help:effective-settings', - 'clean', - 'install', - ].join(' ')) -} diff --git a/sdks/python/setup.cfg b/sdks/python/setup.cfg index 0f6acea2c0985..041d6fa55f49d 100644 --- a/sdks/python/setup.cfg +++ b/sdks/python/setup.cfg @@ -26,6 +26,9 @@ verbosity=2 # fast_coders_test and typecoders_test. exclude=fast_coders_test|typecoders_test +# Creates an xml file compatible with standard XUnit XML format. +with-xunit=1 + # Configurations to control coverage.py. [coverage:run] branch = True diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index ff88ac42fa831..3cc1682368435 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -50,7 +50,7 @@ commands = pip --version {toxinidir}/run_tox_cleanup.sh python apache_beam/examples/complete/autocomplete_test.py - python setup.py test + python setup.py nosetests {toxinidir}/run_tox_cleanup.sh [testenv:py27-cython] @@ -64,7 +64,7 @@ commands = pip --version {toxinidir}/run_tox_cleanup.sh python apache_beam/examples/complete/autocomplete_test.py - python setup.py test + python setup.py nosetests {toxinidir}/run_tox_cleanup.sh [testenv:py27-gcp] @@ -74,7 +74,7 @@ commands = pip --version {toxinidir}/run_tox_cleanup.sh python apache_beam/examples/complete/autocomplete_test.py - python setup.py test + python setup.py nosetests {toxinidir}/run_tox_cleanup.sh [testenv:py27-lint]