Skip to content

Commit

Permalink
add nri reconnect
Browse files Browse the repository at this point in the history
Signed-off-by: Zhang Kang <kang.zhang@intel.com>
  • Loading branch information
kangclzjc committed Apr 26, 2024
1 parent af5f951 commit 1e9b75b
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 51 deletions.
55 changes: 35 additions & 20 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,30 +105,40 @@ var (
)

type Config struct {
RuntimeHooksNetwork string
RuntimeHooksAddr string
RuntimeHooksFailurePolicy string
RuntimeHooksPluginFailurePolicy string
RuntimeHookConfigFilePath string
RuntimeHookHostEndpoint string
RuntimeHookDisableStages []string
RuntimeHooksNRI bool
RuntimeHooksNRISocketPath string
RuntimeHookReconcileInterval time.Duration
RuntimeHooksNetwork string
RuntimeHooksAddr string
RuntimeHooksFailurePolicy string
RuntimeHooksPluginFailurePolicy string
RuntimeHookConfigFilePath string
RuntimeHookHostEndpoint string
RuntimeHookDisableStages []string
RuntimeHooksNRI bool
RuntimeHooksNRIConnectTimeout time.Duration
RuntimeHooksNRIReconnectInitInterval time.Duration
RuntimeHooksNRIReconnectMaxDuration time.Duration
RuntimeHooksNRIReconnectMul float64
RuntimeHooksNRIReconnectLimit int
RuntimeHooksNRISocketPath string
RuntimeHookReconcileInterval time.Duration
}

func NewDefaultConfig() *Config {
return &Config{
RuntimeHooksNetwork: "unix",
RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock",
RuntimeHooksFailurePolicy: "Ignore",
RuntimeHooksPluginFailurePolicy: "Ignore",
RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir,
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
RuntimeHooksNetwork: "unix",
RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock",
RuntimeHooksFailurePolicy: "Ignore",
RuntimeHooksPluginFailurePolicy: "Ignore",
RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir,
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRIConnectTimeout: 2 * time.Second,
RuntimeHooksNRIReconnectInitInterval: 1 * time.Second,
RuntimeHooksNRIReconnectMaxDuration: 10 * time.Minute,
RuntimeHooksNRIReconnectLimit: 100,
RuntimeHooksNRIReconnectMul: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
}
}

