Skip to content

Commit

Permalink
Update forwarder to use it with an external vpp
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
nsmbot authored and glazychev-art committed Mar 18, 2022
1 parent 16b654b commit 9701306
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 29 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/edwarnicke/grpcfd v1.1.2
github.com/edwarnicke/vpphelper v0.0.0-20210512223648-f914b171f679
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.2.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/networkservicemesh/api v1.2.1-0.20220315001249-f33f8c3f2feb
github.com/networkservicemesh/sdk v0.5.1-0.20220316105041-b88289b9022e
Expand Down
4 changes: 2 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type Config struct {

TunnelIP net.IP `desc:"IP to use for tunnels" split_words:"true"`
VxlanPort uint16 `default:"0" desc:"VXLAN port to use" split_words:"true"`
VppAPISocket string `default:"" desc:"filename of socket to connect to existing VPP instance. If empty a VPP instance is run in forwarder" split_words:"true"`
VppInit vppinit.Func `default:"AF_PACKET" desc:"type of VPP initialization. Must be AF_PACKET or NONE" split_words:"true"`
VppAPISocket string `default:"/var/run/vpp-ext/vpp-api.sock" desc:"filename of socket to connect to existing VPP instance. If empty a VPP instance is run in forwarder" split_words:"true"`
VppInit vppinit.Func `default:"NONE" desc:"type of VPP initialization. Must be NONE or AF_PACKET" split_words:"true"`

ResourcePollTimeout time.Duration `default:"30s" desc:"device plugin polling timeout" split_words:"true"`
DevicePluginPath string `default:"/var/lib/kubelet/device-plugins/" desc:"path to the device plugin directory" split_words:"true"`
Expand Down
4 changes: 4 additions & 0 deletions internal/imports/imports_linux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 113 additions & 0 deletions internal/xconnectns/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build linux

package xconnectns

import (
"net/url"
"time"

"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/cleanup"

"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/vxlan"
"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/stats"
)

type xconnOptions struct {
name string
authorizeServer networkservice.NetworkServiceServer
clientURL *url.URL
dialTimeout time.Duration
domain2Device map[string]string
statsOpts []stats.Option
cleanupOpts []cleanup.Option
vxlanOpts []vxlan.Option
dialOpts []grpc.DialOption
}

// Option is an option pattern for forwarder
type Option func(o *xconnOptions)

// WithName - set a forwarder name
func WithName(name string) Option {
return func(o *xconnOptions) {
o.name = name
}
}

// WithAuthorizeServer sets authorization server chain element
func WithAuthorizeServer(authorizeServer networkservice.NetworkServiceServer) Option {
if authorizeServer == nil {
panic("Authorize server cannot be nil")
}
return func(o *xconnOptions) {
o.authorizeServer = authorizeServer
}
}

// WithClientURL sets clientURL.
func WithClientURL(clientURL *url.URL) Option {
return func(c *xconnOptions) {
c.clientURL = clientURL
}
}

// WithDialTimeout sets dial timeout for the client
func WithDialTimeout(dialTimeout time.Duration) Option {
return func(o *xconnOptions) {
o.dialTimeout = dialTimeout
}
}

// WithVlanDomain2Device sets vlan option
func WithVlanDomain2Device(domain2Device map[string]string) Option {
return func(o *xconnOptions) {
o.domain2Device = domain2Device
}
}

// WithStatsOptions sets stats options
func WithStatsOptions(opts ...stats.Option) Option {
return func(o *xconnOptions) {
o.statsOpts = opts
}
}

// WithCleanupOptions sets cleanup options
func WithCleanupOptions(opts ...cleanup.Option) Option {
return func(o *xconnOptions) {
o.cleanupOpts = opts
}
}

// WithVxlanOptions sets vxlan options
func WithVxlanOptions(opts ...vxlan.Option) Option {
return func(o *xconnOptions) {
o.vxlanOpts = opts
}
}

// WithDialOptions sets dial options
func WithDialOptions(opts ...grpc.DialOption) Option {
return func(o *xconnOptions) {
o.dialOpts = opts
}
}
50 changes: 39 additions & 11 deletions internal/xconnectns/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Copyright (c) 2021 Doc.ai and/or its affiliates.
// Copyright (c) 2021-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -25,7 +27,7 @@ import (
"net/url"
"time"

"google.golang.org/grpc"
"github.com/google/uuid"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
Expand All @@ -40,6 +42,7 @@ import (
sriovtokens "github.com/networkservicemesh/sdk-sriov/pkg/tools/tokens"
vppforwarder "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/chains/forwarder"
"github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/switchcase"
"github.com/networkservicemesh/sdk/pkg/tools/token"
Expand All @@ -48,22 +51,36 @@ import (
// NewServer - returns an implementation of the xconnectns network service
func NewServer(
ctx context.Context,
name string,
authzServer networkservice.NetworkServiceServer,
tokenGenerator token.GeneratorFunc,
vppConn vppforwarder.Connection,
tunnelIP net.IP,
tunnelPort uint16,
pciPool resourcepool.PCIPool,
resourcePool resourcepool.ResourcePool,
sriovConfig *sriovconfig.Config,
deviceMap map[string]string,
vfioDir, cgroupBaseDir string,
clientURL *url.URL,
dialTimeout time.Duration,
clientDialOptions ...grpc.DialOption,
options ...Option,
) endpoint.Endpoint {
vppForwarder := vppforwarder.NewServer(ctx, name, authzServer, tokenGenerator, clientURL, vppConn, tunnelIP, tunnelPort, dialTimeout, deviceMap, clientDialOptions...)
xconnOpts := &xconnOptions{
name: "forwarder-" + uuid.New().String(),
authorizeServer: authorize.NewServer(authorize.Any()),
clientURL: &url.URL{Scheme: "unix", Host: "connect.to.socket"},
dialTimeout: time.Millisecond * 200,
domain2Device: make(map[string]string),
}
for _, opt := range options {
opt(xconnOpts)
}

vppForwarder := vppforwarder.NewServer(ctx, tokenGenerator, vppConn, tunnelIP,
vppforwarder.WithName(xconnOpts.name),
vppforwarder.WithAuthorizeServer(xconnOpts.authorizeServer),
vppforwarder.WithClientURL(xconnOpts.clientURL),
vppforwarder.WithDialTimeout(xconnOpts.dialTimeout),
vppforwarder.WithVlanDomain2Device(xconnOpts.domain2Device),
vppforwarder.WithCleanupOptions(xconnOpts.cleanupOpts...),
vppforwarder.WithStatsOptions(xconnOpts.statsOpts...),
vppforwarder.WithVxlanOptions(xconnOpts.vxlanOpts...),
vppforwarder.WithDialOptions(xconnOpts.dialOpts...))
if sriovConfig == nil {
return vppForwarder
}
Expand Down Expand Up @@ -92,6 +109,17 @@ func NewServer(
})
},
vppForwarder,
sriovforwarder.NewServer(ctx, name, authzServer, tokenGenerator, pciPool, resourcePool, sriovConfig, vfioDir, cgroupBaseDir, clientURL, dialTimeout, clientDialOptions...),
sriovforwarder.NewServer(ctx,
xconnOpts.name,
xconnOpts.authorizeServer,
tokenGenerator,
pciPool,
resourcePool,
sriovConfig,
vfioDir,
cgroupBaseDir,
xconnOpts.clientURL,
xconnOpts.dialTimeout,
xconnOpts.dialOpts...),
)
}
68 changes: 52 additions & 16 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"os"
"os/signal"
"path"
"syscall"
"time"

