From a83f9b045202eacc4cd7ae20d4854184e447d5a2 Mon Sep 17 00:00:00 2001 From: Denis Tingaikin Date: Mon, 3 Jul 2023 18:26:34 +0300 Subject: [PATCH 1/2] fix: nsmgr should keep fowarder while conn is fine Signed-off-by: Denis Tingaikin --- .../chains/nsmgr/select_forwarder_test.go | 110 ++++++++++++++++-- .../common/discoverforwarder/metadata.go | 36 ++++++ .../common/discoverforwarder/server.go | 43 ++++--- pkg/networkservice/common/heal/eventloop.go | 1 - 4 files changed, 162 insertions(+), 28 deletions(-) create mode 100644 pkg/networkservice/common/discoverforwarder/metadata.go diff --git a/pkg/networkservice/chains/nsmgr/select_forwarder_test.go b/pkg/networkservice/chains/nsmgr/select_forwarder_test.go index 68c8a0cf6..13cbdd454 100644 --- a/pkg/networkservice/chains/nsmgr/select_forwarder_test.go +++ b/pkg/networkservice/chains/nsmgr/select_forwarder_test.go @@ -19,16 +19,19 @@ package nsmgr_test import ( "context" "fmt" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" "go.uber.org/goleak" + "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/api/pkg/api/registry" nsclient "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" "github.com/networkservicemesh/sdk/pkg/networkservice/chains/nsmgr" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror" "github.com/networkservicemesh/sdk/pkg/tools/sandbox" @@ -141,7 +144,7 @@ func Test_DiscoverForwarder_ChangeForwarderOnClose(t *testing.T) { require.Equal(t, skipCount+1, counter.UniqueRequests()) require.Equal(t, skipCount+1, counter.Requests()) - selectedFwd := conn.GetPath().GetPathSegments()[2].Name + selectedForwarder := conn.GetPath().GetPathSegments()[2].Name requestsCount := counter.Requests() for i := 0; i < reselectCount; i++ { @@ -149,18 +152,18 @@ func Test_DiscoverForwarder_ChangeForwarderOnClose(t *testing.T) { require.NoError(t, err) // check that we select a different forwarder - selectedFwd = conn.GetPath().GetPathSegments()[2].Name + selectedForwarder = conn.GetPath().GetPathSegments()[2].Name request.Connection = conn conn, err = nsc.Request(ctx, request.Clone()) require.NoError(t, err) require.Equal(t, skipCount+1, counter.UniqueRequests()) require.Equal(t, requestsCount+3, counter.Requests()) requestsCount = counter.Requests() - if selectedFwd != conn.GetPath().GetPathSegments()[2].Name { + if selectedForwarder != conn.GetPath().GetPathSegments()[2].Name { break } } - require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name) + require.NotEqual(t, selectedForwarder, conn.GetPath().GetPathSegments()[2].Name) } func Test_DiscoverForwarder_ChangeForwarderOnDeath_LostHeal(t *testing.T) { @@ -210,9 +213,9 @@ func Test_DiscoverForwarder_ChangeForwarderOnDeath_LostHeal(t *testing.T) { require.Equal(t, 1, counter.UniqueRequests()) require.Equal(t, 1, counter.Requests()) - selectedFwd := conn.GetPath().GetPathSegments()[2].Name + selectedForwarder := conn.GetPath().GetPathSegments()[2].Name - domain.Nodes[0].Forwarders[selectedFwd].Cancel() + domain.Nodes[0].Forwarders[selectedForwarder].Cancel() require.Eventually(t, checkSecondRequestsReceived(counter.Requests), timeout, tick) require.Equal(t, 1, counter.UniqueRequests()) @@ -226,7 +229,7 @@ func Test_DiscoverForwarder_ChangeForwarderOnDeath_LostHeal(t *testing.T) { require.Equal(t, 1, counter.UniqueRequests()) require.Equal(t, 3, counter.Requests()) require.Equal(t, 1, counter.Closes()) - require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name) + require.NotEqual(t, selectedForwarder, conn.GetPath().GetPathSegments()[2].Name) } func Test_DiscoverForwarder_ChangeRemoteForwarderOnDeath(t *testing.T) { @@ -281,11 +284,11 @@ func Test_DiscoverForwarder_ChangeRemoteForwarderOnDeath(t *testing.T) { require.Equal(t, 1, counter.UniqueRequests()) require.Equal(t, 1, counter.Requests()) - selectedFwd := conn.GetPath().GetPathSegments()[4].Name + selectedForwarder := conn.GetPath().GetPathSegments()[4].Name domain.Registry.Restart() - domain.Nodes[1].Forwarders[selectedFwd].Cancel() + domain.Nodes[1].Forwarders[selectedForwarder].Cancel() require.Eventually(t, checkSecondRequestsReceived(counter.Requests), timeout, tick) require.Equal(t, 1, counter.UniqueRequests()) @@ -299,5 +302,92 @@ func Test_DiscoverForwarder_ChangeRemoteForwarderOnDeath(t *testing.T) { require.Equal(t, 1, counter.UniqueRequests()) require.Equal(t, 3, counter.Requests()) require.Equal(t, 1, counter.Closes()) - require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[4].Name) + require.NotEqual(t, selectedForwarder, conn.GetPath().GetPathSegments()[4].Name) +} + +func Test_DiscoverForwarder_Should_KeepSelectedForwarderWhileConnectionIsFine(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + + defer cancel() + domain := sandbox.NewBuilder(ctx, t). + SetNodesCount(1). + SetNSMgrProxySupplier(nil). + SetRegistryProxySupplier(nil). + SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) { + node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer) + }). + Build() + + const fwdCount = 10 + for i := 0; i < fwdCount; i++ { + domain.Nodes[0].NewForwarder(ctx, ®istry.NetworkServiceEndpoint{ + Name: sandbox.UniqueName("forwarder-" + fmt.Sprint(i)), + NetworkServiceNames: []string{"forwarder"}, + }, sandbox.GenerateTestToken) + } + + nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) + + nsReg := defaultRegistryService(t.Name()) + nsReg, err := nsRegistryClient.Register(ctx, nsReg) + require.NoError(t, err) + + nseReg := defaultRegistryEndpoint(nsReg.Name) + + counter := new(count.Server) + domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) + + request := defaultRequest(nsReg.Name) + + var livenessValue atomic.Value + livenessValue.Store(true) + + var selectedForwarder string + + var livenessChecker = func(deadlineCtx context.Context, conn *networkservice.Connection) bool { + if v := livenessValue.Load().(bool); !v { + return conn.GetPath().GetPathSegments()[2].Name != selectedForwarder + } + return true + } + + nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, + nsclient.WithHealClient(heal.NewClient(ctx, + heal.WithLivenessCheck(livenessChecker)))) + + conn, err := nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 1, counter.Requests()) + + selectedForwarder = conn.GetPath().GetPathSegments()[2].Name + + domain.Registry.Restart() + + domain.Nodes[0].NSMgr.Restart() + + require.Eventually(t, checkSecondRequestsReceived(counter.Requests), timeout, tick) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 2, counter.Requests()) + require.Equal(t, 0, counter.Closes()) + + request.Connection = conn + conn, err = nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 0, counter.Closes()) + require.Equal(t, selectedForwarder, conn.GetPath().GetPathSegments()[2].Name) + + // datapath is down + livenessValue.Store(false) + domain.Nodes[0].Forwarders[selectedForwarder].Cancel() + + request.Connection = conn + conn, err = nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.Equal(t, 1, counter.UniqueRequests()) + require.Greater(t, counter.Closes(), 0) + require.NotEqual(t, selectedForwarder, conn.GetPath().GetPathSegments()[2].Name) + } diff --git a/pkg/networkservice/common/discoverforwarder/metadata.go b/pkg/networkservice/common/discoverforwarder/metadata.go new file mode 100644 index 000000000..e0cf8e43d --- /dev/null +++ b/pkg/networkservice/common/discoverforwarder/metadata.go @@ -0,0 +1,36 @@ +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discoverforwarder + +import ( + "context" + + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" +) + +type forwarderNameMetadataKey struct{} + +func loadForwarderName(ctx context.Context) string { + if v, ok := metadata.Map(ctx, false).Load(forwarderNameMetadataKey{}); ok { + return v.(string) + } + return "" +} + +func storeForwarderName(ctx context.Context, name string) { + metadata.Map(ctx, false).Store(forwarderNameMetadataKey{}, name) +} diff --git a/pkg/networkservice/common/discoverforwarder/server.go b/pkg/networkservice/common/discoverforwarder/server.go index cda87759a..a9d84ba49 100644 --- a/pkg/networkservice/common/discoverforwarder/server.go +++ b/pkg/networkservice/common/discoverforwarder/server.go @@ -65,16 +65,8 @@ func NewServer(nsClient registry.NetworkServiceRegistryClient, nseClient registr return result } -func (d *discoverForwarderServer) forwarderName(conn *networkservice.Connection) string { - var segments = conn.GetPath().GetPathSegments() - if pathIndex := int(conn.GetPath().Index); len(conn.GetPath().PathSegments) > pathIndex+1 { - return segments[pathIndex+1].Name - } - return "" -} - func (d *discoverForwarderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - var forwarderName = d.forwarderName(request.GetConnection()) + var forwarderName = loadForwarderName(ctx) var logger = log.FromContext(ctx).WithField("discoverForwarderServer", "request") ns, err := d.discoverNetworkService(ctx, request.GetConnection().GetNetworkService(), request.GetConnection().GetPayload()) @@ -87,7 +79,8 @@ func (d *discoverForwarderServer) Request(ctx context.Context, request *networks NetworkServiceNames: []string{ d.forwarderServiceName, }, - Url: d.nsmgrURL, + Name: forwarderName, + Url: d.nsmgrURL, }, }) if err != nil { @@ -97,15 +90,20 @@ func (d *discoverForwarderServer) Request(ctx context.Context, request *networks nses := d.matchForwarders(request.Connection.GetLabels(), ns, registry.ReadNetworkServiceEndpointList(stream)) if len(nses) == 0 { + if forwarderName != "" { + return nil, errors.Errorf("forwarder %v is not available", forwarderName) + } return nil, errors.New("no candidates found") } - segments := request.Connection.GetPath().GetPathSegments() - if pathIndex := int(request.Connection.GetPath().Index); len(segments) > pathIndex+1 { - for i, candidate := range nses { - if candidate.Name == forwarderName { - nses[0], nses[i] = nses[i], nses[0] - break + if forwarderName == "" { + segments := request.Connection.GetPath().GetPathSegments() + if pathIndex := int(request.Connection.GetPath().Index); len(segments) > pathIndex+1 { + for i, candidate := range nses { + if candidate.Name == segments[pathIndex+1].GetName() { + nses[0], nses[i] = nses[i], nses[0] + break + } } } } @@ -123,6 +121,9 @@ func (d *discoverForwarderServer) Request(ctx context.Context, request *networks resp, err := next.Server(ctx).Request(clienturlctx.WithClientURL(ctx, u), request.Clone()) if err == nil { + if forwarderName == "" { + storeForwarderName(ctx, candidate.GetName()) + } return resp, nil } logger.Errorf("forwarder=%v url=%v returned error=%v", candidate.Name, candidate.Url, err.Error()) @@ -136,7 +137,15 @@ func (d *discoverForwarderServer) Close(ctx context.Context, conn *networkservic // Unlike Request, Close method should always call next element in chain // to make sure we clear resources in the current app. - var forwarderName = d.forwarderName(conn) + var forwarderName = loadForwarderName(ctx) + + if forwarderName == "" { + segments := conn.GetPath().GetPathSegments() + if pathIndex := int(conn.GetPath().Index); len(segments) > pathIndex+1 { + forwarderName = segments[pathIndex+1].GetName() + } + } + var logger = log.FromContext(ctx).WithField("discoverForwarderServer", "request") if forwarderName == "" { logger.Error("connection doesn't have forwarder") diff --git a/pkg/networkservice/common/heal/eventloop.go b/pkg/networkservice/common/heal/eventloop.go index ba5b6c9c0..dbe858313 100644 --- a/pkg/networkservice/common/heal/eventloop.go +++ b/pkg/networkservice/common/heal/eventloop.go @@ -155,7 +155,6 @@ func (cev *eventLoop) waitForEvents() (canceled, reselect bool) { return true, false } cev.logger.Warnf("Data plane is down") - reselect = true cev.healingStartedCh <- true return false, true case <-cev.chainCtx.Done(): From 042a3ee5f07a74ac97574e8b28fcd77006a539f5 Mon Sep 17 00:00:00 2001 From: Denis Tingaikin Date: Mon, 3 Jul 2023 18:51:12 +0300 Subject: [PATCH 2/2] fix critical linter issue Signed-off-by: Denis Tingaikin --- pkg/networkservice/chains/nsmgr/select_forwarder_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/networkservice/chains/nsmgr/select_forwarder_test.go b/pkg/networkservice/chains/nsmgr/select_forwarder_test.go index 13cbdd454..6a966e38a 100644 --- a/pkg/networkservice/chains/nsmgr/select_forwarder_test.go +++ b/pkg/networkservice/chains/nsmgr/select_forwarder_test.go @@ -389,5 +389,4 @@ func Test_DiscoverForwarder_Should_KeepSelectedForwarderWhileConnectionIsFine(t require.Equal(t, 1, counter.UniqueRequests()) require.Greater(t, counter.Closes(), 0) require.NotEqual(t, selectedForwarder, conn.GetPath().GetPathSegments()[2].Name) - }