Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13857] Add K:V flags for expansion service jars and addresses to Go ITs. #16908

Merged
merged 3 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions sdks/go/test/integration/expansions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"fmt"
"strconv"
"time"

"github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
"github.com/apache/beam/sdks/v2/go/test/integration/internal/ports"
)

// ExpansionServices is a struct used for getting addresses and starting expansion services, based
// on the --expansion_jar and --expansion_addr flags in this package. The main reason to use this
// instead of accessing the flags directly is to let it handle jar startup and shutdown.
//
// Usage
//
// Create an ExpansionServices object in TestMain with NewExpansionServices. Then use GetAddr for
// every expansion service needed for the test. Call Shutdown on it before finishing TestMain (or
// simply defer a call to it).
//
// ExpansionServices is not concurrency safe, and so a single instance should not be used within
// multiple individual tests, due to the possibility of those tests being run concurrently. It is
// recommended to only use ExpansionServices in TestMain to avoid this.
//
// Example:
// var retCode int
// defer func() { os.Exit(retCode) }() // Defer os.Exit so it happens after other defers.
// services := integration.NewExpansionServices()
// defer func() { services.Shutdown() }()
// addr, err := services.GetAddr("example")
// if err != nil {
// retCode = 1
// panic(err)
// }
// expansionAddr = addr // Save address to a package-level variable used by tests.
// retCode = ptest.MainRet(m)
type ExpansionServices struct {
addrs map[string]string
jars map[string]string
procs []jars.Process
// Callback for running jars, stored this way for testing purposes.
run func(time.Duration, string, ...string) (jars.Process, error)
waitTime time.Duration // Time to sleep after running jar. Tests can adjust this.
}

// NewExpansionServices creates and initializes an ExpansionServices instance.
func NewExpansionServices() *ExpansionServices {
return &ExpansionServices{
addrs: GetExpansionAddrs(),
jars: GetExpansionJars(),
procs: make([]jars.Process, 0),
run: jars.Run,
waitTime: 3 * time.Second,
}
}

// GetAddr gets the address for the expansion service with the given label. The label corresponds to
// the labels used in the --expansion_jar and --expansion_addr flags. If an expansion service is
// provided as a jar, then that jar will be run to retrieve the address, and the jars are not
// guaranteed to be shut down unless Shutdown is called.
//
// Note: If this function starts a jar, it waits a few seconds for it to initialize. Do not use
// this function if the possibility of a few seconds of latency is not acceptable.
func (es *ExpansionServices) GetAddr(label string) (string, error) {
// Always default to existing address before running a jar.
if addr, ok := es.addrs[label]; ok {
return addr, nil
}
jar, ok := es.jars[label]
if !ok {
err := fmt.Errorf("no --expansion_jar or --expansion_addr flag provided with label \"%s\"", label)
return "", fmt.Errorf("expansion service labeled \"%s\" not found: %w", label, err)
}

// Start jar on open port.
port, err := ports.GetOpenTCP()
if err != nil {
return "", fmt.Errorf("cannot get open port for expansion service labeled \"%s\": %w", label, err)
}
portStr := strconv.Itoa(port)

// Run jar and cache its info.
proc, err := es.run(*ExpansionTimeout, jar, portStr)
if err != nil {
return "", fmt.Errorf("cannot run jar for expansion service labeled \"%s\": %w", label, err)
}
time.Sleep(es.waitTime) // Wait a bit for the jar to start.
es.procs = append(es.procs, proc)
addr := "localhost:" + portStr
es.addrs[label] = addr
return addr, nil
}

// Shutdown shuts down any jars started by the ExpansionServices struct and should get called if it
// was used at all.
func (es *ExpansionServices) Shutdown() {
for _, p := range es.procs {
p.Kill()
}
es.jars = nil
es.addrs = nil
es.procs = nil
}
179 changes: 179 additions & 0 deletions sdks/go/test/integration/expansions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"fmt"
"testing"
"time"

_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
"github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

type testProcess struct {
killed bool
jar string
}

func (p *testProcess) Kill() error {
p.killed = true
return nil
}

func failRun(_ time.Duration, _ string, _ ...string) (jars.Process, error) {
return nil, fmt.Errorf("unexpectedly running a jar, failing")
}

func succeedRun(_ time.Duration, jar string, _ ...string) (jars.Process, error) {
return &testProcess{jar: jar}, nil
}

// TestExpansionServices_GetAddr_Addresses tests calling GetAddr on provided addresses.
func TestExpansionServices_GetAddr_Addresses(t *testing.T) {
addrsMap := map[string]string{
"label1": "testAddr1",
"label2": "testAddr2",
"label3": "testAddr3",
}
jarsMap := map[string]string{
"label2": "jarFilepath2",
}
es := &ExpansionServices{
addrs: addrsMap,
jars: jarsMap,
procs: make([]jars.Process, 0),
run: failRun,
waitTime: 0,
}

// Ensure we get the same map we put in, and that addresses take priority over jars if
// both are given for the same label.
for label, wantAddr := range addrsMap {
gotAddr, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
if gotAddr != wantAddr {
t.Errorf("incorrect address for \"%v\", want %v, got %v", label, wantAddr, gotAddr)
}
}
// Check that nonexistent labels fail.
if _, err := es.GetAddr("nonexistent_label"); err == nil {
t.Errorf("did not receive error when calling GetAddr with nonexistent label")
}
}

