Skip to content

Commit

Permalink
BGP: Make TCP listener VRF aware (#407)
Browse files Browse the repository at this point in the history
  • Loading branch information
taktv6 authored Jan 9, 2023
1 parent 8269abc commit 133fb10
Show file tree
Hide file tree
Showing 18 changed files with 549 additions and 216 deletions.
32 changes: 17 additions & 15 deletions cmd/bio-rd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,17 @@ func main() {
os.Exit(1)
}

bgpSrv = bgpserver.NewBGPServer(
startCfg.RoutingOptions.RouterIDUint32,
[]string{
listenAddrsByVRF := map[string][]string{
vrf.DefaultVRFName: {
"[::]:179",
"0.0.0.0:179",
},
}

bgpSrv = bgpserver.NewBGPServer(
startCfg.RoutingOptions.RouterIDUint32,
vrfReg.CreateVRFIfNotExists(vrf.DefaultVRFName, 0),
listenAddrsByVRF,
)

err = bgpSrv.Start()
Expand All @@ -75,13 +80,11 @@ func main() {
os.Exit(1)
}

vrfReg.CreateVRFIfNotExists(vrf.DefaultVRFName, 0)

go configReloader()
sigHUP <- syscall.SIGHUP
installSignalHandler()

s := bgpserver.NewBGPAPIServer(bgpSrv)
s := bgpserver.NewBGPAPIServer(bgpSrv, vrfReg)
isisAPISrv := isisserver.NewISISAPIServer(isisSrv)
unaryInterceptors := []grpc.UnaryServerInterceptor{}
streamInterceptors := []grpc.StreamServerInterceptor{}
Expand Down Expand Up @@ -135,7 +138,6 @@ func configReloader() {
}

func loadConfig(cfg *config.Config) error {

for _, ri := range cfg.RoutingInstances {
err := configureRoutingInstance(ri)
_ = err
Expand Down Expand Up @@ -167,41 +169,41 @@ func configureProtocolsBGP(bgp *config.BGP) error {
found := false
for _, g := range bgp.Groups {
for _, n := range g.Neighbors {
if n.PeerAddressIP == p {
if n.PeerAddressIP == p.Addr() && p.VRF() == bgpSrv.GetDefaultVRF() {
found = true
break
}
}
}

if !found {
bgpSrv.DisposePeer(p)
bgpSrv.DisposePeer(bgpSrv.GetDefaultVRF(), p.Addr())
}
}

// Tear down peers that need new sessions as they changed too significantly
for _, g := range bgp.Groups {
for _, n := range g.Neighbors {
newCfg := BGPPeerConfig(n, vrfReg.GetVRFByName(vrf.DefaultVRFName))
oldCfg := bgpSrv.GetPeerConfig(n.PeerAddressIP)
newCfg := BGPPeerConfig(n, bgpSrv.GetDefaultVRF())
oldCfg := bgpSrv.GetPeerConfig(bgpSrv.GetDefaultVRF(), n.PeerAddressIP)
if oldCfg == nil {
continue
}

if !oldCfg.NeedsRestart(newCfg) {
bgpSrv.ReplaceImportFilterChain(n.PeerAddressIP, newCfg.IPv4.ImportFilterChain)
bgpSrv.ReplaceExportFilterChain(n.PeerAddressIP, newCfg.IPv4.ExportFilterChain)
bgpSrv.ReplaceImportFilterChain(bgpSrv.GetDefaultVRF(), n.PeerAddressIP, newCfg.IPv4.ImportFilterChain)
bgpSrv.ReplaceExportFilterChain(bgpSrv.GetDefaultVRF(), n.PeerAddressIP, newCfg.IPv4.ExportFilterChain)
continue
}

bgpSrv.DisposePeer(oldCfg.PeerAddress)
bgpSrv.DisposePeer(bgpSrv.GetDefaultVRF(), oldCfg.PeerAddress)
}
}

// Turn up all sessions that are missing
for _, g := range bgp.Groups {
for _, n := range g.Neighbors {
if bgpSrv.GetPeerConfig(n.PeerAddressIP) != nil {
if bgpSrv.GetPeerConfig(bgpSrv.GetDefaultVRF(), n.PeerAddressIP) != nil {
continue
}

Expand Down
11 changes: 6 additions & 5 deletions examples/bgp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ func main() {
listen = append(listen, "[::]:179")
}

b := server.NewBGPServer(0, listen)
v, err := vrf.New("master", 0)
v, err := vrf.New(vrf.DefaultVRFName, 0)
if err != nil {
logrus.Fatal(err)
}

b := server.NewBGPServer(0, v, map[string][]string{vrf.DefaultVRFName: listen})

go startMetricsEndpoint(b)
go startAPIEndpoint(b)
go startAPIEndpoint(b, vrf.GetGlobalRegistry())

if err := b.Start(); err != nil {
log.Fatalf("Unable to start BGP server: %v", err)
Expand All @@ -69,8 +70,8 @@ func startMetricsEndpoint(server server.BGPServer) {
logrus.Error(http.ListenAndServe(":8080", nil))
}

func startAPIEndpoint(b server.BGPServer) {
apiSrv := server.NewBGPAPIServer(b)
func startAPIEndpoint(b server.BGPServer, vrfReg *vrf.VRFRegistry) {
apiSrv := server.NewBGPAPIServer(b, vrfReg)

lis, err := net.Listen("tcp", ":1337")
if err != nil {
Expand Down
65 changes: 62 additions & 3 deletions net/tcp/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,33 @@ import (
"fmt"
"net"

"github.com/bio-routing/bio-rd/routingtable/vrf"
"golang.org/x/sys/unix"
)

type ListenerFactoryI interface {
NewListener(v *vrf.VRF, laddr *net.TCPAddr, ttl uint8) (ListenerI, error)
}

type ListenerFactory struct{}

func NewListenerFactory() *ListenerFactory {
return &ListenerFactory{}
}

type ListenerI interface {
SetTCPMD5(peerAddr net.IP, secret string) error
AcceptTCP() (ConnI, error)
}

// Listener listens for TCP clients
type Listener struct {
fd int
laddr *net.TCPAddr
}

// Listen starts a TCPListener
func Listen(laddr *net.TCPAddr, ttl uint8) (*Listener, error) {
// NewListener starts a TCPListener
func (lf *ListenerFactory) NewListener(v *vrf.VRF, laddr *net.TCPAddr, ttl uint8) (ListenerI, error) {
l := &Listener{
laddr: laddr,
}
Expand Down Expand Up @@ -52,6 +68,14 @@ func Listen(laddr *net.TCPAddr, ttl uint8) (*Listener, error) {
}
}

if v.Name() != vrf.DefaultVRFName {
err = unix.SetsockoptString(fd, SOL_IP, unix.SO_BINDTODEVICE, v.Name())
if err != nil {
unix.Close(fd)
return nil, fmt.Errorf("unable to set SO_BINDTODEVICE (%s): %v", v.Name(), err)
}
}

if laddr.IP.To4() != nil {
err = unix.Bind(fd, &unix.SockaddrInet4{
Port: laddr.Port,
Expand Down Expand Up @@ -93,7 +117,7 @@ func (l *Listener) SetTCPMD5(peerAddr net.IP, secret string) error {
}

// AcceptTCP accepts a new TCP connection
func (l *Listener) AcceptTCP() (*Conn, error) {
func (l *Listener) AcceptTCP() (ConnI, error) {
fd, sa, err := unix.Accept(l.fd)
if err != nil {
return nil, err
Expand All @@ -120,3 +144,38 @@ func (l *Listener) AcceptTCP() (*Conn, error) {
raddr: raddr,
}, nil
}

type MockListenerFactory struct{}

func NewMockListenerFactory() *MockListenerFactory {
return &MockListenerFactory{}
}

type MockListener struct {
localAddr net.IP
localPort uint16
connCh chan *MockConn
}

func (lf *MockListenerFactory) NewMockListener(localAddr net.IP, localPort uint16) *MockListener {
return &MockListener{
localAddr: localAddr,
localPort: localPort,
connCh: make(chan *MockConn, 100),
}
}

func (l *MockListener) SetTCPMD5(peerAddr net.IP, secret string) error {
return nil
}

// AcceptTCP accepts a new TCP connection
func (l *MockListener) AcceptTCP() (ConnI, error) {
return <-l.connCh, nil
}

func (l *MockListener) Connect(addr net.IP, port uint16) *MockConn {
mc := NewMockConn(l.localAddr, l.localPort, addr, port)
l.connCh <- mc
return mc
}
145 changes: 145 additions & 0 deletions net/tcp/listener_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package tcp

import (
"fmt"
"net"
"sync"

"github.com/bio-routing/bio-rd/routingtable/vrf"
"github.com/bio-routing/bio-rd/util/log"
)

type ConnWithVRF struct {
Conn net.Conn
VRF *vrf.VRF
}

type ListenerManagerI interface {
ListenAddrsPerVRF(vrf *vrf.VRF) []string
GetListeners(v *vrf.VRF) []ListenerI
CreateListenersIfNotExists(v *vrf.VRF) error
AcceptCh() chan ConnWithVRF
}

type ListenerManager struct {
listenAddrsByVRF map[string][]string
listenersByVRF map[string][]ListenerI
listenersByVRFmu sync.RWMutex
acceptCh chan ConnWithVRF
listenerFactory ListenerFactoryI
}

func NewListenerManager(listenAddrsByVRF map[string][]string) *ListenerManager {
return &ListenerManager{
listenAddrsByVRF: listenAddrsByVRF,
listenersByVRF: make(map[string][]ListenerI),
listenerFactory: NewListenerFactory(),
}
}

func (lm *ListenerManager) ListenAddrsPerVRF(vrf *vrf.VRF) []string {
return lm.listenAddrsByVRF[vrf.Name()]
}

func (lm *ListenerManager) GetListeners(v *vrf.VRF) []ListenerI {
lm.listenersByVRFmu.Lock()
defer lm.listenersByVRFmu.Unlock()

ret := make([]ListenerI, 0)
for _, l := range lm.listenersByVRF[v.Name()] {
ret = append(ret, l)
}

return ret
}

func (lm *ListenerManager) CreateListenersIfNotExists(v *vrf.VRF) error {
lm.listenersByVRFmu.Lock()
defer lm.listenersByVRFmu.Unlock()

if _, exists := lm.listenersByVRF[v.Name()]; exists {
return nil
}

err := lm._createListeners(v)
if err != nil {
return fmt.Errorf("unable to create listeners: %v", err)
}

return nil
}

func (lm *ListenerManager) _createListeners(v *vrf.VRF) error {
for _, addr := range lm.ListenAddrsPerVRF(v) {
err := lm._addListener(v, addr, lm.acceptCh)
if err != nil {
return fmt.Errorf("unable to create TCP listener %q vrf %s: %v", addr, v.Name(), err)
}
}

return nil
}

// newListener creates a new Listener
func (lm *ListenerManager) _addListener(vrf *vrf.VRF, addr string, ch chan ConnWithVRF) error {
tcpaddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return err
}

log.Infof("Listener manager: Starting TCP listener on %s in VRF %s", addr, vrf.Name())
l, err := lm.listenerFactory.NewListener(vrf, tcpaddr, 255)
if err != nil {
return err
}

lm._add(vrf, l)

go func(tl ListenerI) error {
defer lm.dropListener(vrf, tl)

for {
conn, err := l.AcceptTCP()
if err != nil {
log.WithError(err).WithFields(log.Fields{
"Topic": "Peer",
}).Error("Failed to AcceptTCP")
return err
}

ch <- ConnWithVRF{
Conn: conn,
VRF: vrf,
}
}
}(l)

return nil
}

// _add is to be called with the mutex acquired
func (lm *ListenerManager) _add(vrf *vrf.VRF, l ListenerI) {
if _, exists := lm.listenersByVRF[vrf.Name()]; !exists {
lm.listenersByVRF[vrf.Name()] = make([]ListenerI, 0)
}

lm.listenersByVRF[vrf.Name()] = append(lm.listenersByVRF[vrf.Name()], l)
}

func (lm *ListenerManager) dropListener(vrf *vrf.VRF, l ListenerI) {
lm.listenersByVRFmu.Lock()
defer lm.listenersByVRFmu.Unlock()

vrfName := vrf.Name()
listeners := lm.listenersByVRF[vrfName]
for i, x := range listeners {
if x == l {
lm.listenersByVRF[vrfName] = append(listeners[:i], listeners[i+1:]...)
return
}
}
}

func (lm *ListenerManager) AcceptCh() chan ConnWithVRF {
return lm.acceptCh
}
3 changes: 3 additions & 0 deletions net/tcp/ports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package tcp

const BGPPORT = 179
Loading

0 comments on commit 133fb10

Please sign in to comment.