diff --git a/pkg/networkservice/chains/client/client.go b/pkg/networkservice/chains/client/client.go index c9f344b70..8c1f434e8 100644 --- a/pkg/networkservice/chains/client/client.go +++ b/pkg/networkservice/chains/client/client.go @@ -1,5 +1,7 @@ // Copyright (c) 2020-2021 Cisco Systems, Inc. // +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -20,9 +22,9 @@ package client import ( "context" + "github.com/google/uuid" "google.golang.org/grpc" - "github.com/google/uuid" "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" @@ -37,7 +39,6 @@ import ( type clientOptions struct { name string - onHeal *networkservice.NetworkServiceClient additionalFunctionality []networkservice.NetworkServiceClient authorizeClient networkservice.NetworkServiceClient } @@ -45,13 +46,6 @@ type clientOptions struct { // Option modifies default client chain values. type Option func(c *clientOptions) -// WithHeal sets heal for the client. -func WithHeal(onHeal *networkservice.NetworkServiceClient) Option { - return Option(func(c *clientOptions) { - c.onHeal = onHeal - }) -} - // WithName sets name for the client. func WithName(name string) Option { return Option(func(c *clientOptions) { @@ -84,7 +78,6 @@ func NewClient(ctx context.Context, cc grpc.ClientConnInterface, clientOpts ...O var opts = &clientOptions{ name: "client-" + uuid.New().String(), authorizeClient: null.NewClient(), - onHeal: &rv, } for _, opt := range clientOpts { opt(opts) @@ -94,7 +87,7 @@ func NewClient(ctx context.Context, cc grpc.ClientConnInterface, clientOpts ...O append([]networkservice.NetworkServiceClient{ updatepath.NewClient(opts.name), serialize.NewClient(), - heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc), opts.onHeal), + heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)), refresh.NewClient(ctx), metadata.NewClient(), }, opts.additionalFunctionality...), diff --git a/pkg/networkservice/chains/nsmgr/server.go b/pkg/networkservice/chains/nsmgr/server.go index 9bd4c29b0..fb69ed2ae 100644 --- a/pkg/networkservice/chains/nsmgr/server.go +++ b/pkg/networkservice/chains/nsmgr/server.go @@ -34,6 +34,7 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/discover" "github.com/networkservicemesh/sdk/pkg/networkservice/common/excludedprefixes" "github.com/networkservicemesh/sdk/pkg/networkservice/common/filtermechanisms" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" "github.com/networkservicemesh/sdk/pkg/networkservice/common/interpose" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/recvfd" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd" @@ -122,10 +123,10 @@ func NewServer(ctx context.Context, nsmRegistration *registryapi.NetworkServiceE recvfd.NewServer(), // Receive any files passed interpose.NewServer(&interposeRegistryServer), filtermechanisms.NewServer(&urlsRegistryServer), + heal.NewServer(ctx, addressof.NetworkServiceClient(adapters.NewServerToClient(rv))), connect.NewServer(ctx, client.NewClientFactory( client.WithName(nsmRegistration.Name), - client.WithHeal(addressof.NetworkServiceClient(adapters.NewServerToClient(rv))), client.WithAdditionalFunctionality( recvfd.NewClient(), sendfd.NewClient(), diff --git a/pkg/networkservice/chains/nsmgr/server_heal_test.go b/pkg/networkservice/chains/nsmgr/server_heal_test.go new file mode 100644 index 000000000..81029d8c5 --- /dev/null +++ b/pkg/networkservice/chains/nsmgr/server_heal_test.go @@ -0,0 +1,473 @@ +// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package nsmgr_test define a tests for NSMGR chain element. +package nsmgr_test + +import ( + "context" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" + kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" + "github.com/networkservicemesh/api/pkg/api/registry" + + "github.com/networkservicemesh/sdk/pkg/tools/sandbox" +) + +const ( + tick = 10 * time.Millisecond + timeout = 10 * time.Second +) + +func TestNSMGR_HealEndpoint(t *testing.T) { + testNSMGRHealEndpoint(t, false) +} + +func TestNSMGR_HealEndpointRestored(t *testing.T) { + testNSMGRHealEndpoint(t, true) +} + +func testNSMGRHealEndpoint(t *testing.T, restored bool) { + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + + defer cancel() + domain := sandbox.NewBuilder(t). + SetNodesCount(2). + SetRegistryProxySupplier(nil). + SetContext(ctx). + Build() + defer domain.Cleanup() + + expireTime := timestamppb.New(time.Now().Add(time.Second)) + nseReg := ®istry.NetworkServiceEndpoint{ + Name: "final-endpoint", + NetworkServiceNames: []string{"my-service-remote"}, + ExpirationTime: expireTime, + } + + counter := &counterServer{} + nseCtx, nseCtxCancel := context.WithTimeout(context.Background(), time.Second) + defer nseCtxCancel() + + nse, err := domain.Nodes[0].NewEndpoint(nseCtx, nseReg, sandbox.GenerateExpiringToken(time.Second), counter) + require.NoError(t, err) + + request := &networkservice.NetworkServiceRequest{ + MechanismPreferences: []*networkservice.Mechanism{ + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, + }, + Connection: &networkservice.Connection{ + Id: "1", + NetworkService: "my-service-remote", + Context: &networkservice.ConnectionContext{}, + }, + } + + nsc := domain.Nodes[1].NewClient(ctx, sandbox.GenerateTestToken) + + conn, err := nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.NotNil(t, conn) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 8, len(conn.Path.PathSegments)) + + nseCtxCancel() + + // Wait grpc unblock the port + require.Eventually(t, checkURLFree(nse.URL.Host), timeout, tick) + + nseReg2 := ®istry.NetworkServiceEndpoint{ + Name: "final-endpoint2", + NetworkServiceNames: []string{"my-service-remote"}, + } + if restored { + nseReg2.Url = nse.URL.String() + } + _, err = domain.Nodes[0].NewEndpoint(ctx, nseReg2, sandbox.GenerateTestToken, counter) + require.NoError(t, err) + + // Wait NSE expired and reconnecting to the new NSE + require.Eventually(t, checkSecondRequestsReceived(counter.UniqueRequests), timeout, tick) + require.Equal(t, 2, counter.UniqueRequests()) + + // Check refresh + request.Connection = conn + _, err = nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + + // Close. + e, err := nsc.Close(ctx, conn) + require.NoError(t, err) + require.NotNil(t, e) + require.Equal(t, 2, counter.UniqueRequests()) + require.Equal(t, 1, counter.UniqueCloses()) +} + +func TestNSMGR_HealLocalForwarder(t *testing.T) { + forwarderCtx, forwarderCtxCancel := context.WithTimeout(context.Background(), time.Second) + defer forwarderCtxCancel() + + customConfig := []*sandbox.NodeConfig{ + nil, + { + ForwarderCtx: forwarderCtx, + ForwarderGenerateTokenFunc: sandbox.GenerateExpiringToken(time.Second), + }, + } + + testNSMGRHealForwarder(t, 1, false, customConfig, forwarderCtxCancel) +} + +func TestNSMGR_HealLocalForwarderRestored(t *testing.T) { + forwarderCtx, forwarderCtxCancel := context.WithTimeout(context.Background(), time.Second) + defer forwarderCtxCancel() + + customConfig := []*sandbox.NodeConfig{ + nil, + { + ForwarderCtx: forwarderCtx, + ForwarderGenerateTokenFunc: sandbox.GenerateExpiringToken(time.Second), + }, + } + + testNSMGRHealForwarder(t, 1, true, customConfig, forwarderCtxCancel) +} + +func TestNSMGR_HealRemoteForwarder(t *testing.T) { + forwarderCtx, forwarderCtxCancel := context.WithTimeout(context.Background(), time.Second) + defer forwarderCtxCancel() + + customConfig := []*sandbox.NodeConfig{ + { + ForwarderCtx: forwarderCtx, + ForwarderGenerateTokenFunc: sandbox.GenerateExpiringToken(time.Second), + }, + } + + testNSMGRHealForwarder(t, 0, false, customConfig, forwarderCtxCancel) +} + +func TestNSMGR_HealRemoteForwarderRestored(t *testing.T) { + forwarderCtx, forwarderCtxCancel := context.WithTimeout(context.Background(), time.Second) + defer forwarderCtxCancel() + + customConfig := []*sandbox.NodeConfig{ + { + ForwarderCtx: forwarderCtx, + ForwarderGenerateTokenFunc: sandbox.GenerateExpiringToken(time.Second), + }, + } + + testNSMGRHealForwarder(t, 0, true, customConfig, forwarderCtxCancel) +} + +func testNSMGRHealForwarder(t *testing.T, nodeNum int, restored bool, customConfig []*sandbox.NodeConfig, forwarderCtxCancel context.CancelFunc) { + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*100) + defer cancel() + + builder := sandbox.NewBuilder(t) + domain := builder. + SetNodesCount(2). + SetRegistryProxySupplier(nil). + SetContext(ctx). + SetCustomConfig(customConfig). + Build() + defer domain.Cleanup() + + nseReg := ®istry.NetworkServiceEndpoint{ + Name: "final-endpoint", + NetworkServiceNames: []string{"my-service-remote"}, + } + + counter := &counterServer{} + _, err := domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) + require.NoError(t, err) + + request := &networkservice.NetworkServiceRequest{ + MechanismPreferences: []*networkservice.Mechanism{ + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, + }, + Connection: &networkservice.Connection{ + Id: "1", + NetworkService: "my-service-remote", + Context: &networkservice.ConnectionContext{}, + }, + } + + nsc := domain.Nodes[1].NewClient(ctx, sandbox.GenerateTestToken) + + conn, err := nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.NotNil(t, conn) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 8, len(conn.Path.PathSegments)) + + forwarderCtxCancel() + + // Wait grpc unblock the port + require.GreaterOrEqual(t, 1, len(domain.Nodes[nodeNum].Forwarder)) + require.Eventually(t, checkURLFree(domain.Nodes[nodeNum].Forwarder[0].URL.Host), timeout, tick) + + nseReg.Name = "cross-nse-restored" + forwarderReg := ®istry.NetworkServiceEndpoint{ + Name: "forwarder-restored", + } + if restored { + forwarderReg.Url = domain.Nodes[nodeNum].Forwarder[0].URL.String() + } + _, err = domain.Nodes[nodeNum].NewForwarder(ctx, forwarderReg, sandbox.GenerateTestToken) + require.NoError(t, err) + + // Wait Cross NSE expired and reconnecting through the new Cross NSE + if restored { + require.Eventually(t, checkSecondRequestsReceived(func() int { + return int(atomic.LoadInt32(&counter.Requests)) + }), timeout, tick) + } else { + require.Eventually(t, checkSecondRequestsReceived(counter.UniqueRequests), timeout, tick) + } + if restored { + require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests)) + } else { + require.Equal(t, 2, counter.UniqueRequests()) + } + + // Check refresh + request.Connection = conn + _, err = nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + + // Close. + closes := atomic.LoadInt32(&counter.Closes) + e, err := nsc.Close(ctx, conn) + require.NoError(t, err) + require.NotNil(t, e) + + if restored { + require.Equal(t, int32(3), atomic.LoadInt32(&counter.Requests)) + require.Equal(t, closes+1, atomic.LoadInt32(&counter.Closes)) + } else { + require.Equal(t, 2, counter.UniqueRequests()) + require.Equal(t, 2, counter.UniqueCloses()) + } +} + +func TestNSMGR_HealRemoteNSMgrRestored(t *testing.T) { + t.Skip("https://github.com/networkservicemesh/sdk/issues/770") + + nsmgrCtx, nsmgrCtxCancel := context.WithTimeout(context.Background(), time.Second) + defer nsmgrCtxCancel() + + customConfig := []*sandbox.NodeConfig{ + { + NsmgrCtx: nsmgrCtx, + NsmgrGenerateTokenFunc: sandbox.GenerateExpiringToken(time.Second), + ForwarderCtx: nsmgrCtx, + ForwarderGenerateTokenFunc: sandbox.GenerateExpiringToken(time.Second), + }, + } + + testNSMGRHealNSMgr(t, 0, customConfig, nsmgrCtxCancel) +} + +func testNSMGRHealNSMgr(t *testing.T, nodeNum int, customConfig []*sandbox.NodeConfig, nsmgrCtxCancel context.CancelFunc) { + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + builder := sandbox.NewBuilder(t) + domain := builder. + SetNodesCount(2). + SetRegistryProxySupplier(nil). + SetContext(ctx). + SetCustomConfig(customConfig). + Build() + defer domain.Cleanup() + + nseReg := ®istry.NetworkServiceEndpoint{ + Name: "final-endpoint", + NetworkServiceNames: []string{"my-service-remote"}, + } + + counter := &counterServer{} + nse, err := domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) + require.NoError(t, err) + + request := &networkservice.NetworkServiceRequest{ + MechanismPreferences: []*networkservice.Mechanism{ + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, + }, + Connection: &networkservice.Connection{ + Id: "1", + NetworkService: "my-service-remote", + Context: &networkservice.ConnectionContext{}, + }, + } + + nsc := domain.Nodes[1].NewClient(ctx, sandbox.GenerateTestToken) + + conn, err := nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.NotNil(t, conn) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 8, len(conn.Path.PathSegments)) + + nsmgrCtxCancel() + require.Equal(t, int32(1), atomic.LoadInt32(&counter.Requests)) + require.Equal(t, int32(0), atomic.LoadInt32(&counter.Closes)) + + require.Eventually(t, checkURLFree(domain.Nodes[nodeNum].NSMgr.URL.Host), timeout, tick) + + restoredNSMgrEntry, restoredNSMgrResources := builder.NewNSMgr(ctx, domain.Nodes[nodeNum], domain.Nodes[nodeNum].NSMgr.URL.Host, domain.Registry.URL, sandbox.GenerateTestToken) + domain.Nodes[nodeNum].NSMgr = restoredNSMgrEntry + domain.AddResources(restoredNSMgrResources) + + forwarderReg := ®istry.NetworkServiceEndpoint{ + Name: "forwarder-restored", + } + _, err = domain.Nodes[nodeNum].NewForwarder(ctx, forwarderReg, sandbox.GenerateTestToken) + require.NoError(t, err) + + nseReg.Url = nse.URL.String() + err = domain.Nodes[nodeNum].RegisterEndpoint(ctx, nseReg) + require.NoError(t, err) + + // Wait Cross NSE expired and reconnecting through the new Cross NSE + require.Eventually(t, checkSecondRequestsReceived(func() int { + return int(atomic.LoadInt32(&counter.Requests)) + }), timeout, tick) + + // Check refresh + request.Connection = conn + _, err = nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + + // Close. + closes := atomic.LoadInt32(&counter.Closes) + e, err := nsc.Close(ctx, conn) + require.NoError(t, err) + require.NotNil(t, e) + require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests)) + require.Equal(t, closes+1, atomic.LoadInt32(&counter.Closes)) +} + +func TestNSMGR_HealRemoteNSMgr(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + + nsmgrCtx, nsmgrCtxCancel := context.WithTimeout(context.Background(), time.Second) + defer nsmgrCtxCancel() + customConfig := []*sandbox.NodeConfig{ + { + NsmgrCtx: nsmgrCtx, + NsmgrGenerateTokenFunc: sandbox.GenerateExpiringToken(time.Second), + ForwarderCtx: nsmgrCtx, + ForwarderGenerateTokenFunc: sandbox.GenerateExpiringToken(time.Second), + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + builder := sandbox.NewBuilder(t) + domain := builder. + SetNodesCount(3). + SetRegistryProxySupplier(nil). + SetContext(ctx). + SetCustomConfig(customConfig). + Build() + defer domain.Cleanup() + + nseReg := ®istry.NetworkServiceEndpoint{ + Name: "final-endpoint", + NetworkServiceNames: []string{"my-service-remote"}, + } + + counter := &counterServer{} + _, err := domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) + require.NoError(t, err) + + request := &networkservice.NetworkServiceRequest{ + MechanismPreferences: []*networkservice.Mechanism{ + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, + }, + Connection: &networkservice.Connection{ + Id: "1", + NetworkService: "my-service-remote", + Context: &networkservice.ConnectionContext{}, + }, + } + + nsc := domain.Nodes[1].NewClient(ctx, sandbox.GenerateTestToken) + + conn, err := nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.NotNil(t, conn) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 8, len(conn.Path.PathSegments)) + + nsmgrCtxCancel() + + nseReg2 := ®istry.NetworkServiceEndpoint{ + Name: "final-endpoint-2", + NetworkServiceNames: []string{"my-service-remote"}, + } + _, err = domain.Nodes[2].NewEndpoint(ctx, nseReg2, sandbox.GenerateTestToken, counter) + require.NoError(t, err) + + // Wait Cross NSE expired and reconnecting through the new Cross NSE + require.Eventually(t, checkSecondRequestsReceived(counter.UniqueRequests), timeout, tick) + require.Equal(t, 2, counter.UniqueRequests()) + + // Check refresh + request.Connection = conn + _, err = nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + + // Close. + e, err := nsc.Close(ctx, conn) + require.NoError(t, err) + require.NotNil(t, e) + require.Equal(t, 2, counter.UniqueRequests()) + require.Equal(t, 2, counter.UniqueCloses()) +} + +func checkSecondRequestsReceived(requestsDone func() int) func() bool { + return func() bool { + return requestsDone() == 2 + } +} + +func checkURLFree(url string) func() bool { + return func() bool { + ln, err := net.Listen("tcp", url) + if err != nil { + return false + } + err = ln.Close() + return err == nil + } +} diff --git a/pkg/networkservice/chains/nsmgr/server_test.go b/pkg/networkservice/chains/nsmgr/server_test.go index 21519b5bb..661265393 100644 --- a/pkg/networkservice/chains/nsmgr/server_test.go +++ b/pkg/networkservice/chains/nsmgr/server_test.go @@ -743,15 +743,72 @@ func (c *passThroughClient) Close(ctx context.Context, conn *networkservice.Conn type counterServer struct { Requests, Closes int32 + requests map[string]int32 + closes map[string]int32 + mu sync.Mutex } func (c *counterServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + c.mu.Lock() + defer c.mu.Unlock() + atomic.AddInt32(&c.Requests, 1) + if c.requests == nil { + c.requests = make(map[string]int32) + } + c.requests[request.GetConnection().GetId()]++ + return next.Server(ctx).Request(ctx, request) } func (c *counterServer) Close(ctx context.Context, connection *networkservice.Connection) (*empty.Empty, error) { + c.mu.Lock() + defer c.mu.Unlock() + atomic.AddInt32(&c.Closes, 1) + if c.closes == nil { + c.closes = make(map[string]int32) + } + c.closes[connection.GetId()]++ + + return next.Server(ctx).Close(ctx, connection) +} + +func (c *counterServer) UniqueRequests() int { + c.mu.Lock() + defer c.mu.Unlock() + + if c.requests == nil { + return 0 + } + return len(c.requests) +} + +func (c *counterServer) UniqueCloses() int { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closes == nil { + return 0 + } + return len(c.closes) +} + +type restartingEndpoint struct { + startTime time.Time +} + +func (c *restartingEndpoint) Request(ctx context.Context, req *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + if time.Now().Before(c.startTime) { + return nil, errors.New("endpoint is restarting") + } + return next.Server(ctx).Request(ctx, req) +} + +func (c *restartingEndpoint) Close(ctx context.Context, connection *networkservice.Connection) (*empty.Empty, error) { + if time.Now().Before(c.startTime) { + return nil, errors.New("endpoint is restarting") + } return next.Server(ctx).Close(ctx, connection) } diff --git a/pkg/networkservice/chains/nsmgrproxy/server.go b/pkg/networkservice/chains/nsmgrproxy/server.go index 3c393add0..f1fe737fa 100644 --- a/pkg/networkservice/chains/nsmgrproxy/server.go +++ b/pkg/networkservice/chains/nsmgrproxy/server.go @@ -30,8 +30,11 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" "github.com/networkservicemesh/sdk/pkg/networkservice/common/externalips" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" "github.com/networkservicemesh/sdk/pkg/networkservice/common/interdomainurl" "github.com/networkservicemesh/sdk/pkg/networkservice/common/swapip" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" + "github.com/networkservicemesh/sdk/pkg/tools/addressof" "github.com/networkservicemesh/sdk/pkg/tools/token" ) @@ -71,6 +74,11 @@ func WithDialOptions(options ...grpc.DialOption) Option { // NewServer creates new proxy NSMgr func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options ...Option) endpoint.Endpoint { + type nsmgrProxyServer struct { + endpoint.Endpoint + } + rv := nsmgrProxyServer{} + opts := &serverOptions{ name: "nsmgr-proxy-" + uuid.New().String(), authorizeServer: authorize.NewServer(authorize.Any()), @@ -79,17 +87,19 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options opt(opts) } - return endpoint.NewServer(ctx, tokenGenerator, + rv.Endpoint = endpoint.NewServer(ctx, tokenGenerator, endpoint.WithName(opts.name), endpoint.WithAuthorizeServer(opts.authorizeServer), endpoint.WithAdditionalFunctionality( interdomainurl.NewServer(), externalips.NewServer(ctx), swapip.NewServer(), + heal.NewServer(ctx, addressof.NetworkServiceClient(adapters.NewServerToClient(rv))), connect.NewServer(ctx, client.NewClientFactory(client.WithName(opts.name)), connect.WithDialOptions(opts.dialOptions...), ), ), ) + return rv } diff --git a/pkg/networkservice/common/discover/match_selector.go b/pkg/networkservice/common/discover/match_selector.go index df8ac5ed9..9c354da73 100644 --- a/pkg/networkservice/common/discover/match_selector.go +++ b/pkg/networkservice/common/discover/match_selector.go @@ -1,5 +1,7 @@ // Copyright (c) 2018-2020 VMware, Inc. // +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -19,6 +21,7 @@ package discover import ( "bytes" "text/template" + "time" "github.com/networkservicemesh/api/pkg/api/registry" ) @@ -40,6 +43,13 @@ func isSubset(a, b, nsLabels map[string]string) bool { } func matchEndpoint(nsLabels map[string]string, ns *registry.NetworkService, networkServiceEndpoints ...*registry.NetworkServiceEndpoint) []*registry.NetworkServiceEndpoint { + var validNetworkServiceEndpoints []*registry.NetworkServiceEndpoint + for _, nse := range networkServiceEndpoints { + if nse.GetExpirationTime() == nil || nse.GetExpirationTime().AsTime().After(time.Now()) { + validNetworkServiceEndpoints = append(validNetworkServiceEndpoints, nse) + } + } + // Iterate through the matches for _, match := range ns.GetMatches() { // All match source selector labels should be present in the requested labels map @@ -50,7 +60,7 @@ func matchEndpoint(nsLabels map[string]string, ns *registry.NetworkService, netw // Check all Destinations in that match for _, destination := range match.GetRoutes() { // Each NSE should be matched against that destination - for _, nse := range networkServiceEndpoints { + for _, nse := range validNetworkServiceEndpoints { if isSubset(nse.GetNetworkServiceLabels()[ns.Name].Labels, destination.GetDestinationSelector(), nsLabels) { nseCandidates = append(nseCandidates, nse) } @@ -58,7 +68,8 @@ func matchEndpoint(nsLabels map[string]string, ns *registry.NetworkService, netw } return nseCandidates } - return networkServiceEndpoints + + return validNetworkServiceEndpoints } // ProcessLabels generates matches based on destination label selectors that specify templating. diff --git a/pkg/networkservice/common/heal/client.go b/pkg/networkservice/common/heal/client.go index 9603f77c6..59c6ba8fd 100644 --- a/pkg/networkservice/common/heal/client.go +++ b/pkg/networkservice/common/heal/client.go @@ -1,5 +1,3 @@ -// Copyright (c) 2020 Cisco Systems, Inc. -// // Copyright (c) 2021 Doc.ai and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 @@ -21,250 +19,41 @@ package heal import ( "context" - "runtime" - "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/empty" - "github.com/pkg/errors" "google.golang.org/grpc" "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/edwarnicke/serialize" - - "github.com/networkservicemesh/sdk/pkg/tools/addressof" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" ) +// RegisterClientFunc - required to inform heal server about new client connection and assign it to the connection ID +type RegisterClientFunc func(context.Context, *networkservice.Connection, networkservice.MonitorConnectionClient) + type healClient struct { - ctx context.Context - client networkservice.MonitorConnectionClient - onHeal *networkservice.NetworkServiceClient - cancelHealMap map[string]func() <-chan error - cancelHealMapExecutor serialize.Executor + ctx context.Context + cc networkservice.MonitorConnectionClient } -// NewClient - creates a new networkservice.NetworkServiceClient chain element that implements the healing algorithm -// - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client. -// - client - networkservice.MonitorConnectionClient that can be used to call MonitorConnection against the endpoint -// - onHeal - *networkservice.NetworkServiceClient. Since networkservice.NetworkServiceClient is an interface -// (and thus a pointer) *networkservice.NetworkServiceClient is a double pointer. Meaning it -// points to a place that points to a place that implements networkservice.NetworkServiceClient -// This is done because when we use heal.NewClient as part of a chain, we may not *have* -// a pointer to this -// client used 'onHeal'. If we detect we need to heal, onHeal.Request is used to heal. -// If onHeal is nil, then we simply set onHeal to this client chain element -// If we are part of a larger chain or a server, we should pass the resulting chain into -// this constructor before we actually have a pointer to it. -// If onHeal nil, onHeal will be pointed to the returned networkservice.NetworkServiceClient -func NewClient(ctx context.Context, client networkservice.MonitorConnectionClient, onHeal *networkservice.NetworkServiceClient) networkservice.NetworkServiceClient { - rv := &healClient{ - ctx: ctx, - client: client, - onHeal: onHeal, - cancelHealMap: make(map[string]func() <-chan error), +// NewClient - creates a new networkservice.NetworkServiceClient chain element that inform healServer about new client connection +func NewClient(ctx context.Context, cc networkservice.MonitorConnectionClient) networkservice.NetworkServiceClient { + return &healClient{ + ctx: ctx, + cc: cc, } - - if rv.onHeal == nil { - rv.onHeal = addressof.NetworkServiceClient(rv) - } - - return rv } -func (f *healClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { +func (u *healClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { conn, err := next.Client(ctx).Request(ctx, request, opts...) - if err != nil { - return nil, err - } - err = f.startHeal(request.Clone().SetRequestConnection(conn.Clone()), opts...) - if err != nil { - return nil, err - } - return conn, nil -} -func (f *healClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { - f.stopHeal(conn) - rv, err := next.Client(ctx).Close(ctx, conn, opts...) - if err != nil { - return nil, err + registerClient := registerClientFunc(ctx) + if err == nil && registerClient != nil { + registerClient(u.ctx, conn, u.cc) } - return rv, nil -} - -func (f *healClient) stopHeal(conn *networkservice.Connection) { - var cancelHeal func() <-chan error - <-f.cancelHealMapExecutor.AsyncExec(func() { - cancelHeal = f.cancelHealMap[conn.GetId()] - }) - <-cancelHeal() -} - -// startHeal - start a healAsNeeded using the request as the request for re-request if healing is needed. -func (f *healClient) startHeal(request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) error { - errCh := make(chan error, 1) - go f.healAsNeeded(request, errCh, opts...) - return <-errCh + return conn, err } -// healAsNeeded - heal the connection found in request. Will immediately do a monitor to make sure the server has the -// expected connection and it is sane, returning an error via errCh if there is an issue, and nil via errCh if there is -// not. You will only *ever* receive one real error via the errCh. errCh will be closed when healAsNeeded is finished -// allowing it to double as a 'done' channel we can use when we stopHealing in f.Close. -// healAsNeeded will then continue to monitor the servers opinions about the state of the connection until either -// expireTime has passed or stopHeal is called (as in Close) or a different pathSegment is found via monitoring -// indicating that a later Request has occurred and in doing so created its own healAsNeeded and so we can stop this one -func (f *healClient) healAsNeeded(request *networkservice.NetworkServiceRequest, errCh chan error, opts ...grpc.CallOption) { - // When we are done, close the errCh - defer close(errCh) - - pathSegment := request.GetConnection().GetNextPathSegment() - - // Make sure we have a valid expireTime to work with - expireTime, err := ptypes.Timestamp(pathSegment.GetExpires()) - if err != nil { - errCh <- errors.Wrapf(err, "error converting pathSegment.GetExpires() to time.Time: %+v", pathSegment.GetExpires()) - return - } - - // Set the ctx Deadline to expireTime based on the heal servers context - ctx, cancel := context.WithDeadline(f.ctx, expireTime) - defer cancel() - id := request.GetConnection().GetId() - f.cancelHealMapExecutor.AsyncExec(func() { - if cancel, ok := f.cancelHealMap[id]; ok { - go cancel() // TODO - what to do with the errCh here? - } - f.cancelHealMap[id] = func() <-chan error { - cancel() - return errCh - } - }) - - // Monitor the pathSegment for the first time, so we can pass back an error - // if we can't confirm via monitor the other side has the expected state - recv, err := f.initialMonitorSegment(ctx, pathSegment) - if err != nil { - errCh <- errors.Wrapf(err, "error calling MonitorConnection_MonitorConnectionsClient.Recv to get initial confirmation server has connection: %+v", request.GetConnection()) - return - } - - // Tell the caller all is well by sending them a nil err so the call can continue - errCh <- nil - - // Start looping over events - for { - select { - case <-ctx.Done(): - return - default: - } - event, err := recv.Recv() - if err != nil { - // If we get an error, try to get a new recv ... if that fails, loop around and try again until - // we succeed or the ctx is canceled or expires - newRecv, newRecvErr := f.client.MonitorConnections(ctx, &networkservice.MonitorScopeSelector{ - PathSegments: []*networkservice.PathSegment{ - pathSegment, - }, - }) - if newRecvErr == nil { - recv = newRecv - } - runtime.Gosched() - continue - } - select { - case <-ctx.Done(): - return - default: - } - if err := f.processEvent(ctx, request, event, opts...); err != nil { - if err != nil { - return - } - } - } -} - -// initialMonitorSegment - monitors for pathSegment and returns a recv and an error if the server does not have -// a record for the connection matching our expectations -func (f *healClient) initialMonitorSegment(ctx context.Context, pathSegment *networkservice.PathSegment) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { - // If pathSegment is nil, the server is very very screwed up - if pathSegment == nil { - return nil, errors.New("pathSegment for server connection must not be nil") - } - - // Monitor *just* this connection - recv, err := f.client.MonitorConnections(ctx, &networkservice.MonitorScopeSelector{ - PathSegments: []*networkservice.PathSegment{ - pathSegment, - }, - }) - if err != nil { - return nil, errors.Wrap(err, "error when attempting to MonitorConnections") - } - - // Get an initial event to make sure we have the expected connection - event, err := recv.Recv() - if err != nil { - return nil, err - } - // If we didn't get an event something very bad has happened - if event.Connections == nil || event.Connections[pathSegment.GetId()] == nil { - return nil, errors.Errorf("connection with id %s not found in MonitorConnections event as expected: event: %+v", pathSegment.Id, event) - } - // If its not *our* connection something's gone wrong like a later Request succeeding - if !pathSegment.Equal(event.GetConnections()[pathSegment.GetId()].GetCurrentPathSegment()) { - return nil, errors.Errorf("server reports a different connection for this id, pathSegments do not match. Expected: %+v Received %+v", pathSegment, event.GetConnections()[pathSegment.GetId()].GetCurrentPathSegment()) - } - return recv, nil -} - -// processEvent - process event, calling (*f.OnHeal).Request(ctx,request,opts...) if the server does not have our connection. -// returns a non-nil error if the event is such that we should no longer to continue to attempt to heal. -func (f *healClient) processEvent(ctx context.Context, request *networkservice.NetworkServiceRequest, event *networkservice.ConnectionEvent, opts ...grpc.CallOption) error { - pathSegment := request.GetConnection().GetNextPathSegment() - switch event.GetType() { - case networkservice.ConnectionEventType_UPDATE: - // We should never receive an UPDATE that isn't ours, but in case we do... - if event.Connections == nil || event.Connections[pathSegment.GetId()] == nil { - break - } - fallthrough - case networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER: - if event.Connections != nil && event.Connections[pathSegment.GetId()] != nil { - // If the server has a pathSegment for this Connection.Id, but its not the one we - // got back from it... we should fail, as different Request came after ours successfully - if !pathSegment.Equal(event.GetConnections()[pathSegment.GetId()].GetCurrentPathSegment()) { - return errors.Errorf("server has a different pathSegment than was returned to this call.") - } - break - } - fallthrough - case networkservice.ConnectionEventType_DELETE: - if event.Connections != nil && event.Connections[pathSegment.Id] != nil && pathSegment.Equal(event.GetConnections()[pathSegment.GetId()].GetCurrentPathSegment()) { - _, err := (*f.onHeal).Request(ctx, request, opts...) - for err != nil { - // Note: ctx here has deadline set to the expireTime of the pathSegment... so there is a finite stop point - // to trying to heal. Additionally, a Close on the connection will trigger a cancel on ctx and - // wait for errCh to finish *before* calling Close down the line... so we won't accidentally - // recreate a closed connection. - runtime.Gosched() - select { - case <-ctx.Done(): - return nil - default: - } - _, err := (*f.onHeal).Request(ctx, request, opts...) - if err != nil { - return err - } - } - return nil - } - } - return nil +func (u *healClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { + return next.Client(ctx).Close(ctx, conn, opts...) } diff --git a/pkg/networkservice/common/heal/client_connection_map.gen.go b/pkg/networkservice/common/heal/client_connection_map.gen.go new file mode 100644 index 000000000..95ab0a288 --- /dev/null +++ b/pkg/networkservice/common/heal/client_connection_map.gen.go @@ -0,0 +1,73 @@ +// Code generated by "-output client_connection_map.gen.go -type clientConnMap -output client_connection_map.gen.go -type clientConnMap"; DO NOT EDIT. +package heal + +import ( + "sync" // Used by sync.Map. +) + +// Generate code that will fail if the constants change value. +func _() { + // An "cannot convert clientConnMap literal (type clientConnMap) to type sync.Map" compiler error signifies that the base type have changed. + // Re-run the go-syncmap command to generate them again. + _ = (sync.Map)(clientConnMap{}) +} + +var _nil_clientConnMap_clientConnInfo_value = func() (val clientConnInfo) { return }() + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *clientConnMap) Load(key string) (clientConnInfo, bool) { + value, ok := (*sync.Map)(m).Load(key) + if value == nil { + return _nil_clientConnMap_clientConnInfo_value, ok + } + return value.(clientConnInfo), ok +} + +// Store sets the value for a key. +func (m *clientConnMap) Store(key string, value clientConnInfo) { + (*sync.Map)(m).Store(key, value) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *clientConnMap) LoadOrStore(key string, value clientConnInfo) (clientConnInfo, bool) { + actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) + if actual == nil { + return _nil_clientConnMap_clientConnInfo_value, loaded + } + return actual.(clientConnInfo), loaded +} + +// LoadAndDelete deletes the value for a key, returning the previous value if any. +// The loaded result reports whether the key was present. +func (m *clientConnMap) LoadAndDelete(key string) (value clientConnInfo, loaded bool) { + actual, loaded := (*sync.Map)(m).LoadAndDelete(key) + if actual == nil { + return _nil_clientConnMap_clientConnInfo_value, loaded + } + return actual.(clientConnInfo), loaded +} + +// Delete deletes the value for a key. +func (m *clientConnMap) Delete(key string) { + (*sync.Map)(m).Delete(key) +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently, Range may reflect any mapping for that key +// from any point during the Range call. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *clientConnMap) Range(f func(key string, value clientConnInfo) bool) { + (*sync.Map)(m).Range(func(key, value interface{}) bool { + return f(key.(string), value.(clientConnInfo)) + }) +} diff --git a/pkg/networkservice/common/heal/connection_map.gen.go b/pkg/networkservice/common/heal/connection_map.gen.go new file mode 100644 index 000000000..8cb97a2e8 --- /dev/null +++ b/pkg/networkservice/common/heal/connection_map.gen.go @@ -0,0 +1,73 @@ +// Code generated by "-output connection_map.gen.go -type connectionMap -output connection_map.gen.go -type connectionMap"; DO NOT EDIT. +package heal + +import ( + "sync" // Used by sync.Map. +) + +// Generate code that will fail if the constants change value. +func _() { + // An "cannot convert connectionMap literal (type connectionMap) to type sync.Map" compiler error signifies that the base type have changed. + // Re-run the go-syncmap command to generate them again. + _ = (sync.Map)(connectionMap{}) +} + +var _nil_connectionMap_connection_value = func() (val connection) { return }() + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *connectionMap) Load(key string) (connection, bool) { + value, ok := (*sync.Map)(m).Load(key) + if value == nil { + return _nil_connectionMap_connection_value, ok + } + return value.(connection), ok +} + +// Store sets the value for a key. +func (m *connectionMap) Store(key string, value connection) { + (*sync.Map)(m).Store(key, value) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *connectionMap) LoadOrStore(key string, value connection) (connection, bool) { + actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) + if actual == nil { + return _nil_connectionMap_connection_value, loaded + } + return actual.(connection), loaded +} + +// LoadAndDelete deletes the value for a key, returning the previous value if any. +// The loaded result reports whether the key was present. +func (m *connectionMap) LoadAndDelete(key string) (value connection, loaded bool) { + actual, loaded := (*sync.Map)(m).LoadAndDelete(key) + if actual == nil { + return _nil_connectionMap_connection_value, loaded + } + return actual.(connection), loaded +} + +// Delete deletes the value for a key. +func (m *connectionMap) Delete(key string) { + (*sync.Map)(m).Delete(key) +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently, Range may reflect any mapping for that key +// from any point during the Range call. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *connectionMap) Range(f func(key string, value connection) bool) { + (*sync.Map)(m).Range(func(key, value interface{}) bool { + return f(key.(string), value.(connection)) + }) +} diff --git a/pkg/networkservice/common/heal/context.go b/pkg/networkservice/common/heal/context.go new file mode 100644 index 000000000..dfc9a4b81 --- /dev/null +++ b/pkg/networkservice/common/heal/context.go @@ -0,0 +1,41 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package heal + +import ( + "context" +) + +const ( + registerClientFuncKey contextKeyType = "RegisterFunc" +) + +type contextKeyType string + +func withRegisterClientFunc(parent context.Context, registerClientFunc RegisterClientFunc) context.Context { + if parent == nil { + panic("cannot create context from nil parent") + } + return context.WithValue(parent, registerClientFuncKey, registerClientFunc) +} + +func registerClientFunc(ctx context.Context) RegisterClientFunc { + if rv, ok := ctx.Value(registerClientFuncKey).(RegisterClientFunc); ok { + return rv + } + return nil +} diff --git a/pkg/networkservice/common/heal/gen.go b/pkg/networkservice/common/heal/gen.go new file mode 100644 index 000000000..2e012277b --- /dev/null +++ b/pkg/networkservice/common/heal/gen.go @@ -0,0 +1,29 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package heal + +import ( + "sync" +) + +//go:generate go-syncmap -output connection_map.gen.go -type connectionMap +//go:generate go-syncmap -output client_connection_map.gen.go -type clientConnMap + +type connectionMap sync.Map +type clientConnMap sync.Map diff --git a/pkg/networkservice/common/heal/server.go b/pkg/networkservice/common/heal/server.go new file mode 100644 index 000000000..4755856c6 --- /dev/null +++ b/pkg/networkservice/common/heal/server.go @@ -0,0 +1,437 @@ +// Copyright (c) 2020 Cisco Systems, Inc. +// +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package heal provides a chain element that carries out proper nsm healing from client to endpoint +package heal + +import ( + "context" + "runtime" + "time" + + "github.com/edwarnicke/serialize" + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/empty" + "github.com/pkg/errors" + "google.golang.org/grpc" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/discover" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/tools/addressof" + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +type ctxWrapper struct { + ctx context.Context + cancel func() +} + +type clientConnInfo struct { + ctx context.Context + cc networkservice.MonitorConnectionClient +} + +type connection struct { + *networkservice.Connection + ctx context.Context +} + +type healServer struct { + ctx context.Context + clients clientConnMap + onHeal *networkservice.NetworkServiceClient + cancelHealMap map[string]*ctxWrapper + cancelHealMapExecutor serialize.Executor + conns connectionMap +} + +// NewServer - creates a new networkservice.NetworkServiceServer chain element that implements the healing algorithm +// - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client. +// - client - networkservice.MonitorConnectionClient that can be used to call MonitorConnection against the endpoint +// - onHeal - *networkservice.NetworkServiceClient. Since networkservice.NetworkServiceClient is an interface +// (and thus a pointer) *networkservice.NetworkServiceClient is a double pointer. Meaning it +// points to a place that points to a place that implements networkservice.NetworkServiceClient +// This is done because when we use heal.NewClient as part of a chain, we may not *have* +// a pointer to this +// client used 'onHeal'. If we detect we need to heal, onHeal.Request is used to heal. +// If onHeal is nil, then we simply set onHeal to this client chain element +// If we are part of a larger chain or a server, we should pass the resulting chain into +// this constructor before we actually have a pointer to it. +// If onHeal nil, onHeal will be pointed to the returned networkservice.NetworkServiceClient +func NewServer(ctx context.Context, onHeal *networkservice.NetworkServiceClient) networkservice.NetworkServiceServer { + rv := &healServer{ + ctx: ctx, + onHeal: onHeal, + cancelHealMap: make(map[string]*ctxWrapper), + } + + if rv.onHeal == nil { + rv.onHeal = addressof.NetworkServiceClient(adapters.NewServerToClient(rv)) + } + + return rv +} + +func (f *healServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + // Replace path within connection to the healed one + f.replaceConnectionPath(request.GetConnection()) + + ctx = withRegisterClientFunc(ctx, f.RegisterClient) + conn, err := next.Server(ctx).Request(ctx, request) + if err != nil { + return nil, err + } + + // Check heal client is in the chain and connection was added + if _, ok := f.clients.Load(conn.GetId()); !ok { + log.FromContext(ctx).WithField("healServer", "Request").Errorf("Heal server ignored connection %s: heal client is not active", conn.GetId()) + return conn, nil + } + + ctx = f.applyStoredCandidates(ctx, conn) + + err = f.startHeal(ctx, request.Clone().SetRequestConnection(conn.Clone())) + if err != nil { + return nil, err + } + + f.conns.Store(conn.GetId(), connection{ + Connection: conn.Clone(), + ctx: ctx, + }) + + return conn, nil +} + +func (f *healServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { + // Replace path within connection to the healed one + f.replaceConnectionPath(conn) + + f.stopHeal(conn) + f.clients.Delete(conn.Id) + + rv, err := next.Server(ctx).Close(ctx, conn) + if err != nil { + return nil, err + } + return rv, nil +} + +func (f *healServer) RegisterClient(ctx context.Context, conn *networkservice.Connection, cc networkservice.MonitorConnectionClient) { + f.clients.Store(conn.Id, clientConnInfo{ + ctx: ctx, + cc: cc, + }) +} + +func (f *healServer) stopHeal(conn *networkservice.Connection) { + var cancelHeal func() + <-f.cancelHealMapExecutor.AsyncExec(func() { + if cancelHealEntry, ok := f.cancelHealMap[conn.GetId()]; ok { + cancelHeal = cancelHealEntry.cancel + delete(f.cancelHealMap, conn.GetId()) + } + }) + if cancelHeal != nil { + cancelHeal() + } + f.conns.Delete(conn.GetId()) +} + +// startHeal - start a healAsNeeded using the request as the request for re-request if healing is needed. +func (f *healServer) startHeal(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) error { + errCh := make(chan error, 1) + go f.healAsNeeded(ctx, request, errCh, opts...) + return <-errCh +} + +// healAsNeeded - heal the connection found in request. Will immediately do a monitor to make sure the server has the +// expected connection and it is sane, returning an error via errCh if there is an issue, and nil via errCh if there is +// not. You will only *ever* receive one real error via the errCh. errCh will be closed when healAsNeeded is finished +// allowing it to double as a 'done' channel we can use when we stopHealing in f.Close. +// healAsNeeded will then continue to monitor the servers opinions about the state of the connection until either +// expireTime has passed or stopHeal is called (as in Close) or a different pathSegment is found via monitoring +// indicating that a later Request has occurred and in doing so created its own healAsNeeded and so we can stop this one +func (f *healServer) healAsNeeded(baseCtx context.Context, request *networkservice.NetworkServiceRequest, errCh chan error, opts ...grpc.CallOption) { + pathSegment := request.GetConnection().GetNextPathSegment() + + // Make sure we have a valid expireTime to work with + expireTime, err := ptypes.Timestamp(pathSegment.GetExpires()) + if err != nil { + errCh <- errors.Wrapf(err, "error converting pathSegment.GetExpires() to time.Time: %+v", pathSegment.GetExpires()) + return + } + + // Set the ctx Deadline to expireTime based on the heal servers context + ctx, cancel := context.WithDeadline(f.ctx, expireTime) + defer cancel() + id := request.GetConnection().GetId() + <-f.cancelHealMapExecutor.AsyncExec(func() { + if entry, ok := f.cancelHealMap[id]; ok { + go entry.cancel() // TODO - what to do with the errCh here? + } + f.cancelHealMap[id] = &ctxWrapper{ + ctx: ctx, + cancel: cancel, + } + }) + + // Monitor the pathSegment for the first time, so we can pass back an error + // if we can't confirm via monitor the other side has the expected state + recv, err := f.initialMonitorSegment(ctx, request.GetConnection(), time.Until(expireTime)) + if err != nil { + errCh <- errors.Wrapf(err, "error calling MonitorConnection_MonitorConnectionsClient.Recv to get initial confirmation server has connection: %+v", request.GetConnection()) + return + } + + // Tell the caller all is well by sending them a nil err so the call can continue + close(errCh) + + // Start looping over events + for { + if ctx.Err() != nil { + return + } + event, err := recv.Recv() + if err != nil { + deadline := time.Now().Add(time.Minute) + if deadline.After(expireTime) { + deadline = expireTime + } + newRecv, newRecvErr := f.initialMonitorSegment(ctx, request.GetConnection(), time.Until(deadline)) + if newRecvErr == nil { + recv = newRecv + } else { + f.restoreConnection(ctx, baseCtx, request, opts...) + return + } + runtime.Gosched() + continue + } + if ctx.Err() != nil || f.ctx.Err() != nil { + return + } + if err := f.processEvent(baseCtx, request, event, opts...); err != nil { + return + } + } +} + +// initialMonitorSegment - monitors for pathSegment and returns a recv and an error if the server does not have +// a record for the connection matching our expectations +func (f *healServer) initialMonitorSegment(ctx context.Context, conn *networkservice.Connection, timeout time.Duration) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { + errCh := make(chan error, 1) + var recv networkservice.MonitorConnection_MonitorConnectionsClient + pathSegment := conn.GetNextPathSegment() + + // nolint:govet + initialCtx, cancel := context.WithCancel(ctx) + + go func() { + // If pathSegment is nil, the server is very very screwed up + if pathSegment == nil { + errCh <- errors.New("pathSegment for server connection must not be nil") + return + } + + // Get connection client by connection + client, ok := f.clients.Load(conn.GetId()) + if !ok { + errCh <- errors.Errorf("error when attempting to MonitorConnections") + return + } + + // Monitor *just* this connection + var err error + recv, err = client.cc.MonitorConnections(initialCtx, &networkservice.MonitorScopeSelector{ + PathSegments: []*networkservice.PathSegment{ + pathSegment, + }, + }) + if err != nil { + errCh <- errors.Wrap(err, "error when attempting to MonitorConnections") + return + } + + // Get an initial event to make sure we have the expected connection + event, err := recv.Recv() + if err != nil { + errCh <- err + return + } + // If we didn't get an event something very bad has happened + if event.Connections == nil || event.Connections[pathSegment.GetId()] == nil { + errCh <- errors.Errorf("connection with id %s not found in MonitorConnections event as expected: event: %+v", pathSegment.Id, event) + return + } + // If its not *our* connection something's gone wrong like a later Request succeeding + if !pathSegment.Equal(event.GetConnections()[pathSegment.GetId()].GetCurrentPathSegment()) { + errCh <- errors.Errorf("server reports a different connection for this id, pathSegments do not match. Expected: %+v Received %+v", pathSegment, event.GetConnections()[pathSegment.GetId()].GetCurrentPathSegment()) + return + } + errCh <- nil + }() + + select { + case err := <-errCh: + // nolint:govet + return recv, err + case <-time.After(timeout): + cancel() + err := <-errCh + return recv, err + } +} + +// processEvent - process event, calling (*f.OnHeal).Request(ctx,request,opts...) if the server does not have our connection. +// returns a non-nil error if the event is such that we should no longer to continue to attempt to heal. +func (f *healServer) processEvent(baseCtx context.Context, request *networkservice.NetworkServiceRequest, event *networkservice.ConnectionEvent, opts ...grpc.CallOption) error { + pathSegment := request.GetConnection().GetNextPathSegment() + + switch event.GetType() { + case networkservice.ConnectionEventType_UPDATE: + // We should never receive an UPDATE that isn't ours, but in case we do... + if event.Connections == nil || event.Connections[pathSegment.GetId()] == nil { + break + } + + fallthrough + case networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER: + if event.Connections != nil && event.Connections[pathSegment.GetId()] != nil { + // If the server has a pathSegment for this Connection.Id, but its not the one we + // got back from it... we should fail, as different Request came after ours successfully + if !pathSegment.Equal(event.GetConnections()[pathSegment.GetId()].GetCurrentPathSegment()) { + return errors.Errorf("server has a different pathSegment than was returned to this call.") + } + break + } + fallthrough + case networkservice.ConnectionEventType_DELETE: + if event.Connections != nil && event.Connections[pathSegment.GetId()] != nil { + f.processHeal(baseCtx, request, opts...) + } + } + return nil +} + +func (f *healServer) restoreConnection(ctx, baseCtx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) { + id := request.GetConnection().GetId() + + var healMapCtx context.Context + <-f.cancelHealMapExecutor.AsyncExec(func() { + if entry, ok := f.cancelHealMap[id]; ok { + healMapCtx = entry.ctx + } + }) + if healMapCtx != ctx || f.ctx.Err() != nil { + return + } + + // Make sure we have a valid expireTime to work with + expires := request.GetConnection().GetNextPathSegment().GetExpires() + expireTime, err := ptypes.Timestamp(expires) + if err != nil { + return + } + + deadline := time.Now().Add(time.Minute) + if deadline.After(expireTime) { + deadline = expireTime + } + requestCtx, requestCancel := context.WithDeadline(f.ctx, deadline) + defer requestCancel() + + for f.ctx.Err() == nil { + _, err = (*f.onHeal).Request(requestCtx, request.Clone(), opts...) + if healMapCtx != ctx { + return + } + if time.Now().After(deadline) || err == nil { + break + } + } + + if err != nil { + f.processHeal(baseCtx, request.Clone(), opts...) + } +} + +func (f *healServer) processHeal(baseCtx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) { + logEntry := log.FromContext(f.ctx).WithField("healServer", "processHeal") + conn := request.GetConnection() + + var err error + candidates := discover.Candidates(baseCtx) + if candidates != nil || conn.GetPath().GetIndex() == 0 { + logEntry.Infof("Starting heal process for %s", conn.GetId()) + + healCtx, healCancel := context.WithCancel(f.ctx) + defer healCancel() + + reRequest := request.Clone() + reRequest.GetConnection().NetworkServiceEndpointName = "" + path := reRequest.GetConnection().Path + reRequest.GetConnection().Path.PathSegments = path.PathSegments[0 : path.Index+1] + + for f.ctx.Err() == nil { + _, err = (*f.onHeal).Request(healCtx, reRequest, opts...) + if err != nil { + logEntry.Errorf("Failed to heal connection %s: %v", conn.GetId(), err) + } else { + logEntry.Infof("Finished heal process for %s", conn.GetId()) + break + } + } + } else { + // Huge timeout is not required to close connection on a current path segment + closeCtx, closeCancel := context.WithTimeout(f.ctx, 1*time.Second) + defer closeCancel() + + _, err := (*f.onHeal).Close(closeCtx, request.GetConnection().Clone(), opts...) + if err != nil { + logEntry.Errorf("Failed to close connection %s: %v", request.GetConnection().GetId(), err) + } + } +} + +func (f *healServer) replaceConnectionPath(conn *networkservice.Connection) { + path := conn.GetPath() + if path != nil && int(path.Index) < len(path.PathSegments)-1 { + if storedConn, ok := f.conns.Load(conn.GetId()); ok { + path.PathSegments = path.PathSegments[:path.Index+1] + path.PathSegments = append(path.PathSegments, storedConn.Path.PathSegments[path.Index+1:]...) + conn.NetworkServiceEndpointName = storedConn.NetworkServiceEndpointName + } + } +} + +func (f *healServer) applyStoredCandidates(ctx context.Context, conn *networkservice.Connection) context.Context { + if candidates := discover.Candidates(ctx); candidates != nil && len(candidates.Endpoints) > 0 { + return ctx + } + + if info, ok := f.conns.Load(conn.Id); ok { + if candidates := discover.Candidates(info.ctx); candidates != nil { + ctx = discover.WithCandidates(ctx, candidates.Endpoints, candidates.NetworkService) + } + } + return ctx +} diff --git a/pkg/networkservice/common/heal/client_test.go b/pkg/networkservice/common/heal/server_test.go similarity index 95% rename from pkg/networkservice/common/heal/client_test.go rename to pkg/networkservice/common/heal/server_test.go index 15abd84d4..351c8ebca 100644 --- a/pkg/networkservice/common/heal/client_test.go +++ b/pkg/networkservice/common/heal/server_test.go @@ -78,9 +78,11 @@ func TestHealClient_Request(t *testing.T) { monitor.NewServer(ctx, &monitorServer), updatetoken.NewServer(sandbox.GenerateTestToken), ) + healServer := heal.NewServer(ctx, addressof.NetworkServiceClient(onHeal)) client := chain.NewNetworkServiceClient( updatepath.NewClient("testClient"), - heal.NewClient(ctx, adapters.NewMonitorServerToClient(monitorServer), addressof.NetworkServiceClient(onHeal)), + adapters.NewServerToClient(healServer), + heal.NewClient(ctx, adapters.NewMonitorServerToClient(monitorServer)), adapters.NewServerToClient(updatetoken.NewServer(sandbox.GenerateTestToken)), adapters.NewServerToClient(server), ) @@ -138,9 +140,12 @@ func TestHealClient_EmptyInit(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() + + healServer := heal.NewServer(ctx, addressof.NetworkServiceClient(onHeal)) client := chain.NewNetworkServiceClient( updatepath.NewClient("testClient"), - heal.NewClient(ctx, eventchannel.NewMonitorConnectionClient(eventCh), addressof.NetworkServiceClient(onHeal)), + adapters.NewServerToClient(healServer), + heal.NewClient(ctx, eventchannel.NewMonitorConnectionClient(eventCh)), adapters.NewServerToClient(updatetoken.NewServer(sandbox.GenerateTestToken)), updatepath.NewClient("testServer"), eventTrigger, diff --git a/pkg/networkservice/common/interpose/server.go b/pkg/networkservice/common/interpose/server.go index f84d26477..bbf092fc8 100644 --- a/pkg/networkservice/common/interpose/server.go +++ b/pkg/networkservice/common/interpose/server.go @@ -71,13 +71,13 @@ func (l *interposeServer) Request(ctx context.Context, request *networkservice.N return nil, errors.Errorf("path segment doesn't have a client or cross connect nse identity") } // We need to find an Id from path to match active connection request. - activeConnID := l.getConnectionID(conn) + activeConnID, fromPath := l.getConnectionID(conn) // We came from client, so select cross nse and go to it. clientURL := clienturlctx.ClientURL(ctx) connInfo, ok := l.activeConnection.Load(activeConnID) - if ok { + if ok && (fromPath || int(conn.Path.Index) < len(conn.Path.PathSegments)-1) { if connID != activeConnID { l.activeConnection.Store(connID, connInfo) } @@ -91,7 +91,7 @@ func (l *interposeServer) Request(ctx context.Context, request *networkservice.N crossCTX := clienturlctx.WithClientURL(ctx, crossNSEURL) // Store client connection and selected cross connection URL. - connInfo, _ = l.activeConnection.LoadOrStore(activeConnID, connectionInfo{ + l.activeConnection.Store(activeConnID, connectionInfo{ clientConnID: activeConnID, endpointURL: clientURL, interposeNSEURL: crossNSEURL, @@ -127,18 +127,18 @@ func (l *interposeServer) Request(ctx context.Context, request *networkservice.N return next.Server(crossCTX).Request(crossCTX, request) } -func (l *interposeServer) getConnectionID(conn *networkservice.Connection) string { +func (l *interposeServer) getConnectionID(conn *networkservice.Connection) (string, bool) { id := conn.Id - for i := conn.GetPath().GetIndex(); i > 0; i-- { + for i := conn.GetPath().GetIndex() - 1; i > 0; i-- { activeConnID := conn.GetPath().GetPathSegments()[i].Id if connInfo, ok := l.activeConnection.Load(activeConnID); ok { if activeConnID == connInfo.clientConnID { - id = activeConnID + return activeConnID, true } break } } - return id + return id, false } func (l *interposeServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { diff --git a/pkg/networkservice/common/monitor/server.go b/pkg/networkservice/common/monitor/server.go index 2bdad2917..d899c754b 100644 --- a/pkg/networkservice/common/monitor/server.go +++ b/pkg/networkservice/common/monitor/server.go @@ -63,11 +63,16 @@ func (m *monitorServer) MonitorConnections(selector *networkservice.MonitorScope m.executor.AsyncExec(func() { monitor := newMonitorFilter(selector, srv) m.monitors = append(m.monitors, monitor) - + connections := make(map[string]*networkservice.Connection) + for _, ps := range selector.PathSegments { + if conn, ok := m.connections[ps.GetId()]; ok { + connections[ps.GetId()] = conn + } + } // Send initial transfer of all data available _ = monitor.Send(&networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, - Connections: m.connections, + Connections: connections, }) }) select { @@ -83,6 +88,7 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net if err == nil { m.executor.AsyncExec(func() { m.connections[eventConn.GetId()] = eventConn + // Send update event event := &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_UPDATE, @@ -97,11 +103,19 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net } func (m *monitorServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { - eventConn := conn.Clone() + <-m.executor.AsyncExec(func() { + if c, ok := m.connections[conn.GetId()]; ok { + conn = c.Clone() + } + }) + _, closeErr := next.Server(ctx).Close(ctx, conn) + // Remove connection object we have and send DELETE + eventConn := conn.Clone() m.executor.AsyncExec(func() { delete(m.connections, eventConn.GetId()) + event := &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_DELETE, Connections: map[string]*networkservice.Connection{eventConn.GetId(): eventConn}, @@ -126,6 +140,7 @@ func (m *monitorServer) send(ctx context.Context, event *networkservice.Connecti newMonitors = append(newMonitors, filter) } } + m.monitors = newMonitors return err } diff --git a/pkg/registry/common/clienturl/nse_client.go b/pkg/registry/common/clienturl/nse_client.go index e7601749f..006d56c73 100644 --- a/pkg/registry/common/clienturl/nse_client.go +++ b/pkg/registry/common/clienturl/nse_client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Doc.ai and/or its affiliates. +// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -20,15 +20,15 @@ import ( "context" "sync" - "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" - - "github.com/networkservicemesh/api/pkg/api/registry" - "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + + "github.com/networkservicemesh/api/pkg/api/registry" "github.com/networkservicemesh/sdk/pkg/registry/core/next" + "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" ) @@ -99,11 +99,22 @@ func (u *nseRegistryURLClient) init() error { if u.dialErr != nil { return } + u.client = u.clientFactory(u.ctx, cc) go func() { - <-u.ctx.Done() - _ = cc.Close() + defer func() { + _ = cc.Close() + }() + for cc.WaitForStateChange(u.ctx, cc.GetState()) { + switch cc.GetState() { + case connectivity.Connecting, connectivity.Idle, connectivity.Ready: + continue + default: + return + } + } }() }) + return u.dialErr } diff --git a/pkg/registry/common/querycache/nse_client.go b/pkg/registry/common/querycache/nse_client.go index 37aef2b19..b35481a6f 100644 --- a/pkg/registry/common/querycache/nse_client.go +++ b/pkg/registry/common/querycache/nse_client.go @@ -79,7 +79,7 @@ func (q *queryCacheNSEClient) findInCache(ctx context.Context, key string) (regi } resultCh := make(chan *registry.NetworkServiceEndpoint, 1) - resultCh <- nse + resultCh <- nse.Clone() close(resultCh) return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultCh), true diff --git a/pkg/tools/sandbox/builder.go b/pkg/tools/sandbox/builder.go index 3ebae4bf9..0e357d74b 100644 --- a/pkg/tools/sandbox/builder.go +++ b/pkg/tools/sandbox/builder.go @@ -47,6 +47,7 @@ type Builder struct { require *require.Assertions resources []context.CancelFunc nodesCount int + nodesConfig []*NodeConfig DNSDomainName string Resolver dnsresolve.Resolver supplyNSMgr SupplyNSMgrFunc @@ -100,7 +101,7 @@ func (b *Builder) Build() *Domain { } for i := 0; i < b.nodesCount; i++ { - domain.Nodes = append(domain.Nodes, b.newNode(ctx, domain.Registry.URL)) + domain.Nodes = append(domain.Nodes, b.newNode(ctx, domain.Registry.URL, b.nodesConfig[i])) } domain.resources, b.resources = b.resources, nil @@ -108,15 +109,62 @@ func (b *Builder) Build() *Domain { return domain } -// SetContext sets context for all chains +// SetContext sets default context for all chains func (b *Builder) SetContext(ctx context.Context) *Builder { b.ctx = ctx + b.SetCustomConfig([]*NodeConfig{}) + return b +} + +// SetCustomConfig sets custom configuration for nodes +func (b *Builder) SetCustomConfig(config []*NodeConfig) *Builder { + oldConfig := b.nodesConfig + b.nodesConfig = nil + + for i := 0; i < b.nodesCount; i++ { + nodeConfig := &NodeConfig{} + if i < len(config) && config[i] != nil { + *nodeConfig = *oldConfig[i] + } + + customConfig := &NodeConfig{} + if i < len(config) && config[i] != nil { + *customConfig = *config[i] + } + + if customConfig.NsmgrCtx != nil { + nodeConfig.NsmgrCtx = customConfig.NsmgrCtx + } else if nodeConfig.NsmgrCtx == nil { + nodeConfig.NsmgrCtx = b.ctx + } + + if customConfig.NsmgrGenerateTokenFunc != nil { + nodeConfig.NsmgrGenerateTokenFunc = customConfig.NsmgrGenerateTokenFunc + } else if nodeConfig.NsmgrGenerateTokenFunc == nil { + nodeConfig.NsmgrGenerateTokenFunc = b.generateTokenFunc + } + + if customConfig.ForwarderCtx != nil { + nodeConfig.ForwarderCtx = customConfig.ForwarderCtx + } else if nodeConfig.ForwarderCtx == nil { + nodeConfig.ForwarderCtx = b.ctx + } + + if customConfig.ForwarderGenerateTokenFunc != nil { + nodeConfig.ForwarderGenerateTokenFunc = customConfig.ForwarderGenerateTokenFunc + } else if nodeConfig.ForwarderGenerateTokenFunc == nil { + nodeConfig.ForwarderGenerateTokenFunc = b.generateTokenFunc + } + + b.nodesConfig = append(b.nodesConfig, nodeConfig) + } return b } // SetNodesCount sets nodes count func (b *Builder) SetNodesCount(nodesCount int) *Builder { b.nodesCount = nodesCount + b.SetCustomConfig([]*NodeConfig{}) return b } @@ -200,7 +248,20 @@ func (b *Builder) newNSMgrProxy(ctx context.Context) *EndpointEntry { } } -func (b *Builder) newNSMgr(ctx context.Context, registryURL *url.URL) *NSMgrEntry { +// NewNSMgr - starts new Network Service Manager +func (b *Builder) NewNSMgr(ctx context.Context, node *Node, address string, registryURL *url.URL, generateTokenFunc token.GeneratorFunc) (entry *NSMgrEntry, resources []context.CancelFunc) { + nsmgrCtx, nsmgrCancel := context.WithCancel(ctx) + b.resources = append(b.resources, nsmgrCancel) + + entry = b.newNSMgr(nsmgrCtx, address, registryURL, generateTokenFunc) + + b.SetupRegistryClients(nsmgrCtx, node) + + resources, b.resources = b.resources, nil + return +} + +func (b *Builder) newNSMgr(ctx context.Context, address string, registryURL *url.URL, generateTokenFunc token.GeneratorFunc) *NSMgrEntry { if b.supplyNSMgr == nil { panic("nodes without managers are not supported") } @@ -208,7 +269,7 @@ func (b *Builder) newNSMgr(ctx context.Context, registryURL *url.URL) *NSMgrEntr if registryURL != nil { registryCC = b.dialContext(ctx, registryURL) } - listener, err := net.Listen("tcp", "127.0.0.1:0") + listener, err := net.Listen("tcp", address) b.require.NoError(err) serveURL := grpcutils.AddressToURL(listener.Addr()) b.require.NoError(listener.Close()) @@ -218,7 +279,7 @@ func (b *Builder) newNSMgr(ctx context.Context, registryURL *url.URL) *NSMgrEntr Url: serveURL.String(), } - mgr := b.supplyNSMgr(ctx, nsmgrReg, authorize.NewServer(authorize.Any()), b.generateTokenFunc, registryCC, DefaultDialOptions(b.generateTokenFunc)...) + mgr := b.supplyNSMgr(ctx, nsmgrReg, authorize.NewServer(authorize.Any()), generateTokenFunc, registryCC, DefaultDialOptions(generateTokenFunc)...) serve(ctx, serveURL, mgr.Register) log.FromContext(ctx).Infof("%v listen on: %v", nsmgrReg.Name, serveURL) @@ -273,31 +334,38 @@ func (b *Builder) newRegistry(ctx context.Context, proxyRegistryURL *url.URL) *R } } -func (b *Builder) newNode(ctx context.Context, registryURL *url.URL) *Node { - nsmgrEntry := b.newNSMgr(ctx, registryURL) - nsmgrCC := b.dialContext(ctx, nsmgrEntry.URL) +func (b *Builder) newNode(ctx context.Context, registryURL *url.URL, nodeConfig *NodeConfig) *Node { + nsmgrEntry := b.newNSMgr(nodeConfig.NsmgrCtx, "127.0.0.1:0", registryURL, nodeConfig.NsmgrGenerateTokenFunc) node := &Node{ - ctx: b.ctx, - NSMgr: nsmgrEntry, - ForwarderRegistryClient: client.NewNetworkServiceEndpointRegistryInterposeClient(ctx, nsmgrCC), - EndpointRegistryClient: client.NewNetworkServiceEndpointRegistryClient(ctx, nsmgrCC), - NSRegistryClient: client.NewNetworkServiceRegistryClient(nsmgrCC), + ctx: b.ctx, + NSMgr: nsmgrEntry, } + b.SetupRegistryClients(nodeConfig.NsmgrCtx, node) + if b.setupNode != nil { - b.setupNode(ctx, node) + b.setupNode(ctx, node, nodeConfig) } return node } +// SetupRegistryClients - creates Network Service Registry Clients +func (b *Builder) SetupRegistryClients(ctx context.Context, node *Node) { + nsmgrCC := b.dialContext(ctx, node.NSMgr.URL) + + node.ForwarderRegistryClient = client.NewNetworkServiceEndpointRegistryInterposeClient(ctx, nsmgrCC) + node.EndpointRegistryClient = client.NewNetworkServiceEndpointRegistryClient(ctx, nsmgrCC) + node.NSRegistryClient = client.NewNetworkServiceRegistryClient(nsmgrCC) +} + func defaultSetupNode(t *testing.T) SetupNodeFunc { - return func(ctx context.Context, node *Node) { + return func(ctx context.Context, node *Node, nodeConfig *NodeConfig) { nseReg := ®istryapi.NetworkServiceEndpoint{ - Name: uuid.New().String(), + Name: "forwarder-" + uuid.New().String(), } - _, err := node.NewForwarder(ctx, nseReg, GenerateTestToken) + _, err := node.NewForwarder(nodeConfig.ForwarderCtx, nseReg, nodeConfig.ForwarderGenerateTokenFunc) require.NoError(t, err) } } diff --git a/pkg/tools/sandbox/node.go b/pkg/tools/sandbox/node.go index 7592f8a8e..6b4e008dd 100644 --- a/pkg/tools/sandbox/node.go +++ b/pkg/tools/sandbox/node.go @@ -31,6 +31,7 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" "github.com/networkservicemesh/sdk/pkg/tools/addressof" "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" @@ -42,6 +43,7 @@ import ( type Node struct { ctx context.Context NSMgr *NSMgrEntry + Forwarder []*EndpointEntry ForwarderRegistryClient registryapi.NetworkServiceEndpointRegistryClient EndpointRegistryClient registryapi.NetworkServiceEndpointRegistryClient NSRegistryClient registryapi.NetworkServiceRegistryClient @@ -57,11 +59,11 @@ func (n *Node) NewForwarder( ep := new(EndpointEntry) additionalFunctionality = append(additionalFunctionality, clienturl.NewServer(n.NSMgr.URL), + heal.NewServer(ctx, addressof.NetworkServiceClient(adapters.NewServerToClient(ep))), connect.NewServer(ctx, client.NewCrossConnectClientFactory( client.WithName(nse.Name), - // What to call onHeal - client.WithHeal(addressof.NetworkServiceClient(adapters.NewServerToClient(ep)))), + ), connect.WithDialOptions(DefaultDialOptions(generatorFunc)...), ), ) @@ -71,7 +73,7 @@ func (n *Node) NewForwarder( return nil, err } *ep = *entry - + n.Forwarder = append(n.Forwarder, ep) return ep, nil } @@ -113,27 +115,41 @@ func (n *Node) newEndpoint( nse.Url = u.String() // 3. Register with the node registry client + err = n.registerEndpoint(ctx, nse, registryClient) + if err != nil { + return nil, err + } + + log.FromContext(ctx).Infof("Started listen endpoint %s on %s.", nse.Name, u.String()) + + return &EndpointEntry{Endpoint: ep, URL: u}, nil +} + +// RegisterEndpoint - registers endpoint in the registry client +func (n *Node) RegisterEndpoint(ctx context.Context, nse *registryapi.NetworkServiceEndpoint) error { + return n.registerEndpoint(ctx, nse, n.EndpointRegistryClient) +} +func (n *Node) registerEndpoint(ctx context.Context, nse *registryapi.NetworkServiceEndpoint, registryClient registryapi.NetworkServiceEndpointRegistryClient) error { + var err error for _, nsName := range nse.NetworkServiceNames { if _, err = n.NSRegistryClient.Register(ctx, ®istryapi.NetworkService{ Name: nsName, Payload: payload.IP, }); err != nil { - return nil, err + return err } } var reg *registryapi.NetworkServiceEndpoint if reg, err = registryClient.Register(ctx, nse); err != nil { - return nil, err + return err } nse.Name = reg.Name nse.ExpirationTime = reg.ExpirationTime - log.FromContext(ctx).Infof("Started listen endpoint %s on %s.", nse.Name, u.String()) - - return &EndpointEntry{Endpoint: ep, URL: u}, nil + return nil } // NewClient starts a new client and connects it to the node NSMgr diff --git a/pkg/tools/sandbox/types.go b/pkg/tools/sandbox/types.go index c5800f368..640aa7e45 100644 --- a/pkg/tools/sandbox/types.go +++ b/pkg/tools/sandbox/types.go @@ -46,7 +46,7 @@ type SupplyRegistryFunc func(ctx context.Context, expiryDuration time.Duration, type SupplyRegistryProxyFunc func(ctx context.Context, dnsResolver dnsresolve.Resolver, handlingDNSDomain string, proxyNSMgrURL *url.URL, options ...grpc.DialOption) registry.Registry // SetupNodeFunc setups each node on Builder.Build() stage -type SetupNodeFunc func(ctx context.Context, node *Node) +type SetupNodeFunc func(ctx context.Context, node *Node, config *NodeConfig) // RegistryEntry is pair of registry.Registry and url.URL type RegistryEntry struct { @@ -77,6 +77,19 @@ type Domain struct { resources []context.CancelFunc } +// NodeConfig keeps custom node configuration parameters +type NodeConfig struct { + NsmgrCtx context.Context + NsmgrGenerateTokenFunc token.GeneratorFunc + ForwarderCtx context.Context + ForwarderGenerateTokenFunc token.GeneratorFunc +} + +// AddResources appends resources to the Domain to close it later +func (d *Domain) AddResources(resources []context.CancelFunc) { + d.resources = append(d.resources, resources...) +} + // Cleanup frees all resources related to the domain func (d *Domain) Cleanup() { for _, r := range d.resources {