Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃悰 Use non blocking file locking for flock library #1605

Merged
merged 1 commit into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/internal/flock/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flock

import "errors"

var (
// ErrAlreadyLocked is returned when the file is already locked.
ErrAlreadyLocked = errors.New("the file is already locked")
)
18 changes: 15 additions & 3 deletions pkg/internal/flock/flock_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@ limitations under the License.

package flock

import "golang.org/x/sys/unix"
import (
"errors"
"fmt"
"os"

"golang.org/x/sys/unix"
)

// Acquire acquires a lock on a file for the duration of the process. This method
// is reentrant.
func Acquire(path string) error {
fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600)
if err != nil {
if errors.Is(err, os.ErrExist) {
return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked)
}
return err
}

// We don't need to close the fd since we should hold
// it until the process exits.

return unix.Flock(fd, unix.LOCK_EX)
err = unix.Flock(fd, unix.LOCK_NB|unix.LOCK_EX)
if errors.Is(err, unix.EWOULDBLOCK) { // This condition requires LOCK_NB.
return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked)
}
return err
}
20 changes: 10 additions & 10 deletions pkg/internal/testing/addr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package addr

import (
"errors"
"fmt"
"io/fs"
"net"
Expand All @@ -31,7 +32,7 @@ import (
// TODO(directxman12): interface / release functionality for external port managers

const (
portReserveTime = 10 * time.Minute
portReserveTime = 2 * time.Minute
portConflictRetry = 100
portFilePrefix = "port-"
)
Expand Down Expand Up @@ -76,7 +77,8 @@ func (c *portCache) add(port int) (bool, error) {
return false, err
}
// Try allocating new port, by acquiring a file.
if err := flock.Acquire(fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)); os.IsExist(err) {
path := fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)
if err := flock.Acquire(path); errors.Is(err, flock.ErrAlreadyLocked) {
return false, nil
} else if err != nil {
return false, err
Expand All @@ -86,22 +88,19 @@ func (c *portCache) add(port int) (bool, error) {

var cache = &portCache{}

func suggest(listenHost string) (int, string, error) {
func suggest(listenHost string) (*net.TCPListener, int, string, error) {
if listenHost == "" {
listenHost = "localhost"
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0"))
if err != nil {
return -1, "", err
return nil, -1, "", err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return -1, "", err
return nil, -1, "", err
}
if err := l.Close(); err != nil {
return -1, "", err
}
return l.Addr().(*net.TCPAddr).Port,
return l, l.Addr().(*net.TCPAddr).Port,
addr.IP.String(),
nil
}
Expand All @@ -112,10 +111,11 @@ func suggest(listenHost string) (int, string, error) {
// allocated within 1 minute.
func Suggest(listenHost string) (int, string, error) {
for i := 0; i < portConflictRetry; i++ {
port, resolvedHost, err := suggest(listenHost)
listener, port, resolvedHost, err := suggest(listenHost)
if err != nil {
return -1, "", err
}
defer listener.Close()
if ok, err := cache.add(port); ok {
return port, resolvedHost, nil
} else if err != nil {
Expand Down