Skip to content

Commit

Permalink
Merge pull request apache#10 from swegner/merge-master
Browse files Browse the repository at this point in the history
Merge master
  • Loading branch information
lukecwik authored Apr 6, 2018
2 parents 2e56a09 + 6052e15 commit 5301662
Show file tree
Hide file tree
Showing 25 changed files with 827 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Apex_Gradle') {
'--rerun-tasks',
]

// Publish all test results to Jenkins
publishers {
archiveJunit('**/build/test-results/**/*.xml')
}

// Sets that this is a PostCommit job.
common_job_properties.setPostCommit(delegate)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle') {
'--rerun-tasks',
]

// Publish all test results to Jenkins
publishers {
archiveJunit('**/build/test-results/**/*.xml')
}

// Sets that this is a PostCommit job.
common_job_properties.setPostCommit(delegate)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Flink_Gradle') {
'--rerun-tasks',
]

// Publish all test results to Jenkins
publishers {
archiveJunit('**/build/test-results/**/*.xml')
}

// Sets that this is a PostCommit job.
common_job_properties.setPostCommit(delegate)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle') {
'--rerun-tasks',
]

// Publish all test results to Jenkins
publishers {
archiveJunit('**/build/test-results/**/*.xml')
}

// Sets that this is a PostCommit job.
// 0 5 31 2 * will run on Feb 31 (i.e. never) according to job properties.
// In post-commit this job triggers only on SCM changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ job('beam_PostCommit_Java_ValidatesRunner_Spark_Gradle') {
'--rerun-tasks',
]

// Publish all test results to Jenkins
publishers {
archiveJunit('**/build/test-results/**/*.xml')
}

// Sets that this is a PostCommit job.
common_job_properties.setPostCommit(delegate)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ job('beam_PreCommit_Python_GradleBuild') {
'--rerun-tasks',
]

// Publish all test results to Jenkins. Note that Nose documentation
// specifically mentions that it produces JUnit compatible test results.
publishers {
archiveJunit('**/nosetests.xml')
}

def gradle_command_line = './gradlew ' + gradle_switches.join(' ') + ' :pythonPreCommit'
// Sets that this is a PreCommit job.
common_job_properties.setPreCommit(delegate, gradle_command_line, 'Run Python Gradle PreCommit')
Expand Down
57 changes: 0 additions & 57 deletions .test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy

This file was deleted.

