Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: fix nri reconnect params #2067

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ type Config struct {
RuntimeHooksNRIBackOffFactor float64
RuntimeHooksNRIBackOffSteps int
RuntimeHooksNRISocketPath string
RuntimeHooksNRIPluginName string
RuntimeHooksNRIPluginIndex string
RuntimeHookReconcileInterval time.Duration
}

Expand All @@ -139,6 +141,8 @@ func NewDefaultConfig() *Config {
RuntimeHooksNRIBackOffSteps: math.MaxInt32,
RuntimeHooksNRIBackOffFactor: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHooksNRIPluginName: "koordlet_nri",
RuntimeHooksNRIPluginIndex: "00",
RuntimeHookReconcileInterval: 10 * time.Second,
}
}
Expand All @@ -155,6 +159,9 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.DurationVar(&c.RuntimeHooksNRIBackOffCap, "runtime-hooks-nri-backoff-cap", c.RuntimeHooksNRIBackOffCap, "nri server backoff cap")
fs.IntVar(&c.RuntimeHooksNRIBackOffSteps, "runtime-hooks-nri-backoff-steps", c.RuntimeHooksNRIBackOffSteps, "nri server backoff steps")
fs.Float64Var(&c.RuntimeHooksNRIBackOffFactor, "runtime-hooks-nri-backoff-factor", c.RuntimeHooksNRIBackOffFactor, "nri server reconnect backoff factor")
fs.StringVar(&c.RuntimeHooksNRISocketPath, "runtime-hooks-nri-socket-path", c.RuntimeHooksNRISocketPath, "nri server socket path")
fs.StringVar(&c.RuntimeHooksNRIPluginName, "runtime-hooks-nri-plugin-name", c.RuntimeHooksNRISocketPath, "nri plugin name of the koordlet runtime hooks")
fs.StringVar(&c.RuntimeHooksNRIPluginIndex, "runtime-hooks-nri-plugin-index", c.RuntimeHooksNRIPluginIndex, "nri plugin index of the koordlet runtime hooks")
fs.Var(cliflag.NewStringSlice(&c.RuntimeHookDisableStages), "runtime-hooks-disable-stages", "disable stages for runtime hooks")
fs.BoolVar(&c.RuntimeHooksNRI, "enable-nri-runtime-hook", c.RuntimeHooksNRI, "enable/disable runtime hooks nri mode")
fs.DurationVar(&c.RuntimeHookReconcileInterval, "runtime-hooks-reconcile-interval", c.RuntimeHookReconcileInterval, "reconcile interval for each plugins")
Expand Down
2 changes: 2 additions & 0 deletions pkg/koordlet/runtimehooks/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func Test_NewDefaultConfig(t *testing.T) {
RuntimeHooksNRIBackOffSteps: math.MaxInt32,
RuntimeHooksNRIBackOffFactor: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHooksNRIPluginName: "koordlet_nri",
RuntimeHooksNRIPluginIndex: "00",
RuntimeHookReconcileInterval: 10 * time.Second,
}
defaultConfig := NewDefaultConfig()
Expand Down
63 changes: 40 additions & 23 deletions pkg/koordlet/runtimehooks/nri/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"
Expand All @@ -41,6 +42,8 @@ type nriConfig struct {
}

type Options struct {
NriPluginName string
NriPluginIdx string
NriSocketPath string
NriConnectTimeout time.Duration
// support stop running other hooks once someone failed
Expand All @@ -62,26 +65,25 @@ func (o Options) Validate() error {
}

type NriServer struct {
cfg nriConfig
stub stub.Stub
mask stub.EventMask
options Options // server options
cfg nriConfig
stub stub.Stub
mask stub.EventMask
options Options // server options
stubOpts []stub.Option // nri stub options
stopped *atomic.Bool // if false, the stub will try to reconnect when stub.OnClose is invoked
}

const (
events = "RunPodSandbox,RemovePodSandbox,CreateContainer,UpdateContainer"
pluginName = "koordlet_nri"
pluginIdx = "00"
events = "RunPodSandbox,RemovePodSandbox,CreateContainer,UpdateContainer"
)

var (
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.RemovePodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
opts []stub.Option
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.RemovePodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
)

func NewNriServer(opt Options) (*NriServer, error) {
Expand All @@ -90,17 +92,23 @@ func NewNriServer(opt Options) (*NriServer, error) {
return nil, fmt.Errorf("failed to validate nri server, err: %w", err)
}

opts = append(opts, stub.WithPluginName(pluginName))
opts = append(opts, stub.WithPluginIdx(pluginIdx))
opts = append(opts, stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath)))
p := &NriServer{options: opt}
stubOpts := []stub.Option{
stub.WithPluginName(opt.NriPluginName),
stub.WithPluginIdx(opt.NriPluginIdx),
stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath)),
}
p := &NriServer{
options: opt,
stubOpts: stubOpts,
stopped: atomic.NewBool(false),
}
if p.mask, err = api.ParseEventMask(events); err != nil {
klog.Errorf("failed to parse events %v", err)
return p, err
}
p.cfg.Events = strings.Split(events, ",")

