Skip to content

Commit

Permalink
add netsvcmonitor chain element
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Tingaikin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin committed Sep 13, 2023
1 parent f96fdf6 commit 45a8479
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 42 deletions.
12 changes: 12 additions & 0 deletions pkg/networkservice/chains/endpoint/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type serverOptions struct {
authorizeServer networkservice.NetworkServiceServer
authorizeMonitorConnectionServer networkservice.MonitorConnectionServer
additionalFunctionality []networkservice.NetworkServiceServer
monitorConnectionServerPtr *networkservice.MonitorConnectionServer
}

// Option modifies server option value
Expand Down Expand Up @@ -105,6 +106,12 @@ func WithAdditionalFunctionality(additionalFunctionality ...networkservice.Netwo
}
}

func WithGettingMonitorConnectionServer(mcs *networkservice.MonitorConnectionServer) Option {
return func(o *serverOptions) {
o.monitorConnectionServerPtr = mcs
}
}

// NewServer - returns a NetworkServiceMesh client as a chain of the standard Client pieces plus whatever
func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options ...Option) Endpoint {
opts := &serverOptions{
Expand All @@ -130,6 +137,11 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
trimpath.NewServer(),
}, opts.additionalFunctionality...)...)
rv.MonitorConnectionServer = next.NewMonitorConnectionServer(opts.authorizeMonitorConnectionServer, mcsPtr)

if opts.monitorConnectionServerPtr != nil {
*opts.monitorConnectionServerPtr = mcsPtr
}

return rv
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/chains/nsmgr/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func requireIPv4Lookup(ctx context.Context, t *testing.T, r *net.Resolver, host,
func Test_DNSUsecase(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*200)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
Expand Down
10 changes: 9 additions & 1 deletion pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/recvfd"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/metrics"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/netsvcmonitor"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry"
registryauthorize "github.com/networkservicemesh/sdk/pkg/registry/common/authorize"
Expand Down Expand Up @@ -291,6 +292,8 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
remoteOrLocalRegistry,
)

var mcs = new(networkservice.MonitorConnectionServer)

// Construct Endpoint
rv.Endpoint = endpoint.NewServer(ctx, tokenGenerator,
endpoint.WithName(opts.name),
Expand All @@ -304,6 +307,11 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
discoverforwarder.WithForwarderServiceName(opts.forwarderServiceName),
discoverforwarder.WithNSMgrURL(opts.url),
),
netsvcmonitor.NewServer(ctx,
registryadapter.NetworkServiceServerToClient(nsRegistry),
registryadapter.NetworkServiceEndpointServerToClient(remoteOrLocalRegistry),
mcs,
),
excludedprefixes.NewServer(ctx),
recvfd.NewServer(), // Receive any files passed
metrics.NewServer(),
Expand All @@ -321,7 +329,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
),
),
sendfd.NewServer()),
)
endpoint.WithGettingMonitorConnectionServer(mcs))

