From ef91bc3550e5f0cdfbf2ec005ea4e2f3b1b5d631 Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Tue, 27 Jul 2021 12:07:46 -0700 Subject: [PATCH] :bug: Use non blocking file locking for flock library Signed-off-by: Vince Prignano --- go.mod | 1 + pkg/internal/flock/flock_unix.go | 17 ++++++++++++++--- pkg/internal/testing/addr/manager.go | 20 ++++++++++---------- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 0ce3b6c132..8e8e3f0d5f 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/imdario/mergo v0.3.12 // indirect github.com/onsi/ginkgo v1.16.4 github.com/onsi/gomega v1.13.0 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_model v0.2.0 go.uber.org/goleak v1.1.10 diff --git a/pkg/internal/flock/flock_unix.go b/pkg/internal/flock/flock_unix.go index 3dae621b73..c33a95cb7e 100644 --- a/pkg/internal/flock/flock_unix.go +++ b/pkg/internal/flock/flock_unix.go @@ -18,7 +18,15 @@ limitations under the License. package flock -import "golang.org/x/sys/unix" +import ( + "github.com/pkg/errors" + "golang.org/x/sys/unix" +) + +var ( + // ErrAlreadyLocked is returned when the file is already locked. + ErrAlreadyLocked = errors.New("the file is already locked") +) // Acquire acquires a lock on a file for the duration of the process. This method // is reentrant. @@ -30,6 +38,9 @@ func Acquire(path string) error { // We don't need to close the fd since we should hold // it until the process exits. - - return unix.Flock(fd, unix.LOCK_EX) + err = unix.Flock(fd, unix.LOCK_NB|unix.LOCK_EX) + if errors.Is(err, unix.EWOULDBLOCK) { // This condition requires LOCK_NB. + return errors.Wrapf(ErrAlreadyLocked, "cannot lock file %q", path) + } + return err } diff --git a/pkg/internal/testing/addr/manager.go b/pkg/internal/testing/addr/manager.go index 2326af1569..9dcbaa437f 100644 --- a/pkg/internal/testing/addr/manager.go +++ b/pkg/internal/testing/addr/manager.go @@ -17,6 +17,7 @@ limitations under the License. package addr import ( + "errors" "fmt" "io/fs" "net" @@ -31,7 +32,7 @@ import ( // TODO(directxman12): interface / release functionality for external port managers const ( - portReserveTime = 10 * time.Minute + portReserveTime = 2 * time.Minute portConflictRetry = 100 portFilePrefix = "port-" ) @@ -76,7 +77,8 @@ func (c *portCache) add(port int) (bool, error) { return false, err } // Try allocating new port, by acquiring a file. - if err := flock.Acquire(fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)); os.IsExist(err) { + path := fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port) + if err := flock.Acquire(path); errors.Is(err, os.ErrExist) || errors.Is(err, flock.ErrAlreadyLocked) { return false, nil } else if err != nil { return false, err @@ -86,22 +88,19 @@ func (c *portCache) add(port int) (bool, error) { var cache = &portCache{} -func suggest(listenHost string) (int, string, error) { +func suggest(listenHost string) (*net.TCPListener, int, string, error) { if listenHost == "" { listenHost = "localhost" } addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0")) if err != nil { - return -1, "", err + return nil, -1, "", err } l, err := net.ListenTCP("tcp", addr) if err != nil { - return -1, "", err + return nil, -1, "", err } - if err := l.Close(); err != nil { - return -1, "", err - } - return l.Addr().(*net.TCPAddr).Port, + return l, l.Addr().(*net.TCPAddr).Port, addr.IP.String(), nil } @@ -112,10 +111,11 @@ func suggest(listenHost string) (int, string, error) { // allocated within 1 minute. func Suggest(listenHost string) (int, string, error) { for i := 0; i < portConflictRetry; i++ { - port, resolvedHost, err := suggest(listenHost) + listener, port, resolvedHost, err := suggest(listenHost) if err != nil { return -1, "", err } + defer listener.Close() if ok, err := cache.add(port); ok { return port, resolvedHost, nil } else if err != nil {