From b5f788e11dc0d5a6a3720b9d81697266bce0ea02 Mon Sep 17 00:00:00 2001 From: saintube Date: Mon, 27 May 2024 20:01:07 +0800 Subject: [PATCH] koordlet: fix nri reconnect params Signed-off-by: saintube --- pkg/koordlet/runtimehooks/config.go | 7 +++ pkg/koordlet/runtimehooks/config_test.go | 2 + pkg/koordlet/runtimehooks/nri/server.go | 63 +++++++++++++------- pkg/koordlet/runtimehooks/nri/server_test.go | 30 +++++++++- pkg/koordlet/runtimehooks/runtimehooks.go | 2 + 5 files changed, 79 insertions(+), 25 deletions(-) diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index c716f3dd3..62314d71d 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -120,6 +120,8 @@ type Config struct { RuntimeHooksNRIBackOffFactor float64 RuntimeHooksNRIBackOffSteps int RuntimeHooksNRISocketPath string + RuntimeHooksNRIPluginName string + RuntimeHooksNRIPluginIndex string RuntimeHookReconcileInterval time.Duration } @@ -139,6 +141,8 @@ func NewDefaultConfig() *Config { RuntimeHooksNRIBackOffSteps: math.MaxInt32, RuntimeHooksNRIBackOffFactor: 2, RuntimeHooksNRISocketPath: "nri/nri.sock", + RuntimeHooksNRIPluginName: "koordlet_nri", + RuntimeHooksNRIPluginIndex: "00", RuntimeHookReconcileInterval: 10 * time.Second, } } @@ -155,6 +159,9 @@ func (c *Config) InitFlags(fs *flag.FlagSet) { fs.DurationVar(&c.RuntimeHooksNRIBackOffCap, "runtime-hooks-nri-backoff-cap", c.RuntimeHooksNRIBackOffCap, "nri server backoff cap") fs.IntVar(&c.RuntimeHooksNRIBackOffSteps, "runtime-hooks-nri-backoff-steps", c.RuntimeHooksNRIBackOffSteps, "nri server backoff steps") fs.Float64Var(&c.RuntimeHooksNRIBackOffFactor, "runtime-hooks-nri-backoff-factor", c.RuntimeHooksNRIBackOffFactor, "nri server reconnect backoff factor") + fs.StringVar(&c.RuntimeHooksNRISocketPath, "runtime-hooks-nri-socket-path", c.RuntimeHooksNRISocketPath, "nri server socket path") + fs.StringVar(&c.RuntimeHooksNRIPluginName, "runtime-hooks-nri-plugin-name", c.RuntimeHooksNRISocketPath, "nri plugin name of the koordlet runtime hooks") + fs.StringVar(&c.RuntimeHooksNRIPluginIndex, "runtime-hooks-nri-plugin-index", c.RuntimeHooksNRIPluginIndex, "nri plugin index of the koordlet runtime hooks") fs.Var(cliflag.NewStringSlice(&c.RuntimeHookDisableStages), "runtime-hooks-disable-stages", "disable stages for runtime hooks") fs.BoolVar(&c.RuntimeHooksNRI, "enable-nri-runtime-hook", c.RuntimeHooksNRI, "enable/disable runtime hooks nri mode") fs.DurationVar(&c.RuntimeHookReconcileInterval, "runtime-hooks-reconcile-interval", c.RuntimeHookReconcileInterval, "reconcile interval for each plugins") diff --git a/pkg/koordlet/runtimehooks/config_test.go b/pkg/koordlet/runtimehooks/config_test.go index 551a94bc2..853defea2 100644 --- a/pkg/koordlet/runtimehooks/config_test.go +++ b/pkg/koordlet/runtimehooks/config_test.go @@ -43,6 +43,8 @@ func Test_NewDefaultConfig(t *testing.T) { RuntimeHooksNRIBackOffSteps: math.MaxInt32, RuntimeHooksNRIBackOffFactor: 2, RuntimeHooksNRISocketPath: "nri/nri.sock", + RuntimeHooksNRIPluginName: "koordlet_nri", + RuntimeHooksNRIPluginIndex: "00", RuntimeHookReconcileInterval: 10 * time.Second, } defaultConfig := NewDefaultConfig() diff --git a/pkg/koordlet/runtimehooks/nri/server.go b/pkg/koordlet/runtimehooks/nri/server.go index 65e6c222b..243b8b2d5 100644 --- a/pkg/koordlet/runtimehooks/nri/server.go +++ b/pkg/koordlet/runtimehooks/nri/server.go @@ -25,6 +25,7 @@ import ( "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" + "go.uber.org/atomic" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "sigs.k8s.io/yaml" @@ -41,6 +42,8 @@ type nriConfig struct { } type Options struct { + NriPluginName string + NriPluginIdx string NriSocketPath string NriConnectTimeout time.Duration // support stop running other hooks once someone failed @@ -62,26 +65,25 @@ func (o Options) Validate() error { } type NriServer struct { - cfg nriConfig - stub stub.Stub - mask stub.EventMask - options Options // server options + cfg nriConfig + stub stub.Stub + mask stub.EventMask + options Options // server options + stubOpts []stub.Option // nri stub options + stopped *atomic.Bool // if false, the stub will try to reconnect when stub.OnClose is invoked } const ( - events = "RunPodSandbox,RemovePodSandbox,CreateContainer,UpdateContainer" - pluginName = "koordlet_nri" - pluginIdx = "00" + events = "RunPodSandbox,RemovePodSandbox,CreateContainer,UpdateContainer" ) var ( - _ = stub.ConfigureInterface(&NriServer{}) - _ = stub.SynchronizeInterface(&NriServer{}) - _ = stub.RunPodInterface(&NriServer{}) - _ = stub.RemovePodInterface(&NriServer{}) - _ = stub.CreateContainerInterface(&NriServer{}) - _ = stub.UpdateContainerInterface(&NriServer{}) - opts []stub.Option + _ = stub.ConfigureInterface(&NriServer{}) + _ = stub.SynchronizeInterface(&NriServer{}) + _ = stub.RunPodInterface(&NriServer{}) + _ = stub.RemovePodInterface(&NriServer{}) + _ = stub.CreateContainerInterface(&NriServer{}) + _ = stub.UpdateContainerInterface(&NriServer{}) ) func NewNriServer(opt Options) (*NriServer, error) { @@ -90,17 +92,23 @@ func NewNriServer(opt Options) (*NriServer, error) { return nil, fmt.Errorf("failed to validate nri server, err: %w", err) } - opts = append(opts, stub.WithPluginName(pluginName)) - opts = append(opts, stub.WithPluginIdx(pluginIdx)) - opts = append(opts, stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath))) - p := &NriServer{options: opt} + stubOpts := []stub.Option{ + stub.WithPluginName(opt.NriPluginName), + stub.WithPluginIdx(opt.NriPluginIdx), + stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath)), + } + p := &NriServer{ + options: opt, + stubOpts: stubOpts, + stopped: atomic.NewBool(false), + } if p.mask, err = api.ParseEventMask(events); err != nil { klog.Errorf("failed to parse events %v", err) return p, err } p.cfg.Events = strings.Split(events, ",") - if p.stub, err = stub.New(p, append(opts, stub.WithOnClose(p.onClose))...); err != nil { + if p.stub, err = stub.New(p, append(p.stubOpts, stub.WithOnClose(p.onClose))...); err != nil { klog.Errorf("failed to create plugin stub: %v", err) return nil, err } @@ -135,10 +143,15 @@ func (p *NriServer) Start() error { case <-success: return nil case <-errorChan: - return fmt.Errorf("nri start fail") + return fmt.Errorf("nri start fail, err: %w", err) } } +func (p *NriServer) Stop() { + p.stopped.Store(true) + p.stub.Stop() +} + func (p *NriServer) Configure(config, runtime, version string) (stub.EventMask, error) { klog.V(4).Infof("got configuration data: %q from runtime %s %s", config, runtime, version) if config == "" { @@ -248,13 +261,17 @@ func (p *NriServer) RemovePodSandbox(pod *api.PodSandbox) error { func (p *NriServer) onClose() { //TODO: consider the pod status during restart retryFunc := func() (bool, error) { - stub, err := stub.New(p, append(opts, stub.WithOnClose(p.onClose))...) + if p.stopped.Load() { // if set to stopped, no longer reconnect + return true, nil + } + + newStub, err := stub.New(p, append(p.stubOpts, stub.WithOnClose(p.onClose))...) if err != nil { klog.Errorf("failed to create plugin stub: %v", err) return false, nil } - p.stub = stub + p.stub = newStub err = p.Start() if err != nil { completeNriSocketPath := filepath.Join(system.Conf.VarRunRootDir, p.options.NriSocketPath) @@ -263,7 +280,7 @@ func (p *NriServer) onClose() { return false, err } //TODO: check the error type, if nri server disable nri, we should also break backoff - klog.Warningf("nri reconnect failed") + klog.Warningf("nri reconnect failed, err: %s", err) return false, nil } else { klog.V(4).Info("nri server restart success") diff --git a/pkg/koordlet/runtimehooks/nri/server_test.go b/pkg/koordlet/runtimehooks/nri/server_test.go index 2bc307a98..cb0785708 100644 --- a/pkg/koordlet/runtimehooks/nri/server_test.go +++ b/pkg/koordlet/runtimehooks/nri/server_test.go @@ -67,11 +67,12 @@ func TestNriServer_Start(t *testing.T) { wantErr bool }{ { - name: "stub is nil", + name: "nri socket not found", fields: fields{ stub: nil, mask: api.EventMask(1), options: Options{ + NriSocketPath: "nri/nri.sock", NriConnectTimeout: time.Second, PluginFailurePolicy: "Ignore", DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}), @@ -81,19 +82,34 @@ func TestNriServer_Start(t *testing.T) { wantErr: true, }, { + name: "stub is nil", fields: fields{ stub: nil, + mask: api.EventMask(1), + options: Options{ + NriSocketPath: "", + NriConnectTimeout: time.Second, + PluginFailurePolicy: "Ignore", + DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}), + Executor: nil, + }, }, + wantErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + helper := system.NewFileTestUtil(t) + defer helper.Cleanup() s := &NriServer{ stub: tt.fields.stub, mask: tt.fields.mask, options: tt.fields.options, } + if s.stub != nil { + defer s.Stop() + } if err := s.Start(); (err != nil) != tt.wantErr { t.Errorf("Start() error = %v, wantErr %v", err, tt.wantErr) @@ -122,6 +138,8 @@ func TestNewNriServer(t *testing.T) { isNriSocketExist: false, }, args: args{opt: Options{ + NriPluginName: "test_newNriServer_0", + NriPluginIdx: "00", NriSocketPath: "nri/nri.sock", PluginFailurePolicy: "Ignore", DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}), @@ -135,6 +153,8 @@ func TestNewNriServer(t *testing.T) { isNriSocketExist: true, }, args: args{opt: Options{ + NriPluginName: "test_newNriServer_1", + NriPluginIdx: "01", NriSocketPath: "nri/nri.sock", PluginFailurePolicy: "Ignore", DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}), @@ -150,7 +170,10 @@ func TestNewNriServer(t *testing.T) { helper.WriteFileContents("nri/nri.sock", "") } - _, err := NewNriServer(tt.args.opt) + s, err := NewNriServer(tt.args.opt) + if s != nil && s.stub != nil { + defer s.Stop() + } if (err != nil) != tt.wantErr { t.Errorf("NewNriServer() error = %v, wantErr %v", err, tt.wantErr) return @@ -597,6 +620,9 @@ func TestNriServer_RemovePodSandbox(t *testing.T) { mask: tt.fields.mask, options: tt.fields.options, } + if p.stub != nil { + defer p.Stop() + } if tt.fields.plugin != nil { tt.fields.plugin.Register(hooks.Options{}) } diff --git a/pkg/koordlet/runtimehooks/runtimehooks.go b/pkg/koordlet/runtimehooks/runtimehooks.go index 8a8cdb95b..019d4e413 100644 --- a/pkg/koordlet/runtimehooks/runtimehooks.go +++ b/pkg/koordlet/runtimehooks/runtimehooks.go @@ -114,6 +114,8 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook, var nriServer *nri.NriServer if cfg.RuntimeHooksNRI { nriServerOptions := nri.Options{ + NriPluginName: cfg.RuntimeHooksNRIPluginName, + NriPluginIdx: cfg.RuntimeHooksNRIPluginIndex, NriSocketPath: cfg.RuntimeHooksNRISocketPath, NriConnectTimeout: cfg.RuntimeHooksNRIConnectTimeout, PluginFailurePolicy: pluginFailurePolicy,