rv.Registry = registry.NewServer(
nsRegistry,
Expand Down
39 changes: 0 additions & 39 deletions pkg/networkservice/common/discover/match_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,47 +22,8 @@ import (
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/matchutils"
)

func matchEndpoint(clockTime clock.Clock, nsLabels map[string]string, ns *registry.NetworkService, nses ...*registry.NetworkServiceEndpoint) []*registry.NetworkServiceEndpoint {
validNetworkServiceEndpoints := validateExpirationTime(clockTime, nses)
// Iterate through the matches
for _, match := range ns.GetMatches() {
// All match source selector labels should be present in the requested labels map
if !matchutils.IsSubset(nsLabels, match.GetSourceSelector(), nsLabels) {
continue
}
nseCandidates := make([]*registry.NetworkServiceEndpoint, 0)
// Check all Destinations in that match
for _, destination := range match.GetRoutes() {
// Each NSE should be matched against that destination
for _, nse := range validNetworkServiceEndpoints {
var candidateNetworkServiceLabels = nse.GetNetworkServiceLabels()[ns.GetName()]
var labels map[string]string
if candidateNetworkServiceLabels != nil {
labels = candidateNetworkServiceLabels.Labels
}
if matchutils.IsSubset(labels, destination.GetDestinationSelector(), nsLabels) {
nseCandidates = append(nseCandidates, nse)
}
}
}

if match.Fallthrough && len(nseCandidates) == 0 {
continue
}

if match.GetMetadata() != nil && len(match.Routes) == 0 && len(nseCandidates) == 0 {
break
}

return nseCandidates
}

return validNetworkServiceEndpoints
}

func validateExpirationTime(clockTime clock.Clock, nses []*registry.NetworkServiceEndpoint) []*registry.NetworkServiceEndpoint {
var validNetworkServiceEndpoints []*registry.NetworkServiceEndpoint
for _, nse := range nses {
Expand Down
3 changes: 2 additions & 1 deletion pkg/networkservice/common/discover/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/tools/clienturlctx"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/matchutils"
)

type discoverCandidatesServer struct {
Expand Down Expand Up @@ -148,7 +149,7 @@ func (d *discoverCandidatesServer) discoverNetworkServiceEndpoints(ctx context.C
}
nseList := registry.ReadNetworkServiceEndpointList(nseRespStream)

result := matchEndpoint(clockTime, nsLabels, ns, nseList...)
result := matchutils.MatchEndpoint(nsLabels, ns, validateExpirationTime(clockTime, nseList)...)
if len(result) != 0 {
return result, nil
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/networkservice/common/netsvcmonitor/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package netsvcmonitor

import (
"context"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)

type cancelFunctionKey struct{}

func storeCancelFunction(ctx context.Context, cancel func()) {
metadata.Map(ctx, false).Store(cancelFunctionKey{}, cancel)
}
func loadCancelFunction(ctx context.Context) (func(), bool) {
v, ok := metadata.Map(ctx, false).Load(cancelFunctionKey{})
if ok {
return v.(func()), true
}
return nil, false
}
164 changes: 164 additions & 0 deletions pkg/networkservice/common/netsvcmonitor/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) 2020-2022 Cisco Systems, Inc.
//
// Copyright (c) 2021-2022 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package monitor provides a NetworkServiceServer chain element to provide a monitor server that reflects
// the connections actually in the NetworkServiceServer
package netsvcmonitor

import (
"context"
"time"

"github.com/golang/protobuf/ptypes/empty"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/matchutils"
)

type Sender interface {
Send(event *networkservice.ConnectionEvent) (err error)
}

type monitorServer struct {
chainCtx context.Context
s *networkservice.MonitorConnectionServer
nsClient registry.NetworkServiceRegistryClient
nseClient registry.NetworkServiceEndpointRegistryClient
}

func NewServer(chainCtx context.Context, nsClient registry.NetworkServiceRegistryClient, nseClient registry.NetworkServiceEndpointRegistryClient, s *networkservice.MonitorConnectionServer) networkservice.NetworkServiceServer {
return &monitorServer{
chainCtx: chainCtx,
nsClient: nsClient,
nseClient: nseClient,
s: s,
}
}

func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
if cancel, ok := loadCancelFunction(ctx); ok {
cancel()
}

resp, err := next.Server(ctx).Request(ctx, request)

if err != nil {
return resp, err
}

var monitorCtx, cancel = context.WithCancel(context.Background())

storeCancelFunction(ctx, cancel)

var logger = log.FromContext(ctx).WithField("monitorServer", "Find")

var monitorNetworkServiceGoroutine func()

monitorNetworkServiceGoroutine = func() {
var errorsLimit = 100
var stream registry.NetworkServiceRegistry_FindClient
var err error
for errorsLimit > 0 {
if monitorCtx.Err() != nil {
return
}
stream, err = m.nsClient.Find(monitorCtx, &registry.NetworkServiceQuery{
Watch: true,
NetworkService: &registry.NetworkService{
Name: resp.GetNetworkService(),
},
})

if err != nil {
logger.Errorf("an error happened during finding network service: %v", err.Error())
errorsLimit--
time.Sleep(time.Millisecond * 100)
continue
}
break
}
if err != nil {
return
}

var networkServiceCh = registry.ReadNetworkServiceChannel(stream)

for {
select {
case <-monitorCtx.Done():
return
case netsvc, ok := <-networkServiceCh:
if !ok {
if monitorCtx.Err() == nil {
//go monitorNetworkServiceGoroutine()
}
return
}
if monitorCtx.Err() != nil {
return
}

nseStream, err := m.nseClient.Find(monitorCtx, &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: resp.GetNetworkServiceEndpointName(),
},
})
if err != nil {
logger.Errorf("an error happened during finding nse: %v", err.Error())
if monitorCtx.Err() == nil {
//go monitorNetworkServiceGoroutine()
}
return
}

var nses = registry.ReadNetworkServiceEndpointList(nseStream)

if len(matchutils.MatchEndpoint(resp.GetLabels(), netsvc.NetworkService, nses...)) == 0 {
if v, ok := (*m.s).(Sender); ok {
v.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_DELETE,
Connections: map[string]*networkservice.Connection{
resp.GetId(): request.GetConnection(),
},
})
}

}
}
time.Sleep(time.Millisecond * 50)
}

}

go monitorNetworkServiceGoroutine()

return resp, err

}

func (m *monitorServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
if cancel, ok := loadCancelFunction(ctx); ok {
cancel()
}

return next.Server(ctx).Close(ctx, conn)
}
36 changes: 36 additions & 0 deletions pkg/tools/matchutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,42 @@ import (
"github.com/networkservicemesh/api/pkg/api/registry"
)

func MatchEndpoint(nsLabels map[string]string, ns *registry.NetworkService, nses ...*registry.NetworkServiceEndpoint) []*registry.NetworkServiceEndpoint {
for _, match := range ns.GetMatches() {
// All match source selector labels should be present in the requested labels map
if !IsSubset(nsLabels, match.GetSourceSelector(), nsLabels) {
continue
}
nseCandidates := make([]*registry.NetworkServiceEndpoint, 0)
// Check all Destinations in that match
for _, destination := range match.GetRoutes() {
// Each NSE should be matched against that destination
for _, nse := range nses {
var candidateNetworkServiceLabels = nse.GetNetworkServiceLabels()[ns.GetName()]
var labels map[string]string
if candidateNetworkServiceLabels != nil {
labels = candidateNetworkServiceLabels.Labels
}
if IsSubset(labels, destination.GetDestinationSelector(), nsLabels) {
nseCandidates = append(nseCandidates, nse)
}
}
}

if match.Fallthrough && len(nseCandidates) == 0 {
continue
}

if match.GetMetadata() != nil && len(match.Routes) == 0 && len(nseCandidates) == 0 {
break
}

return nseCandidates
}

return nses
}

// MatchNetworkServices returns true if two network services are matched
func MatchNetworkServices(left, right *registry.NetworkService) bool {
return (left.Name == "" || strings.Contains(right.Name, left.Name)) &&
Expand Down

0 comments on commit 45a8479

Please sign in to comment.