Skip to content

Commit

Permalink
Close with eventFactory from begin.Server in case of RESELECT_REQUEST…
Browse files Browse the repository at this point in the history
…ED (#1480)

Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
glazychev-art authored Jul 4, 2023
1 parent 0ba94ea commit f349173
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 25 deletions.
25 changes: 15 additions & 10 deletions pkg/networkservice/chains/nsmgr/reselect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func testReselectWithNsmgrRestart(t *testing.T, nodeNum int, restartLocal, resta
// in this test we add counters to apps in chain
// to make sure that in each app Close call goes through the whole chain,
// without stopping on an error mid-chain
var counterFwd []*count.Server
var counterFwd []*count.Client
for i := 0; i < nodeNum; i++ {
counterFwd = append(counterFwd, new(count.Server))
counterFwd = append(counterFwd, new(count.Client))
}

defer cancel()
Expand All @@ -92,7 +92,7 @@ func testReselectWithNsmgrRestart(t *testing.T, nodeNum int, restartLocal, resta
node.NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Name: sandbox.UniqueName("forwarder"),
NetworkServiceNames: []string{"forwarder"},
}, sandbox.GenerateTestToken, counterFwd[i])
}, sandbox.GenerateTestToken, sandbox.WithForwarderAdditionalFunctionalityClient(counterFwd[i]))
}).
Build()

Expand Down Expand Up @@ -191,9 +191,9 @@ func testReselectWithLocalForwarderRestart(t *testing.T, nodeNum int) {
// in this test we add counters to apps in chain
// to make sure that in each app Close call goes through the whole chain,
// without stopping on an error mid-chain
var counterFwd []*count.Server
var counterFwd []*count.Client
for i := 0; i < nodeNum; i++ {
counterFwd = append(counterFwd, new(count.Server))
counterFwd = append(counterFwd, new(count.Client))
}

defer cancel()
Expand All @@ -206,7 +206,7 @@ func testReselectWithLocalForwarderRestart(t *testing.T, nodeNum int) {
node.NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Name: sandbox.UniqueName("forwarder"),
NetworkServiceNames: []string{"forwarder"},
}, sandbox.GenerateTestToken, counterFwd[i])
}, sandbox.GenerateTestToken, sandbox.WithForwarderAdditionalFunctionalityClient(counterFwd[i]))
}).
Build()

Expand All @@ -228,16 +228,21 @@ func testReselectWithLocalForwarderRestart(t *testing.T, nodeNum int) {
conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)

// Kill a local forwarder and NSE
for _, fwd := range domain.Nodes[0].Forwarders {
fwd.Restart()
fwd.Cancel()
}

nse.Cancel()

// Restart NSE and local forwarder
nseReg2 := defaultRegistryEndpoint(nsReg.Name)
nseReg2.Name += "-2"
domain.Nodes[nodeNum-1].NewEndpoint(ctx, nseReg2, sandbox.GenerateTestToken, counterNse)

for _, fwd := range domain.Nodes[0].Forwarders {
fwd.Restart()
}

// Wait for heal to finish successfully
require.Eventually(t, checkSecondRequestsReceived(counterNse.UniqueRequests), timeout, tick)
// Client should try to close connection before reselect
Expand Down Expand Up @@ -282,7 +287,7 @@ func TestReselect_Close_RegistryDied(t *testing.T) {
// in this test we add counters to apps in chain
// to make sure that in each app Close call goes through the whole chain,
// without stopping on an error mid-chain
counterFwd := new(count.Server)
counterFwd := new(count.Client)

defer cancel()
domain := sandbox.NewBuilder(ctx, t).
Expand All @@ -294,7 +299,7 @@ func TestReselect_Close_RegistryDied(t *testing.T) {
node.NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Name: sandbox.UniqueName("forwarder"),
NetworkServiceNames: []string{"forwarder"},
}, sandbox.GenerateTestToken, counterFwd)
}, sandbox.GenerateTestToken, sandbox.WithForwarderAdditionalFunctionalityClient(counterFwd))
}).
Build()