if p.stub, err = stub.New(p, append(opts, stub.WithOnClose(p.onClose))...); err != nil {
if p.stub, err = stub.New(p, append(p.stubOpts, stub.WithOnClose(p.onClose))...); err != nil {
klog.Errorf("failed to create plugin stub: %v", err)
return nil, err
}
Expand Down Expand Up @@ -135,10 +143,15 @@ func (p *NriServer) Start() error {
case <-success:
return nil
case <-errorChan:
return fmt.Errorf("nri start fail")
return fmt.Errorf("nri start fail, err: %w", err)
}
}

func (p *NriServer) Stop() {
p.stopped.Store(true)
p.stub.Stop()
}

func (p *NriServer) Configure(config, runtime, version string) (stub.EventMask, error) {
klog.V(4).Infof("got configuration data: %q from runtime %s %s", config, runtime, version)
if config == "" {
Expand Down Expand Up @@ -248,13 +261,17 @@ func (p *NriServer) RemovePodSandbox(pod *api.PodSandbox) error {
func (p *NriServer) onClose() {
//TODO: consider the pod status during restart
retryFunc := func() (bool, error) {
stub, err := stub.New(p, append(opts, stub.WithOnClose(p.onClose))...)
if p.stopped.Load() { // if set to stopped, no longer reconnect
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, currently, nri server never stop, right ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Currently, this method is used by the testing.

return true, nil
}

newStub, err := stub.New(p, append(p.stubOpts, stub.WithOnClose(p.onClose))...)
if err != nil {
klog.Errorf("failed to create plugin stub: %v", err)
return false, nil
}

p.stub = stub
p.stub = newStub
err = p.Start()
if err != nil {
completeNriSocketPath := filepath.Join(system.Conf.VarRunRootDir, p.options.NriSocketPath)
Expand All @@ -263,7 +280,7 @@ func (p *NriServer) onClose() {
return false, err
}
//TODO: check the error type, if nri server disable nri, we should also break backoff
klog.Warningf("nri reconnect failed")
klog.Warningf("nri reconnect failed, err: %s", err)
return false, nil
} else {
klog.V(4).Info("nri server restart success")
Expand Down
30 changes: 28 additions & 2 deletions pkg/koordlet/runtimehooks/nri/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ func TestNriServer_Start(t *testing.T) {
wantErr bool
}{
{
name: "stub is nil",
name: "nri socket not found",
fields: fields{
stub: nil,
mask: api.EventMask(1),
options: Options{
NriSocketPath: "nri/nri.sock",
NriConnectTimeout: time.Second,
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Expand All @@ -81,19 +82,34 @@ func TestNriServer_Start(t *testing.T) {
wantErr: true,
},
{
name: "stub is nil",
fields: fields{
stub: nil,
mask: api.EventMask(1),
options: Options{
NriSocketPath: "",
NriConnectTimeout: time.Second,
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Executor: nil,
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
helper := system.NewFileTestUtil(t)
defer helper.Cleanup()

s := &NriServer{
stub: tt.fields.stub,
mask: tt.fields.mask,
options: tt.fields.options,
}
if s.stub != nil {
defer s.Stop()
}

if err := s.Start(); (err != nil) != tt.wantErr {
t.Errorf("Start() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -122,6 +138,8 @@ func TestNewNriServer(t *testing.T) {
isNriSocketExist: false,
},
args: args{opt: Options{
NriPluginName: "test_newNriServer_0",
NriPluginIdx: "00",
NriSocketPath: "nri/nri.sock",
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Expand All @@ -135,6 +153,8 @@ func TestNewNriServer(t *testing.T) {
isNriSocketExist: true,
},
args: args{opt: Options{
NriPluginName: "test_newNriServer_1",
NriPluginIdx: "01",
NriSocketPath: "nri/nri.sock",
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Expand All @@ -150,7 +170,10 @@ func TestNewNriServer(t *testing.T) {
helper.WriteFileContents("nri/nri.sock", "")
}

_, err := NewNriServer(tt.args.opt)
s, err := NewNriServer(tt.args.opt)
if s != nil && s.stub != nil {
defer s.Stop()
}
if (err != nil) != tt.wantErr {
t.Errorf("NewNriServer() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -597,6 +620,9 @@ func TestNriServer_RemovePodSandbox(t *testing.T) {
mask: tt.fields.mask,
options: tt.fields.options,
}
if p.stub != nil {
defer p.Stop()
}
if tt.fields.plugin != nil {
tt.fields.plugin.Register(hooks.Options{})
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/koordlet/runtimehooks/runtimehooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook,
var nriServer *nri.NriServer
if cfg.RuntimeHooksNRI {
nriServerOptions := nri.Options{
NriPluginName: cfg.RuntimeHooksNRIPluginName,
NriPluginIdx: cfg.RuntimeHooksNRIPluginIndex,
NriSocketPath: cfg.RuntimeHooksNRISocketPath,
NriConnectTimeout: cfg.RuntimeHooksNRIConnectTimeout,
PluginFailurePolicy: pluginFailurePolicy,
Expand Down