From 3f726cc6331c2452863b97c8a745a4215363d8de Mon Sep 17 00:00:00 2001 From: Lukas Vogel Date: Mon, 20 Apr 2020 08:06:09 +0200 Subject: [PATCH] integration: speed up dockerized end2end test (#3719) The overhead of running a binary in a docker container with docker-compose exec is around 1 second. Since our integration tests are serialized and a cartesian product between all ASes is taken, this overhead easily amounts to multiple minutes. With this PR, all servers are started in parallel for the end2end test. All pairs with the same source are executed in on single docker exec command. And all executions are done in parallel. To get a progress update, the end2end binary talks through a unix socket to the integration binary. We can eventually move the server wait to this RPC framework as well. With this style of invoking the end2end test, we can reduce the execution time on CI from >10 minutes (timeout) to a bit more than 1 minute. Most of the execution time is actually spent in starting the topology, not in execution the tests. Co-authored-by: Oncilla --- .buildkite/pipeline.yml | 17 ++ go/examples/pingpong/pp_integration/main.go | 2 +- go/integration/BUILD.bazel | 2 + go/integration/cert_req_integration/main.go | 2 +- go/integration/common.go | 14 ++ go/integration/end2end/main.go | 19 +- .../end2end_integration/BUILD.bazel | 2 + go/integration/end2end_integration/main.go | 176 +++++++++++++++--- go/lib/integration/BUILD.bazel | 3 + go/lib/integration/binary.go | 25 ++- go/lib/integration/cmd.go | 96 ++++++++++ go/lib/integration/docker.go | 24 ++- go/lib/integration/done.go | 53 ++++++ go/lib/integration/integration.go | 32 +--- go/lib/integration/progress/BUILD.bazel | 9 + go/lib/integration/progress/progress.go | 70 +++++++ topology/Default.topo | 16 +- 17 files changed, 486 insertions(+), 76 deletions(-) create mode 100644 go/lib/integration/cmd.go create mode 100644 go/lib/integration/done.go create mode 100644 go/lib/integration/progress/BUILD.bazel create mode 100644 go/lib/integration/progress/progress.go diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 12ba23b43b..05a1ea52c6 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -122,3 +122,20 @@ steps: automatic: - exit_status: -1 # Agent was lost - exit_status: 255 # Forced agent shutdown + - label: ":docker: Integration: end2end_integration full topology" + if: build.message !~ /\[doc\]/ + command: + - bazel run //docker/perapp:prod >/dev/null 2>&1 + - ./scion.sh topology -t -d + - ./scion.sh run + - docker-compose -f gen/scion-dc.yml -p scion up -d $(docker-compose -f gen/scion-dc.yml config --services | grep tester) + - sleep 10 + - ./bin/end2end_integration -d -log.console warn + artifact_paths: + - "artifacts.out/**/*" + timeout_in_minutes: 10 + key: docker_integration_e2e_default + retry: + automatic: + - exit_status: -1 # Agent was lost + - exit_status: 255 # Forced agent shutdown diff --git a/go/examples/pingpong/pp_integration/main.go b/go/examples/pingpong/pp_integration/main.go index c0a0872a0c..4533431e28 100644 --- a/go/examples/pingpong/pp_integration/main.go +++ b/go/examples/pingpong/pp_integration/main.go @@ -72,7 +72,7 @@ func runTests(in integration.Integration, pairs []integration.IAPair) error { for i, conn := range pairs { testInfo := fmt.Sprintf("%v -> %v (%v/%v)", conn.Src.IA, conn.Dst.IA, i+1, len(pairs)) log.Info(fmt.Sprintf("Test %v: %s", in.Name(), testInfo)) - if err := integration.RunClient(in, conn, 5*time.Second); err != nil { + if err := integration.RunClient(in, conn, 10*time.Second); err != nil { log.Error(fmt.Sprintf("Error in client: %s", testInfo), "err", err) return err } diff --git a/go/integration/BUILD.bazel b/go/integration/BUILD.bazel index bbd54c1db4..e2739c485c 100644 --- a/go/integration/BUILD.bazel +++ b/go/integration/BUILD.bazel @@ -6,8 +6,10 @@ go_library( importpath = "github.com/scionproto/scion/go/integration", visibility = ["//visibility:public"], deps = [ + "//go/lib/addr:go_default_library", "//go/lib/env:go_default_library", "//go/lib/integration:go_default_library", + "//go/lib/integration/progress:go_default_library", "//go/lib/log:go_default_library", "//go/lib/sciond:go_default_library", "//go/lib/snet:go_default_library", diff --git a/go/integration/cert_req_integration/main.go b/go/integration/cert_req_integration/main.go index b3a3a4c29f..dd58c7e776 100644 --- a/go/integration/cert_req_integration/main.go +++ b/go/integration/cert_req_integration/main.go @@ -28,7 +28,7 @@ import ( var ( name = "cert_req_integration" cmd = "./bin/cert_req" - attempts = flag.Int("attempts", 2, "Number of attempts before giving up.") + attempts = flag.Int("attempts", 4, "Number of attempts before giving up.") ) func main() { diff --git a/go/integration/common.go b/go/integration/common.go index 6b02cd60a1..0e2eb6bb10 100644 --- a/go/integration/common.go +++ b/go/integration/common.go @@ -24,8 +24,10 @@ import ( "github.com/opentracing/opentracing-go" + "github.com/scionproto/scion/go/lib/addr" "github.com/scionproto/scion/go/lib/env" "github.com/scionproto/scion/go/lib/integration" + "github.com/scionproto/scion/go/lib/integration/progress" "github.com/scionproto/scion/go/lib/log" "github.com/scionproto/scion/go/lib/sciond" "github.com/scionproto/scion/go/lib/snet" @@ -41,6 +43,7 @@ const ( var ( Local snet.UDPAddr Mode string + Progress string sciondAddr string networksFile string Attempts int @@ -55,6 +58,7 @@ func Setup() { func addFlags() { flag.Var((*snet.UDPAddr)(&Local), "local", "(Mandatory) address to listen on") flag.StringVar(&Mode, "mode", ModeClient, "Run in "+ModeClient+" or "+ModeServer+" mode") + flag.StringVar(&Progress, "progress", "", "Socket to write progress to") flag.StringVar(&sciondAddr, "sciond", sciond.DefaultSCIONDAddress, "SCIOND address") flag.StringVar(&networksFile, "networks", integration.SCIONDAddressesFile, "File containing network definitions") @@ -146,6 +150,16 @@ func AttemptRepeatedly(name string, attempt AttemptFunc) int { return 1 } +// Done informs the integration test that a test binary has finished. +func Done(src, dst addr.IA) { + if Progress == "" { + return + } + if doneErr := (progress.Client{Socket: Progress}).Done(src, dst); doneErr != nil { + log.Error("Unable to send done", "err", doneErr) + } +} + // LogFatal logs a critical error and exits with 1 func LogFatal(msg string, a ...interface{}) { log.Crit(msg, a...) diff --git a/go/integration/end2end/main.go b/go/integration/end2end/main.go index e1205cec86..6dbfa4bea8 100644 --- a/go/integration/end2end/main.go +++ b/go/integration/end2end/main.go @@ -49,7 +49,7 @@ const ( var ( remote snet.UDPAddr - timeout = &util.DurWrap{Duration: 2 * time.Second} + timeout = &util.DurWrap{Duration: 10 * time.Second} ) func main() { @@ -99,6 +99,9 @@ type server struct { } func (s server) run() { + log.Info("Starting server", "ia", integration.Local.IA) + defer log.Info("Finished server", "ia", integration.Local.IA) + connFactory := &snet.DefaultPacketDispatcherService{ Dispatcher: reliable.NewDispatcher(""), SCMPHandler: snet.NewSCMPHandler( @@ -113,7 +116,7 @@ func (s server) run() { if len(os.Getenv(libint.GoIntegrationEnv)) > 0 { // Needed for integration test ready signal. fmt.Printf("Port=%d\n", port) - fmt.Printf("%s%s\n", libint.ReadySignal, integration.Local.IA) + fmt.Printf("%s%s\n\n", libint.ReadySignal, integration.Local.IA) } log.Debug("Listening", "local", fmt.Sprintf("%v:%d", integration.Local.Host, port)) // Receive ping message @@ -127,8 +130,7 @@ func (s server) run() { if string(p.Payload.(common.RawBytes)) != ping+integration.Local.IA.String() { integration.LogFatal("Received unexpected data", "data", p.Payload.(common.RawBytes)) } - log.Debug(fmt.Sprintf("Ping received from %s, sending pong.", - p.Source)) + log.Debug(fmt.Sprintf("Ping received from %s, sending pong.", p.Source)) // Send pong if p.Path != nil { @@ -143,7 +145,7 @@ func (s server) run() { if err := conn.WriteTo(&p, &ov); err != nil { integration.LogFatal("Unable to send reply", "err", err) } - log.Debug(fmt.Sprintf("Sent pong to %s", p.Destination)) + log.Info("Sent pong to", "client", p.Destination) } } @@ -154,6 +156,10 @@ type client struct { } func (c client) run() int { + pair := fmt.Sprintf("%s -> %s", integration.Local.IA, remote.IA) + log.Info("Starting", "pair", pair) + defer log.Info("Finished", "pair", pair) + defer integration.Done(integration.Local.IA, remote.IA) connFactory := &snet.DefaultPacketDispatcherService{ Dispatcher: reliable.NewDispatcher(""), SCMPHandler: snet.NewSCMPHandler( @@ -195,7 +201,6 @@ func (c client) attemptRequest(n int) bool { ext.Error.Set(span, true) return false } - logger.Info("Received pong") return true } @@ -275,7 +280,7 @@ func (c client) pong(ctx context.Context) error { return common.NewBasicError("Received unexpected data", nil, "data", string(p.Payload.(common.RawBytes)), "expected", expected) } - log.Debug(fmt.Sprintf("Received pong from %s", remote.IA)) + log.Info("Received pong", "server", p.Source) return nil } diff --git a/go/integration/end2end_integration/BUILD.bazel b/go/integration/end2end_integration/BUILD.bazel index 2f6b971d9f..01618dd640 100644 --- a/go/integration/end2end_integration/BUILD.bazel +++ b/go/integration/end2end_integration/BUILD.bazel @@ -11,6 +11,8 @@ go_library( "//go/lib/common:go_default_library", "//go/lib/integration:go_default_library", "//go/lib/log:go_default_library", + "//go/lib/serrors:go_default_library", + "//go/lib/snet:go_default_library", "//go/lib/util:go_default_library", ], ) diff --git a/go/integration/end2end_integration/main.go b/go/integration/end2end_integration/main.go index 93207bbd9d..5cf6e30f89 100644 --- a/go/integration/end2end_integration/main.go +++ b/go/integration/end2end_integration/main.go @@ -15,39 +15,47 @@ package main import ( + "context" "flag" "fmt" "os" "strconv" "strings" + "sync" "time" "github.com/scionproto/scion/go/lib/addr" "github.com/scionproto/scion/go/lib/common" "github.com/scionproto/scion/go/lib/integration" "github.com/scionproto/scion/go/lib/log" + "github.com/scionproto/scion/go/lib/serrors" + "github.com/scionproto/scion/go/lib/snet" "github.com/scionproto/scion/go/lib/util" ) const ( - name = "end2end_integration" - cmd = "./bin/end2end" + nameE2E, cmdE2E = "end2end_integration", "./bin/end2end" + logDir = "logs/end2end_integration" ) var ( - subset string - attempts int - runAll bool - timeout = &util.DurWrap{Duration: 5 * time.Second} + subset string + attempts int + timeout = &util.DurWrap{Duration: 10 * time.Second} + parallelism int ) +func getCmd() (string, bool) { + return cmdE2E, true +} + func main() { os.Exit(realMain()) } func realMain() int { addFlags() - if err := integration.Init(name); err != nil { + if err := integration.Init(nameE2E); err != nil { fmt.Fprintf(os.Stderr, "Failed to init: %s\n", err) return 1 } @@ -62,11 +70,18 @@ func realMain() int { "-remote", integration.DstAddrPattern + ":" + integration.ServerPortReplace, } serverArgs := []string{ - "-log.console", "debug", "-mode", "server", "-sciond", integration.SCIOND, "-local", integration.DstAddrPattern + ":0", } + + cmd, name := cmdE2E, nameE2E + + if err := os.MkdirAll(logDir, os.ModePerm); err != nil { + log.Error("Error creating logging directory", "err", err) + return 1 + } + in := integration.NewBinaryIntegration(name, cmd, clientArgs, serverArgs) pairs, err := getPairs() if err != nil { @@ -83,44 +98,151 @@ func realMain() int { // addFlags adds the necessary flags. func addFlags() { flag.IntVar(&attempts, "attempts", 1, "Number of attempts per client before giving up.") - flag.BoolVar(&runAll, "all", false, "Run all tests, instead of exiting on first error.") flag.Var(timeout, "timeout", "The timeout for each attempt") flag.StringVar(&subset, "subset", "all", "Subset of pairs to run (all|core-core|"+ "noncore-localcore|noncore-core|noncore-noncore)") + flag.IntVar(¶llelism, "parallelism", 1, "How many end2end tests run in parallel.") } // runTests runs the end2end tests for all pairs. In case of an error the // function is terminated immediately. func runTests(in integration.Integration, pairs []integration.IAPair) error { return integration.ExecuteTimed(in.Name(), func() error { + // Make sure that all executed commands can write to the RPC server + // after shutdown. + defer time.Sleep(time.Second) + + // Estimating the timeout we should have is hard. CI will abort after 10 + // minutes anyway. Thus this value. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + // First run all servers - var lastErr error - dsts := integration.ExtractUniqueDsts(pairs) - for _, dst := range dsts { - s, err := integration.StartServer(in, dst) - if err != nil { - log.Error(fmt.Sprintf("Error in server: %s", dst.String()), "err", err) - return err + type srvResult struct { + cleaner func() + err error + } + // Start servers in parallel. + srvResults := make(chan srvResult) + for _, dst := range integration.ExtractUniqueDsts(pairs) { + go func(dst *snet.UDPAddr) { + defer log.HandlePanic() + + srvCtx, cancel := context.WithCancel(ctx) + waiter, err := in.StartServer(srvCtx, dst) + if err != nil { + log.Error(fmt.Sprintf("Error in server: %s", dst.String()), "err", err) + } + cleaner := func() { + cancel() + if waiter != nil { + waiter.Wait() + } + } + srvResults <- srvResult{cleaner: cleaner, err: err} + }(dst) + } + // Wait for all servers being started. + var errs serrors.List + for range integration.ExtractUniqueDsts(pairs) { + res := <-srvResults + // We need to register a cleanup for all servers. + // Do not short-cut exit here. + if res.err != nil { + errs = append(errs, res.err) } - defer s.Close() + defer res.cleaner() + } + if err := errs.ToError(); err != nil { + return err } - // Now start the clients for srcDest pair - for i, conn := range pairs { - testInfo := fmt.Sprintf("%v -> %v (%v/%v)", conn.Src.IA, conn.Dst.IA, i+1, len(pairs)) + + // Start a done signal listener. This is how the end2end binary + // communicates with this integration test. This is solely used to print + // the progress of the test. + var ctrMtx sync.Mutex + var ctr int + socket, clean, err := integration.ListenDone(func(src, dst addr.IA) { + ctrMtx.Lock() + defer ctrMtx.Unlock() + ctr++ + testInfo := fmt.Sprintf("%v -> %v (%v/%v)", src, dst, ctr, len(pairs)) log.Info(fmt.Sprintf("Test %v: %s", in.Name(), testInfo)) - t := integration.DefaultRunTimeout + timeout.Duration*time.Duration(attempts) - if err := integration.RunClient(in, conn, t); err != nil { - log.Error(fmt.Sprintf("Error in client: %s", testInfo), "err", err) - lastErr = err - if !runAll { - return err + }) + if err != nil { + return err + } + defer clean() + + // CI collapses if parallelism is too high. + semaphore := make(chan struct{}, parallelism) + + // Docker exec comes with a 1 second overhead. We group all the pairs by + // the clients. And run all pairs for a given client in one execution. + // Thus, reducing the overhead dramatically. + groups := integration.GroupBySource(pairs) + clientResults := make(chan error, len(groups)) + for src, dsts := range groups { + go func(src *snet.UDPAddr, dsts []*snet.UDPAddr) { + defer log.HandlePanic() + + semaphore <- struct{}{} + defer func() { <-semaphore }() + // Aggregate all the commands that need to be run. + cmds := make([]integration.Cmd, 0, len(dsts)) + for _, dst := range dsts { + cmd, err := clientTemplate(socket).Template(src, dst) + if err != nil { + clientResults <- err + return + } + cmds = append(cmds, cmd) + } + var tester string + if *integration.Docker { + tester = integration.TesterID(src) } + logFile := fmt.Sprintf("%s/client_%s.log", logDir, src.IA.FileFmt(false)) + err := integration.Run(ctx, integration.RunConfig{ + Commands: cmds, + LogFile: logFile, + Tester: tester, + }) + if err != nil { + err = serrors.WithCtx(err, "file", logFile) + } + clientResults <- err + }(src, dsts) + } + for range groups { + err := <-clientResults + if err != nil { + return err } } - return lastErr + return nil }) } +func clientTemplate(progressSock string) integration.Cmd { + bin, progress := getCmd() + cmd := integration.Cmd{ + Binary: bin, + Args: []string{ + "-log.console", "debug", + "-attempts", strconv.Itoa(attempts), + "-timeout", timeout.String(), + "-sciond", integration.SCIOND, + "-local", integration.SrcAddrPattern + ":0", + "-remote", integration.DstAddrPattern + ":" + integration.ServerPortReplace, + }, + } + if progress { + cmd.Args = append(cmd.Args) + } + return cmd +} + // getPairs returns the pairs to test according to the specified subset. func getPairs() ([]integration.IAPair, error) { pairs := integration.IAPairs(integration.DispAddr) diff --git a/go/lib/integration/BUILD.bazel b/go/lib/integration/BUILD.bazel index 755a334a33..ee957f3c3e 100644 --- a/go/lib/integration/BUILD.bazel +++ b/go/lib/integration/BUILD.bazel @@ -4,7 +4,9 @@ go_library( name = "go_default_library", srcs = [ "binary.go", + "cmd.go", "docker.go", + "done.go", "integration.go", ], importpath = "github.com/scionproto/scion/go/lib/integration", @@ -12,6 +14,7 @@ go_library( deps = [ "//go/lib/addr:go_default_library", "//go/lib/common:go_default_library", + "//go/lib/integration/progress:go_default_library", "//go/lib/log:go_default_library", "//go/lib/sciond:go_default_library", "//go/lib/serrors:go_default_library", diff --git a/go/lib/integration/binary.go b/go/lib/integration/binary.go index c65407cd81..702a00e059 100644 --- a/go/lib/integration/binary.go +++ b/go/lib/integration/binary.go @@ -22,6 +22,7 @@ import ( "os" "os/exec" "strings" + "sync" "time" "github.com/scionproto/scion/go/lib/addr" @@ -63,7 +64,11 @@ const ( ) var ( - serverPorts = make(map[addr.IA]string) + // FIXME(roosd): The caller to StartServer and StartClient + // should take care of aggregating the data. I would prefer not to use a + // global here. + serverPortsMtx sync.Mutex + serverPorts = make(map[addr.IA]string) ) var _ Integration = (*binaryIntegration)(nil) @@ -116,6 +121,7 @@ func (bi *binaryIntegration) StartServer(ctx context.Context, dst *snet.UDPAddr) r := &binaryWaiter{ exec.CommandContext(ctx, bi.cmd, args...), } + log.Info(fmt.Sprintf("%v %v\n", bi.cmd, strings.Join(args, " "))) r.Env = os.Environ() r.Env = append(r.Env, fmt.Sprintf("%s=1", GoIntegrationEnv)) ep, err := r.StderrPipe() @@ -136,9 +142,15 @@ func (bi *binaryIntegration) StartServer(ctx context.Context, dst *snet.UDPAddr) init := true scanner := bufio.NewScanner(sp) for scanner.Scan() { + if scanner.Err() != nil { + log.Error("Error during reading of stdout", "err", scanner.Err()) + return + } line := scanner.Text() if strings.HasPrefix(line, portString) { + serverPortsMtx.Lock() serverPorts[dst.IA] = strings.TrimPrefix(line, portString) + serverPortsMtx.Unlock() } if init && signal == line { close(ready) @@ -150,6 +162,7 @@ func (bi *binaryIntegration) StartServer(ctx context.Context, dst *snet.UDPAddr) defer log.HandlePanic() bi.writeLog("server", dst.IA.FileFmt(false), dst.IA.FileFmt(false), ep) }() + if err = r.Start(); err != nil { return nil, common.NewBasicError("Failed to start server", err, "dst", dst.IA) } @@ -179,16 +192,26 @@ func (bi *binaryIntegration) StartClient(ctx context.Context, r := &binaryWaiter{ exec.CommandContext(ctx, bi.cmd, args...), } + log.Info(fmt.Sprintf("%v %v\n", bi.cmd, strings.Join(args, " "))) r.Env = os.Environ() r.Env = append(r.Env, fmt.Sprintf("%s=1", GoIntegrationEnv)) ep, err := r.StderrPipe() if err != nil { return nil, err } + sp, err := r.StdoutPipe() + if err != nil { + return nil, err + } + go func() { defer log.HandlePanic() bi.writeLog("client", clientId(src, dst), fmt.Sprintf("%s -> %s", src.IA, dst.IA), ep) }() + go func() { + defer log.HandlePanic() + bi.writeLog("client", clientId(src, dst), fmt.Sprintf("%s -> %s", src.IA, dst.IA), sp) + }() return r, r.Start() } diff --git a/go/lib/integration/cmd.go b/go/lib/integration/cmd.go new file mode 100644 index 0000000000..59a52c0bed --- /dev/null +++ b/go/lib/integration/cmd.go @@ -0,0 +1,96 @@ +// Copyright 2020 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "fmt" + "os" + "os/exec" + "strings" + + "github.com/scionproto/scion/go/lib/log" + "github.com/scionproto/scion/go/lib/serrors" + "github.com/scionproto/scion/go/lib/snet" +) + +// Cmd represents a single command. +type Cmd struct { + // Binary is the path to the application binary. + Binary string + Args []string +} + +// Template returns a command with the place holder arguments filled in. +func (c Cmd) Template(src, dst *snet.UDPAddr) (Cmd, error) { + args := replacePattern(SrcIAReplace, src.IA.String(), c.Args) + args = replacePattern(SrcHostReplace, src.Host.IP.String(), args) + args = replacePattern(DstIAReplace, dst.IA.String(), args) + args = replacePattern(DstHostReplace, dst.Host.IP.String(), args) + args = replacePattern(ServerPortReplace, serverPorts[dst.IA], args) + if needSCIOND(args) { + sciond, err := GetSCIONDAddress(SCIONDAddressesFile, src.IA) + if err != nil { + return Cmd{}, serrors.WrapStr("unable to determine SCIOND address", err) + } + args = replacePattern(SCIOND, sciond, args) + } + return Cmd{Binary: c.Binary, Args: args}, nil +} + +func (c Cmd) String() string { + return fmt.Sprintf("%v %v", c.Binary, strings.Join(c.Args, " ")) +} + +// RunConfig is used to configure the run. +type RunConfig struct { + Commands []Cmd + LogFile string + // Tester is the tester container to run the commands in. If it is empty, + // the commands are run directly, instead of in a tester container. + Tester string +} + +// Run runs the commands of the run config. The caller must ensure that all +// commands are executable when run in a tester container. E.g., for end-to-end +// tests this means the source address is the same for all. +func Run(ctx context.Context, cfg RunConfig) error { + file, err := os.OpenFile(cfg.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644) + if err != nil { + return err + } + var cmd *exec.Cmd + if cfg.Tester != "" { + args := append([]string{}, dockerArgs...) + args = append(args, cfg.Tester, "sh", "-c", joinCmds(cfg.Commands)) + cmd = exec.CommandContext(ctx, "docker-compose", args...) + log.Debug("Running docker command", "cmd", cmd) + } else { + cmd = exec.CommandContext(ctx, "sh", "-c", joinCmds(cfg.Commands)) + cmd.Env = append(os.Environ(), fmt.Sprintf("%s=1", GoIntegrationEnv)) + log.Debug("Running command", "cmd", cmd) + } + cmd.Stdout, cmd.Stderr = file, file + return cmd.Run() +} + +// Join joins the commands with the provided operator. +func joinCmds(l []Cmd) string { + cmds := make([]string, 0, len(l)) + for _, cmd := range l { + cmds = append(cmds, cmd.String()) + } + return strings.Join(cmds, " && ") +} diff --git a/go/lib/integration/docker.go b/go/lib/integration/docker.go index a24b8d11d5..b89bf9d952 100644 --- a/go/lib/integration/docker.go +++ b/go/lib/integration/docker.go @@ -18,14 +18,15 @@ import ( "context" "flag" "fmt" + "os" + "strings" "github.com/scionproto/scion/go/lib/log" "github.com/scionproto/scion/go/lib/snet" ) const ( - dockerCmd = "./tools/dc" - dockerArg = "exec_tester" + dockerCmd = "docker-compose" ) var ( @@ -33,6 +34,9 @@ var ( Docker = flag.Bool("d", false, "Run tests in a docker container") ) +var dockerArgs = []string{"-f", "gen/scion-dc.yml", "-p", "scion", "exec", "-T", "-e", + fmt.Sprintf("%s=1", GoIntegrationEnv)} + var _ Integration = (*dockerIntegration)(nil) type dockerIntegration struct { @@ -51,9 +55,7 @@ func dockerize(bi *binaryIntegration) Integration { // StartServer starts a server and blocks until the ReadySignal is received on Stdout. func (di *dockerIntegration) StartServer(ctx context.Context, dst *snet.UDPAddr) (Waiter, error) { bi := *di.binaryIntegration - env := fmt.Sprintf("%s=1", GoIntegrationEnv) - bi.serverArgs = append([]string{dockerArg, dst.IA.FileFmt(false), env, bi.cmd}, - bi.serverArgs...) + bi.serverArgs = append(dockerArgs, append([]string{TesterID(dst), bi.cmd}, bi.serverArgs...)...) bi.cmd = dockerCmd log.Debug(fmt.Sprintf("Starting server for %s in a docker container", dst.IA.FileFmt(false))) return bi.StartServer(ctx, dst) @@ -62,8 +64,18 @@ func (di *dockerIntegration) StartServer(ctx context.Context, dst *snet.UDPAddr) func (di *dockerIntegration) StartClient(ctx context.Context, src, dst *snet.UDPAddr) (Waiter, error) { bi := *di.binaryIntegration - bi.clientArgs = append([]string{dockerArg, src.IA.FileFmt(false), bi.cmd}, bi.clientArgs...) + bi.clientArgs = append(dockerArgs, append([]string{TesterID(src), bi.cmd}, bi.clientArgs...)...) bi.cmd = dockerCmd log.Debug(fmt.Sprintf("Starting client for %s in a docker container", src.IA.FileFmt(false))) return bi.StartClient(ctx, src, dst) } + +// TesterID returns the ID of the tester container. +func TesterID(a *snet.UDPAddr) string { + ia := a.IA.FileFmt(false) + envID, ok := os.LookupEnv(fmt.Sprintf("tester_%s", strings.Replace(ia, "-", "_", -1))) + if !ok { + return fmt.Sprintf("tester_%s", ia) + } + return envID +} diff --git a/go/lib/integration/done.go b/go/lib/integration/done.go new file mode 100644 index 0000000000..6d3c3e95ca --- /dev/null +++ b/go/lib/integration/done.go @@ -0,0 +1,53 @@ +// Copyright 2020 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "io/ioutil" + "net" + "os" + + "github.com/scionproto/scion/go/lib/addr" + "github.com/scionproto/scion/go/lib/integration/progress" + "github.com/scionproto/scion/go/lib/log" +) + +// ListenDone opens a RPC server to listen for done signals. +func ListenDone(onDone func(src, dst addr.IA)) (string, func(), error) { + if err := os.MkdirAll("logs/socks", os.ModePerm); err != nil { + return "", nil, err + } + file, err := ioutil.TempFile("logs/socks", "integration-*.sock") + if err != nil { + return "", nil, err + } + name := file.Name() + if err := file.Close(); err != nil { + return "", nil, err + } + if err := os.Remove(name); err != nil { + return "", nil, err + } + l, err := net.Listen("unix", name) + if err != nil { + return "", nil, err + } + srv := progress.Server{OnDone: onDone} + go func() { + defer log.HandlePanic() + srv.Serve(l) + }() + return name, func() { os.Remove(name) }, nil +} diff --git a/go/lib/integration/integration.go b/go/lib/integration/integration.go index 46c4ed95f4..03cddcb3e7 100644 --- a/go/lib/integration/integration.go +++ b/go/lib/integration/integration.go @@ -40,7 +40,7 @@ import ( const ( // StartServerTimeout is the timeout for starting a server. - StartServerTimeout = 6 * time.Second + StartServerTimeout = 20 * time.Second // DefaultRunTimeout is the timeout when running a server or a client. DefaultRunTimeout = 8 * time.Second // CtxTimeout is the timeout a context waits before being killed @@ -286,31 +286,13 @@ func ExtractUniqueDsts(pairs []IAPair) []*snet.UDPAddr { return res } -// RunBinaryTests runs the client and server for each IAPair. A number of tests are run in parallel -// In case of an error the function is terminated immediately. -func RunBinaryTests(in Integration, pairs []IAPair, timeout time.Duration) error { - if timeout == 0 { - timeout = DefaultRunTimeout +// GroupBySource groups the ISD-AS pairs by source. +func GroupBySource(pairs []IAPair) map[*snet.UDPAddr][]*snet.UDPAddr { + groups := make(map[*snet.UDPAddr][]*snet.UDPAddr) + for _, pair := range pairs { + groups[pair.Src] = append(groups[pair.Src], pair.Dst) } - return runTests(in, pairs, 1, func(idx int, pair IAPair) error { - // Start server - s, err := StartServer(in, pair.Dst) - if err != nil { - log.Error(fmt.Sprintf("Error in server: %s", pair.Dst.String()), "err", err) - return err - } - defer s.Close() - // Start client - log.Info(fmt.Sprintf("Test %v: %v -> %v (%v/%v)", in.Name(), pair.Src.IA, pair.Dst.IA, - idx+1, len(pairs))) - if err := RunClient(in, pair, timeout); err != nil { - msg := fmt.Sprintf("Error in client: %v -> %v (%v/%v)", - pair.Src.IA, pair.Dst.IA, idx+1, len(pairs)) - log.Error(msg, "err", err) - return err - } - return nil - }) + return groups } // RunUnaryTests runs the client for each IAPair. diff --git a/go/lib/integration/progress/BUILD.bazel b/go/lib/integration/progress/BUILD.bazel new file mode 100644 index 0000000000..0742ececf9 --- /dev/null +++ b/go/lib/integration/progress/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["progress.go"], + importpath = "github.com/scionproto/scion/go/lib/integration/progress", + visibility = ["//visibility:public"], + deps = ["//go/lib/addr:go_default_library"], +) diff --git a/go/lib/integration/progress/progress.go b/go/lib/integration/progress/progress.go new file mode 100644 index 0000000000..275770ac6b --- /dev/null +++ b/go/lib/integration/progress/progress.go @@ -0,0 +1,70 @@ +// Copyright 2018 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package progress + +import ( + "net" + "net/http" + "net/rpc" + + "github.com/scionproto/scion/go/lib/addr" +) + +// RPC defines the progress RPC API. +type RPC struct { + onDone func(src, dst addr.IA) +} + +// Done exposes the RPC. +func (r *RPC) Done(done *Done, rep *bool) error { + *rep = true + r.onDone(done.Src, done.Dst) + return nil +} + +// Done is the RPC call to indicate a test is done. +type Done struct { + Src, Dst addr.IA +} + +// Client is the client side of the RPCs between the testing binary and the +// integration test. +type Client struct { + Socket string +} + +// Done tells the integration test, that the testing binary is done. +func (c Client) Done(src, dst addr.IA) error { + client, err := rpc.DialHTTP("unix", c.Socket) + if err != nil { + return err + } + args := &Done{Src: src, Dst: dst} + var ignore bool + return client.Call("RPC.Done", args, &ignore) +} + +// Server is the server side of the RPCs between the testing binary and the +// integration test. +type Server struct { + OnDone func(src, dst addr.IA) +} + +// Serve starts serving the RPCs. +func (s *Server) Serve(l net.Listener) error { + rpc.Register(&RPC{onDone: s.OnDone}) + rpc.HandleHTTP() + return http.Serve(l, nil) +} diff --git a/topology/Default.topo b/topology/Default.topo index 2ba78ec94e..cfccc36eca 100644 --- a/topology/Default.topo +++ b/topology/Default.topo @@ -59,24 +59,24 @@ ASes: underlay: UDP/IPv6 links: - {a: "1-ff00:0:110#1", b: "1-ff00:0:120-A#6", linkAtoB: CORE} - - {a: "1-ff00:0:110#2", b: "1-ff00:0:130-A#1004", linkAtoB: CORE, underlay: UDP/IPv6} + - {a: "1-ff00:0:110#2", b: "1-ff00:0:130-A#104", linkAtoB: CORE, underlay: UDP/IPv6} - {a: "1-ff00:0:110#3", b: "2-ff00:0:210#453", linkAtoB: CORE} - - {a: "1-ff00:0:120-A#1", b: "1-ff00:0:130-B#1005", linkAtoB: CORE} + - {a: "1-ff00:0:120-A#1", b: "1-ff00:0:130-B#105", linkAtoB: CORE} - {a: "1-ff00:0:120-B#2", b: "2-ff00:0:220#501", linkAtoB: CORE, mtu: 1350} - {a: "1-ff00:0:120-B#3", b: "2-ff00:0:220#502", linkAtoB: CORE, mtu: 1400} - {a: "1-ff00:0:120-B#4", b: "1-ff00:0:121#3", linkAtoB: CHILD} - {a: "1-ff00:0:120#5", b: "1-ff00:0:111-B#104", linkAtoB: CHILD} - - {a: "1-ff00:0:130-A#1001", b: "1-ff00:0:131#4079", linkAtoB: CHILD} - - {a: "1-ff00:0:130-B#1002", b: "1-ff00:0:111-A#105", linkAtoB: CHILD, underlay: UDP/IPv6} - - {a: "1-ff00:0:130-A#1003", b: "1-ff00:0:112#4095", linkAtoB: CHILD} + - {a: "1-ff00:0:130-A#111", b: "1-ff00:0:131#479", linkAtoB: CHILD} + - {a: "1-ff00:0:130-B#112", b: "1-ff00:0:111-A#105", linkAtoB: CHILD, underlay: UDP/IPv6} + - {a: "1-ff00:0:130-A#113", b: "1-ff00:0:112#495", linkAtoB: CHILD} - {a: "1-ff00:0:111-C#100", b: "1-ff00:0:121#4", linkAtoB: PEER} - {a: "1-ff00:0:111-B#101", b: "2-ff00:0:211-A#5", linkAtoB: PEER, underlay: UDP/IPv6} - {a: "1-ff00:0:111-C#102", b: "2-ff00:0:211-A#6", linkAtoB: PEER} - - {a: "1-ff00:0:111-A#103", b: "1-ff00:0:112#4094", linkAtoB: CHILD} - - {a: "1-ff00:0:121#1", b: "1-ff00:0:131#4080", linkAtoB: PEER} + - {a: "1-ff00:0:111-A#103", b: "1-ff00:0:112#494", linkAtoB: CHILD} + - {a: "1-ff00:0:121#1", b: "1-ff00:0:131#480", linkAtoB: PEER} - {a: "1-ff00:0:121#2", b: "1-ff00:0:122#2", linkAtoB: CHILD, underlay: UDP/IPv6} - {a: "1-ff00:0:122#1", b: "1-ff00:0:133#1", linkAtoB: PEER} - - {a: "1-ff00:0:131#4078", b: "1-ff00:0:132#2", linkAtoB: CHILD} + - {a: "1-ff00:0:131#478", b: "1-ff00:0:132#2", linkAtoB: CHILD} - {a: "1-ff00:0:132#1", b: "1-ff00:0:133#2", linkAtoB: CHILD} - {a: "2-ff00:0:210#450", b: "2-ff00:0:220#503", linkAtoB: CORE, underlay: UDP/IPv6} - {a: "2-ff00:0:210#451", b: "2-ff00:0:211-A#7", linkAtoB: CHILD}