7 changes: 7 additions & 0 deletions sdks/go/pkg/beam/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ func inferCoder(t FullType) (*coder.Coder, error) {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
case reflectx.Float32, reflectx.Float64:
c, err := coderx.NewFloat(t.Type())
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil

case reflectx.String, reflectx.ByteSlice:
// TODO(BEAM-3580): we should stop encoding string using the bytecoder. It forces
// conversions at runtime in inconvenient places.
Expand Down
75 changes: 75 additions & 0 deletions sdks/go/pkg/beam/core/runtime/coderx/float.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package coderx

import (
"fmt"
"math"
"math/bits"
"reflect"

"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)

func init() {
runtime.RegisterFunction(encFloat)
runtime.RegisterFunction(decFloat)
}

func encFloat(v typex.T) []byte {
var val float64
switch n := v.(type) {
case float32:
val = float64(n)
case float64:
val = n
default:
panic(fmt.Sprintf("received unknown value type: want a float, got %T", n))
}

return encVarUintZ(bits.ReverseBytes64(math.Float64bits(val)))
}

func decFloat(t reflect.Type, data []byte) (typex.T, error) {
uval, err := decVarUintZ(reflectx.Uint64, data)
if err != nil {
return nil, fmt.Errorf("invalid float encoding for: %v", data)
}

n := math.Float64frombits(bits.ReverseBytes64(uval.(uint64)))
switch t.Kind() {
case reflect.Float64:
return n, nil
case reflect.Float32:
return float32(n), nil
default:
panic(fmt.Sprintf("unreachable statement: expected a float, got %v", t))
}
}

// NewFloat returns a coder for the given float type. It uses the same
// encoding scheme as the gob package.
func NewFloat(t reflect.Type) (*coder.CustomCoder, error) {
switch t.Kind() {
case reflect.Float32, reflect.Float64:
return coder.NewCustomCoder("float", t, encFloat, decFloat)
default:
return nil, fmt.Errorf("not a float type: %v", t)
}
}
50 changes: 50 additions & 0 deletions sdks/go/pkg/beam/core/runtime/coderx/float_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package coderx

import (
"reflect"
"testing"

"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)

func TestFloat(t *testing.T) {
tests := []interface{}{
float32(0.0),
float32(94.5),
float64(-127.25),
float64(8675309.4286),
}

for _, v := range tests {
typ := reflect.ValueOf(v).Type()

data := encFloat(v)
result, err := decFloat(typ, data)
if err != nil {
t.Fatalf("dec(enc(%v)) failed: %v", v, err)
}

if v != result {
t.Errorf("dec(enc(%v)) = %v, want id", v, result)
}
resultT := reflectx.UnderlyingType(reflect.ValueOf(result)).Type()
if resultT != typ {
t.Errorf("type(dec(enc(%v))) = %v, want id", typ, resultT)
}
}
}
13 changes: 11 additions & 2 deletions sdks/go/pkg/beam/core/runtime/exec/datasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/log"
)

// DataSink is a Node.
Expand All @@ -31,8 +34,10 @@ type DataSink struct {
Target Target
Coder *coder.Coder

enc ElementEncoder
w io.WriteCloser
enc ElementEncoder
w io.WriteCloser
count int64
start time.Time
}

func (n *DataSink) ID() UnitID {
Expand All @@ -52,6 +57,8 @@ func (n *DataSink) StartBundle(ctx context.Context, id string, data DataManager)
return err
}
n.w = w
atomic.StoreInt64(&n.count, 0)
n.start = time.Now()
return nil
}

Expand All @@ -60,6 +67,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value FullValue, values .
// unit.
var b bytes.Buffer

atomic.AddInt64(&n.count, 1)
if err := EncodeWindowedValueHeader(value.Timestamp, &b); err != nil {
return err
}
Expand All @@ -75,6 +83,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value FullValue, values .
}

func (n *DataSink) FinishBundle(ctx context.Context) error {
log.Infof(ctx, "DataSource: %d elements in %d ns", atomic.LoadInt64(&n.count), time.Now().Sub(n.start))
return n.w.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (n *DataSource) Process(ctx context.Context) error {
}

func (n *DataSource) FinishBundle(ctx context.Context) error {
log.Infof(context.Background(), "DataSource: %d elements in %d ns", atomic.LoadInt64(&n.count), time.Now().Sub(n.start))
log.Infof(ctx, "DataSource: %d elements in %d ns", atomic.LoadInt64(&n.count), time.Now().Sub(n.start))
n.sid = StreamID{}
n.source = nil
return n.Out.FinishBundle(ctx)
Expand Down
8 changes: 8 additions & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
return &v1.Type{Kind: v1.Type_UINT32}, nil
case reflect.Uint64:
return &v1.Type{Kind: v1.Type_UINT64}, nil
case reflect.Float32:
return &v1.Type{Kind: v1.Type_FLOAT32}, nil
case reflect.Float64:
return &v1.Type{Kind: v1.Type_FLOAT64}, nil
case reflect.String:
return &v1.Type{Kind: v1.Type_STRING}, nil

Expand Down Expand Up @@ -485,6 +489,10 @@ func decodeType(t *v1.Type) (reflect.Type, error) {
return reflectx.Uint32, nil
case v1.Type_UINT64:
return reflectx.Uint64, nil
case v1.Type_FLOAT32:
return reflectx.Float32, nil
case v1.Type_FLOAT64:
return reflectx.Float64, nil
case v1.Type_STRING:
return reflectx.String, nil

Expand Down
Loading

0 comments on commit 5301662

Please sign in to comment.