diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Apex.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Apex.groovy index 512cfa976a14b..f1c7e18c066a5 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Apex.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Apex.groovy @@ -36,6 +36,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Apex_Gradle') { '--rerun-tasks', ] + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + // Sets that this is a PostCommit job. common_job_properties.setPostCommit(delegate) diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy index 318c834957fc4..5159f8afae286 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy @@ -37,6 +37,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle') { '--rerun-tasks', ] + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + // Sets that this is a PostCommit job. common_job_properties.setPostCommit(delegate) diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy index 314e96881254e..f06c9d91a84c3 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy @@ -34,6 +34,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Flink_Gradle') { '--rerun-tasks', ] + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + // Sets that this is a PostCommit job. common_job_properties.setPostCommit(delegate) diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Gearpump.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Gearpump.groovy index 8ba0a71dc59b4..ef6b34ccdf85e 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Gearpump.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Gearpump.groovy @@ -39,6 +39,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle') { '--rerun-tasks', ] + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + // Sets that this is a PostCommit job. // 0 5 31 2 * will run on Feb 31 (i.e. never) according to job properties. // In post-commit this job triggers only on SCM changes. diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Spark.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Spark.groovy index 33c0d3f8f4d46..7824e91e8d256 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Spark.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Spark.groovy @@ -36,6 +36,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Spark_Gradle') { '--rerun-tasks', ] + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + // Sets that this is a PostCommit job. common_job_properties.setPostCommit(delegate) diff --git a/.test-infra/jenkins/job_beam_PreCommit_Python_GradleBuild.groovy b/.test-infra/jenkins/job_beam_PreCommit_Python_GradleBuild.groovy index d2fdef72ad3b7..255277d471a88 100644 --- a/.test-infra/jenkins/job_beam_PreCommit_Python_GradleBuild.groovy +++ b/.test-infra/jenkins/job_beam_PreCommit_Python_GradleBuild.groovy @@ -41,6 +41,12 @@ job('beam_PreCommit_Python_GradleBuild') { '--rerun-tasks', ] + // Publish all test results to Jenkins. Note that Nose documentation + // specifically mentions that it produces JUnit compatible test results. + publishers { + archiveJunit('**/nosetests.xml') + } + def gradle_command_line = './gradlew ' + gradle_switches.join(' ') + ' :pythonPreCommit' // Sets that this is a PreCommit job. common_job_properties.setPreCommit(delegate, gradle_command_line, 'Run Python Gradle PreCommit') diff --git a/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy b/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy deleted file mode 100644 index f0429e4aa1866..0000000000000 --- a/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import common_job_properties - -// This is the Python precommit which runs a maven install, and the current set -// of precommit tests. -mavenJob('beam_PreCommit_Python_MavenInstall') { - description('Runs an install of the current GitHub Pull Request.') - - previousNames('beam_PreCommit_MavenVerify') - - // Execute concurrent builds if necessary. - concurrentBuild() - - // Set common parameters. - common_job_properties.setTopLevelMainJobProperties( - delegate, - 'master', - 150) - - // Set Maven parameters. - common_job_properties.setMavenConfig(delegate) - - // Sets that this is a PreCommit job. - common_job_properties.setPreCommit(delegate, 'mvn clean install -pl sdks/python -am -amd', 'Run Python PreCommit') - - // Maven modules for this job: The Python SDK, its dependencies, and things that depend on it, - // excluding the container. - goals([ - '--batch-mode', - '--errors', - '--activate-profiles release', - '--projects sdks/python,!sdks/python/container', - '--also-make', - '--also-make-dependents', - '-D pullRequest=$ghprbPullId', - 'help:effective-settings', - 'clean', - 'install', - ].join(' ')) -} diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go index c7a2d4cabc571..b7f62186135d4 100644 --- a/sdks/go/pkg/beam/coder.go +++ b/sdks/go/pkg/beam/coder.go @@ -107,6 +107,13 @@ func inferCoder(t FullType) (*coder.Coder, error) { return nil, err } return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil + case reflectx.Float32, reflectx.Float64: + c, err := coderx.NewFloat(t.Type()) + if err != nil { + return nil, err + } + return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil + case reflectx.String, reflectx.ByteSlice: // TODO(BEAM-3580): we should stop encoding string using the bytecoder. It forces // conversions at runtime in inconvenient places. diff --git a/sdks/go/pkg/beam/core/runtime/coderx/float.go b/sdks/go/pkg/beam/core/runtime/coderx/float.go new file mode 100644 index 0000000000000..403cd30ac1dc9 --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/coderx/float.go @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coderx + +import ( + "fmt" + "math" + "math/bits" + "reflect" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" + "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" +) + +func init() { + runtime.RegisterFunction(encFloat) + runtime.RegisterFunction(decFloat) +} + +func encFloat(v typex.T) []byte { + var val float64 + switch n := v.(type) { + case float32: + val = float64(n) + case float64: + val = n + default: + panic(fmt.Sprintf("received unknown value type: want a float, got %T", n)) + } + + return encVarUintZ(bits.ReverseBytes64(math.Float64bits(val))) +} + +func decFloat(t reflect.Type, data []byte) (typex.T, error) { + uval, err := decVarUintZ(reflectx.Uint64, data) + if err != nil { + return nil, fmt.Errorf("invalid float encoding for: %v", data) + } + + n := math.Float64frombits(bits.ReverseBytes64(uval.(uint64))) + switch t.Kind() { + case reflect.Float64: + return n, nil + case reflect.Float32: + return float32(n), nil + default: + panic(fmt.Sprintf("unreachable statement: expected a float, got %v", t)) + } +} + +// NewFloat returns a coder for the given float type. It uses the same +// encoding scheme as the gob package. +func NewFloat(t reflect.Type) (*coder.CustomCoder, error) { + switch t.Kind() { + case reflect.Float32, reflect.Float64: + return coder.NewCustomCoder("float", t, encFloat, decFloat) + default: + return nil, fmt.Errorf("not a float type: %v", t) + } +} diff --git a/sdks/go/pkg/beam/core/runtime/coderx/float_test.go b/sdks/go/pkg/beam/core/runtime/coderx/float_test.go new file mode 100644 index 0000000000000..460057e900b8d --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/coderx/float_test.go @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coderx + +import ( + "reflect" + "testing" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" +) + +func TestFloat(t *testing.T) { + tests := []interface{}{ + float32(0.0), + float32(94.5), + float64(-127.25), + float64(8675309.4286), + } + + for _, v := range tests { + typ := reflect.ValueOf(v).Type() + + data := encFloat(v) + result, err := decFloat(typ, data) + if err != nil { + t.Fatalf("dec(enc(%v)) failed: %v", v, err) + } + + if v != result { + t.Errorf("dec(enc(%v)) = %v, want id", v, result) + } + resultT := reflectx.UnderlyingType(reflect.ValueOf(result)).Type() + if resultT != typ { + t.Errorf("type(dec(enc(%v))) = %v, want id", typ, resultT) + } + } +} diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go b/sdks/go/pkg/beam/core/runtime/exec/datasink.go index ebf8ddc240763..73f4aab651575 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go @@ -20,8 +20,11 @@ import ( "context" "fmt" "io" + "sync/atomic" + "time" "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/log" ) // DataSink is a Node. @@ -31,8 +34,10 @@ type DataSink struct { Target Target Coder *coder.Coder - enc ElementEncoder - w io.WriteCloser + enc ElementEncoder + w io.WriteCloser + count int64 + start time.Time } func (n *DataSink) ID() UnitID { @@ -52,6 +57,8 @@ func (n *DataSink) StartBundle(ctx context.Context, id string, data DataManager) return err } n.w = w + atomic.StoreInt64(&n.count, 0) + n.start = time.Now() return nil } @@ -60,6 +67,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value FullValue, values . // unit. var b bytes.Buffer + atomic.AddInt64(&n.count, 1) if err := EncodeWindowedValueHeader(value.Timestamp, &b); err != nil { return err } @@ -75,6 +83,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value FullValue, values . } func (n *DataSink) FinishBundle(ctx context.Context) error { + log.Infof(ctx, "DataSource: %d elements in %d ns", atomic.LoadInt64(&n.count), time.Now().Sub(n.start)) return n.w.Close() } diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index af7dbb97bb124..12ce8847ff47e 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -171,7 +171,7 @@ func (n *DataSource) Process(ctx context.Context) error { } func (n *DataSource) FinishBundle(ctx context.Context) error { - log.Infof(context.Background(), "DataSource: %d elements in %d ns", atomic.LoadInt64(&n.count), time.Now().Sub(n.start)) + log.Infof(ctx, "DataSource: %d elements in %d ns", atomic.LoadInt64(&n.count), time.Now().Sub(n.start)) n.sid = StreamID{} n.source = nil return n.Out.FinishBundle(ctx) diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go index f38a698d39172..5615b71dfbea4 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go @@ -347,6 +347,10 @@ func encodeType(t reflect.Type) (*v1.Type, error) { return &v1.Type{Kind: v1.Type_UINT32}, nil case reflect.Uint64: return &v1.Type{Kind: v1.Type_UINT64}, nil + case reflect.Float32: + return &v1.Type{Kind: v1.Type_FLOAT32}, nil + case reflect.Float64: + return &v1.Type{Kind: v1.Type_FLOAT64}, nil case reflect.String: return &v1.Type{Kind: v1.Type_STRING}, nil @@ -485,6 +489,10 @@ func decodeType(t *v1.Type) (reflect.Type, error) { return reflectx.Uint32, nil case v1.Type_UINT64: return reflectx.Uint64, nil + case v1.Type_FLOAT32: + return reflectx.Float32, nil + case v1.Type_FLOAT64: + return reflectx.Float64, nil case v1.Type_STRING: return reflectx.String, nil diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go index 97f591f070049..94dfd64847a21 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go @@ -42,18 +42,20 @@ type Type_Kind int32 const ( Type_INVALID Type_Kind = 0 // Primitive. - Type_BOOL Type_Kind = 1 - Type_INT Type_Kind = 2 - Type_INT8 Type_Kind = 3 - Type_INT16 Type_Kind = 4 - Type_INT32 Type_Kind = 5 - Type_INT64 Type_Kind = 6 - Type_UINT Type_Kind = 7 - Type_UINT8 Type_Kind = 8 - Type_UINT16 Type_Kind = 9 - Type_UINT32 Type_Kind = 10 - Type_UINT64 Type_Kind = 11 - Type_STRING Type_Kind = 12 + Type_BOOL Type_Kind = 1 + Type_INT Type_Kind = 2 + Type_INT8 Type_Kind = 3 + Type_INT16 Type_Kind = 4 + Type_INT32 Type_Kind = 5 + Type_INT64 Type_Kind = 6 + Type_UINT Type_Kind = 7 + Type_UINT8 Type_Kind = 8 + Type_UINT16 Type_Kind = 9 + Type_UINT32 Type_Kind = 10 + Type_UINT64 Type_Kind = 11 + Type_STRING Type_Kind = 12 + Type_FLOAT32 Type_Kind = 13 + Type_FLOAT64 Type_Kind = 14 // Non-primitive types. Type_SLICE Type_Kind = 20 Type_STRUCT Type_Kind = 21 @@ -78,6 +80,8 @@ var Type_Kind_name = map[int32]string{ 10: "UINT32", 11: "UINT64", 12: "STRING", + 13: "FLOAT32", + 14: "FLOAT64", 20: "SLICE", 21: "STRUCT", 22: "FUNC", @@ -100,6 +104,8 @@ var Type_Kind_value = map[string]int32{ "UINT32": 10, "UINT64": 11, "STRING": 12, + "FLOAT32": 13, + "FLOAT64": 14, "SLICE": 20, "STRUCT": 21, "FUNC": 22, @@ -749,73 +755,74 @@ func init() { func init() { proto.RegisterFile("v1.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1079 bytes of a gzipped FileDescriptorProto + // 1091 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdb, 0x6e, 0xdb, 0x46, - 0x13, 0x0e, 0x45, 0x89, 0x87, 0x91, 0x94, 0x6c, 0xf6, 0x77, 0xfc, 0x33, 0x46, 0x8a, 0x28, 0xbc, - 0xa9, 0xda, 0x18, 0x2a, 0x2c, 0x07, 0x46, 0xd1, 0x3b, 0x45, 0xa2, 0x1d, 0xc2, 0x32, 0x25, 0xac, + 0x13, 0x0e, 0x45, 0x89, 0x87, 0x91, 0xe4, 0x6c, 0xf6, 0x77, 0xfc, 0x33, 0x46, 0x8a, 0x28, 0xbc, + 0xa9, 0xda, 0x18, 0x2a, 0x2c, 0x1b, 0x46, 0xd1, 0x3b, 0x45, 0xa2, 0x1c, 0xc2, 0x34, 0x25, 0xac, 0x28, 0xd9, 0xe9, 0x8d, 0xc0, 0x88, 0x2b, 0x99, 0xb5, 0xb4, 0x64, 0x49, 0xca, 0x88, 0xd0, 0x57, - 0x29, 0xfa, 0x1c, 0x7d, 0x87, 0x3e, 0x47, 0xde, 0xa3, 0xd8, 0x25, 0x29, 0x5b, 0xae, 0x8b, 0x02, - 0xb9, 0xda, 0xd9, 0x99, 0xef, 0xdb, 0x39, 0xec, 0xec, 0x90, 0xa0, 0xdd, 0x1e, 0xb5, 0xa2, 0x38, - 0x4c, 0x43, 0x5c, 0xba, 0x3d, 0x32, 0xbf, 0xa8, 0x50, 0x76, 0x37, 0x11, 0xc5, 0x6f, 0xa0, 0x7c, - 0x13, 0x30, 0xdf, 0x90, 0x1a, 0x52, 0xf3, 0x69, 0xbb, 0xde, 0xba, 0x3d, 0x6a, 0x71, 0x7d, 0xeb, - 0x3c, 0x60, 0x3e, 0x11, 0x26, 0x6c, 0x82, 0x4a, 0x97, 0x74, 0x45, 0x59, 0x6a, 0x94, 0x1a, 0x52, - 0xb3, 0xda, 0xd6, 0x0a, 0x14, 0x29, 0x0c, 0xf8, 0x10, 0x94, 0x79, 0x40, 0x97, 0x7e, 0x62, 0xc8, - 0x0d, 0xb9, 0x59, 0x6d, 0xef, 0x6d, 0x0f, 0x1a, 0xa5, 0xf1, 0x7a, 0x96, 0x9e, 0x72, 0x23, 0xc9, - 0x31, 0xf8, 0x08, 0x9e, 0x45, 0x5e, 0xec, 0xad, 0x68, 0x4a, 0xe3, 0x69, 0xba, 0x89, 0x68, 0x62, - 0x94, 0x05, 0xed, 0xee, 0xe4, 0xa7, 0x5b, 0x00, 0xdf, 0x26, 0xf8, 0x2d, 0xd4, 0x62, 0x9a, 0xae, - 0x63, 0x96, 0xe3, 0x2b, 0x0f, 0xf0, 0xd5, 0xcc, 0x9a, 0x81, 0x5f, 0x43, 0x35, 0x48, 0xa6, 0xb7, - 0x5e, 0x1c, 0x78, 0x7e, 0x30, 0x33, 0x94, 0x86, 0xd4, 0xd4, 0x08, 0x04, 0xc9, 0x24, 0xd7, 0xe0, - 0xb7, 0xa0, 0xcd, 0xae, 0x3d, 0x36, 0xf5, 0x83, 0xd8, 0x50, 0x45, 0xe6, 0x68, 0x1b, 0x70, 0xf7, - 0xda, 0x63, 0xbd, 0x20, 0x26, 0xea, 0x2c, 0x13, 0xf0, 0xf7, 0xa0, 0x26, 0x11, 0x9d, 0x05, 0xde, - 0xd2, 0xd0, 0x1e, 0x60, 0x47, 0x99, 0x9e, 0x14, 0x00, 0xfc, 0x06, 0x6a, 0xf4, 0x73, 0x4a, 0x63, - 0xe6, 0x2d, 0xa7, 0x37, 0x74, 0x63, 0xe8, 0x0d, 0xa9, 0xa9, 0x93, 0x6a, 0xa1, 0x3b, 0xa7, 0x9b, - 0x83, 0x3f, 0x25, 0xa8, 0xde, 0x2b, 0x0a, 0xc6, 0x50, 0x66, 0xde, 0x8a, 0x8a, 0x1b, 0xd0, 0x89, - 0x90, 0xf1, 0x4b, 0xd0, 0xa2, 0x9b, 0xc5, 0x34, 0xf2, 0xd2, 0x6b, 0x51, 0x73, 0x9d, 0xa8, 0xd1, - 0xcd, 0x62, 0xe8, 0xa5, 0xd7, 0xf8, 0x15, 0x94, 0x79, 0x05, 0x0c, 0xf9, 0xc1, 0x55, 0x08, 0x2d, - 0x46, 0x20, 0xa7, 0xde, 0xc2, 0x28, 0x0b, 0x0e, 0x17, 0xf1, 0x3e, 0x28, 0xe1, 0x7c, 0x9e, 0xd0, - 0xd4, 0xa8, 0x34, 0xa4, 0xa6, 0x4c, 0xf2, 0x1d, 0xde, 0x83, 0x4a, 0xc0, 0x7c, 0xfa, 0xd9, 0x50, - 0x1a, 0x72, 0xb3, 0x42, 0xb2, 0x0d, 0x7e, 0x05, 0xba, 0xc7, 0x42, 0xb6, 0x59, 0x85, 0xeb, 0x44, - 0x54, 0x46, 0x23, 0x77, 0x0a, 0xf3, 0x8b, 0x04, 0x65, 0xde, 0x18, 0xb8, 0x0a, 0xaa, 0xed, 0x4c, - 0x3a, 0x7d, 0xbb, 0x87, 0x9e, 0x60, 0x0d, 0xca, 0xef, 0x07, 0x83, 0x3e, 0x92, 0xb0, 0x0a, 0xb2, - 0xed, 0xb8, 0xa8, 0xc4, 0x55, 0xb6, 0xe3, 0xfe, 0x88, 0x64, 0xac, 0x43, 0xc5, 0x76, 0xdc, 0xa3, - 0x13, 0x54, 0xce, 0xc5, 0xe3, 0x36, 0xaa, 0xe4, 0xe2, 0xc9, 0x3b, 0xa4, 0x70, 0xe8, 0x98, 0x93, - 0x54, 0xae, 0x1c, 0x0b, 0x96, 0x86, 0x01, 0x94, 0x71, 0x46, 0xd3, 0x0b, 0xf9, 0xb8, 0x8d, 0xa0, - 0x90, 0x4f, 0xde, 0xa1, 0x2a, 0x97, 0x47, 0x2e, 0xb1, 0x9d, 0x33, 0x54, 0xe3, 0xd4, 0x51, 0xdf, - 0xee, 0x5a, 0x68, 0x2f, 0x57, 0x8f, 0xbb, 0x2e, 0x7a, 0xc1, 0xcf, 0x3e, 0x1d, 0x3b, 0x5d, 0xb4, - 0xcf, 0xa5, 0xee, 0x87, 0x8e, 0x83, 0xfe, 0xcf, 0x63, 0x1c, 0xba, 0x04, 0x19, 0x3c, 0x87, 0xd1, - 0xd0, 0xea, 0xda, 0x9d, 0x3e, 0x7a, 0x89, 0x6b, 0xa0, 0x59, 0x57, 0xae, 0x45, 0x9c, 0x4e, 0x1f, - 0x1d, 0x98, 0xdf, 0x82, 0x9a, 0x77, 0x01, 0x27, 0x12, 0xab, 0x3b, 0xc9, 0xd2, 0x1c, 0x59, 0x4e, - 0x0f, 0x49, 0x59, 0xc2, 0xee, 0x07, 0x54, 0x32, 0xff, 0x90, 0x40, 0xcd, 0x7b, 0x40, 0xd4, 0xa4, - 0xdf, 0xb7, 0xce, 0x3a, 0x7d, 0xf4, 0x84, 0x07, 0x64, 0x11, 0x32, 0x20, 0x48, 0xe2, 0xfa, 0xee, - 0xc0, 0x71, 0xad, 0xab, 0xbc, 0x30, 0xee, 0xc7, 0xa1, 0x85, 0x64, 0x5c, 0x07, 0xdd, 0x9a, 0x58, - 0x8e, 0xeb, 0xda, 0x17, 0x16, 0x02, 0xac, 0x40, 0xe9, 0x7c, 0x82, 0xaa, 0x9c, 0xd8, 0x1d, 0x9c, - 0xbd, 0x3f, 0x47, 0x75, 0xfc, 0x1c, 0xea, 0x97, 0xb6, 0xd3, 0x1b, 0x5c, 0x5a, 0xbd, 0x49, 0xa7, - 0x3f, 0xb6, 0xd0, 0x53, 0x5c, 0x01, 0xc9, 0x45, 0xcf, 0xf8, 0x32, 0x46, 0x88, 0x2f, 0x13, 0xf4, - 0x9c, 0x2f, 0x97, 0x08, 0xf3, 0xe5, 0x0a, 0xfd, 0x8f, 0x2f, 0x1f, 0xd1, 0x1e, 0x5f, 0x7e, 0x46, - 0x2f, 0xcc, 0x09, 0x68, 0xa7, 0xeb, 0xe5, 0x52, 0x3c, 0xf5, 0xa2, 0x73, 0xa4, 0x47, 0x3b, 0xe7, - 0x10, 0x60, 0x16, 0xae, 0xa2, 0x90, 0x51, 0x96, 0x26, 0x46, 0x49, 0x3c, 0xaf, 0x1a, 0xc7, 0x14, - 0x7c, 0x72, 0xcf, 0x6e, 0xfe, 0x04, 0xca, 0x38, 0xa1, 0xf1, 0x29, 0x7b, 0xb4, 0x7d, 0x0b, 0x4f, - 0xa5, 0xc7, 0x3c, 0x99, 0x53, 0xa8, 0xf4, 0x36, 0xec, 0x6b, 0xa8, 0x9c, 0xe1, 0x7b, 0xa9, 0x27, - 0x9a, 0xbf, 0x46, 0x84, 0xcc, 0x5b, 0x7e, 0x41, 0x59, 0xd1, 0xf2, 0x0b, 0xca, 0xcc, 0x5f, 0xa1, - 0x74, 0xca, 0xf0, 0x01, 0x94, 0xe6, 0x2c, 0x4f, 0x16, 0xf8, 0x39, 0x59, 0xc0, 0xa4, 0x34, 0x67, - 0xff, 0xe1, 0x05, 0x81, 0x1c, 0x46, 0xa9, 0x70, 0xa2, 0x13, 0x2e, 0xe2, 0xd7, 0x50, 0xf1, 0x37, - 0x6c, 0x9e, 0x79, 0xa9, 0xb6, 0x75, 0x4e, 0x10, 0x39, 0x90, 0x4c, 0x6f, 0xfe, 0x06, 0xd5, 0xee, - 0x3a, 0x49, 0xc3, 0x55, 0x37, 0xf4, 0x69, 0xfc, 0x15, 0x99, 0xbd, 0x02, 0x99, 0xb2, 0x59, 0xfe, - 0xaa, 0xef, 0x87, 0xcb, 0xd5, 0xdc, 0xea, 0xd3, 0x59, 0xee, 0x7d, 0xc7, 0xea, 0xd3, 0x99, 0xf9, - 0xbb, 0x0c, 0xfa, 0xc5, 0x7a, 0x99, 0x06, 0x96, 0xbf, 0xa0, 0x78, 0xff, 0x5e, 0xde, 0x8a, 0xb8, - 0xc0, 0x2c, 0x67, 0x3e, 0x08, 0xa2, 0x59, 0xe8, 0xd3, 0xbc, 0x54, 0xf9, 0x0e, 0xff, 0x00, 0x6a, - 0xc0, 0x3e, 0x85, 0x6b, 0xe6, 0xe7, 0xb7, 0xfe, 0x82, 0x93, 0xb6, 0xe7, 0xb5, 0xec, 0xcc, 0x48, - 0x0a, 0x14, 0x6e, 0x83, 0x16, 0xae, 0xd3, 0x8c, 0x91, 0x4d, 0xfb, 0xfd, 0x5d, 0xc6, 0x20, 0xb7, - 0x92, 0x2d, 0xee, 0xe0, 0x2f, 0x09, 0xd4, 0xfc, 0x20, 0x7c, 0xbc, 0xf3, 0xc9, 0x79, 0xfd, 0xa8, - 0xb7, 0x96, 0xcd, 0xa2, 0x75, 0x7a, 0xef, 0x23, 0xd4, 0xd8, 0xa9, 0xde, 0x6e, 0x63, 0x66, 0x6d, - 0x15, 0x80, 0xbe, 0x25, 0xfd, 0x63, 0x40, 0x5d, 0x74, 0x6c, 0x07, 0x49, 0xfc, 0xd1, 0x8d, 0x6c, - 0xe7, 0xac, 0x6f, 0xb9, 0x03, 0x07, 0x95, 0xee, 0xc6, 0x86, 0xcc, 0xc7, 0xc2, 0x45, 0x67, 0x88, - 0xca, 0x7c, 0x12, 0x5c, 0x8c, 0xfb, 0xae, 0xcd, 0x77, 0x15, 0x31, 0xc8, 0x5c, 0x8b, 0x20, 0x85, - 0xcf, 0x15, 0x62, 0x09, 0x59, 0x3d, 0x38, 0x04, 0xad, 0xc8, 0x71, 0x1b, 0x98, 0xf4, 0xaf, 0x81, - 0x7d, 0x03, 0x75, 0x9b, 0xfd, 0x42, 0x67, 0xe9, 0xd0, 0xdb, 0x2c, 0x43, 0xcf, 0xc7, 0x35, 0x90, - 0xb2, 0x0b, 0xaa, 0x10, 0x89, 0x99, 0x31, 0x20, 0x37, 0xf6, 0x58, 0x32, 0x0f, 0xe3, 0x55, 0x81, - 0x40, 0x20, 0xaf, 0x63, 0x96, 0xb7, 0x0f, 0x17, 0xf9, 0x77, 0x9a, 0xfa, 0x8b, 0x22, 0xff, 0xfa, - 0x4e, 0xd1, 0x88, 0x30, 0xe1, 0xef, 0x40, 0x09, 0x84, 0x9f, 0xbc, 0x8b, 0x9e, 0x73, 0xd0, 0x8e, - 0x67, 0x92, 0x03, 0x3e, 0x29, 0xe2, 0x4f, 0xe0, 0xf8, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6b, - 0x77, 0x19, 0xb6, 0x15, 0x08, 0x00, 0x00, + 0x29, 0xfa, 0x1c, 0x7d, 0x87, 0x3e, 0x54, 0x8a, 0xe5, 0x41, 0xb6, 0x5c, 0x17, 0x05, 0x72, 0xb5, + 0xb3, 0x33, 0xdf, 0xb7, 0x73, 0xd8, 0xd9, 0x21, 0x41, 0xb9, 0x3b, 0x6e, 0x85, 0x51, 0x90, 0x04, + 0xb8, 0x74, 0x77, 0xac, 0x7f, 0x91, 0xa1, 0xec, 0x6c, 0x42, 0x8a, 0xdf, 0x42, 0xf9, 0xd6, 0x67, + 0x9e, 0x26, 0x34, 0x84, 0xe6, 0x5e, 0xbb, 0xde, 0xba, 0x3b, 0x6e, 0x71, 0x7d, 0xeb, 0xc2, 0x67, + 0x1e, 0x49, 0x4d, 0x58, 0x07, 0x99, 0x2e, 0xe9, 0x8a, 0xb2, 0x44, 0x2b, 0x35, 0x84, 0x66, 0xb5, + 0xad, 0x14, 0x28, 0x52, 0x18, 0xf0, 0x11, 0x48, 0x73, 0x9f, 0x2e, 0xbd, 0x58, 0x13, 0x1b, 0x62, + 0xb3, 0xda, 0xde, 0xdf, 0x1e, 0x34, 0x4a, 0xa2, 0xf5, 0x2c, 0xe9, 0x73, 0x23, 0xc9, 0x31, 0xf8, + 0x18, 0x9e, 0x87, 0x6e, 0xe4, 0xae, 0x68, 0x42, 0xa3, 0x69, 0xb2, 0x09, 0x69, 0xac, 0x95, 0x53, + 0xda, 0xfd, 0xc9, 0x7b, 0x5b, 0x00, 0xdf, 0xc6, 0xf8, 0x1d, 0xd4, 0x22, 0x9a, 0xac, 0x23, 0x96, + 0xe3, 0x2b, 0x8f, 0xf0, 0xd5, 0xcc, 0x9a, 0x81, 0xdf, 0x40, 0xd5, 0x8f, 0xa7, 0x77, 0x6e, 0xe4, + 0xbb, 0x9e, 0x3f, 0xd3, 0xa4, 0x86, 0xd0, 0x54, 0x08, 0xf8, 0xf1, 0x24, 0xd7, 0xe0, 0x77, 0xa0, + 0xcc, 0x6e, 0x5c, 0x36, 0xf5, 0xfc, 0x48, 0x93, 0xd3, 0xcc, 0xd1, 0x36, 0xe0, 0xee, 0x8d, 0xcb, + 0x7a, 0x7e, 0x44, 0xe4, 0x59, 0x26, 0xe0, 0xef, 0x41, 0x8e, 0x43, 0x3a, 0xf3, 0xdd, 0xa5, 0xa6, + 0x3c, 0xc2, 0x8e, 0x32, 0x3d, 0x29, 0x00, 0xf8, 0x2d, 0xd4, 0xe8, 0xe7, 0x84, 0x46, 0xcc, 0x5d, + 0x4e, 0x6f, 0xe9, 0x46, 0x53, 0x1b, 0x42, 0x53, 0x25, 0xd5, 0x42, 0x77, 0x41, 0x37, 0x87, 0x7f, + 0x0a, 0x50, 0x7d, 0x50, 0x14, 0x8c, 0xa1, 0xcc, 0xdc, 0x15, 0x4d, 0x6f, 0x40, 0x25, 0xa9, 0x8c, + 0x5f, 0x81, 0x12, 0xde, 0x2e, 0xa6, 0xa1, 0x9b, 0xdc, 0xa4, 0x35, 0x57, 0x89, 0x1c, 0xde, 0x2e, + 0x86, 0x6e, 0x72, 0x83, 0x5f, 0x43, 0x99, 0x57, 0x40, 0x13, 0x1f, 0x5d, 0x45, 0xaa, 0xc5, 0x08, + 0xc4, 0xc4, 0x5d, 0x68, 0xe5, 0x94, 0xc3, 0x45, 0x7c, 0x00, 0x52, 0x30, 0x9f, 0xc7, 0x34, 0xd1, + 0x2a, 0x0d, 0xa1, 0x29, 0x92, 0x7c, 0x87, 0xf7, 0xa1, 0xe2, 0x33, 0x8f, 0x7e, 0xd6, 0xa4, 0x86, + 0xd8, 0xac, 0x90, 0x6c, 0x83, 0x5f, 0x83, 0xea, 0xb2, 0x80, 0x6d, 0x56, 0xc1, 0x3a, 0x4e, 0x2b, + 0xa3, 0x90, 0x7b, 0x85, 0xfe, 0x45, 0x80, 0x32, 0x6f, 0x0c, 0x5c, 0x05, 0xd9, 0xb4, 0x27, 0x1d, + 0xcb, 0xec, 0xa1, 0x67, 0x58, 0x81, 0xf2, 0xfb, 0xc1, 0xc0, 0x42, 0x02, 0x96, 0x41, 0x34, 0x6d, + 0x07, 0x95, 0xb8, 0xca, 0xb4, 0x9d, 0x1f, 0x91, 0x88, 0x55, 0xa8, 0x98, 0xb6, 0x73, 0x7c, 0x86, + 0xca, 0xb9, 0x78, 0xd2, 0x46, 0x95, 0x5c, 0x3c, 0x3b, 0x45, 0x12, 0x87, 0x8e, 0x39, 0x49, 0xe6, + 0xca, 0x71, 0xca, 0x52, 0x30, 0x80, 0x34, 0xce, 0x68, 0x6a, 0x21, 0x9f, 0xb4, 0x11, 0x14, 0xf2, + 0xd9, 0x29, 0xaa, 0x72, 0x79, 0xe4, 0x10, 0xd3, 0x3e, 0x47, 0x35, 0x1e, 0x4f, 0xdf, 0x1a, 0x74, + 0x38, 0xa8, 0xbe, 0xdd, 0x9c, 0x9d, 0xa2, 0x3d, 0x7e, 0xe8, 0xc8, 0x32, 0xbb, 0x06, 0xda, 0xcf, + 0x09, 0xe3, 0xae, 0x83, 0x5e, 0x72, 0xaf, 0xfd, 0xb1, 0xdd, 0x45, 0x07, 0x5c, 0xea, 0x7e, 0xe8, + 0xd8, 0xe8, 0xff, 0x3c, 0xfa, 0xa1, 0x43, 0x90, 0xc6, 0x0f, 0x18, 0x0d, 0x8d, 0xae, 0xd9, 0xb1, + 0xd0, 0x2b, 0x5c, 0x03, 0xc5, 0xb8, 0x76, 0x0c, 0x62, 0x77, 0x2c, 0x74, 0xa8, 0x7f, 0x0b, 0x72, + 0xde, 0x1f, 0x9c, 0x48, 0x8c, 0xee, 0x24, 0x2b, 0xc0, 0xc8, 0xb0, 0x7b, 0x48, 0xc8, 0x4a, 0xe1, + 0x7c, 0x40, 0x25, 0xfd, 0x0f, 0x01, 0xe4, 0xbc, 0x3b, 0xd2, 0x6a, 0x59, 0x96, 0x71, 0xde, 0xb1, + 0xd0, 0x33, 0x1e, 0x90, 0x41, 0xc8, 0x80, 0x20, 0x81, 0xeb, 0xbb, 0x03, 0xdb, 0x31, 0xae, 0xf3, + 0x92, 0x39, 0x1f, 0x87, 0x06, 0x12, 0x71, 0x1d, 0x54, 0x63, 0x62, 0xd8, 0x8e, 0x63, 0x5e, 0x1a, + 0x08, 0xb0, 0x04, 0xa5, 0x8b, 0x09, 0xaa, 0x72, 0x62, 0x77, 0x70, 0xfe, 0xfe, 0x02, 0xd5, 0xf1, + 0x0b, 0xa8, 0x5f, 0x99, 0x76, 0x6f, 0x70, 0x65, 0xf4, 0x26, 0x1d, 0x6b, 0x6c, 0xa0, 0x3d, 0x5c, + 0x01, 0xc1, 0x41, 0xcf, 0xf9, 0x32, 0x46, 0x88, 0x2f, 0x13, 0xf4, 0x82, 0x2f, 0x57, 0x08, 0xf3, + 0xe5, 0x1a, 0xfd, 0x8f, 0x2f, 0x1f, 0xd1, 0x3e, 0x5f, 0x7e, 0x46, 0x2f, 0xf5, 0x09, 0x28, 0xfd, + 0xf5, 0x72, 0x99, 0x0e, 0x81, 0xa2, 0xa7, 0x84, 0x27, 0x7b, 0xea, 0x08, 0x60, 0x16, 0xac, 0xc2, + 0x80, 0x51, 0x96, 0xc4, 0x5a, 0x29, 0x7d, 0x78, 0x35, 0x8e, 0x29, 0xf8, 0xe4, 0x81, 0x5d, 0xff, + 0x09, 0xa4, 0x71, 0x4c, 0xa3, 0x3e, 0x7b, 0xb2, 0xb1, 0x0b, 0x4f, 0xa5, 0xa7, 0x3c, 0xe9, 0x53, + 0xa8, 0xf4, 0x36, 0xec, 0x6b, 0xa8, 0x9c, 0xe1, 0xb9, 0x89, 0x9b, 0x3e, 0x8b, 0x1a, 0x49, 0x65, + 0xfe, 0x18, 0x16, 0x94, 0x15, 0x8f, 0x61, 0x41, 0x99, 0xfe, 0x2b, 0x94, 0xfa, 0x0c, 0x1f, 0x42, + 0x69, 0xce, 0xf2, 0x64, 0x81, 0x9f, 0x93, 0x05, 0x4c, 0x4a, 0x73, 0xf6, 0x1f, 0x5e, 0x10, 0x88, + 0x41, 0x98, 0xa4, 0x4e, 0x54, 0xc2, 0x45, 0xfc, 0x06, 0x2a, 0xde, 0x86, 0xcd, 0x33, 0x2f, 0xd5, + 0xb6, 0xca, 0x09, 0x69, 0x0e, 0x24, 0xd3, 0xeb, 0xbf, 0x41, 0xb5, 0xbb, 0x8e, 0x93, 0x60, 0xd5, + 0x0d, 0x3c, 0x1a, 0x7d, 0x45, 0x66, 0xaf, 0x41, 0xa4, 0x6c, 0x96, 0xbf, 0xf7, 0x87, 0xe1, 0x72, + 0x35, 0xb7, 0x7a, 0x74, 0x96, 0x7b, 0xdf, 0xb1, 0x7a, 0x74, 0xa6, 0xff, 0x2e, 0x82, 0x7a, 0xb9, + 0x5e, 0x26, 0xbe, 0xe1, 0x2d, 0x28, 0x3e, 0x78, 0x90, 0xb7, 0x94, 0x5e, 0x60, 0x96, 0x33, 0x1f, + 0x11, 0xe1, 0x2c, 0xf0, 0x68, 0x5e, 0xaa, 0x7c, 0x87, 0x7f, 0x00, 0xd9, 0x67, 0x9f, 0x82, 0x35, + 0xf3, 0xf2, 0x5b, 0x7f, 0xc9, 0x49, 0xdb, 0xf3, 0x5a, 0x66, 0x66, 0x24, 0x05, 0x0a, 0xb7, 0x41, + 0x09, 0xd6, 0x49, 0xc6, 0xc8, 0xbe, 0x03, 0x07, 0xbb, 0x8c, 0x41, 0x6e, 0x25, 0x5b, 0xdc, 0xe1, + 0x5f, 0x02, 0xc8, 0xf9, 0x41, 0xf8, 0x64, 0xe7, 0x63, 0xf4, 0xe6, 0x49, 0x6f, 0x2d, 0x93, 0x85, + 0xeb, 0xe4, 0xc1, 0xe7, 0xa9, 0xb1, 0x53, 0xbd, 0xdd, 0xc6, 0xcc, 0xda, 0xca, 0x07, 0x75, 0x4b, + 0xfa, 0xc7, 0xe8, 0xba, 0xec, 0x98, 0x36, 0x12, 0xf8, 0xa3, 0x1b, 0x99, 0xf6, 0xb9, 0x65, 0x38, + 0x03, 0x1b, 0x95, 0xee, 0xc7, 0x86, 0xc8, 0xc7, 0xc2, 0x65, 0x67, 0x88, 0xca, 0x7c, 0x12, 0x5c, + 0x8e, 0x2d, 0xc7, 0xe4, 0xbb, 0x4a, 0x3a, 0xe2, 0x1c, 0x83, 0x20, 0x89, 0xcf, 0x15, 0x62, 0xa4, + 0xb2, 0x7c, 0x78, 0x04, 0x4a, 0x91, 0xe3, 0x36, 0x30, 0xe1, 0x5f, 0x03, 0xfb, 0x06, 0xea, 0x26, + 0xfb, 0x85, 0xce, 0x92, 0xa1, 0xbb, 0x59, 0x06, 0xae, 0x87, 0x6b, 0x20, 0x64, 0x17, 0x54, 0x21, + 0x02, 0xd3, 0x23, 0x40, 0x4e, 0xe4, 0xb2, 0x78, 0x1e, 0x44, 0xab, 0x02, 0x81, 0x40, 0x5c, 0x47, + 0x2c, 0x6f, 0x1f, 0x2e, 0xf2, 0x2f, 0x38, 0xf5, 0x16, 0x45, 0xfe, 0xf5, 0x9d, 0xa2, 0x91, 0xd4, + 0x84, 0xbf, 0x03, 0xc9, 0x4f, 0xfd, 0xe4, 0x5d, 0xf4, 0x82, 0x83, 0x76, 0x3c, 0x93, 0x1c, 0xf0, + 0x49, 0x4a, 0xff, 0x11, 0x4e, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x4c, 0x68, 0x2d, 0xd6, 0x2f, + 0x08, 0x00, 0x00, } diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto index a1a357312ca8b..273cf2ef82c59 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto +++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto @@ -46,6 +46,8 @@ message Type { UINT32 = 10; UINT64 = 11; STRING = 12; + FLOAT32 = 13; + FLOAT64 = 14; // Non-primitive types. SLICE = 20; diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 5e545c550f986..f8efffece71ad 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -17,16 +17,14 @@ package harness import ( - "bytes" "context" "fmt" "io" - "io/ioutil" - "runtime/pprof" "sync" "time" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" "github.com/apache/beam/sdks/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx" @@ -37,12 +35,13 @@ import ( // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin). // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not -// "pipeline-construction time" -- on each worker. It is a Fn API client and +// "pipeline-construction time" -- on each worker. It is a FnAPI client and // ultimately responsible for correctly executing user code. func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { - setupRemoteLogging(ctx, loggingEndpoint) - setupDiagnosticRecording() + hooks.DeserializeHooksFromOptions() + hooks.RunInitHooks(ctx) + setupRemoteLogging(ctx, loggingEndpoint) recordHeader() // Connect to FnAPI control server. Receive and execute work. @@ -87,8 +86,6 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { data: &DataManager{}, } - var cpuProfBuf bytes.Buffer - // gRPC requires all readers of a stream be the same goroutine, so this goroutine // is responsible for managing the network data. All it does is pull data from // the stream, and hand off the message to a goroutine to actually be handled, @@ -112,18 +109,10 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { log.Debugf(ctx, "RECV: %v", proto.MarshalTextString(req)) recordInstructionRequest(req) - if isEnabled("cpu_profiling") { - cpuProfBuf.Reset() - pprof.StartCPUProfile(&cpuProfBuf) - } + hooks.RunRequestHooks(ctx, req) resp := ctrl.handleInstruction(ctx, req) - if isEnabled("cpu_profiling") { - pprof.StopCPUProfile() - if err := ioutil.WriteFile(fmt.Sprintf("%s/cpu_prof%s", storagePath, req.InstructionId), cpuProfBuf.Bytes(), 0644); err != nil { - log.Warnf(ctx, "Failed to write CPU profile for instruction %s: %v", req.InstructionId, err) - } - } + hooks.RunResponseHooks(ctx, req, resp) recordInstructionResponse(resp) if resp != nil { diff --git a/sdks/go/pkg/beam/core/runtime/harness/session.go b/sdks/go/pkg/beam/core/runtime/harness/session.go index 67f577f5035ba..69a1d9927863d 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/session.go +++ b/sdks/go/pkg/beam/core/runtime/harness/session.go @@ -16,13 +16,14 @@ package harness import ( + "context" "fmt" - "os" + "io" "sync" - "time" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session" + "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" "github.com/golang/protobuf/proto" ) @@ -35,12 +36,13 @@ const ( dataSend ) +// capture is set by the capture hook below. +var capture io.WriteCloser + var ( selectedOptions = make(map[string]bool) - // TODO(wcn): add a buffered writer around capture and use it. - capture *os.File - sessionLock sync.Mutex - bufPool = sync.Pool{ + sessionLock sync.Mutex + bufPool = sync.Pool{ New: func() interface{} { return proto.NewBuffer(nil) }, @@ -49,35 +51,6 @@ var ( storagePath string ) -// TODO(wcn): the plan is to make these hooks available in the harness in a fashion -// similar to net/http/httptrace. They are simple function calls now to get this -// code underway. -func setupDiagnosticRecording() error { - // No recording options specified? We're done. - if runtime.GlobalOptions.Get("cpu_profiling") == "" && runtime.GlobalOptions.Get("session_recording") == "" { - return nil - } - - var err error - - storagePath = runtime.GlobalOptions.Get("storage_path") - // Any form of recording requires the destination directory to exist. - if err = os.MkdirAll(storagePath, 0755); err != nil { - return fmt.Errorf("Unable to create session directory: %v", err) - } - - if !isEnabled("session_recording") { - return nil - } - - // Set up the session recorder. - if capture, err = os.Create(fmt.Sprintf("%s/session-%v", storagePath, time.Now().Unix())); err != nil { - return fmt.Errorf("Unable to create session file: %v", err) - } - - return nil -} - func isEnabled(option string) bool { return runtime.GlobalOptions.Get(option) == "true" } @@ -213,3 +186,57 @@ func recordFooter() error { }, }) } + +// CaptureHook writes the messaging content consumed and +// produced by the worker, allowing the data to be used as +// an input for the session runner. Since workers can exist +// in a variety of environments, this allows the runner +// to tailor the behavior best for its particular needs. +type CaptureHook io.WriteCloser + +// CaptureHookFactory produces a CaptureHook from the supplied +// options. +type CaptureHookFactory func([]string) CaptureHook + +var captureHookRegistry = make(map[string]CaptureHookFactory) + +func init() { + hf := func(opts []string) hooks.Hook { + return hooks.Hook{ + Init: func(_ context.Context) error { + if len(opts) > 0 { + name, opts := hooks.Decode(opts[0]) + capture = captureHookRegistry[name](opts) + } + return nil + }, + } + } + + hooks.RegisterHook("session", hf) +} + +// RegisterCaptureHook registers a CaptureHookFactory for the +// supplied identifier. +func RegisterCaptureHook(name string, c CaptureHookFactory) { + if _, exists := captureHookRegistry[name]; exists { + panic(fmt.Sprintf("RegisterSessionCaptureHook: %s registered twice", name)) + } + captureHookRegistry[name] = c +} + +// EnableCaptureHook is called to request the use of a hook in a pipeline. +// It updates the supplied pipelines to capture this request. +func EnableCaptureHook(name string, opts []string) { + if _, exists := captureHookRegistry[name]; !exists { + panic(fmt.Sprintf("EnableHook: %s not registered", name)) + } + if exists, opts := hooks.IsEnabled("session"); exists { + n, _ := hooks.Decode(opts[0]) + if n != name { + panic(fmt.Sprintf("EnableHook: can't enable hook %s, hook %s already enabled", name, n)) + } + } + + hooks.EnableHook("session", hooks.Encode(name, opts)) +} diff --git a/sdks/go/pkg/beam/core/util/hooks/hooks.go b/sdks/go/pkg/beam/core/util/hooks/hooks.go new file mode 100644 index 0000000000000..ea30cc6e50fbb --- /dev/null +++ b/sdks/go/pkg/beam/core/util/hooks/hooks.go @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package hooks allows runners to tailor execution of the worker harness. +// +// Examples of customization: +// +// gRPC integration +// session recording +// profile recording +// +// Registration methods for hooks must be called prior to calling beam.Init() +// Request methods for hooks must be called as part of building the pipeline +// request for the runner's Execute method. +package hooks + +import ( + "bytes" + "context" + "encoding/csv" + "encoding/json" + "fmt" + "strings" + + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" + "github.com/apache/beam/sdks/go/pkg/beam/log" + fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" +) + +var ( + hookRegistry = make(map[string]HookFactory) + enabledHooks = make(map[string][]string) + activeHooks = make(map[string]Hook) +) + +// A Hook is a set of hooks to run at various stages of executing a +// pipeline. +type Hook struct { + // Init is called once at the startup of the worker prior to + // connecting to the FnAPI services. + Init InitHook + // Req is called each time the worker handles a FnAPI instruction request. + Req RequestHook + // Resp is called each time the worker generates a FnAPI instruction response. + Resp ResponseHook +} + +// InitHook is a hook that is called when the harness +// initializes. +type InitHook func(context.Context) error + +// HookFactory is a function that produces a Hook from the supplied arguments. +type HookFactory func([]string) Hook + +// RegisterHook registers a Hook for the +// supplied identifier. +func RegisterHook(name string, h HookFactory) { + hookRegistry[name] = h +} + +// RunInitHooks runs the init hooks. +func RunInitHooks(ctx context.Context) error { + // If an init hook fails to complete, the invariants of the + // system are compromised and we can't run a workflow. + // The hooks can run in any order. They should not be + // interdependent or interfere with each other. + for _, h := range activeHooks { + if h.Init != nil { + if err := h.Init(ctx); err != nil { + return err + } + } + } + return nil +} + +// RequestHook is called when handling a FnAPI instruction. +type RequestHook func(context.Context, *fnpb.InstructionRequest) error + +// RunRequestHooks runs the hooks that handle a FnAPI request. +func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) { + // The request hooks should not modify the request. + for n, h := range activeHooks { + if h.Req != nil { + if err := h.Req(ctx, req); err != nil { + log.Infof(ctx, "request hook %s failed: %v", n, err) + } + } + } +} + +// ResponseHook is called when sending a FnAPI instruction response. +type ResponseHook func(context.Context, *fnpb.InstructionRequest, *fnpb.InstructionResponse) error + +// RunResponseHooks runs the hooks that handle a FnAPI response. +func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp *fnpb.InstructionResponse) { + for n, h := range activeHooks { + if h.Resp != nil { + if err := h.Resp(ctx, req, resp); err != nil { + log.Infof(ctx, "response hook %s failed: %v", n, err) + } + } + } +} + +// SerializeHooksToOptions serializes the activated hooks and their configuration into a JSON string +// that can be deserialized later by the runner. +func SerializeHooksToOptions() { + data, err := json.Marshal(enabledHooks) + if err != nil { + // Shouldn't happen, since all the data is strings. + panic(fmt.Sprintf("Couldn't serialize hooks: %v", err)) + } + runtime.GlobalOptions.Set("hooks", string(data)) +} + +// DeserializeHooksFromOptions extracts the hook configuration information from the options and configures +// the hooks with the supplied options. +func DeserializeHooksFromOptions() { + cfg := runtime.GlobalOptions.Get("hooks") + if err := json.Unmarshal([]byte(cfg), &enabledHooks); err != nil { + // Shouldn't happen, since all the data is strings. + panic(fmt.Sprintf("DeserializeHooks failed on input %q: %v", cfg, err)) + } + + for h, opts := range enabledHooks { + activeHooks[h] = hookRegistry[h](opts) + } +} + +// EnableHook enables the hook to be run for the pipline. It will be +// receive the supplied args when the pipeline executes. It is safe +// to enable the same hook with different options, as this is necessary +// if a hook wants to compose behavior. +func EnableHook(name string, args ...string) error { + if _, ok := hookRegistry[name]; !ok { + return fmt.Errorf("EnableHook: hook %s not found", name) + } + enabledHooks[name] = args + return nil +} + +// IsEnabled returns true and the registered options if the hook is +// already enabled. +func IsEnabled(name string) (bool, []string) { + opts, ok := enabledHooks[name] + return ok, opts +} + +// Encode encodes a hook name and its arguments into a single string. +// This is a convenience function for users of this package that are composing +// hooks. +func Encode(name string, opts []string) string { + var cfg bytes.Buffer + w := csv.NewWriter(&cfg) + // This should never happen since a bytes.Buffer doesn't fail to write. + if err := w.Write(append([]string{name}, opts...)); err != nil { + panic(fmt.Sprintf("error encoding arguments: %v", err)) + } + w.Flush() + return cfg.String() +} + +// Decode decodes a hook name and its arguments from a single string. +// This is a convenience function for users of this package that are composing +// hooks. +func Decode(in string) (string, []string) { + r := csv.NewReader(strings.NewReader(in)) + s, err := r.Read() + if err != nil { + panic(fmt.Sprintf("malformed input for decoding: %s %v", in, err)) + } + return s[0], s[1:] +} diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 247a2d1487ff1..4a9f4335e3466 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -22,6 +22,7 @@ import ( "errors" "flag" "fmt" + "io" "os" "os/user" "path" @@ -31,12 +32,14 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" // Importing to get the side effect of the remote execution hook. See init(). _ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init" + "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/go/pkg/beam/log" "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts" "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts" "github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib" "github.com/apache/beam/sdks/go/pkg/beam/util/gcsx" + "github.com/apache/beam/sdks/go/pkg/beam/x/hooks/perf" "github.com/golang/protobuf/proto" "golang.org/x/oauth2/google" df "google.golang.org/api/dataflow/v1b3" @@ -55,18 +58,19 @@ var ( teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).") // SDK options - cpuProfiling = flag.String("cpu_profiling", "", "Job records CPU profiles") + cpuProfiling = flag.String("cpu_profiling", "", "Job records CPU profiles to this GCS location (optional)") sessionRecording = flag.String("session_recording", "", "Job records session transcripts") ) func init() { // Note that we also _ import harness/init to setup the remote execution hook. beam.RegisterRunner("dataflow", Execute) + + perf.RegisterProfCaptureHook("gcs_profile_writer", gcsRecorderHook) } type dataflowOptions struct { - Options map[string]string `json:"options"` - PipelineURL string `json:"pipelineUrl"` + PipelineURL string `json:"pipelineUrl"` } // Execute runs the given pipeline on Google Cloud Dataflow. It uses the @@ -90,15 +94,20 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { } if *cpuProfiling != "" { - beam.PipelineOptions.Set("cpu_profiling", "true") - beam.PipelineOptions.Set("storage_path", "/var/opt/google/traces") + perf.EnableProfCaptureHook("gcs_profile_writer", *cpuProfiling) } if *sessionRecording != "" { - beam.PipelineOptions.Set("session_recording", "true") - beam.PipelineOptions.Set("storage_path", "/var/opt/google/traces") + // TODO(wcn): BEAM-4017 + // It's a bit inconvenient for GCS because the whole object is written in + // one pass, whereas the session logs are constantly appended. We wouldn't + // want to hold all the logs in memory to flush at the end of the pipeline + // as we'd blow out memory on the worker. The implementation of the + // CaptureHook should create an internal buffer and write chunks out to GCS + // once they get to an appropriate size (50M or so?) } + hooks.SerializeHooksToOptions() options := beam.PipelineOptions.Export() // (1) Upload Go binary and model to GCS. @@ -147,9 +156,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { SdkPipelineOptions: newMsg(pipelineOptions{ DisplayData: findPipelineFlags(), Options: dataflowOptions{ - Options: options.Options, PipelineURL: modelURL, }, + GoOptions: options, }), WorkerPools: []*df.WorkerPool{{ Kind: "harness", @@ -317,3 +326,18 @@ func printJob(ctx context.Context, job *df.Job) { } log.Info(ctx, string(str)) } + +func gcsRecorderHook(opts []string) perf.CaptureHook { + bucket, prefix, err := gcsx.ParseObject(opts[0]) + if err != nil { + panic(fmt.Sprintf("Invalid hook configuration for gcsRecorderHook: %s", opts)) + } + + return func(ctx context.Context, spec string, r io.Reader) error { + client, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope) + if err != nil { + return fmt.Errorf("couldn't establish GCS client: %v", err) + } + return gcsx.WriteObject(client, bucket, path.Join(prefix, spec), r) + } +} diff --git a/sdks/go/pkg/beam/runners/dataflow/messages.go b/sdks/go/pkg/beam/runners/dataflow/messages.go index e4abea67ac7cb..80fcebdadf0e8 100644 --- a/sdks/go/pkg/beam/runners/dataflow/messages.go +++ b/sdks/go/pkg/beam/runners/dataflow/messages.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" "google.golang.org/api/googleapi" ) @@ -34,8 +35,9 @@ func newMsg(msg interface{}) googleapi.RawMessage { // pipelineOptions models Job/Environment/SdkPipelineOptions type pipelineOptions struct { - DisplayData []*displayData `json:"display_data,omitempty"` - Options interface{} `json:"options,omitempty"` + DisplayData []*displayData `json:"display_data,omitempty"` + Options interface{} `json:"options,omitempty"` + GoOptions runtime.RawOptions `json:"beam:option:go_options:v1,omitempty"` } // NOTE(herohde) 2/9/2017: most of the v1b3 messages are weakly-typed json diff --git a/sdks/go/pkg/beam/runners/session/session.go b/sdks/go/pkg/beam/runners/session/session.go index b123466dcfc08..aa8ae1ec899e3 100644 --- a/sdks/go/pkg/beam/runners/session/session.go +++ b/sdks/go/pkg/beam/runners/session/session.go @@ -51,7 +51,7 @@ func init() { var sessionFile = flag.String("session_file", "", "Session file for the runner") -// controlServer manages the Fn API control channel. +// controlServer manages the FnAPI control channel. type controlServer struct { filename string wg *sync.WaitGroup // used to signal when the session is completed @@ -207,7 +207,7 @@ func extractPortSpec(spec *rapi_pb.FunctionSpec) string { panic("unable to extract port") } -// dataServer manages the Fn API data channel. +// dataServer manages the FnAPI data channel. type dataServer struct { ctrl *controlServer } @@ -235,7 +235,7 @@ func (d *dataServer) Data(stream fnapi_pb.BeamFnData_DataServer) error { } } -// loggingServer manages the Fn API logging channel. +// loggingServer manages the FnAPI logging channel. type loggingServer struct{} // no data content func (l *loggingServer) Logging(stream fnapi_pb.BeamFnLogging_LoggingServer) error { diff --git a/sdks/go/pkg/beam/util/grpcx/hook.go b/sdks/go/pkg/beam/util/grpcx/hook.go new file mode 100644 index 0000000000000..6978915aaa6c1 --- /dev/null +++ b/sdks/go/pkg/beam/util/grpcx/hook.go @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcx + +import ( + "context" + "fmt" + "time" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" + "google.golang.org/grpc" +) + +// Hook allow a runner to customize various aspects of gRPC +// communication with the FnAPI harness. Each member of the struct +// is optional; the default behavior will be used if a value is not +// supplied. +type Hook struct { + // Dialer allows the runner to customize the gRPC dialing behavior. + Dialer func(context.Context, string, time.Duration) (*grpc.ClientConn, error) + // TODO(wcn): expose other hooks here. +} + +type HookFactory func([]string) Hook + +var hookRegistry = make(map[string]HookFactory) + +// RegisterHook registers a HookFactory for the +// supplied identifier. It panics if the same identifier is +// registered twice. +func RegisterHook(name string, c HookFactory) { + if _, exists := hookRegistry[name]; exists { + panic(fmt.Sprintf("grpc.Hook: %s registered twice", name)) + } + hookRegistry[name] = c + + hf := func(opts []string) hooks.Hook { + return hooks.Hook{ + Init: func(_ context.Context) error { + if len(opts) == 0 { + return nil + } + + name, opts := hooks.Decode(opts[0]) + grpcHook := hookRegistry[name](opts) + if grpcHook.Dialer != nil { + Dial = grpcHook.Dialer + } + return nil + }, + } + } + hooks.RegisterHook("grpc", hf) +} + +// EnableHook is called to request the use of the gRPC +// hook in a pipeline. +func EnableHook(name string, opts ...string) { + _, exists := hookRegistry[name] + if !exists { + panic(fmt.Sprintf("EnableHook: %s not registered", name)) + } + // Only one hook can be enabled. If the pipeline has two conflicting views about how to use gRPC + // that won't end well. + if exists, opts := hooks.IsEnabled("grpc"); exists { + n, _ := hooks.Decode(opts[0]) + if n != name { + panic(fmt.Sprintf("EnableHook: can't enable hook %s, hook %s already enabled", name, n)) + } + } + + hooks.EnableHook("grpc", hooks.Encode(name, opts)) +} diff --git a/sdks/go/pkg/beam/x/hooks/perf/perf.go b/sdks/go/pkg/beam/x/hooks/perf/perf.go new file mode 100644 index 0000000000000..93fd611845913 --- /dev/null +++ b/sdks/go/pkg/beam/x/hooks/perf/perf.go @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package perf is to add performance measuring hooks to a runner, such as cpu, or trace profiles. +package perf + +import ( + "bytes" + "context" + "fmt" + "io" + "runtime/pprof" + "runtime/trace" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" + fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" +) + +// CaptureHook is used by the harness to have the runner +// persist a trace record with the supplied name and comment. +// The type of trace can be determined by the prefix of the string. +// +// * prof: A profile compatible with traces produced by runtime/pprof +// * trace: A trace compatible with traces produced by runtime/trace +type CaptureHook func(context.Context, string, io.Reader) error + +// CaptureHookFactory creates a CaptureHook from the supplied options. +type CaptureHookFactory func([]string) CaptureHook + +var profCaptureHookRegistry = make(map[string]CaptureHookFactory) + +var enabledProfCaptureHooks []string + +func init() { + hf := func(opts []string) hooks.Hook { + enabledProfCaptureHooks = opts + enabled := len(enabledProfCaptureHooks) > 0 + var cpuProfBuf bytes.Buffer + return hooks.Hook{ + Req: func(_ context.Context, _ *fnpb.InstructionRequest) error { + if !enabled { + return nil + } + cpuProfBuf.Reset() + return pprof.StartCPUProfile(&cpuProfBuf) + }, + Resp: func(ctx context.Context, req *fnpb.InstructionRequest, _ *fnpb.InstructionResponse) error { + if !enabled { + return nil + } + pprof.StopCPUProfile() + for _, h := range enabledProfCaptureHooks { + name, opts := hooks.Decode(h) + if err := profCaptureHookRegistry[name](opts)(ctx, fmt.Sprintf("prof%s", req.InstructionId), &cpuProfBuf); err != nil { + return err + } + } + return nil + }, + } + } + hooks.RegisterHook("prof", hf) + + hf = func(opts []string) hooks.Hook { + var traceProfBuf bytes.Buffer + enabledTraceCaptureHooks = opts + enabled := len(enabledTraceCaptureHooks) > 0 + return hooks.Hook{ + Req: func(_ context.Context, _ *fnpb.InstructionRequest) error { + if !enabled { + return nil + } + traceProfBuf.Reset() + return trace.Start(&traceProfBuf) + }, + Resp: func(ctx context.Context, req *fnpb.InstructionRequest, _ *fnpb.InstructionResponse) error { + if !enabled { + return nil + } + trace.Stop() + for _, h := range enabledTraceCaptureHooks { + name, opts := hooks.Decode(h) + if err := traceCaptureHookRegistry[name](opts)(ctx, fmt.Sprintf("trace_prof%s", req.InstructionId), &traceProfBuf); err != nil { + return err + } + } + return nil + }, + } + } + hooks.RegisterHook("trace", hf) +} + +// RegisterProfCaptureHook registers a CaptureHookFactory for the +// supplied identifier. It panics if the same identifier is +// registered twice. +func RegisterProfCaptureHook(name string, c CaptureHookFactory) { + if _, exists := profCaptureHookRegistry[name]; exists { + panic(fmt.Sprintf("RegisterProfCaptureHook: %s registered twice", name)) + } + profCaptureHookRegistry[name] = c +} + +// EnableProfCaptureHook actives a registered profile capture hook for a given pipeline. +func EnableProfCaptureHook(name string, opts ...string) { + _, exists := profCaptureHookRegistry[name] + if !exists { + panic(fmt.Sprintf("EnableProfCaptureHook: %s not registered", name)) + } + + enc := hooks.Encode(name, opts) + + for i, h := range enabledProfCaptureHooks { + n, _ := hooks.Decode(h) + if h == n { + // Rewrite the registration with the current arguments + enabledProfCaptureHooks[i] = enc + hooks.EnableHook("prof", enabledProfCaptureHooks...) + return + } + } + + enabledProfCaptureHooks = append(enabledProfCaptureHooks, enc) + hooks.EnableHook("prof", enabledProfCaptureHooks...) +} + +var traceCaptureHookRegistry = make(map[string]CaptureHookFactory) +var enabledTraceCaptureHooks []string + +// RegisterTraceCaptureHook registers a CaptureHookFactory for the +// supplied identifier. It panics if the same identifier is +// registered twice. +func RegisterTraceCaptureHook(name string, c CaptureHookFactory) { + if _, exists := traceCaptureHookRegistry[name]; exists { + panic(fmt.Sprintf("RegisterTraceCaptureHook: %s registered twice", name)) + } + traceCaptureHookRegistry[name] = c +} + +// EnableTraceCaptureHook actives a registered profile capture hook for a given pipeline. +func EnableTraceCaptureHook(name string, opts ...string) { + if _, exists := traceCaptureHookRegistry[name]; !exists { + panic(fmt.Sprintf("EnableTraceCaptureHook: %s not registered", name)) + } + + enc := hooks.Encode(name, opts) + for i, h := range enabledTraceCaptureHooks { + n, _ := hooks.Decode(h) + if h == n { + // Rewrite the registration with the current arguments + enabledTraceCaptureHooks[i] = enc + hooks.EnableHook("trace", enabledTraceCaptureHooks...) + return + } + } + enabledTraceCaptureHooks = append(enabledTraceCaptureHooks, enc) + hooks.EnableHook("trace", enabledTraceCaptureHooks...) +} diff --git a/sdks/python/setup.cfg b/sdks/python/setup.cfg index 0f6acea2c0985..041d6fa55f49d 100644 --- a/sdks/python/setup.cfg +++ b/sdks/python/setup.cfg @@ -26,6 +26,9 @@ verbosity=2 # fast_coders_test and typecoders_test. exclude=fast_coders_test|typecoders_test +# Creates an xml file compatible with standard XUnit XML format. +with-xunit=1 + # Configurations to control coverage.py. [coverage:run] branch = True diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index ff88ac42fa831..3cc1682368435 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -50,7 +50,7 @@ commands = pip --version {toxinidir}/run_tox_cleanup.sh python apache_beam/examples/complete/autocomplete_test.py - python setup.py test + python setup.py nosetests {toxinidir}/run_tox_cleanup.sh [testenv:py27-cython] @@ -64,7 +64,7 @@ commands = pip --version {toxinidir}/run_tox_cleanup.sh python apache_beam/examples/complete/autocomplete_test.py - python setup.py test + python setup.py nosetests {toxinidir}/run_tox_cleanup.sh [testenv:py27-gcp] @@ -74,7 +74,7 @@ commands = pip --version {toxinidir}/run_tox_cleanup.sh python apache_beam/examples/complete/autocomplete_test.py - python setup.py test + python setup.py nosetests {toxinidir}/run_tox_cleanup.sh [testenv:py27-lint]