Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: add NRI reconnect #1972

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package runtimehooks

import (
"flag"
"math"
"time"

"k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -113,6 +114,11 @@ type Config struct {
RuntimeHookHostEndpoint string
RuntimeHookDisableStages []string
RuntimeHooksNRI bool
RuntimeHooksNRIConnectTimeout time.Duration
RuntimeHooksNRIBackOffDuration time.Duration
RuntimeHooksNRIBackOffCap time.Duration
RuntimeHooksNRIBackOffFactor float64
RuntimeHooksNRIBackOffSteps int
RuntimeHooksNRISocketPath string
RuntimeHookReconcileInterval time.Duration
}
Expand All @@ -127,6 +133,11 @@ func NewDefaultConfig() *Config {
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRIConnectTimeout: 6 * time.Second,
RuntimeHooksNRIBackOffDuration: 1 * time.Second,
RuntimeHooksNRIBackOffCap: 1<<62 - 1,
RuntimeHooksNRIBackOffSteps: math.MaxInt32,
RuntimeHooksNRIBackOffFactor: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
}
Expand All @@ -139,6 +150,11 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.StringVar(&c.RuntimeHooksPluginFailurePolicy, "runtime-hooks-plugin-failure-policy", c.RuntimeHooksPluginFailurePolicy, "stop running other hooks once someone failed")
fs.StringVar(&c.RuntimeHookConfigFilePath, "runtime-hooks-config-path", c.RuntimeHookConfigFilePath, "config file path for runtime hooks")
fs.StringVar(&c.RuntimeHookHostEndpoint, "runtime-hooks-host-endpoint", c.RuntimeHookHostEndpoint, "host endpoint of runtime proxy")
fs.DurationVar(&c.RuntimeHooksNRIConnectTimeout, "runtime-hooks-nri-connect-timeout", c.RuntimeHooksNRIConnectTimeout, "nri server connect time out, it should be a little more than default plugin registration timeout(5 seconds) which is defined in containerd config")
fs.DurationVar(&c.RuntimeHooksNRIBackOffDuration, "runtime-hooks-nri-backoff-duration", c.RuntimeHooksNRIBackOffDuration, "nri server backoff duration")
fs.DurationVar(&c.RuntimeHooksNRIBackOffCap, "runtime-hooks-nri-backoff-cap", c.RuntimeHooksNRIBackOffCap, "nri server backoff cap")
fs.IntVar(&c.RuntimeHooksNRIBackOffSteps, "runtime-hooks-nri-backoff-steps", c.RuntimeHooksNRIBackOffSteps, "nri server backoff steps")
fs.Float64Var(&c.RuntimeHooksNRIBackOffFactor, "runtime-hooks-nri-backoff-factor", c.RuntimeHooksNRIBackOffFactor, "nri server reconnect backoff factor")
fs.Var(cliflag.NewStringSlice(&c.RuntimeHookDisableStages), "runtime-hooks-disable-stages", "disable stages for runtime hooks")
fs.BoolVar(&c.RuntimeHooksNRI, "enable-nri-runtime-hook", c.RuntimeHooksNRI, "enable/disable runtime hooks nri mode")
fs.DurationVar(&c.RuntimeHookReconcileInterval, "runtime-hooks-reconcile-interval", c.RuntimeHookReconcileInterval, "reconcile interval for each plugins")
Expand Down
6 changes: 6 additions & 0 deletions pkg/koordlet/runtimehooks/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package runtimehooks

