diff --git a/temporalcli/commands.server.go b/temporalcli/commands.server.go index 0437d81b..927bb6f7 100644 --- a/temporalcli/commands.server.go +++ b/temporalcli/commands.server.go @@ -40,6 +40,12 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string) } else if err := opts.LogLevel.UnmarshalText([]byte(logLevel)); err != nil { return fmt.Errorf("invalid log level %q: %w", logLevel, err) } + if err := devserver.CheckPortFree(opts.FrontendIP, opts.FrontendPort); err != nil { + return fmt.Errorf("can't set frontend port %d: %w", opts.FrontendPort, err) + } + if err := devserver.CheckPortFree(opts.FrontendIP, opts.FrontendHTTPPort); err != nil { + return fmt.Errorf("can't set frontend HTTP port %d: %w", opts.FrontendHTTPPort, err) + } // Setup UI if !t.Headless { opts.UIIP, opts.UIPort = t.Ip, t.UiPort @@ -48,6 +54,13 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string) } if opts.UIPort == 0 { opts.UIPort = t.Port + 1000 + if err := devserver.CheckPortFree(opts.UIIP, opts.UIPort); err != nil { + return fmt.Errorf("can't use default UI port %d (%d + 1000): %w", opts.UIPort, t.Port, err) + } + } else { + if err := devserver.CheckPortFree(opts.UIIP, t.Port); err != nil { + return fmt.Errorf("can't set UI port %d: %w", opts.UIPort, err) + } } opts.UIAssetPath, opts.UICodecEndpoint = t.UiAssetPath, t.UiCodecEndpoint } @@ -87,7 +100,11 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string) } // Grab a free port for metrics ahead-of-time so we know what port is selected if opts.MetricsPort == 0 { - opts.MetricsPort = devserver.MustGetFreePort() + opts.MetricsPort = devserver.MustGetFreePort(t.Ip) + } else { + if err := devserver.CheckPortFree(t.Ip, opts.MetricsPort); err != nil { + return fmt.Errorf("can't set metrics port %d: %w", opts.MetricsPort, err) + } } // Start, wait for context complete, then stop diff --git a/temporalcli/commands.server_test.go b/temporalcli/commands.server_test.go index 918a10bd..72462c56 100644 --- a/temporalcli/commands.server_test.go +++ b/temporalcli/commands.server_test.go @@ -3,6 +3,8 @@ package temporalcli_test import ( "context" "strconv" + "sync" + "sync/atomic" "testing" "time" @@ -21,7 +23,7 @@ func TestServer_StartDev_Simple(t *testing.T) { defer h.Close() // Start in background, then wait for client to be able to connect - port := strconv.Itoa(devserver.MustGetFreePort()) + port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1")) resCh := make(chan *CommandResult, 1) // TODO(cretz): Remove --headless when // https://github.com/temporalio/ui/issues/1773 fixed @@ -38,7 +40,7 @@ func TestServer_StartDev_Simple(t *testing.T) { } var err error cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port}) - require.NoError(t, err) + assert.NoError(t, err) }, 3*time.Second, 200*time.Millisecond) defer cl.Close() @@ -60,3 +62,82 @@ func TestServer_StartDev_Simple(t *testing.T) { h.NoError(res.Err) } } + +func TestServer_StartDev_ConcurrentStarts(t *testing.T) { + startOne := func() { + h := NewCommandHarness(t) + defer h.Close() + + // Start in background, then wait for client to be able to connect + port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1")) + resCh := make(chan *CommandResult, 1) + go func() { + resCh <- h.Execute("server", "start-dev", "-p", port, "--headless", "--log-level", "never") + }() + + // Try to connect for a bit while checking for error + var cl client.Client + h.EventuallyWithT(func(t *assert.CollectT) { + select { + case res := <-resCh: + require.NoError(t, res.Err) + require.Fail(t, "got early server result") + default: + } + var err error + cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port, Logger: testLogger{t: h.t}}) + assert.NoError(t, err) + }, 3*time.Second, 200*time.Millisecond) + defer cl.Close() + + // Send an interrupt by cancelling context + h.CancelContext() + + // FIXME: We should technically wait for server cleanup, but this is + // slowing down the test considerably, presumably due to the issue fixed + // in https://github.com/temporalio/temporal/pull/5459. Uncomment the + // following code when the server dependency is updated to 1.24.0. + // + // select { + // case <-time.After(20 * time.Second): + // h.Fail("didn't cleanup after 20 seconds") + // case res := <-resCh: + // h.NoError(res.Err) + // } + } + + // Start 80 dev server instances, with 8 concurrent executions + instanceCounter := atomic.Int32{} + instanceCounter.Store(80) + wg := &sync.WaitGroup{} + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + for instanceCounter.Add(-1) >= 0 { + startOne() + } + wg.Done() + }() + } + wg.Wait() +} + +type testLogger struct { + t *testing.T +} + +func (l testLogger) Debug(msg string, keysAndValues ...interface{}) { + l.t.Logf("DEBUG: "+msg, keysAndValues...) +} + +func (l testLogger) Info(msg string, keysAndValues ...interface{}) { + l.t.Logf("INFO: "+msg, keysAndValues...) +} + +func (l testLogger) Warn(msg string, keysAndValues ...interface{}) { + l.t.Logf("WARN: "+msg, keysAndValues...) +} + +func (l testLogger) Error(msg string, keysAndValues ...interface{}) { + l.t.Logf("ERROR: "+msg, keysAndValues...) +} diff --git a/temporalcli/commands_test.go b/temporalcli/commands_test.go index 39a15421..839e6d7a 100644 --- a/temporalcli/commands_test.go +++ b/temporalcli/commands_test.go @@ -273,7 +273,7 @@ func StartDevServer(t *testing.T, options DevServerOptions) *DevServer { d.Options.FrontendIP = "127.0.0.1" } if d.Options.FrontendPort == 0 { - d.Options.FrontendPort = devserver.MustGetFreePort() + d.Options.FrontendPort = devserver.MustGetFreePort(d.Options.FrontendIP) } if len(d.Options.Namespaces) == 0 { d.Options.Namespaces = []string{ diff --git a/temporalcli/devserver/freeport.go b/temporalcli/devserver/freeport.go index 2847279f..9acca842 100644 --- a/temporalcli/devserver/freeport.go +++ b/temporalcli/devserver/freeport.go @@ -1,84 +1,95 @@ -// The MIT License -// -// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Copyright (c) 2021 Datadog, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - package devserver import ( "fmt" "net" + "runtime" ) -// Modified from https://github.com/phayes/freeport/blob/95f893ade6f232a5f1511d61735d89b1ae2df543/freeport.go - -func MustGetFreePort() int { - p := NewPortProvider() - defer p.Close() - return p.MustGetFreePort() -} - -func NewPortProvider() *PortProvider { - return &PortProvider{} -} - -type PortProvider struct { - listeners []*net.TCPListener -} - -// GetFreePort asks the kernel for a free open port that is ready to use. -func (p *PortProvider) GetFreePort() (int, error) { - addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") +// Returns a TCP port that is available to listen on, for the given (local) host. +// +// This works by binding a new TCP socket on port 0, which requests the OS to +// allocate a free port. There is no strict guarantee that the port will remain +// available after this function returns, but it should be safe to assume that +// a given port will not be allocated again to any process on this machine +// within a few seconds. +// +// On Unix-based systems, binding to the port returned by this function requires +// setting the `SO_REUSEADDR` socket option (Go already does that by default, +// but other languages may not); otherwise, the OS may fail with a message such +// as "address already in use". Windows default behavior is already appropriate +// in this regard; on that platform, `SO_REUSEADDR` has a different meaning and +// should not be set (setting it may have unpredictable consequences). +func GetFreePort(host string) (int, error) { + l, err := net.Listen("tcp", host+":0") if err != nil { - if addr, err = net.ResolveTCPAddr("tcp6", "[::1]:0"); err != nil { - panic(fmt.Sprintf("temporal: failed to get free port: %v", err)) - } + return 0, fmt.Errorf("failed to assign a free port: %v", err) } + defer l.Close() + port := l.Addr().(*net.TCPAddr).Port - l, err := net.ListenTCP("tcp", addr) - if err != nil { - return 0, err + // On Linux and some BSD variants, ephemeral ports are randomized, and may + // consequently repeat within a short time frame after the listenning end + // has been closed. To avoid this, we make a connection to the port, then + // close that connection from the server's side (this is very important), + // which puts the connection in TIME_WAIT state for some time (by default, + // 60s on Linux). While it remains in that state, the OS will not reallocate + // that port number for bind(:0) syscalls, yet we are not prevented from + // explicitly binding to it (thanks to SO_REUSEADDR). + // + // On macOS and Windows, the above technique is not necessary, as the OS + // allocates ephemeral ports sequentially, meaning a port number will only + // be reused after the entire range has been exhausted. Quite the opposite, + // given that these OSes use a significantly smaller range for ephemeral + // ports, making an extra connection just to reserve a port might actually + // be harmful (by hastening ephemeral port exhaustion). + if runtime.GOOS != "darwin" && runtime.GOOS != "windows" { + r, err := net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr)) + if err != nil { + return 0, fmt.Errorf("failed to assign a free port: %v", err) + } + c, err := l.Accept() + if err != nil { + return 0, fmt.Errorf("failed to assign a free port: %v", err) + } + // Closing the socket from the server side + c.Close() + defer r.Close() } - p.listeners = append(p.listeners, l) - - return l.Addr().(*net.TCPAddr).Port, nil + return port, nil } -func (p *PortProvider) MustGetFreePort() int { - port, err := p.GetFreePort() +// Returns a TCP port that is available to listen on, for the given (local) +// host; panics if no port is available. +// +// This works by binding a new TCP socket on port 0, which requests the OS to +// allocate a free port. There is no strict guarantee that the port will remain +// available after this function returns, but it should be safe to assume that +// a given port will not be allocated again to any process on this machine +// within a few seconds. +// +// On Unix-based systems, binding to the port returned by this function requires +// setting the `SO_REUSEADDR` socket option (Go already does that by default, +// but other languages may not); otherwise, the OS may fail with a message such +// as "address already in use". Windows default behavior is already appropriate +// in this regard; on that platform, `SO_REUSEADDR` has a different meaning and +// should not be set (setting it may have unpredictable consequences). +func MustGetFreePort(host string) int { + port, err := GetFreePort(host) if err != nil { - panic(err) + panic(fmt.Errorf("failed assigning ephemeral port: %w", err)) } return port } -func (p *PortProvider) Close() error { - for _, l := range p.listeners { - if err := l.Close(); err != nil { - return err - } +// Asserts that the given TCP port is available to listen on, for the given +// (local) host; return an error if it is not. +func CheckPortFree(host string, port int) error { + l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + return err } + l.Close() return nil } diff --git a/temporalcli/devserver/freeport_test.go b/temporalcli/devserver/freeport_test.go new file mode 100644 index 00000000..e0e49a0d --- /dev/null +++ b/temporalcli/devserver/freeport_test.go @@ -0,0 +1,69 @@ +package devserver_test + +import ( + "fmt" + "net" + "testing" + + "github.com/temporalio/cli/temporalcli/devserver" +) + +func TestFreePort_NoDouble(t *testing.T) { + host := "127.0.0.1" + portSet := make(map[int]bool) + + for i := 0; i < 2000; i++ { + p, err := devserver.GetFreePort(host) + if err != nil { + t.Fatalf("Error: %s", err) + break + } + + if _, exists := portSet[p]; exists { + t.Fatalf("Port %d has been assigned more than once", p) + } + + // Add port to the set + portSet[p] = true + } +} + +func TestFreePort_CanBindImmediatelySameProcess(t *testing.T) { + host := "127.0.0.1" + + for i := 0; i < 500; i++ { + p, err := devserver.GetFreePort(host) + if err != nil { + t.Fatalf("Error: %s", err) + break + } + err = tryListenAndDialOn(host, p) + if err != nil { + t.Fatalf("Error: %s", err) + break + } + } +} + +// This function is used as part of unit tests, to ensure that the port +func tryListenAndDialOn(host string, port int) error { + l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + return err + } + defer l.Close() + + r, err := net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr)) + if err != nil { + panic(err) + } + defer r.Close() + + c, err := l.Accept() + if err != nil { + panic(err) + } + defer c.Close() + + return nil +} diff --git a/temporalcli/devserver/server.go b/temporalcli/devserver/server.go index b0983f0e..2bbde0b4 100644 --- a/temporalcli/devserver/server.go +++ b/temporalcli/devserver/server.go @@ -260,13 +260,11 @@ func (s *StartOptions) buildServerConfig() (*config.Config, error) { } } conf.DCRedirectionPolicy.Policy = "noop" - portProvider := NewPortProvider() - defer portProvider.Close() conf.Services = map[string]config.Service{ - "frontend": s.buildServiceConfig(portProvider, true), - "history": s.buildServiceConfig(portProvider, false), - "matching": s.buildServiceConfig(portProvider, false), - "worker": s.buildServiceConfig(portProvider, false), + "frontend": s.buildServiceConfig(true), + "history": s.buildServiceConfig(false), + "matching": s.buildServiceConfig(false), + "worker": s.buildServiceConfig(false), } conf.Archival.History.State = "disabled" conf.Archival.Visibility.State = "disabled" @@ -319,7 +317,7 @@ func (s *StartOptions) buildSQLConfig() (*config.SQL, error) { return &conf, nil } -func (s *StartOptions) buildServiceConfig(p *PortProvider, frontend bool) config.Service { +func (s *StartOptions) buildServiceConfig(frontend bool) config.Service { var conf config.Service if frontend { conf.RPC.GRPCPort = s.FrontendPort @@ -328,9 +326,9 @@ func (s *StartOptions) buildServiceConfig(p *PortProvider, frontend bool) config conf.RPC.HTTPPort = s.FrontendHTTPPort } } else { - conf.RPC.GRPCPort = p.MustGetFreePort() + conf.RPC.GRPCPort = MustGetFreePort("127.0.0.1") conf.RPC.BindOnLocalHost = true } - conf.RPC.MembershipPort = p.MustGetFreePort() + conf.RPC.MembershipPort = MustGetFreePort("127.0.0.1") return conf } diff --git a/temporalcli/internal/printer/printer_test.go b/temporalcli/internal/printer/printer_test.go index 58672331..8965ac60 100644 --- a/temporalcli/internal/printer/printer_test.go +++ b/temporalcli/internal/printer/printer_test.go @@ -159,7 +159,7 @@ func TestPrinter_NoPanicIfNoStdout(t *testing.T) { if err != nil { t.Fatalf("Error finding go executable: %v", err) } - // Don't use exec.Command here, as it silently replace nil file descriptors + // Don't use exec.Command here, as it silently replaces nil file descriptors // with /dev/null on the parent side. We specifically want to test what // happens when stdout is nil. p, err := os.StartProcess(