Skip to content

Commit

Permalink
🐛 Use non blocking file locking for flock library
Browse files Browse the repository at this point in the history
Signed-off-by: Vince Prignano <vincepri@vmware.com>
  • Loading branch information
vincepri committed Jul 27, 2021
1 parent 3c78540 commit c439bb6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 13 deletions.
20 changes: 20 additions & 0 deletions pkg/internal/flock/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

var (
// ErrAlreadyLocked is returned when the file is already locked.
ErrAlreadyLocked = errors.New("the file is already locked")
)
23 changes: 20 additions & 3 deletions pkg/internal/flock/flock_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,35 @@ limitations under the License.

package flock

import "golang.org/x/sys/unix"
import (
"errors"
"fmt"
"os"

"golang.org/x/sys/unix"
)

var (
// ErrAlreadyLocked is returned when the file is already locked.
ErrAlreadyLocked = errors.New("the file is already locked")
)

// Acquire acquires a lock on a file for the duration of the process. This method
// is reentrant.
func Acquire(path string) error {
fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600)
if err != nil {
if errors.Is(err, os.ErrExist) {
return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked)
}
return err
}

// We don't need to close the fd since we should hold
// it until the process exits.

return unix.Flock(fd, unix.LOCK_EX)
err = unix.Flock(fd, unix.LOCK_NB|unix.LOCK_EX)
if errors.Is(err, unix.EWOULDBLOCK) { // This condition requires LOCK_NB.
return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked)
}
return err
}
20 changes: 10 additions & 10 deletions pkg/internal/testing/addr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package addr

import (
"errors"
"fmt"
"io/fs"
"net"
Expand All @@ -31,7 +32,7 @@ import (
// TODO(directxman12): interface / release functionality for external port managers

const (
portReserveTime = 10 * time.Minute
portReserveTime = 2 * time.Minute
portConflictRetry = 100
portFilePrefix = "port-"
)
Expand Down Expand Up @@ -76,7 +77,8 @@ func (c *portCache) add(port int) (bool, error) {
return false, err
}
// Try allocating new port, by acquiring a file.
if err := flock.Acquire(fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)); os.IsExist(err) {
path := fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)
if err := flock.Acquire(path); errors.Is(err, flock.ErrAlreadyLocked) {
return false, nil
} else if err != nil {
return false, err
Expand All @@ -86,22 +88,19 @@ func (c *portCache) add(port int) (bool, error) {

var cache = &portCache{}

func suggest(listenHost string) (int, string, error) {
func suggest(listenHost string) (*net.TCPListener, int, string, error) {
if listenHost == "" {
listenHost = "localhost"
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0"))
if err != nil {
return -1, "", err
return nil, -1, "", err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return -1, "", err
return nil, -1, "", err
}
if err := l.Close(); err != nil {
return -1, "", err
}
return l.Addr().(*net.TCPAddr).Port,
return l, l.Addr().(*net.TCPAddr).Port,
addr.IP.String(),
nil
}
Expand All @@ -112,10 +111,11 @@ func suggest(listenHost string) (int, string, error) {
// allocated within 1 minute.
func Suggest(listenHost string) (int, string, error) {
for i := 0; i < portConflictRetry; i++ {
port, resolvedHost, err := suggest(listenHost)
listener, port, resolvedHost, err := suggest(listenHost)
if err != nil {
return -1, "", err
}
defer listener.Close()
if ok, err := cache.add(port); ok {
return port, resolvedHost, nil
} else if err != nil {
Expand Down

0 comments on commit c439bb6

Please sign in to comment.