Skip to content

Commit

Permalink
koordlet: add NRI reconnect (#1972)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhang Kang <kang.zhang@intel.com>
  • Loading branch information
kangclzjc authored May 10, 2024
1 parent fb33c30 commit afa430a
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 21 deletions.
16 changes: 16 additions & 0 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package runtimehooks

import (
"flag"
"math"
"time"

"k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -113,6 +114,11 @@ type Config struct {
RuntimeHookHostEndpoint string
RuntimeHookDisableStages []string
RuntimeHooksNRI bool
RuntimeHooksNRIConnectTimeout time.Duration
RuntimeHooksNRIBackOffDuration time.Duration
RuntimeHooksNRIBackOffCap time.Duration
RuntimeHooksNRIBackOffFactor float64
RuntimeHooksNRIBackOffSteps int
RuntimeHooksNRISocketPath string
RuntimeHookReconcileInterval time.Duration
}
Expand All @@ -127,6 +133,11 @@ func NewDefaultConfig() *Config {
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRIConnectTimeout: 6 * time.Second,
RuntimeHooksNRIBackOffDuration: 1 * time.Second,
RuntimeHooksNRIBackOffCap: 1<<62 - 1,
RuntimeHooksNRIBackOffSteps: math.MaxInt32,
RuntimeHooksNRIBackOffFactor: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
}
Expand All @@ -139,6 +150,11 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.StringVar(&c.RuntimeHooksPluginFailurePolicy, "runtime-hooks-plugin-failure-policy", c.RuntimeHooksPluginFailurePolicy, "stop running other hooks once someone failed")
fs.StringVar(&c.RuntimeHookConfigFilePath, "runtime-hooks-config-path", c.RuntimeHookConfigFilePath, "config file path for runtime hooks")
fs.StringVar(&c.RuntimeHookHostEndpoint, "runtime-hooks-host-endpoint", c.RuntimeHookHostEndpoint, "host endpoint of runtime proxy")
fs.DurationVar(&c.RuntimeHooksNRIConnectTimeout, "runtime-hooks-nri-connect-timeout", c.RuntimeHooksNRIConnectTimeout, "nri server connect time out, it should be a little more than default plugin registration timeout(5 seconds) which is defined in containerd config")
fs.DurationVar(&c.RuntimeHooksNRIBackOffDuration, "runtime-hooks-nri-backoff-duration", c.RuntimeHooksNRIBackOffDuration, "nri server backoff duration")
fs.DurationVar(&c.RuntimeHooksNRIBackOffCap, "runtime-hooks-nri-backoff-cap", c.RuntimeHooksNRIBackOffCap, "nri server backoff cap")
fs.IntVar(&c.RuntimeHooksNRIBackOffSteps, "runtime-hooks-nri-backoff-steps", c.RuntimeHooksNRIBackOffSteps, "nri server backoff steps")
fs.Float64Var(&c.RuntimeHooksNRIBackOffFactor, "runtime-hooks-nri-backoff-factor", c.RuntimeHooksNRIBackOffFactor, "nri server reconnect backoff factor")
fs.Var(cliflag.NewStringSlice(&c.RuntimeHookDisableStages), "runtime-hooks-disable-stages", "disable stages for runtime hooks")
fs.BoolVar(&c.RuntimeHooksNRI, "enable-nri-runtime-hook", c.RuntimeHooksNRI, "enable/disable runtime hooks nri mode")
fs.DurationVar(&c.RuntimeHookReconcileInterval, "runtime-hooks-reconcile-interval", c.RuntimeHookReconcileInterval, "reconcile interval for each plugins")
Expand Down
6 changes: 6 additions & 0 deletions pkg/koordlet/runtimehooks/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package runtimehooks

import (
"flag"
"math"
"testing"
"time"

Expand All @@ -36,6 +37,11 @@ func Test_NewDefaultConfig(t *testing.T) {
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRIConnectTimeout: 6 * time.Second,
RuntimeHooksNRIBackOffDuration: 1 * time.Second,
RuntimeHooksNRIBackOffCap: 1<<62 - 1,
RuntimeHooksNRIBackOffSteps: math.MaxInt32,
RuntimeHooksNRIBackOffFactor: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
}
Expand Down
73 changes: 60 additions & 13 deletions pkg/koordlet/runtimehooks/nri/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"fmt"
"path/filepath"
"strings"
"time"

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

Expand All @@ -39,12 +41,14 @@ type nriConfig struct {
}

type Options struct {
NriSocketPath string
NriSocketPath string
NriConnectTimeout time.Duration
// support stop running other hooks once someone failed
PluginFailurePolicy rmconfig.FailurePolicyType
// todo: add support for disable stages
DisableStages map[string]struct{}
Executor resourceexecutor.ResourceUpdateExecutor
BackOff wait.Backoff
}

func (o Options) Validate() error {
Expand All @@ -71,11 +75,12 @@ const (
)

var (
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
opts []stub.Option
)

func NewNriServer(opt Options) (*NriServer, error) {
Expand All @@ -84,7 +89,6 @@ func NewNriServer(opt Options) (*NriServer, error) {
return nil, fmt.Errorf("failed to validate nri server, err: %w", err)
}

var opts []stub.Option
opts = append(opts, stub.WithPluginName(pluginName))
opts = append(opts, stub.WithPluginIdx(pluginIdx))
opts = append(opts, stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath)))
Expand All @@ -104,19 +108,34 @@ func NewNriServer(opt Options) (*NriServer, error) {
}

func (p *NriServer) Start() error {
go func() {
err := p.options.Validate()
if err != nil {
return err
}
success := time.After(p.options.NriConnectTimeout)
errorChan := make(chan error)

go func(chan error) {
if p.stub != nil {
err := p.stub.Run(context.Background())
if err != nil {
klog.Errorf("nri server exited with error: %v", err)
errorChan <- err
} else {
klog.V(4).Info("nri server started")
}
} else {
klog.V(4).Info("nri stub is nil")
err := fmt.Errorf("nri stub is nil")
errorChan <- err
}
}()
return nil
}(errorChan)

select {
case <-success:
return nil
case <-errorChan:
return fmt.Errorf("nri start fail")
}
}

func (p *NriServer) Configure(config, runtime, version string) (stub.EventMask, error) {
Expand Down Expand Up @@ -209,6 +228,34 @@ func (p *NriServer) UpdateContainer(pod *api.PodSandbox, container *api.Containe
}

func (p *NriServer) onClose() {
p.stub.Stop()
klog.V(6).Infof("NRI server closes")
//TODO: consider the pod status during restart
retryFunc := func() (bool, error) {
stub, err := stub.New(p, append(opts, stub.WithOnClose(p.onClose))...)
if err != nil {
klog.Errorf("failed to create plugin stub: %v", err)
return false, nil
}

p.stub = stub
err = p.Start()
if err != nil {
completeNriSocketPath := filepath.Join(system.Conf.VarRunRootDir, p.options.NriSocketPath)
targetErr := fmt.Errorf("nri socket path %q does not exist", completeNriSocketPath)
if err.Error() == targetErr.Error() {
return false, err
}
//TODO: check the error type, if nri server disable nri, we should also break backoff
klog.Warningf("nri reconnect failed")
return false, nil
} else {
klog.V(4).Info("nri server restart success")
return true, nil
}
}

// TODO: high version wait not support BackoffUntil with BackOffManger as parameters, when updated to v0.27.0 version wait, we can refine ExponentialBackoff.
err := wait.ExponentialBackoff(p.options.BackOff, retryFunc)
if err != nil {
klog.Errorf("nri server restart failed after several times retry")
}
}
5 changes: 3 additions & 2 deletions pkg/koordlet/runtimehooks/nri/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nri
import (
"reflect"
"testing"
"time"

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
Expand Down Expand Up @@ -58,19 +59,19 @@ func TestNriServer_Start(t *testing.T) {
stub: nil,
mask: api.EventMask(1),
options: Options{
NriConnectTimeout: time.Second,
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Executor: nil,
},
},
wantErr: false,
wantErr: true,
},
{
fields: fields{
stub: nil,
},
},
{},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
24 changes: 18 additions & 6 deletions pkg/koordlet/runtimehooks/runtimehooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package runtimehooks
import (
"fmt"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/features"
Expand Down Expand Up @@ -57,12 +58,14 @@ func (r *runtimeHook) Run(stopCh <-chan struct{}) error {
return err
}
if r.nriServer != nil {
if err := r.nriServer.Start(); err != nil {
// if NRI is not enabled or container runtime not support NRI, we just skip NRI server start
klog.Errorf("nri mode runtime hook server start failed: %v", err)
} else {
klog.V(4).Infof("nri mode runtime hook server has started")
}
go func() {
if err := r.nriServer.Start(); err != nil {
// if NRI is not enabled or container runtime not support NRI, we just skip NRI server start
klog.Warningf("nri mode runtime hook server start failed: %v", err)
} else {
klog.V(4).Infof("nri mode runtime hook server has started")
}
}()
}
if err := r.reconciler.Run(stopCh); err != nil {
return err
Expand Down Expand Up @@ -101,13 +104,22 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook,
Executor: e,
}

backOff := wait.Backoff{
Duration: cfg.RuntimeHooksNRIBackOffDuration,
Factor: cfg.RuntimeHooksNRIBackOffFactor,
Jitter: 0.1,
Steps: cfg.RuntimeHooksNRIBackOffSteps,
Cap: cfg.RuntimeHooksNRIBackOffCap,
}
var nriServer *nri.NriServer
if cfg.RuntimeHooksNRI {
nriServerOptions := nri.Options{
NriSocketPath: cfg.RuntimeHooksNRISocketPath,
NriConnectTimeout: cfg.RuntimeHooksNRIConnectTimeout,
PluginFailurePolicy: pluginFailurePolicy,
DisableStages: getDisableStagesMap(cfg.RuntimeHookDisableStages),
Executor: e,
BackOff: backOff,
}
nriServer, err = nri.NewNriServer(nriServerOptions)
if err != nil {
Expand Down

0 comments on commit afa430a

Please sign in to comment.