import (
"flag"
"math"
"testing"
"time"

Expand All @@ -36,6 +37,11 @@ func Test_NewDefaultConfig(t *testing.T) {
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRIConnectTimeout: 6 * time.Second,
RuntimeHooksNRIBackOffDuration: 1 * time.Second,
RuntimeHooksNRIBackOffCap: 1<<62 - 1,
RuntimeHooksNRIBackOffSteps: math.MaxInt32,
RuntimeHooksNRIBackOffFactor: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
}
Expand Down
73 changes: 60 additions & 13 deletions pkg/koordlet/runtimehooks/nri/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"fmt"
"path/filepath"
"strings"
"time"

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

Expand All @@ -39,12 +41,14 @@ type nriConfig struct {
}

type Options struct {
NriSocketPath string
NriSocketPath string
NriConnectTimeout time.Duration
// support stop running other hooks once someone failed
PluginFailurePolicy rmconfig.FailurePolicyType
// todo: add support for disable stages
DisableStages map[string]struct{}
Executor resourceexecutor.ResourceUpdateExecutor
BackOff wait.Backoff
}

func (o Options) Validate() error {
Expand All @@ -71,11 +75,12 @@ const (
)

var (
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
opts []stub.Option
)

func NewNriServer(opt Options) (*NriServer, error) {
Expand All @@ -84,7 +89,6 @@ func NewNriServer(opt Options) (*NriServer, error) {
return nil, fmt.Errorf("failed to validate nri server, err: %w", err)
}

var opts []stub.Option
opts = append(opts, stub.WithPluginName(pluginName))
opts = append(opts, stub.WithPluginIdx(pluginIdx))
opts = append(opts, stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath)))
Expand All @@ -104,19 +108,34 @@ func NewNriServer(opt Options) (*NriServer, error) {
}

func (p *NriServer) Start() error {
go func() {
kangclzjc marked this conversation as resolved.
Show resolved Hide resolved
err := p.options.Validate()
if err != nil {
return err
}
success := time.After(p.options.NriConnectTimeout)
kangclzjc marked this conversation as resolved.
Show resolved Hide resolved
errorChan := make(chan error)

go func(chan error) {
if p.stub != nil {
err := p.stub.Run(context.Background())
if err != nil {
klog.Errorf("nri server exited with error: %v", err)
errorChan <- err
} else {
klog.V(4).Info("nri server started")
}
} else {
klog.V(4).Info("nri stub is nil")
err := fmt.Errorf("nri stub is nil")
errorChan <- err
}
}()
return nil
}(errorChan)

select {
case <-success:
return nil
case <-errorChan:
return fmt.Errorf("nri start fail")
}
}

func (p *NriServer) Configure(config, runtime, version string) (stub.EventMask, error) {
Expand Down Expand Up @@ -209,6 +228,34 @@ func (p *NriServer) UpdateContainer(pod *api.PodSandbox, container *api.Containe
}

func (p *NriServer) onClose() {
p.stub.Stop()
klog.V(6).Infof("NRI server closes")
//TODO: consider the pod status during restart
retryFunc := func() (bool, error) {
stub, err := stub.New(p, append(opts, stub.WithOnClose(p.onClose))...)
if err != nil {
klog.Errorf("failed to create plugin stub: %v", err)
return false, nil
}

p.stub = stub
err = p.Start()
if err != nil {
completeNriSocketPath := filepath.Join(system.Conf.VarRunRootDir, p.options.NriSocketPath)
targetErr := fmt.Errorf("nri socket path %q does not exist", completeNriSocketPath)
if err.Error() == targetErr.Error() {
return false, err
}
//TODO: check the error type, if nri server disable nri, we should also break backoff
klog.Warningf("nri reconnect failed")
return false, nil
} else {
klog.V(4).Info("nri server restart success")
return true, nil
}
}

// TODO: high version wait not support BackoffUntil with BackOffManger as parameters, when updated to v0.27.0 version wait, we can refine ExponentialBackoff.
err := wait.ExponentialBackoff(p.options.BackOff, retryFunc)
if err != nil {
klog.Errorf("nri server restart failed after several times retry")
}
}
5 changes: 3 additions & 2 deletions pkg/koordlet/runtimehooks/nri/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nri
import (
"reflect"
"testing"
"time"

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
Expand Down Expand Up @@ -58,19 +59,19 @@ func TestNriServer_Start(t *testing.T) {
stub: nil,
mask: api.EventMask(1),
options: Options{
NriConnectTimeout: time.Second,
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Executor: nil,
},
},
wantErr: false,
wantErr: true,
},
{
fields: fields{
stub: nil,
},
},
{},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
24 changes: 18 additions & 6 deletions pkg/koordlet/runtimehooks/runtimehooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package runtimehooks
import (
"fmt"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/features"
Expand Down Expand Up @@ -57,12 +58,14 @@ func (r *runtimeHook) Run(stopCh <-chan struct{}) error {
return err
}
if r.nriServer != nil {
if err := r.nriServer.Start(); err != nil {
// if NRI is not enabled or container runtime not support NRI, we just skip NRI server start
klog.Errorf("nri mode runtime hook server start failed: %v", err)
} else {
klog.V(4).Infof("nri mode runtime hook server has started")
}
go func() {
if err := r.nriServer.Start(); err != nil {
// if NRI is not enabled or container runtime not support NRI, we just skip NRI server start
klog.Warningf("nri mode runtime hook server start failed: %v", err)
} else {
klog.V(4).Infof("nri mode runtime hook server has started")
}
}()
}
if err := r.reconciler.Run(stopCh); err != nil {
return err
Expand Down Expand Up @@ -101,13 +104,22 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook,
Executor: e,
}

backOff := wait.Backoff{
Duration: cfg.RuntimeHooksNRIBackOffDuration,
Factor: cfg.RuntimeHooksNRIBackOffFactor,
Jitter: 0.1,
Steps: cfg.RuntimeHooksNRIBackOffSteps,
Cap: cfg.RuntimeHooksNRIBackOffCap,
}
var nriServer *nri.NriServer
if cfg.RuntimeHooksNRI {
nriServerOptions := nri.Options{
NriSocketPath: cfg.RuntimeHooksNRISocketPath,
NriConnectTimeout: cfg.RuntimeHooksNRIConnectTimeout,
PluginFailurePolicy: pluginFailurePolicy,
DisableStages: getDisableStagesMap(cfg.RuntimeHookDisableStages),
Executor: e,
BackOff: backOff,
}
nriServer, err = nri.NewNriServer(nriServerOptions)
if err != nil {
Expand Down