// TestExpansionServices_GetAddr_Jars tests calling GetAddr on provided jars.
func TestExpansionServices_GetAddr_Jars(t *testing.T) {
addrsMap := map[string]string{}
jarsMap := map[string]string{
"label1": "jarFilepath1",
"label2": "jarFilepath2",
"label3": "jarFilepath3",
}
es := &ExpansionServices{
addrs: addrsMap,
jars: jarsMap,
procs: make([]jars.Process, 0),
run: succeedRun,
waitTime: 0,
}

// Call GetAddr on each jar twice, checking that the addresses remain consistent.
gotMap := make(map[string]string)
for label := range jarsMap {
gotAddr, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
gotMap[label] = gotAddr
}
for label, gotAddr := range gotMap {
secondAddr, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
if secondAddr != gotAddr {
t.Errorf("getAddr returned different address when called twice for \"%v\", "+
"attempt 1: %v, attempt 2: %v", label, gotAddr, secondAddr)
}
}
// Check that all jars were run.
gotJars := make([]string, 0)
for _, proc := range es.procs {
testProc := proc.(*testProcess)
gotJars = append(gotJars, testProc.jar)
}
wantJars := make([]string, 0)
for _, jar := range jarsMap {
wantJars = append(wantJars, jar)
}
lessFunc := func(a, b string) bool { return a < b }
if diff := cmp.Diff(wantJars, gotJars, cmpopts.SortSlices(lessFunc)); diff != "" {
t.Errorf("processes in ExpansionServices does not match jars that should be running: diff(-want,+got):\n%v", diff)
}
}

// TestExpansionServices_Shutdown tests that a shutdown correctly kills all jars started by an
// ExpansionServices.
func TestExpansionServices_Shutdown(t *testing.T) {
addrsMap := map[string]string{}
jarsMap := map[string]string{
"label1": "jarFilepath1",
"label2": "jarFilepath2",
"label3": "jarFilepath3",
}
es := &ExpansionServices{
addrs: addrsMap,
jars: jarsMap,
procs: make([]jars.Process, 0),
run: succeedRun,
waitTime: 0,
}
// Call getAddr on each label to run jars.
for label := range addrsMap {
_, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
}

// Shutdown and confirm that jars are killed and addresses can no longer be retrieved.
procs := es.procs
es.Shutdown()
for _, proc := range procs {
testProc := proc.(*testProcess)
if !testProc.killed {
t.Errorf("process for jar %v was not killed on Shutdown()", testProc.jar)
}
}
for label := range addrsMap {
_, err := es.GetAddr(label)
if err == nil {
t.Errorf("calling GetAddr after Shutdown did not return an error for \"%v\"", label)
}
}
}
86 changes: 85 additions & 1 deletion sdks/go/test/integration/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@

package integration

import "flag"
import (
"flag"
"fmt"
"strings"
)

// The following flags are flags used in one or more integration tests, and that
// may be used by scripts that execute "go test ./sdks/go/test/integration/...".
Expand Down Expand Up @@ -53,4 +57,84 @@ var (
KafkaJarTimeout = flag.String("kafka_jar_timeout", "10m",
"Sets an auto-shutdown timeout to the Kafka cluster. "+
"Requires the timeout command to be present in Path, unless the value is set to \"\".")

// ExpansionJars contains elements in the form "label:jar" describing jar
// filepaths for expansion services to use in integration tests, and the
// corresponding labels. Once provided through this flag, those jars can
// be used in tests via the ExpansionServices struct.
ExpansionJars stringSlice

// ExpansionAddrs contains elements in the form "label:address" describing
// endpoints for expansion services to use in integration tests, and the
// corresponding labels. Once provided through this flag, those addresses
// can be used in tests via the ExpansionServices struct.
ExpansionAddrs stringSlice

// ExpansionTimeout attempts to apply an auto-shutdown timeout to any
// expansion services started by integration tests.
ExpansionTimeout = flag.Duration("expansion_timeout", 0,
"Sets an auto-shutdown timeout to any started expansion services. "+
"Requires the timeout command to be present in Path, unless the value is set to 0.")
)

func init() {
flag.Var(&ExpansionJars, "expansion_jar",
"Define jar locations for expansion services. Each entry consists of "+
"two values, an arbitrary label and a jar filepath, separated by a "+
"\":\", in the form \"label:jar\". Jars provided through this flag "+
"can be started by tests.")
flag.Var(&ExpansionAddrs, "expansion_addr",
"Define addresses for expansion services. Each entry consists of "+
"two values, an arbitrary label and an address, separated by a "+
"\":\", in the form \"label:address\". Addresses provided through "+
"this flag can be used as expansion addresses by tests.")
}

// GetExpansionJars gets all the jars given to --expansion_jar as a map of label to jar location.
func GetExpansionJars() map[string]string {
ret := make(map[string]string)
for _, jar := range ExpansionJars {
splits := strings.SplitN(jar, ":", 2)
ret[splits[0]] = splits[1]
}
return ret
}

// GetExpansionAddrs gets all the addresses given to --expansion_addr as a map of label to address.
func GetExpansionAddrs() map[string]string {
ret := make(map[string]string)
for _, addr := range ExpansionAddrs {
splits := strings.SplitN(addr, ":", 2)
ret[splits[0]] = splits[1]
}
return ret
}

// stringSlice is a flag.Value implementation for string slices, that allows
// multiple strings to be assigned to one flag by specifying multiple instances
// of the flag.
//
// Example:
// var myFlags stringSlice
// flag.Var(&myFlags, "my_flag", "A list of flags")
// With the example above, the slice can be set to contain ["foo", "bar"]:
// cmd -my_flag foo -my_flag bar
type stringSlice []string

// String implements the String method of flag.Value. This outputs the value
// of the flag as a string.
func (s *stringSlice) String() string {
return fmt.Sprintf("%v", *s)
}

// Set implements the Set method of flag.Value. This stores a string input to
// the flag into a stringSlice representation.
func (s *stringSlice) Set(value string) error {
*s = append(*s, value)
return nil
}

// Get returns the instance itself.
func (s stringSlice) Get() interface{} {
return s
}
Loading