Expand All @@ -45,6 +46,7 @@ import (
"github.com/networkservicemesh/sdk-sriov/pkg/sriov/resource"
sriovtoken "github.com/networkservicemesh/sdk-sriov/pkg/sriov/token"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/cleanup"
registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client"
"github.com/networkservicemesh/sdk/pkg/registry/common/sendfd"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
Expand All @@ -55,6 +57,9 @@ import (
"github.com/networkservicemesh/sdk/pkg/tools/token"
"github.com/networkservicemesh/sdk/pkg/tools/tracing"

"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/vxlan"
"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/stats"

"github.com/networkservicemesh/cmd-forwarder-vpp/internal/config"
"github.com/networkservicemesh/cmd-forwarder-vpp/internal/devicecfg"
"github.com/networkservicemesh/cmd-forwarder-vpp/internal/vppinit"
Expand Down Expand Up @@ -146,14 +151,30 @@ func main() {

var vppConn vpphelper.Connection
var vppErrCh <-chan error
if cfg.VppAPISocket != "" { // If we have a VppAPISocket, use that
var statsOpts []stats.Option
cleanupDoneCh := make(chan struct{})
cleanupOpts := []cleanup.Option{
cleanup.WithoutGRPCCall(),
}

if fileExists(cfg.VppAPISocket) { // If we have an external VppAPISocket, use that
vppConn = vpphelper.DialContext(ctx, cfg.VppAPISocket)
errCh := make(chan error)
close(errCh)
vppErrCh = errCh
dir, _ := path.Split(cfg.VppAPISocket)
statsOpts = append(statsOpts, stats.WithSocket(path.Join(dir, "stats.sock")))
cleanupOpts = append(cleanupOpts, cleanup.WithDoneChan(cleanupDoneCh))

log.FromContext(ctx).Info("external vpp is being used")
} else { // If we don't have a VPPAPISocket, start VPP and use that
if err = cfg.VppInit.Decode("AF_PACKET"); err != nil {
log.FromContext(ctx).Fatalf("VppInit.Decode error: %v", err)
}
vppConn, vppErrCh = vpphelper.StartAndDialContext(ctx)
exitOnErrCh(ctx, cancel, vppErrCh)
close(cleanupDoneCh)
log.FromContext(ctx).Info("local vpp is being used")
}

log.FromContext(ctx).WithField("duration", time.Since(now)).Info("completed phase 2: run vpp and get a connection to it")
Expand Down Expand Up @@ -192,29 +213,35 @@ func main() {
// ********************************************************************************
now = time.Now()

dialOptions := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(
grpcfd.TransportCredentials(credentials.NewTLS(tlsconfig.MTLSClientConfig(source, source, tlsconfig.AuthorizeAny())))),
grpc.WithDefaultCallOptions(
grpc.PerRPCCredentials(token.NewPerRPCCredentials(spiffejwt.TokenGeneratorFunc(source, cfg.MaxTokenLifetime))),
),
grpcfd.WithChainStreamInterceptor(),
grpcfd.WithChainUnaryInterceptor(),
}

endpoint := xconnectns.NewServer(
ctx,
cfg.Name,
authorize.NewServer(),
spiffejwt.TokenGeneratorFunc(source, cfg.MaxTokenLifetime),
vppConn,
vppinit.Must(cfg.VppInit.Execute(ctx, vppConn, cfg.TunnelIP)),
cfg.VxlanPort,
pciPool,
resourcePool,
sriovConfig,
deviceMap,
cfg.VFIOPath, cfg.CgroupPath,
&cfg.ConnectTo,
cfg.DialTimeout,
grpc.WithBlock(),
grpc.WithTransportCredentials(
grpcfd.TransportCredentials(credentials.NewTLS(tlsconfig.MTLSClientConfig(source, source, tlsconfig.AuthorizeAny())))),
grpc.WithDefaultCallOptions(
grpc.PerRPCCredentials(token.NewPerRPCCredentials(spiffejwt.TokenGeneratorFunc(source, cfg.MaxTokenLifetime))),
),
grpcfd.WithChainStreamInterceptor(),
grpcfd.WithChainUnaryInterceptor(),
xconnectns.WithName(cfg.Name),
xconnectns.WithAuthorizeServer(authorize.NewServer()),
xconnectns.WithVlanDomain2Device(deviceMap),
xconnectns.WithClientURL(&cfg.ConnectTo),
xconnectns.WithDialTimeout(cfg.DialTimeout),
xconnectns.WithStatsOptions(statsOpts...),
xconnectns.WithCleanupOptions(cleanupOpts...),
xconnectns.WithVxlanOptions(vxlan.WithPort(cfg.VxlanPort)),
xconnectns.WithDialOptions(dialOptions...),
)

log.FromContext(ctx).WithField("duration", time.Since(now)).Info("completed phase 7: create xconnect network service endpoint")
Expand Down Expand Up @@ -276,10 +303,11 @@ func main() {

log.FromContext(ctx).Infof("Startup completed in %v", time.Since(starttime))

// TODO - cleaner shutdown across these three channels
// TODO - cleaner shutdown across these channels
<-ctx.Done()
<-srvErrCh
<-vppErrCh
<-cleanupDoneCh
}

func setupDeviceMap(ctx context.Context, cfg *config.Config) map[string]string {
Expand Down Expand Up @@ -383,3 +411,11 @@ func exitOnErrCh(ctx context.Context, cancel context.CancelFunc, errCh <-chan er
cancel()
}(ctx, errCh)
}

func fileExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}

0 comments on commit 9701306

Please sign in to comment.