Expand All @@ -139,6 +149,11 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.StringVar(&c.RuntimeHooksPluginFailurePolicy, "runtime-hooks-plugin-failure-policy", c.RuntimeHooksPluginFailurePolicy, "stop running other hooks once someone failed")
fs.StringVar(&c.RuntimeHookConfigFilePath, "runtime-hooks-config-path", c.RuntimeHookConfigFilePath, "config file path for runtime hooks")
fs.StringVar(&c.RuntimeHookHostEndpoint, "runtime-hooks-host-endpoint", c.RuntimeHookHostEndpoint, "host endpoint of runtime proxy")
fs.DurationVar(&c.RuntimeHooksNRIConnectTimeout, "runtime-hooks-nri-connect-timeout", c.RuntimeHooksNRIConnectTimeout, "nri server connect time out")
fs.DurationVar(&c.RuntimeHooksNRIReconnectInitInterval, "runtime-hooks-nri-reconnect-init-interval", c.RuntimeHooksNRIReconnectInitInterval, "nri server reconnection init interval")
fs.DurationVar(&c.RuntimeHooksNRIReconnectMaxDuration, "runtime-hooks-nri-reconnect-max-duration", c.RuntimeHooksNRIReconnectMaxDuration, "nri server reconnection max duration, when hit the limit, will not reconnect")
fs.IntVar(&c.RuntimeHooksNRIReconnectLimit, "runtime-hooks-nri-reconnect-max-times", c.RuntimeHooksNRIReconnectLimit, "nri server reconnect max limit")
fs.Float64Var(&c.RuntimeHooksNRIReconnectMul, "runtime-hooks-nri-reconnect-mul", c.RuntimeHooksNRIReconnectMul, "nri server reconnect backoff mul")
fs.Var(cliflag.NewStringSlice(&c.RuntimeHookDisableStages), "runtime-hooks-disable-stages", "disable stages for runtime hooks")
fs.BoolVar(&c.RuntimeHooksNRI, "enable-nri-runtime-hook", c.RuntimeHooksNRI, "enable/disable runtime hooks nri mode")
fs.DurationVar(&c.RuntimeHookReconcileInterval, "runtime-hooks-reconcile-interval", c.RuntimeHookReconcileInterval, "reconcile interval for each plugins")
Expand Down
25 changes: 15 additions & 10 deletions pkg/koordlet/runtimehooks/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,21 @@ import (

func Test_NewDefaultConfig(t *testing.T) {
expectConfig := &Config{
RuntimeHooksNetwork: "unix",
RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock",
RuntimeHooksFailurePolicy: "Ignore",
RuntimeHooksPluginFailurePolicy: "Ignore",
RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir,
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
RuntimeHooksNetwork: "unix",
RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock",
RuntimeHooksFailurePolicy: "Ignore",
RuntimeHooksPluginFailurePolicy: "Ignore",
RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir,
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRIConnectTimeout: 2 * time.Second,
RuntimeHooksNRIReconnectInitInterval: 1 * time.Second,
RuntimeHooksNRIReconnectMaxDuration: 10 * time.Minute,
RuntimeHooksNRIReconnectLimit: 100,
RuntimeHooksNRIReconnectMul: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
}
defaultConfig := NewDefaultConfig()
assert.Equal(t, expectConfig, defaultConfig)
Expand Down
72 changes: 59 additions & 13 deletions pkg/koordlet/runtimehooks/nri/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"fmt"
"path/filepath"
"strings"
"time"

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

Expand All @@ -39,12 +41,14 @@ type nriConfig struct {
}

type Options struct {
NriSocketPath string
NriSocketPath string
NriConnectTimeout time.Duration
// support stop running other hooks once someone failed
PluginFailurePolicy rmconfig.FailurePolicyType
// todo: add support for disable stages
DisableStages map[string]struct{}
Executor resourceexecutor.ResourceUpdateExecutor
BackOff wait.Backoff
}

func (o Options) Validate() error {
Expand All @@ -71,11 +75,12 @@ const (
)

var (
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
opts []stub.Option
)

func NewNriServer(opt Options) (*NriServer, error) {
Expand All @@ -84,7 +89,6 @@ func NewNriServer(opt Options) (*NriServer, error) {
return nil, fmt.Errorf("failed to validate nri server, err: %w", err)
}

var opts []stub.Option
opts = append(opts, stub.WithPluginName(pluginName))
opts = append(opts, stub.WithPluginIdx(pluginIdx))
opts = append(opts, stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath)))
Expand All @@ -104,19 +108,34 @@ func NewNriServer(opt Options) (*NriServer, error) {
}

func (p *NriServer) Start() error {
go func() {
err := p.options.Validate()
if err != nil {
return err
}
success := time.After(p.options.NriConnectTimeout)
errorChan := make(chan error)

go func(chan error) {
if p.stub != nil {
err := p.stub.Run(context.Background())
if err != nil {
klog.Errorf("nri server exited with error: %v", err)
errorChan <- err
} else {
klog.V(4).Info("nri server started")
}
} else {
klog.V(4).Info("nri stub is nil")
err := fmt.Errorf("nri stub is nil")
errorChan <- err
}
}()
return nil
}(errorChan)

select {
case <-success:
return nil
case <-errorChan:
return fmt.Errorf("nri start fail")
}
}

func (p *NriServer) Configure(config, runtime, version string) (stub.EventMask, error) {
Expand Down Expand Up @@ -209,6 +228,33 @@ func (p *NriServer) UpdateContainer(pod *api.PodSandbox, container *api.Containe
}

func (p *NriServer) onClose() {
p.stub.Stop()
klog.V(6).Infof("NRI server closes")
//TODO: consider the pod status during restart
retryFunc := func() (bool, error) {
stub, err := stub.New(p, append(opts, stub.WithOnClose(p.onClose))...)
if err != nil {
klog.Errorf("failed to create plugin stub: %v", err)
return false, nil
}

p.stub = stub
err = p.Start()
if err != nil {
completeNriSocketPath := filepath.Join(system.Conf.VarRunRootDir, p.options.NriSocketPath)
targetErr := fmt.Errorf("nri socket path %q does not exist", completeNriSocketPath)
if err.Error() == targetErr.Error() {
return false, err
}
//TODO: check the error type, if nri server disable nri, we should also break backoff
klog.Warningf("nri reconnect failed")
return false, nil
} else {
klog.V(4).Info("nri server restart success")
return true, nil
}
}

err := wait.ExponentialBackoff(p.options.BackOff, retryFunc)
if err != nil {
klog.Errorf("nri server restart failed after several times retry")
}
}
5 changes: 3 additions & 2 deletions pkg/koordlet/runtimehooks/nri/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nri
import (
"reflect"
"testing"
"time"

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
Expand Down Expand Up @@ -58,19 +59,19 @@ func TestNriServer_Start(t *testing.T) {
stub: nil,
mask: api.EventMask(1),
options: Options{
NriConnectTimeout: time.Second,
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Executor: nil,
},
},
wantErr: false,
wantErr: true,
},
{
fields: fields{
stub: nil,
},
},
{},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
24 changes: 18 additions & 6 deletions pkg/koordlet/runtimehooks/runtimehooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package runtimehooks
import (
"fmt"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/features"
Expand Down Expand Up @@ -57,12 +58,14 @@ func (r *runtimeHook) Run(stopCh <-chan struct{}) error {
return err
}
if r.nriServer != nil {
if err := r.nriServer.Start(); err != nil {
// if NRI is not enabled or container runtime not support NRI, we just skip NRI server start
klog.Errorf("nri mode runtime hook server start failed: %v", err)
} else {
klog.V(4).Infof("nri mode runtime hook server has started")
}
go func() {
if err := r.nriServer.Start(); err != nil {
// if NRI is not enabled or container runtime not support NRI, we just skip NRI server start
klog.Warningf("nri mode runtime hook server start failed: %v", err)
} else {
klog.V(4).Infof("nri mode runtime hook server has started")
}
}()
}
if err := r.reconciler.Run(stopCh); err != nil {
return err
Expand Down Expand Up @@ -101,13 +104,22 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook,
Executor: e,
}

backOff := wait.Backoff{
Duration: cfg.RuntimeHooksNRIReconnectInitInterval,
Factor: cfg.RuntimeHooksNRIReconnectMul,
Jitter: 0.1,
Steps: cfg.RuntimeHooksNRIReconnectLimit,
Cap: cfg.RuntimeHooksNRIReconnectMaxDuration,
}
var nriServer *nri.NriServer
if cfg.RuntimeHooksNRI {
nriServerOptions := nri.Options{
NriSocketPath: cfg.RuntimeHooksNRISocketPath,
NriConnectTimeout: cfg.RuntimeHooksNRIConnectTimeout,
PluginFailurePolicy: pluginFailurePolicy,
DisableStages: getDisableStagesMap(cfg.RuntimeHookDisableStages),
Executor: e,
BackOff: backOff,
}
nriServer, err = nri.NewNriServer(nriServerOptions)
if err != nil {
Expand Down

0 comments on commit 1e9b75b

Please sign in to comment.