Skip to content

Commit

Permalink
add nri reconnect
Browse files Browse the repository at this point in the history
Signed-off-by: Zhang Kang <kang.zhang@intel.com>
  • Loading branch information
kangclzjc committed Apr 12, 2024
1 parent af5f951 commit fa835ea
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 51 deletions.
52 changes: 32 additions & 20 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,30 +105,38 @@ var (
)

type Config struct {
RuntimeHooksNetwork string
RuntimeHooksAddr string
RuntimeHooksFailurePolicy string
RuntimeHooksPluginFailurePolicy string
RuntimeHookConfigFilePath string
RuntimeHookHostEndpoint string
RuntimeHookDisableStages []string
RuntimeHooksNRI bool
RuntimeHooksNRISocketPath string
RuntimeHookReconcileInterval time.Duration
RuntimeHooksNetwork string
RuntimeHooksAddr string
RuntimeHooksFailurePolicy string
RuntimeHooksPluginFailurePolicy string
RuntimeHookConfigFilePath string
RuntimeHookHostEndpoint string
RuntimeHookDisableStages []string
RuntimeHooksNRI bool
RuntimeHooksNRIReconnectInitInterval time.Duration
RuntimeHooksNRIReconnectMaxInterval time.Duration
RuntimeHooksNRIReconnectMul float64
RuntimeHooksNRIReconnectLimit int
RuntimeHooksNRISocketPath string
RuntimeHookReconcileInterval time.Duration
}

func NewDefaultConfig() *Config {
return &Config{
RuntimeHooksNetwork: "unix",
RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock",
RuntimeHooksFailurePolicy: "Ignore",
RuntimeHooksPluginFailurePolicy: "Ignore",
RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir,
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
RuntimeHooksNetwork: "unix",
RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock",
RuntimeHooksFailurePolicy: "Ignore",
RuntimeHooksPluginFailurePolicy: "Ignore",
RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir,
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRIReconnectInitInterval: 1 * time.Second,
RuntimeHooksNRIReconnectMaxInterval: 1 * time.Minute,
RuntimeHooksNRIReconnectLimit: 100,
RuntimeHooksNRIReconnectMul: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
}
}

Expand All @@ -139,6 +147,10 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.StringVar(&c.RuntimeHooksPluginFailurePolicy, "runtime-hooks-plugin-failure-policy", c.RuntimeHooksPluginFailurePolicy, "stop running other hooks once someone failed")
fs.StringVar(&c.RuntimeHookConfigFilePath, "runtime-hooks-config-path", c.RuntimeHookConfigFilePath, "config file path for runtime hooks")
fs.StringVar(&c.RuntimeHookHostEndpoint, "runtime-hooks-host-endpoint", c.RuntimeHookHostEndpoint, "host endpoint of runtime proxy")
fs.DurationVar(&c.RuntimeHooksNRIReconnectInitInterval, "runtime-hooks-nri-reconnect-init-interval", c.RuntimeHooksNRIReconnectInitInterval, "nri server reconnection init interval")
fs.DurationVar(&c.RuntimeHooksNRIReconnectMaxInterval, "runtime-hooks-nri-reconnect-max-interval", c.RuntimeHooksNRIReconnectMaxInterval, "nri server reconnection max interval")
fs.IntVar(&c.RuntimeHooksNRIReconnectLimit, "runtime-hooks-nri-reconnect-max-times", c.RuntimeHooksNRIReconnectLimit, "nri server reconnect max limit")
fs.Float64Var(&c.RuntimeHooksNRIReconnectMul, "runtime-hooks-nri-reconnect-mul", c.RuntimeHooksNRIReconnectMul, "nri server reconnect backoff mul")
fs.Var(cliflag.NewStringSlice(&c.RuntimeHookDisableStages), "runtime-hooks-disable-stages", "disable stages for runtime hooks")
fs.BoolVar(&c.RuntimeHooksNRI, "enable-nri-runtime-hook", c.RuntimeHooksNRI, "enable/disable runtime hooks nri mode")
fs.DurationVar(&c.RuntimeHookReconcileInterval, "runtime-hooks-reconcile-interval", c.RuntimeHookReconcileInterval, "reconcile interval for each plugins")
Expand Down
24 changes: 14 additions & 10 deletions pkg/koordlet/runtimehooks/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@ import (

func Test_NewDefaultConfig(t *testing.T) {
expectConfig := &Config{
RuntimeHooksNetwork: "unix",
RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock",
RuntimeHooksFailurePolicy: "Ignore",
RuntimeHooksPluginFailurePolicy: "Ignore",
RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir,
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
RuntimeHooksNetwork: "unix",
RuntimeHooksAddr: "/host-var-run-koordlet/koordlet.sock",
RuntimeHooksFailurePolicy: "Ignore",
RuntimeHooksPluginFailurePolicy: "Ignore",
RuntimeHookConfigFilePath: system.Conf.RuntimeHooksConfigDir,
RuntimeHookHostEndpoint: "/var/run/koordlet/koordlet.sock",
RuntimeHookDisableStages: []string{},
RuntimeHooksNRI: true,
RuntimeHooksNRIReconnectInitInterval: 10 * time.Second,
RuntimeHooksNRIReconnectMaxInterval: 5 * time.Minute,
RuntimeHooksNRIReconnectLimit: 100,
RuntimeHooksNRIReconnectMul: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHookReconcileInterval: 10 * time.Second,
}
defaultConfig := NewDefaultConfig()
assert.Equal(t, expectConfig, defaultConfig)
Expand Down
41 changes: 41 additions & 0 deletions pkg/koordlet/runtimehooks/nri/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nri

import "time"

type Backoff struct {
initInterval time.Duration
maxInterval time.Duration
multiplier float64
}

func NewBackOff(initInterval, maxInterval time.Duration, mul float64) Backoff {
return Backoff{
initInterval: initInterval,
maxInterval: maxInterval,
multiplier: mul,
}
}

func (b Backoff) NextInterval(attempt int) time.Duration {
backoff := float64(b.initInterval) * (float64(attempt) * b.multiplier)
if backoff > float64(b.maxInterval) {
backoff = float64(b.maxInterval)
}
return time.Duration(backoff)
}
95 changes: 95 additions & 0 deletions pkg/koordlet/runtimehooks/nri/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nri

import (
"reflect"
"testing"
"time"
)

func TestBackoff_NextInterval(t *testing.T) {
type fields struct {
initInterval time.Duration
maxInterval time.Duration
multiplier float64
}
type args struct {
attempt int
}
tests := []struct {
name string
fields fields
args args
want time.Duration
}{
{
name: "Test NextInterval backoff smaller than maxInterval",
fields: fields{
initInterval: 10,
maxInterval: 100,
multiplier: 2,
},
args: args{attempt: 2},
want: time.Duration(40),
},
{
name: "Test NextInterval backoff larger than maxInterval",
fields: fields{
initInterval: 10,
maxInterval: 100,
multiplier: 2,
},
args: args{attempt: 10},
want: time.Duration(100),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := Backoff{
initInterval: tt.fields.initInterval,
maxInterval: tt.fields.maxInterval,
multiplier: tt.fields.multiplier,
}
if got := b.NextInterval(tt.args.attempt); got != tt.want {
t.Errorf("NextInterval() = %v, want %v", got, tt.want)
}
})
}
}

func TestNewBackOff(t *testing.T) {
type args struct {
initInterval time.Duration
maxInterval time.Duration
mul float64
}
tests := []struct {
name string
args args
want Backoff
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewBackOff(tt.args.initInterval, tt.args.maxInterval, tt.args.mul); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewBackOff() = %v, want %v", got, tt.want)
}
})
}
}
63 changes: 48 additions & 15 deletions pkg/koordlet/runtimehooks/nri/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"fmt"
"path/filepath"
"strings"
"time"

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"

