diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index 294a92b7f..c716f3dd3 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -18,6 +18,7 @@ package runtimehooks import ( "flag" + "math" "time" "k8s.io/apimachinery/pkg/util/runtime" @@ -113,6 +114,11 @@ type Config struct { RuntimeHookHostEndpoint string RuntimeHookDisableStages []string RuntimeHooksNRI bool + RuntimeHooksNRIConnectTimeout time.Duration + RuntimeHooksNRIBackOffDuration time.Duration + RuntimeHooksNRIBackOffCap time.Duration + RuntimeHooksNRIBackOffFactor float64 + RuntimeHooksNRIBackOffSteps int RuntimeHooksNRISocketPath string RuntimeHookReconcileInterval time.Duration } @@ -127,6 +133,11 @@ func NewDefaultConfig() *Config { RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock", RuntimeHookDisableStages: []string{}, RuntimeHooksNRI: true, + RuntimeHooksNRIConnectTimeout: 6 * time.Second, + RuntimeHooksNRIBackOffDuration: 1 * time.Second, + RuntimeHooksNRIBackOffCap: 1<<62 - 1, + RuntimeHooksNRIBackOffSteps: math.MaxInt32, + RuntimeHooksNRIBackOffFactor: 2, RuntimeHooksNRISocketPath: "nri/nri.sock", RuntimeHookReconcileInterval: 10 * time.Second, } @@ -139,6 +150,11 @@ func (c *Config) InitFlags(fs *flag.FlagSet) { fs.StringVar(&c.RuntimeHooksPluginFailurePolicy, "runtime-hooks-plugin-failure-policy", c.RuntimeHooksPluginFailurePolicy, "stop running other hooks once someone failed") fs.StringVar(&c.RuntimeHookConfigFilePath, "runtime-hooks-config-path", c.RuntimeHookConfigFilePath, "config file path for runtime hooks") fs.StringVar(&c.RuntimeHookHostEndpoint, "runtime-hooks-host-endpoint", c.RuntimeHookHostEndpoint, "host endpoint of runtime proxy") + fs.DurationVar(&c.RuntimeHooksNRIConnectTimeout, "runtime-hooks-nri-connect-timeout", c.RuntimeHooksNRIConnectTimeout, "nri server connect time out, it should be a little more than default plugin registration timeout(5 seconds) which is defined in containerd config") + fs.DurationVar(&c.RuntimeHooksNRIBackOffDuration, "runtime-hooks-nri-backoff-duration", c.RuntimeHooksNRIBackOffDuration, "nri server backoff duration") + fs.DurationVar(&c.RuntimeHooksNRIBackOffCap, "runtime-hooks-nri-backoff-cap", c.RuntimeHooksNRIBackOffCap, "nri server backoff cap") + fs.IntVar(&c.RuntimeHooksNRIBackOffSteps, "runtime-hooks-nri-backoff-steps", c.RuntimeHooksNRIBackOffSteps, "nri server backoff steps") + fs.Float64Var(&c.RuntimeHooksNRIBackOffFactor, "runtime-hooks-nri-backoff-factor", c.RuntimeHooksNRIBackOffFactor, "nri server reconnect backoff factor") fs.Var(cliflag.NewStringSlice(&c.RuntimeHookDisableStages), "runtime-hooks-disable-stages", "disable stages for runtime hooks") fs.BoolVar(&c.RuntimeHooksNRI, "enable-nri-runtime-hook", c.RuntimeHooksNRI, "enable/disable runtime hooks nri mode") fs.DurationVar(&c.RuntimeHookReconcileInterval, "runtime-hooks-reconcile-interval", c.RuntimeHookReconcileInterval, "reconcile interval for each plugins") diff --git a/pkg/koordlet/runtimehooks/config_test.go b/pkg/koordlet/runtimehooks/config_test.go index 05aaef3de..551a94bc2 100644 --- a/pkg/koordlet/runtimehooks/config_test.go +++ b/pkg/koordlet/runtimehooks/config_test.go @@ -18,6 +18,7 @@ package runtimehooks import ( "flag" + "math" "testing" "time" @@ -36,6 +37,11 @@ func Test_NewDefaultConfig(t *testing.T) { RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock", RuntimeHookDisableStages: []string{}, RuntimeHooksNRI: true, + RuntimeHooksNRIConnectTimeout: 6 * time.Second, + RuntimeHooksNRIBackOffDuration: 1 * time.Second, + RuntimeHooksNRIBackOffCap: 1<<62 - 1, + RuntimeHooksNRIBackOffSteps: math.MaxInt32, + RuntimeHooksNRIBackOffFactor: 2, RuntimeHooksNRISocketPath: "nri/nri.sock", RuntimeHookReconcileInterval: 10 * time.Second, } diff --git a/pkg/koordlet/runtimehooks/nri/server.go b/pkg/koordlet/runtimehooks/nri/server.go index 6c047deee..bbf1954d4 100644 --- a/pkg/koordlet/runtimehooks/nri/server.go +++ b/pkg/koordlet/runtimehooks/nri/server.go @@ -21,9 +21,11 @@ import ( "fmt" "path/filepath" "strings" + "time" "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "sigs.k8s.io/yaml" @@ -39,12 +41,14 @@ type nriConfig struct { } type Options struct { - NriSocketPath string + NriSocketPath string + NriConnectTimeout time.Duration // support stop running other hooks once someone failed PluginFailurePolicy rmconfig.FailurePolicyType // todo: add support for disable stages DisableStages map[string]struct{} Executor resourceexecutor.ResourceUpdateExecutor + BackOff wait.Backoff } func (o Options) Validate() error { @@ -71,11 +75,12 @@ const ( ) var ( - _ = stub.ConfigureInterface(&NriServer{}) - _ = stub.SynchronizeInterface(&NriServer{}) - _ = stub.RunPodInterface(&NriServer{}) - _ = stub.CreateContainerInterface(&NriServer{}) - _ = stub.UpdateContainerInterface(&NriServer{}) + _ = stub.ConfigureInterface(&NriServer{}) + _ = stub.SynchronizeInterface(&NriServer{}) + _ = stub.RunPodInterface(&NriServer{}) + _ = stub.CreateContainerInterface(&NriServer{}) + _ = stub.UpdateContainerInterface(&NriServer{}) + opts []stub.Option ) func NewNriServer(opt Options) (*NriServer, error) { @@ -84,7 +89,6 @@ func NewNriServer(opt Options) (*NriServer, error) { return nil, fmt.Errorf("failed to validate nri server, err: %w", err) } - var opts []stub.Option opts = append(opts, stub.WithPluginName(pluginName)) opts = append(opts, stub.WithPluginIdx(pluginIdx)) opts = append(opts, stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath))) @@ -104,19 +108,34 @@ func NewNriServer(opt Options) (*NriServer, error) { } func (p *NriServer) Start() error { - go func() { + err := p.options.Validate() + if err != nil { + return err + } + success := time.After(p.options.NriConnectTimeout) + errorChan := make(chan error) + + go func(chan error) { if p.stub != nil { err := p.stub.Run(context.Background()) if err != nil { klog.Errorf("nri server exited with error: %v", err) + errorChan <- err } else { klog.V(4).Info("nri server started") } } else { - klog.V(4).Info("nri stub is nil") + err := fmt.Errorf("nri stub is nil") + errorChan <- err } - }() - return nil + }(errorChan) + + select { + case <-success: + return nil + case <-errorChan: + return fmt.Errorf("nri start fail") + } } func (p *NriServer) Configure(config, runtime, version string) (stub.EventMask, error) { @@ -209,6 +228,34 @@ func (p *NriServer) UpdateContainer(pod *api.PodSandbox, container *api.Containe } func (p *NriServer) onClose() { - p.stub.Stop() - klog.V(6).Infof("NRI server closes") + //TODO: consider the pod status during restart + retryFunc := func() (bool, error) { + stub, err := stub.New(p, append(opts, stub.WithOnClose(p.onClose))...) + if err != nil { + klog.Errorf("failed to create plugin stub: %v", err) + return false, nil + } + + p.stub = stub + err = p.Start() + if err != nil { + completeNriSocketPath := filepath.Join(system.Conf.VarRunRootDir, p.options.NriSocketPath) + targetErr := fmt.Errorf("nri socket path %q does not exist", completeNriSocketPath) + if err.Error() == targetErr.Error() { + return false, err + } + //TODO: check the error type, if nri server disable nri, we should also break backoff + klog.Warningf("nri reconnect failed") + return false, nil + } else { + klog.V(4).Info("nri server restart success") + return true, nil + } + } + + // TODO: high version wait not support BackoffUntil with BackOffManger as parameters, when updated to v0.27.0 version wait, we can refine ExponentialBackoff. + err := wait.ExponentialBackoff(p.options.BackOff, retryFunc) + if err != nil { + klog.Errorf("nri server restart failed after several times retry") + } } diff --git a/pkg/koordlet/runtimehooks/nri/server_test.go b/pkg/koordlet/runtimehooks/nri/server_test.go index 75826f43f..241420899 100644 --- a/pkg/koordlet/runtimehooks/nri/server_test.go +++ b/pkg/koordlet/runtimehooks/nri/server_test.go @@ -19,6 +19,7 @@ package nri import ( "reflect" "testing" + "time" "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" @@ -58,19 +59,19 @@ func TestNriServer_Start(t *testing.T) { stub: nil, mask: api.EventMask(1), options: Options{ + NriConnectTimeout: time.Second, PluginFailurePolicy: "Ignore", DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}), Executor: nil, }, }, - wantErr: false, + wantErr: true, }, { fields: fields{ stub: nil, }, }, - {}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/koordlet/runtimehooks/runtimehooks.go b/pkg/koordlet/runtimehooks/runtimehooks.go index 9a888ba42..8a8cdb95b 100644 --- a/pkg/koordlet/runtimehooks/runtimehooks.go +++ b/pkg/koordlet/runtimehooks/runtimehooks.go @@ -19,6 +19,7 @@ package runtimehooks import ( "fmt" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "github.com/koordinator-sh/koordinator/pkg/features" @@ -57,12 +58,14 @@ func (r *runtimeHook) Run(stopCh <-chan struct{}) error { return err } if r.nriServer != nil { - if err := r.nriServer.Start(); err != nil { - // if NRI is not enabled or container runtime not support NRI, we just skip NRI server start - klog.Errorf("nri mode runtime hook server start failed: %v", err) - } else { - klog.V(4).Infof("nri mode runtime hook server has started") - } + go func() { + if err := r.nriServer.Start(); err != nil { + // if NRI is not enabled or container runtime not support NRI, we just skip NRI server start + klog.Warningf("nri mode runtime hook server start failed: %v", err) + } else { + klog.V(4).Infof("nri mode runtime hook server has started") + } + }() } if err := r.reconciler.Run(stopCh); err != nil { return err @@ -101,13 +104,22 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook, Executor: e, } + backOff := wait.Backoff{ + Duration: cfg.RuntimeHooksNRIBackOffDuration, + Factor: cfg.RuntimeHooksNRIBackOffFactor, + Jitter: 0.1, + Steps: cfg.RuntimeHooksNRIBackOffSteps, + Cap: cfg.RuntimeHooksNRIBackOffCap, + } var nriServer *nri.NriServer if cfg.RuntimeHooksNRI { nriServerOptions := nri.Options{ NriSocketPath: cfg.RuntimeHooksNRISocketPath, + NriConnectTimeout: cfg.RuntimeHooksNRIConnectTimeout, PluginFailurePolicy: pluginFailurePolicy, DisableStages: getDisableStagesMap(cfg.RuntimeHookDisableStages), Executor: e, + BackOff: backOff, } nriServer, err = nri.NewNriServer(nriServerOptions) if err != nil {