From 1e15912f4531e6f016548b584b333763ce4a901a Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Tue, 27 Jul 2021 12:07:46 -0700 Subject: [PATCH] :bug: Use non blocking file locking for flock library Signed-off-by: Vince Prignano --- pkg/internal/flock/errors.go | 24 ++++++++++++++++++++++++ pkg/internal/flock/flock_unix.go | 18 +++++++++++++++--- pkg/internal/testing/addr/manager.go | 20 ++++++++++---------- 3 files changed, 49 insertions(+), 13 deletions(-) create mode 100644 pkg/internal/flock/errors.go diff --git a/pkg/internal/flock/errors.go b/pkg/internal/flock/errors.go new file mode 100644 index 0000000000..ee7a434372 --- /dev/null +++ b/pkg/internal/flock/errors.go @@ -0,0 +1,24 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flock + +import "errors" + +var ( + // ErrAlreadyLocked is returned when the file is already locked. + ErrAlreadyLocked = errors.New("the file is already locked") +) diff --git a/pkg/internal/flock/flock_unix.go b/pkg/internal/flock/flock_unix.go index 3dae621b73..3a904f3f5f 100644 --- a/pkg/internal/flock/flock_unix.go +++ b/pkg/internal/flock/flock_unix.go @@ -18,18 +18,30 @@ limitations under the License. package flock -import "golang.org/x/sys/unix" +import ( + "errors" + "fmt" + "os" + + "golang.org/x/sys/unix" +) // Acquire acquires a lock on a file for the duration of the process. This method // is reentrant. func Acquire(path string) error { fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600) if err != nil { + if errors.Is(err, os.ErrExist) { + return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked) + } return err } // We don't need to close the fd since we should hold // it until the process exits. - - return unix.Flock(fd, unix.LOCK_EX) + err = unix.Flock(fd, unix.LOCK_NB|unix.LOCK_EX) + if errors.Is(err, unix.EWOULDBLOCK) { // This condition requires LOCK_NB. + return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked) + } + return err } diff --git a/pkg/internal/testing/addr/manager.go b/pkg/internal/testing/addr/manager.go index 2326af1569..caaafa2627 100644 --- a/pkg/internal/testing/addr/manager.go +++ b/pkg/internal/testing/addr/manager.go @@ -17,6 +17,7 @@ limitations under the License. package addr import ( + "errors" "fmt" "io/fs" "net" @@ -31,7 +32,7 @@ import ( // TODO(directxman12): interface / release functionality for external port managers const ( - portReserveTime = 10 * time.Minute + portReserveTime = 2 * time.Minute portConflictRetry = 100 portFilePrefix = "port-" ) @@ -76,7 +77,8 @@ func (c *portCache) add(port int) (bool, error) { return false, err } // Try allocating new port, by acquiring a file. - if err := flock.Acquire(fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)); os.IsExist(err) { + path := fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port) + if err := flock.Acquire(path); errors.Is(err, flock.ErrAlreadyLocked) { return false, nil } else if err != nil { return false, err @@ -86,22 +88,19 @@ func (c *portCache) add(port int) (bool, error) { var cache = &portCache{} -func suggest(listenHost string) (int, string, error) { +func suggest(listenHost string) (*net.TCPListener, int, string, error) { if listenHost == "" { listenHost = "localhost" } addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0")) if err != nil { - return -1, "", err + return nil, -1, "", err } l, err := net.ListenTCP("tcp", addr) if err != nil { - return -1, "", err + return nil, -1, "", err } - if err := l.Close(); err != nil { - return -1, "", err - } - return l.Addr().(*net.TCPAddr).Port, + return l, l.Addr().(*net.TCPAddr).Port, addr.IP.String(), nil } @@ -112,10 +111,11 @@ func suggest(listenHost string) (int, string, error) { // allocated within 1 minute. func Suggest(listenHost string) (int, string, error) { for i := 0; i < portConflictRetry; i++ { - port, resolvedHost, err := suggest(listenHost) + listener, port, resolvedHost, err := suggest(listenHost) if err != nil { return -1, "", err } + defer listener.Close() if ok, err := cache.add(port); ok { return port, resolvedHost, nil } else if err != nil {