Check failure on line 30 in pkg/koordlet/runtimehooks/nri/server.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `goimports`-ed with -local github.com/koordinator-sh/koordinator (goimports)
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
Expand All @@ -38,13 +39,19 @@ type nriConfig struct {
Events []string `json:"events"`
}

type ReconnectionOption struct {
LimitTimes int
Backoff Backoff
}

type Options struct {
NriSocketPath string
// support stop running other hooks once someone failed
PluginFailurePolicy rmconfig.FailurePolicyType
// todo: add support for disable stages
DisableStages map[string]struct{}
Executor resourceexecutor.ResourceUpdateExecutor
DisableStages map[string]struct{}
Executor resourceexecutor.ResourceUpdateExecutor
ReconnectionOption ReconnectionOption
}

func (o Options) Validate() error {
Expand All @@ -71,11 +78,12 @@ const (
)

var (
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
opts []stub.Option
)

func NewNriServer(opt Options) (*NriServer, error) {
Expand All @@ -84,7 +92,6 @@ func NewNriServer(opt Options) (*NriServer, error) {
return nil, fmt.Errorf("failed to validate nri server, err: %w", err)
}

var opts []stub.Option
opts = append(opts, stub.WithPluginName(pluginName))
opts = append(opts, stub.WithPluginIdx(pluginIdx))
opts = append(opts, stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath)))
Expand All @@ -104,19 +111,30 @@ func NewNriServer(opt Options) (*NriServer, error) {
}

func (p *NriServer) Start() error {
go func() {
success := time.After(2 * time.Second)
errorChan := make(chan error)

go func(chan error) {
if p.stub != nil {
err := p.stub.Run(context.Background())
if err != nil {
klog.Errorf("nri server exited with error: %v", err)
} else {
klog.V(4).Info("nri server started")
}
errorChan <- err
} else {
klog.V(4).Info("nri stub is nil")
}
}()
return nil
}(errorChan)

select {
case <-success:
klog.Infof("nri start successfully.")
return nil
case <-errorChan:
return fmt.Errorf("nri start fail")
}
}

func (p *NriServer) Configure(config, runtime, version string) (stub.EventMask, error) {
Expand Down Expand Up @@ -209,6 +227,21 @@ func (p *NriServer) UpdateContainer(pod *api.PodSandbox, container *api.Containe
}

func (p *NriServer) onClose() {
p.stub.Stop()
klog.V(6).Infof("NRI server closes")
for attempt := 1; attempt <= p.options.ReconnectionOption.LimitTimes; attempt++ {
stub, err := stub.New(p, append(opts, stub.WithOnClose(p.onClose))...)
if err != nil {
klog.Errorf("failed to create plugin stub: %v", err)
}

p.stub = stub
err = p.Start()
if err != nil {
klog.Infof("NRI reconnect after %v seconds", p.options.ReconnectionOption.Backoff.NextInterval(attempt).Seconds())
time.Sleep(p.options.ReconnectionOption.Backoff.NextInterval(attempt))
} else {
klog.V(5).Infof("nri server restart success")
return
}
}
klog.Warningf("nri server restart failed after %v times retry", p.options.ReconnectionOption.LimitTimes)
}
Loading

0 comments on commit fa835ea

Please sign in to comment.