From 1e9b75b690c9251da765a783f5e738c322c37d8a Mon Sep 17 00:00:00 2001 From: Zhang Kang Date: Fri, 26 Apr 2024 16:58:09 +0800 Subject: [PATCH] add nri reconnect Signed-off-by: Zhang Kang --- pkg/koordlet/runtimehooks/config.go | 55 +++++++++------ pkg/koordlet/runtimehooks/config_test.go | 25 ++++--- pkg/koordlet/runtimehooks/nri/server.go | 72 ++++++++++++++++---- pkg/koordlet/runtimehooks/nri/server_test.go | 5 +- pkg/koordlet/runtimehooks/runtimehooks.go | 24 +++++-- 5 files changed, 130 insertions(+), 51 deletions(-) diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index 294a92b7f..d351e6a53 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -105,30 +105,40 @@ var ( ) type Config struct { - RuntimeHooksNetwork string - RuntimeHooksAddr string - RuntimeHooksFailurePolicy string - RuntimeHooksPluginFailurePolicy string - RuntimeHookConfigFilePath string - RuntimeHookHostEndpoint string - RuntimeHookDisableStages []string - RuntimeHooksNRI bool - RuntimeHooksNRISocketPath string - RuntimeHookReconcileInterval time.Duration + RuntimeHooksNetwork string + RuntimeHooksAddr string + RuntimeHooksFailurePolicy string + RuntimeHooksPluginFailurePolicy string + RuntimeHookConfigFilePath string + RuntimeHookHostEndpoint string + RuntimeHookDisableStages []string + RuntimeHooksNRI bool + RuntimeHooksNRIConnectTimeout time.Duration + RuntimeHooksNRIReconnectInitInterval time.Duration + RuntimeHooksNRIReconnectMaxDuration time.Duration + RuntimeHooksNRIReconnectMul float64 + RuntimeHooksNRIReconnectLimit int + RuntimeHooksNRISocketPath string + RuntimeHookReconcileInterval time.Duration } func NewDefaultConfig() *Config { return &Config{ - RuntimeHooksNetwork: "unix", - RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock", - RuntimeHooksFailurePolicy: "Ignore", - RuntimeHooksPluginFailurePolicy: "Ignore", - RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir, - RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock", - RuntimeHookDisableStages: []string{}, - RuntimeHooksNRI: true, - RuntimeHooksNRISocketPath: "nri/nri.sock", - RuntimeHookReconcileInterval: 10 * time.Second, + RuntimeHooksNetwork: "unix", + RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock", + RuntimeHooksFailurePolicy: "Ignore", + RuntimeHooksPluginFailurePolicy: "Ignore", + RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir, + RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock", + RuntimeHookDisableStages: []string{}, + RuntimeHooksNRI: true, + RuntimeHooksNRIConnectTimeout: 2 * time.Second, + RuntimeHooksNRIReconnectInitInterval: 1 * time.Second, + RuntimeHooksNRIReconnectMaxDuration: 10 * time.Minute, + RuntimeHooksNRIReconnectLimit: 100, + RuntimeHooksNRIReconnectMul: 2, + RuntimeHooksNRISocketPath: "nri/nri.sock", + RuntimeHookReconcileInterval: 10 * time.Second, } } @@ -139,6 +149,11 @@ func (c *Config) InitFlags(fs *flag.FlagSet) { fs.StringVar(&c.RuntimeHooksPluginFailurePolicy, "runtime-hooks-plugin-failure-policy", c.RuntimeHooksPluginFailurePolicy, "stop running other hooks once someone failed") fs.StringVar(&c.RuntimeHookConfigFilePath, "runtime-hooks-config-path", c.RuntimeHookConfigFilePath, "config file path for runtime hooks") fs.StringVar(&c.RuntimeHookHostEndpoint, "runtime-hooks-host-endpoint", c.RuntimeHookHostEndpoint, "host endpoint of runtime proxy") + fs.DurationVar(&c.RuntimeHooksNRIConnectTimeout, "runtime-hooks-nri-connect-timeout", c.RuntimeHooksNRIConnectTimeout, "nri server connect time out") + fs.DurationVar(&c.RuntimeHooksNRIReconnectInitInterval, "runtime-hooks-nri-reconnect-init-interval", c.RuntimeHooksNRIReconnectInitInterval, "nri server reconnection init interval") + fs.DurationVar(&c.RuntimeHooksNRIReconnectMaxDuration, "runtime-hooks-nri-reconnect-max-duration", c.RuntimeHooksNRIReconnectMaxDuration, "nri server reconnection max duration, when hit the limit, will not reconnect") + fs.IntVar(&c.RuntimeHooksNRIReconnectLimit, "runtime-hooks-nri-reconnect-max-times", c.RuntimeHooksNRIReconnectLimit, "nri server reconnect max limit") + fs.Float64Var(&c.RuntimeHooksNRIReconnectMul, "runtime-hooks-nri-reconnect-mul", c.RuntimeHooksNRIReconnectMul, "nri server reconnect backoff mul") fs.Var(cliflag.NewStringSlice(&c.RuntimeHookDisableStages), "runtime-hooks-disable-stages", "disable stages for runtime hooks") fs.BoolVar(&c.RuntimeHooksNRI, "enable-nri-runtime-hook", c.RuntimeHooksNRI, "enable/disable runtime hooks nri mode") fs.DurationVar(&c.RuntimeHookReconcileInterval, "runtime-hooks-reconcile-interval", c.RuntimeHookReconcileInterval, "reconcile interval for each plugins") diff --git a/pkg/koordlet/runtimehooks/config_test.go b/pkg/koordlet/runtimehooks/config_test.go index 05aaef3de..e9c88c423 100644 --- a/pkg/koordlet/runtimehooks/config_test.go +++ b/pkg/koordlet/runtimehooks/config_test.go @@ -28,16 +28,21 @@ import ( func Test_NewDefaultConfig(t *testing.T) { expectConfig := &Config{ - RuntimeHooksNetwork: "unix", - RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock", - RuntimeHooksFailurePolicy: "Ignore", - RuntimeHooksPluginFailurePolicy: "Ignore", - RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir, - RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock", - RuntimeHookDisableStages: []string{}, - RuntimeHooksNRI: true, - RuntimeHooksNRISocketPath: "nri/nri.sock", - RuntimeHookReconcileInterval: 10 * time.Second, + RuntimeHooksNetwork: "unix", + RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock", + RuntimeHooksFailurePolicy: "Ignore", + RuntimeHooksPluginFailurePolicy: "Ignore", + RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir, + RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock", + RuntimeHookDisableStages: []string{}, + RuntimeHooksNRI: true, + RuntimeHooksNRIConnectTimeout: 2 * time.Second, + RuntimeHooksNRIReconnectInitInterval: 1 * time.Second, + RuntimeHooksNRIReconnectMaxDuration: 10 * time.Minute, + RuntimeHooksNRIReconnectLimit: 100, + RuntimeHooksNRIReconnectMul: 2, + RuntimeHooksNRISocketPath: "nri/nri.sock", + RuntimeHookReconcileInterval: 10 * time.Second, } defaultConfig := NewDefaultConfig() assert.Equal(t, expectConfig, defaultConfig) diff --git a/pkg/koordlet/runtimehooks/nri/server.go b/pkg/koordlet/runtimehooks/nri/server.go index 6c047deee..513f148c6 100644 --- a/pkg/koordlet/runtimehooks/nri/server.go +++ b/pkg/koordlet/runtimehooks/nri/server.go @@ -21,9 +21,11 @@ import ( "fmt" "path/filepath" "strings" + "time" "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "sigs.k8s.io/yaml" @@ -39,12 +41,14 @@ type nriConfig struct { } type Options struct { - NriSocketPath string + NriSocketPath string + NriConnectTimeout time.Duration // support stop running other hooks once someone failed PluginFailurePolicy rmconfig.FailurePolicyType // todo: add support for disable stages DisableStages map[string]struct{} Executor resourceexecutor.ResourceUpdateExecutor + BackOff wait.Backoff } func (o Options) Validate() error { @@ -71,11 +75,12 @@ const ( ) var ( - _ = stub.ConfigureInterface(&NriServer{}) - _ = stub.SynchronizeInterface(&NriServer{}) - _ = stub.RunPodInterface(&NriServer{}) - _ = stub.CreateContainerInterface(&NriServer{}) - _ = stub.UpdateContainerInterface(&NriServer{}) + _ = stub.ConfigureInterface(&NriServer{}) + _ = stub.SynchronizeInterface(&NriServer{}) + _ = stub.RunPodInterface(&NriServer{}) + _ = stub.CreateContainerInterface(&NriServer{}) + _ = stub.UpdateContainerInterface(&NriServer{}) + opts []stub.Option ) func NewNriServer(opt Options) (*NriServer, error) { @@ -84,7 +89,6 @@ func NewNriServer(opt Options) (*NriServer, error) { return nil, fmt.Errorf("failed to validate nri server, err: %w", err) } - var opts []stub.Option opts = append(opts, stub.WithPluginName(pluginName)) opts = append(opts, stub.WithPluginIdx(pluginIdx)) opts = append(opts, stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath))) @@ -104,19 +108,34 @@ func NewNriServer(opt Options) (*NriServer, error) { } func (p *NriServer) Start() error { - go func() { + err := p.options.Validate() + if err != nil { + return err + } + success := time.After(p.options.NriConnectTimeout) + errorChan := make(chan error) + + go func(chan error) { if p.stub != nil { err := p.stub.Run(context.Background()) if err != nil { klog.Errorf("nri server exited with error: %v", err) + errorChan <- err } else { klog.V(4).Info("nri server started") } } else { - klog.V(4).Info("nri stub is nil") + err := fmt.Errorf("nri stub is nil") + errorChan <- err } - }() - return nil + }(errorChan) + + select { + case <-success: + return nil + case <-errorChan: + return fmt.Errorf("nri start fail") + } } func (p *NriServer) Configure(config, runtime, version string) (stub.EventMask, error) { @@ -209,6 +228,33 @@ func (p *NriServer) UpdateContainer(pod *api.PodSandbox, container *api.Containe } func (p *NriServer) onClose() { - p.stub.Stop() - klog.V(6).Infof("NRI server closes") + //TODO: consider the pod status during restart + retryFunc := func() (bool, error) { + stub, err := stub.New(p, append(opts, stub.WithOnClose(p.onClose))...) + if err != nil { + klog.Errorf("failed to create plugin stub: %v", err) + return false, nil + } + + p.stub = stub + err = p.Start() + if err != nil { + completeNriSocketPath := filepath.Join(system.Conf.VarRunRootDir, p.options.NriSocketPath) + targetErr := fmt.Errorf("nri socket path %q does not exist", completeNriSocketPath) + if err.Error() == targetErr.Error() { + return false, err + } + //TODO: check the error type, if nri server disable nri, we should also break backoff + klog.Warningf("nri reconnect failed") + return false, nil + } else { + klog.V(4).Info("nri server restart success") + return true, nil + } + } + + err := wait.ExponentialBackoff(p.options.BackOff, retryFunc) + if err != nil { + klog.Errorf("nri server restart failed after several times retry") + } } diff --git a/pkg/koordlet/runtimehooks/nri/server_test.go b/pkg/koordlet/runtimehooks/nri/server_test.go index 75826f43f..241420899 100644 --- a/pkg/koordlet/runtimehooks/nri/server_test.go +++ b/pkg/koordlet/runtimehooks/nri/server_test.go @@ -19,6 +19,7 @@ package nri import ( "reflect" "testing" + "time" "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" @@ -58,19 +59,19 @@ func TestNriServer_Start(t *testing.T) { stub: nil, mask: api.EventMask(1), options: Options{ + NriConnectTimeout: time.Second, PluginFailurePolicy: "Ignore", DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}), Executor: nil, }, }, - wantErr: false, + wantErr: true, }, { fields: fields{ stub: nil, }, }, - {}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/koordlet/runtimehooks/runtimehooks.go b/pkg/koordlet/runtimehooks/runtimehooks.go index 9a888ba42..cc464bb6e 100644 --- a/pkg/koordlet/runtimehooks/runtimehooks.go +++ b/pkg/koordlet/runtimehooks/runtimehooks.go @@ -19,6 +19,7 @@ package runtimehooks import ( "fmt" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "github.com/koordinator-sh/koordinator/pkg/features" @@ -57,12 +58,14 @@ func (r *runtimeHook) Run(stopCh <-chan struct{}) error { return err } if r.nriServer != nil { - if err := r.nriServer.Start(); err != nil { - // if NRI is not enabled or container runtime not support NRI, we just skip NRI server start - klog.Errorf("nri mode runtime hook server start failed: %v", err) - } else { - klog.V(4).Infof("nri mode runtime hook server has started") - } + go func() { + if err := r.nriServer.Start(); err != nil { + // if NRI is not enabled or container runtime not support NRI, we just skip NRI server start + klog.Warningf("nri mode runtime hook server start failed: %v", err) + } else { + klog.V(4).Infof("nri mode runtime hook server has started") + } + }() } if err := r.reconciler.Run(stopCh); err != nil { return err @@ -101,13 +104,22 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook, Executor: e, } + backOff := wait.Backoff{ + Duration: cfg.RuntimeHooksNRIReconnectInitInterval, + Factor: cfg.RuntimeHooksNRIReconnectMul, + Jitter: 0.1, + Steps: cfg.RuntimeHooksNRIReconnectLimit, + Cap: cfg.RuntimeHooksNRIReconnectMaxDuration, + } var nriServer *nri.NriServer if cfg.RuntimeHooksNRI { nriServerOptions := nri.Options{ NriSocketPath: cfg.RuntimeHooksNRISocketPath, + NriConnectTimeout: cfg.RuntimeHooksNRIConnectTimeout, PluginFailurePolicy: pluginFailurePolicy, DisableStages: getDisableStagesMap(cfg.RuntimeHookDisableStages), Executor: e, + BackOff: backOff, } nriServer, err = nri.NewNriServer(nriServerOptions) if err != nil {