Skip to content

Commit

Permalink
Add two flags to identify the kubelet plugin registration
Browse files Browse the repository at this point in the history
Usage is documented in #152
  • Loading branch information
mauriciopoppe committed Jun 24, 2021
1 parent 36eaedc commit d72b499
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 1,301 deletions.
48 changes: 37 additions & 11 deletions cmd/csi-node-driver-registrar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/kubernetes-csi/csi-lib-utils/connection"
csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc"
"github.com/kubernetes-csi/node-driver-registrar/pkg/util"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
)

Expand All @@ -41,12 +42,6 @@ const (
sleepDuration = 2 * time.Minute
)

var (
// kubeletRegistrationCallbackReceived is set to true when the kubelet calls the GetInfo callback
// meaning that the registration process is successful
kubeletRegistrationCallbackReceived = false
)

// Command line flags
var (
connectionTimeout = flag.Duration("connection-timeout", 0, "The --connection-timeout flag is deprecated")
Expand All @@ -57,7 +52,13 @@ var (
healthzPort = flag.Int("health-port", 0, "(deprecated) TCP port for healthz requests. Set to 0 to disable the healthz server. Only one of `--health-port` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including the health check indicating whether the registration socket exists, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--health-port` and `--http-endpoint` can be set.")
showVersion = flag.Bool("version", false, "Show version.")
version = "unknown"

// kubelet registration succeed flags
kubeletRegistrationAckPath = flag.String("kubelet-registration-ack-path", "", "If set, a temp file with this name will be created after the kubelet registration process succeeds.")
kubeletRegistrationCheckAck = flag.String("kubelet-registration-check-ack", "", "Checks that the kubelet plugin registration ack file exists, if set it must be the same value as kubelet-registration-ack-path.")

// Set during compilation time
version = "unknown"

// List of supported versions
supportedVersions = []string{"1.0.0"}
Expand All @@ -84,7 +85,16 @@ func newRegistrationServer(driverName string, endpoint string, versions []string
// GetInfo is the RPC invoked by plugin watcher
func (e registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
klog.Infof("Received GetInfo call: %+v", req)
kubeletRegistrationCallbackReceived = true

if *kubeletRegistrationAckPath != "" {
err := util.TouchFile(*kubeletRegistrationAckPath)
if err != nil {
klog.ErrorS(err, "Failed to write lock file=%s", *kubeletRegistrationAckPath)
os.Exit(1)
}
klog.Infof("Lock file=%s created.", *kubeletRegistrationAckPath)
}

return &registerapi.PluginInfo{
Type: registerapi.CSIPlugin,
Name: e.driverName,
Expand All @@ -108,15 +118,31 @@ func main() {
flag.Set("logtostderr", "true")
flag.Parse()

if *kubeletRegistrationPath == "" {
klog.Error("kubelet-registration-path is a required parameter")
os.Exit(1)
if *kubeletRegistrationCheckAck != "" {
// check for the existence of the lock file
exists, err := util.DoesFileExist(*kubeletRegistrationCheckAck)
if err != nil {
klog.ErrorS(err, "Failed to check that the path=%s exists", *kubeletRegistrationCheckAck)
os.Exit(1)
}
if !exists {
klog.Errorf("path=%s doesn't exist, the kubelet plugin registration hasn't succeeded yet", *kubeletRegistrationCheckAck)
os.Exit(1)
}
klog.Infof("kubelet plugin registration succeded")
// registration succeeded, the lock file is no longer needed
os.Exit(0)
}

if *showVersion {
fmt.Println(os.Args[0], version)
return
}

if *kubeletRegistrationPath == "" {
klog.Error("kubelet-registration-path is a required parameter")
os.Exit(1)
}
klog.Infof("Version: %s", version)

if *healthzPort > 0 && *httpEndpoint != "" {
Expand Down
29 changes: 16 additions & 13 deletions cmd/csi-node-driver-registrar/node_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ import (
"os/signal"
"runtime"
"syscall"
"time"

"google.golang.org/grpc"

"github.com/kubernetes-csi/node-driver-registrar/pkg/util"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
)
Expand Down Expand Up @@ -62,21 +60,21 @@ func nodeRegister(csiDriverName, httpEndpoint string) {
}
klog.Infof("Registration Server started at: %s\n", socketPath)
grpcServer := grpc.NewServer()
// Registers kubelet plugin watcher api.
registerapi.RegisterRegistrationServer(grpcServer, registrar)

// Sometimes on windows after registration with the kubelet plugin we don't
// get a callback through GetInfo, as a workaround if we don't get a callback within
// the next 10 seconds we'll restart
go func() {
err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
return kubeletRegistrationCallbackReceived, nil
})
// Make sure that the lock file doesn't exist,
// it could exist because the container was forcefully shut down
if *kubeletRegistrationAckPath != "" {
// the file might not exist, an error is only returned if there was a failure trying
// to remove a file that already exists or if we couldn't get do a file stat
err = util.CleanupFile(*kubeletRegistrationAckPath)
if err != nil {
klog.Errorf("Timed out waiting for kubelet registration callback")
klog.Errorf("Failed to cleanup file=%s with error: %+v", *kubeletRegistrationAckPath, err)
os.Exit(1)
}
}()
}

// Registers kubelet plugin watcher api.
registerapi.RegisterRegistrationServer(grpcServer, registrar)

go healthzServer(socketPath, httpEndpoint)
go removeRegSocket(csiDriverName)
Expand All @@ -85,6 +83,11 @@ func nodeRegister(csiDriverName, httpEndpoint string) {
klog.Errorf("Registration Server stopped serving: %v", err)
os.Exit(1)
}

if *kubeletRegistrationAckPath != "" {
// delete the lock file on graceful shutdown
_ = util.CleanupFile(*kubeletRegistrationAckPath)
}
// If gRPC server is gracefully shutdown, exit
os.Exit(0)
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
golang.org/x/text v0.3.5 // indirect
google.golang.org/genproto v0.0.0-20210317182105-75c7a8546eb9 // indirect
google.golang.org/grpc v1.36.0
k8s.io/apimachinery v0.21.0
k8s.io/client-go v0.21.0
k8s.io/klog/v2 v2.8.0
k8s.io/kubelet v0.21.0
Expand Down
39 changes: 39 additions & 0 deletions pkg/util/util_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,42 @@ func DoesSocketExist(socketPath string) (bool, error) {
}
return false, nil
}

func CleanupFile(filePath string) error {
fileExists, err := DoesFileExist(filePath)
if err != nil {
return err
}
if fileExists {
if err := os.Remove(filePath); err != nil {
return fmt.Errorf("failed to remove stale file=%s with error: %+v", filePath, err)
}
}
return nil
}

func DoesFileExist(filePath string) (bool, error) {
info, err := os.Stat(filePath)
if err == nil {
return !info.IsDir(), nil
}
if err != nil && !os.IsNotExist(err) {
return false, fmt.Errorf("Failed to stat the file=%s with error: %+v", filePath, err)
}
return false, nil
}

func TouchFile(filePath string) error {
exists, err := DoesFileExist(filePath)
if err != nil {
return err
}
if !exists {
file, err := os.Create(filePath)
if err != nil {
return err
}
file.Close()
}
return nil
}
39 changes: 39 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

var socketFileName = "reg.sock"
var kubeletRegistrationFileName = "kubelet-registration-ack"

// TestSocketFileDoesNotExist - Test1: file does not exist. So clean up should be successful.
func TestSocketFileDoesNotExist(t *testing.T) {
Expand Down Expand Up @@ -173,3 +174,41 @@ func TestSocketRegularFile(t *testing.T) {
}
}
}

// TestTouchFile creates a file if it doesn't exist
func TestTouchFile(t *testing.T) {
// Create a temp directory
testDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("could not create temp dir: %v", err)
}
defer os.RemoveAll(testDir)

filePath := filepath.Join(testDir, kubeletRegistrationFileName)
fileExists, err := DoesFileExist(filePath)
if err != nil {
t.Fatalf("Failed to execute file exist: %+v", err)
}
if fileExists {
t.Fatalf("File %s must not exist", filePath)
}

// returns an error only if it failed to clean the file, not if the file didn't exist
err = CleanupFile(filePath)
if err != nil {
t.Fatalf("Failed to execute file cleanup: %+v", err)
}

err = TouchFile(filePath)
if err != nil {
t.Fatalf("Failed to execute file touch: %+v", err)
}

fileExists, err = DoesFileExist(filePath)
if err != nil {
t.Fatalf("Failed to execute file exist: %+v", err)
}
if !fileExists {
t.Fatalf("File %s must exist", filePath)
}
}
39 changes: 39 additions & 0 deletions pkg/util/util_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,42 @@ func DoesSocketExist(socketPath string) (bool, error) {
}
return true, nil
}

func CleanupFile(filePath string) error {
fileExists, err := DoesFileExist(filePath)
if err != nil {
return err
}
if fileExists {
if err := os.Remove(filePath); err != nil {
return fmt.Errorf("failed to remove stale file=%s with error: %+v", filePath, err)
}
}
return nil
}

func DoesFileExist(filePath string) (bool, error) {
info, err := os.Lstat(filePath)
if err == nil {
return !info.IsDir(), nil
}
if err != nil && !os.IsNotExist(err) {
return false, fmt.Errorf("Failed to stat the file=%s with error: %+v", filePath, err)
}
return false, nil
}

func TouchFile(filePath string) error {
exists, err := DoesFileExist(filePath)
if err != nil {
return err
}
if !exists {
file, err := os.Create(filePath)
if err != nil {
return err
}
file.Close()
}
return nil
}
Loading

0 comments on commit d72b499

Please sign in to comment.