Expand Down
6 changes: 4 additions & 2 deletions pkg/networkservice/chains/nsmgr/unix_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -108,7 +110,7 @@ func Test_MultiForwarderSendfd(t *testing.T) {
},
},
},
}, sandbox.GenerateTestToken, errorServer, recvfd.NewServer())
}, sandbox.GenerateTestToken, sandbox.WithForwarderAdditionalFunctionalityServer(errorServer, recvfd.NewServer()))
node.NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Name: "forwarder-2",
NetworkServiceNames: []string{"forwarder"},
Expand All @@ -119,7 +121,7 @@ func Test_MultiForwarderSendfd(t *testing.T) {
},
},
},
}, sandbox.GenerateTestToken, errorServer, recvfd.NewServer())
}, sandbox.GenerateTestToken, sandbox.WithForwarderAdditionalFunctionalityServer(errorServer, recvfd.NewServer()))
}).
Build()

Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/common/begin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo
request.GetConnection().GetState() == networkservice.State_RESELECT_REQUESTED &&
eventFactoryServer.request != nil && eventFactoryServer.request.Connection != nil {
log.FromContext(ctx).Info("Closing connection due to RESELECT_REQUESTED state")
_, closeErr := next.Server(ctx).Close(ctx, eventFactoryServer.request.Connection)
_, closeErr := next.Server(withEventFactoryCtx).Close(withEventFactoryCtx, eventFactoryServer.request.Connection)
if closeErr != nil {
log.FromContext(ctx).Errorf("Can't close old connection: %v", closeErr)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/networkservice/utils/inject/injecterror/error_supplier.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -16,9 +18,11 @@

package injecterror

import "go.uber.org/atomic"

type errorSupplier struct {
err error
count int
count atomic.Int32
errorTimes []int
}

Expand All @@ -27,13 +31,14 @@ type errorSupplier struct {
// * [-1] - will return an error on all requests
// * [1, 4, -1] - will return an error on 0 time and on all times starting from 4
func (e *errorSupplier) supply() error {
defer func() { e.count++ }()
defer func() { e.count.Inc() }()

count := int(e.count.Load())
for _, errorTime := range e.errorTimes {
if errorTime > e.count {
if errorTime > count {
break
}
if errorTime == e.count || errorTime == -1 {
if errorTime == count || errorTime == -1 {
return e.err
}
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/registry/utils/inject/injecterror/error_supplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package injecterror

import "github.com/pkg/errors"
import (
"github.com/pkg/errors"
"go.uber.org/atomic"
)

type errorSupplier struct {
err error
count int
count atomic.Int32
errorTimes []int
}

Expand All @@ -31,13 +34,14 @@ type errorSupplier struct {
// * [-1] - will return an error on all requests
// * [1, 4, -1] - will return an error on 0 time and on all times starting from 4
func (e *errorSupplier) supply() error {
defer func() { e.count++ }()
defer func() { e.count.Inc() }()

count := int(e.count.Load())
for _, errorTime := range e.errorTimes {
if errorTime > e.count {
if errorTime > count {
break
}
if errorTime == e.count || errorTime == -1 {
if errorTime == count || errorTime == -1 {
return errors.WithStack(e.err)
}
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/tools/sandbox/node.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -104,7 +106,7 @@ func (n *Node) NewForwarder(
ctx context.Context,
nse *registryapi.NetworkServiceEndpoint,
generatorFunc token.GeneratorFunc,
additionalFunctionality ...networkservice.NetworkServiceServer,
opts ...ForwarderOption,
) *EndpointEntry {
var serveURL *url.URL
var err error
Expand All @@ -116,6 +118,11 @@ func (n *Node) NewForwarder(
require.NoError(n.t, err)
}

var serverOptions = &forwarderOptions{}
for _, opt := range opts {
opt(serverOptions)
}

nseClone := nse.Clone()
dialOptions := DialOptions(WithTokenGenerator(generatorFunc))

Expand All @@ -141,13 +148,15 @@ func (n *Node) NewForwarder(
append([]networkservice.NetworkServiceServer{
discover.NewServer(nsClient, nseClient),
roundrobin.NewServer(),
}, additionalFunctionality...),
}, serverOptions.additionalFunctionalityServer...),
connect.NewServer(
client.NewClient(
ctx,
client.WithName(entry.Name),
client.WithAdditionalFunctionality(
mechanismtranslation.NewClient(),
append([]networkservice.NetworkServiceClient{
mechanismtranslation.NewClient(),
}, serverOptions.additionalFunctionalityClient...)...,
),
client.WithDialOptions(dialOptions...),
client.WithDialTimeout(DialTimeout),
Expand Down
43 changes: 43 additions & 0 deletions pkg/tools/sandbox/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sandbox

import (
"github.com/networkservicemesh/api/pkg/api/networkservice"
)

type forwarderOptions struct {
additionalFunctionalityServer []networkservice.NetworkServiceServer
additionalFunctionalityClient []networkservice.NetworkServiceClient
}

// ForwarderOption is an option to configure a forwarder for sandbox
type ForwarderOption func(*forwarderOptions)

// WithForwarderAdditionalFunctionalityServer adds an additionalFunctionality to server chain
func WithForwarderAdditionalFunctionalityServer(a ...networkservice.NetworkServiceServer) ForwarderOption {
return func(o *forwarderOptions) {
o.additionalFunctionalityServer = a
}
}

// WithForwarderAdditionalFunctionalityClient adds an additionalFunctionality to client chain
func WithForwarderAdditionalFunctionalityClient(a ...networkservice.NetworkServiceClient) ForwarderOption {
return func(o *forwarderOptions) {
o.additionalFunctionalityClient = a
}
}

0 comments on commit f349173

Please sign in to comment.