Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept kubelet socket from command line #790

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions cmd/nvidia-device-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
"syscall"
"time"

Expand Down Expand Up @@ -105,6 +106,12 @@ func main() {
Usage: "ensure that containers are started with NVIDIA_MOFED=enabled",
EnvVars: []string{"MOFED_ENABLED"},
},
&cli.StringFlag{
Name: "kubelet-socket",
Value: pluginapi.KubeletSocket,
Usage: "specify the socket for communicating with the kubelet; if this is empty, no connection with the kubelet is attempted",
EnvVars: []string{"KUBELET_SOCKET"},
},
&cli.StringFlag{
Name: "config-file",
Usage: "the path to a config file as an alternative to command line options or environment variables",
Expand Down Expand Up @@ -196,8 +203,10 @@ func loadConfig(c *cli.Context, flags []cli.Flag) (*spec.Config, error) {
}

func start(c *cli.Context, flags []cli.Flag) error {
klog.Info("Starting FS watcher.")
watcher, err := watch.Files(pluginapi.DevicePluginPath)
kubeletSocket := c.String("kubelet-socket")
kubeletSocketDir := filepath.Dir(kubeletSocket)
klog.Infof("Starting FS watcher for %v", kubeletSocketDir)
tariq1890 marked this conversation as resolved.
Show resolved Hide resolved
watcher, err := watch.Files(kubeletSocketDir)
if err != nil {
return fmt.Errorf("failed to create FS watcher for %s: %v", pluginapi.DevicePluginPath, err)
}
Expand Down Expand Up @@ -242,8 +251,8 @@ restart:
// 'pluginapi.KubeletSocket' file. When this occurs, restart this loop,
// restarting all of the plugins in the process.
case event := <-watcher.Events:
if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
klog.Infof("inotify: %s created, restarting.", pluginapi.KubeletSocket)
if kubeletSocket != "" && event.Name == kubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
klog.Infof("inotify: %s created, restarting.", kubeletSocket)
goto restart
}

Expand Down Expand Up @@ -315,7 +324,8 @@ func startPlugins(c *cli.Context, flags []cli.Flag) ([]plugin.Interface, bool, e

// Get the set of plugins.
klog.Info("Retrieving plugins.")
pluginManager, err := NewPluginManager(infolib, nvmllib, devicelib, config)
kubeletSocket := c.String("kubelet-socket")
pluginManager, err := NewPluginManager(infolib, nvmllib, devicelib, kubeletSocket, config)
if err != nil {
return nil, false, fmt.Errorf("error creating plugin manager: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/nvidia-device-plugin/plugin-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

// NewPluginManager creates an NVML-based plugin manager
func NewPluginManager(infolib info.Interface, nvmllib nvml.Interface, devicelib device.Interface, config *spec.Config) (manager.Interface, error) {
func NewPluginManager(infolib info.Interface, nvmllib nvml.Interface, devicelib device.Interface, kubeletSocket string, config *spec.Config) (manager.Interface, error) {
var err error
switch *config.Flags.MigStrategy {
case spec.MigStrategyNone:
Expand Down Expand Up @@ -67,6 +67,7 @@ func NewPluginManager(infolib info.Interface, nvmllib nvml.Interface, devicelib
manager.WithCDIHandler(cdiHandler),
manager.WithConfig(config),
manager.WithFailOnInitError(*config.Flags.FailOnInitError),
manager.WithKubeletSocket(kubeletSocket),
manager.WithMigStrategy(*config.Flags.MigStrategy),
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/plugin/manager/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type manager struct {

cdiHandler cdi.Interface
config *spec.Config

kubeletSocket string
}

// New creates a new plugin manager with the supplied options.
Expand Down
2 changes: 1 addition & 1 deletion internal/plugin/manager/nvml.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (m *nvmlmanager) GetPlugins() ([]plugin.Interface, error) {

var plugins []plugin.Interface
for _, r := range rms {
plugin, err := plugin.NewNvidiaDevicePlugin(m.config, r, m.cdiHandler)
plugin, err := plugin.NewNvidiaDevicePlugin(m.config, m.kubeletSocket, r, m.cdiHandler)
if err != nil {
return nil, fmt.Errorf("failed to create plugin: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions internal/plugin/manager/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,9 @@ func WithConfig(config *spec.Config) Option {
m.config = config
}
}

func WithKubeletSocket(kubeletSocket string) Option {
return func(m *manager) {
m.kubeletSocket = kubeletSocket
}
}
2 changes: 1 addition & 1 deletion internal/plugin/manager/tegra.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (m *tegramanager) GetPlugins() ([]plugin.Interface, error) {

var plugins []plugin.Interface
for _, r := range rms {
plugin, err := plugin.NewNvidiaDevicePlugin(m.config, r, m.cdiHandler)
plugin, err := plugin.NewNvidiaDevicePlugin(m.config, m.kubeletSocket, r, m.cdiHandler)
if err != nil {
return nil, fmt.Errorf("failed to create plugin: %w", err)
}
Expand Down
11 changes: 9 additions & 2 deletions internal/plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type NvidiaDevicePlugin struct {
deviceListEnvvar string
deviceListStrategies spec.DeviceListStrategies
socket string
kubeletSocket string

cdiHandler cdi.Interface
cdiEnabled bool
Expand All @@ -69,7 +70,7 @@ type NvidiaDevicePlugin struct {
}

// NewNvidiaDevicePlugin returns an initialized NvidiaDevicePlugin
func NewNvidiaDevicePlugin(config *spec.Config, resourceManager rm.ResourceManager, cdiHandler cdi.Interface) (*NvidiaDevicePlugin, error) {
func NewNvidiaDevicePlugin(config *spec.Config, kubeletSocket string, resourceManager rm.ResourceManager, cdiHandler cdi.Interface) (*NvidiaDevicePlugin, error) {
_, name := resourceManager.Resource().Split()

deviceListStrategies, _ := spec.NewDeviceListStrategies(*config.Flags.Plugin.DeviceListStrategy)
Expand Down Expand Up @@ -102,6 +103,7 @@ func NewNvidiaDevicePlugin(config *spec.Config, resourceManager rm.ResourceManag
mpsDaemon: mpsDaemon,
mpsHostRoot: mpsHostRoot,

kubeletSocket: kubeletSocket,
// These will be reinitialized every
// time the plugin server is restarted.
server: nil,
Expand Down Expand Up @@ -245,7 +247,12 @@ func (plugin *NvidiaDevicePlugin) Serve() error {

// Register registers the device plugin for the given resourceName with Kubelet.
func (plugin *NvidiaDevicePlugin) Register() error {
conn, err := plugin.dial(pluginapi.KubeletSocket, 5*time.Second)
if plugin.kubeletSocket == "" {
klog.Info("Skipping registration with Kubelet")
return nil
}

conn, err := plugin.dial(plugin.kubeletSocket, 5*time.Second)
if err != nil {
return err
}
Expand Down