Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BGP: Make TCP listener VRF aware #407

Merged
merged 1 commit into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions cmd/bio-rd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,17 @@ func main() {
os.Exit(1)
}

bgpSrv = bgpserver.NewBGPServer(
startCfg.RoutingOptions.RouterIDUint32,
[]string{
listenAddrsByVRF := map[string][]string{
vrf.DefaultVRFName: {
"[::]:179",
"0.0.0.0:179",
},
}

bgpSrv = bgpserver.NewBGPServer(
startCfg.RoutingOptions.RouterIDUint32,
vrfReg.CreateVRFIfNotExists(vrf.DefaultVRFName, 0),
listenAddrsByVRF,
)

err = bgpSrv.Start()
Expand All @@ -75,13 +80,11 @@ func main() {
os.Exit(1)
}

vrfReg.CreateVRFIfNotExists(vrf.DefaultVRFName, 0)

go configReloader()
sigHUP <- syscall.SIGHUP
installSignalHandler()

s := bgpserver.NewBGPAPIServer(bgpSrv)
s := bgpserver.NewBGPAPIServer(bgpSrv, vrfReg)
isisAPISrv := isisserver.NewISISAPIServer(isisSrv)
unaryInterceptors := []grpc.UnaryServerInterceptor{}
streamInterceptors := []grpc.StreamServerInterceptor{}
Expand Down Expand Up @@ -135,7 +138,6 @@ func configReloader() {
}

func loadConfig(cfg *config.Config) error {

for _, ri := range cfg.RoutingInstances {
err := configureRoutingInstance(ri)
_ = err
Expand Down Expand Up @@ -167,41 +169,41 @@ func configureProtocolsBGP(bgp *config.BGP) error {
found := false
for _, g := range bgp.Groups {
for _, n := range g.Neighbors {
if n.PeerAddressIP == p {
if n.PeerAddressIP == p.Addr() && p.VRF() == bgpSrv.GetDefaultVRF() {
found = true
break
}
}
}

if !found {
bgpSrv.DisposePeer(p)
bgpSrv.DisposePeer(bgpSrv.GetDefaultVRF(), p.Addr())
}
}

// Tear down peers that need new sessions as they changed too significantly
for _, g := range bgp.Groups {
for _, n := range g.Neighbors {
newCfg := BGPPeerConfig(n, vrfReg.GetVRFByName(vrf.DefaultVRFName))
oldCfg := bgpSrv.GetPeerConfig(n.PeerAddressIP)
newCfg := BGPPeerConfig(n, bgpSrv.GetDefaultVRF())
oldCfg := bgpSrv.GetPeerConfig(bgpSrv.GetDefaultVRF(), n.PeerAddressIP)
if oldCfg == nil {
continue
}

if !oldCfg.NeedsRestart(newCfg) {
bgpSrv.ReplaceImportFilterChain(n.PeerAddressIP, newCfg.IPv4.ImportFilterChain)
bgpSrv.ReplaceExportFilterChain(n.PeerAddressIP, newCfg.IPv4.ExportFilterChain)
bgpSrv.ReplaceImportFilterChain(bgpSrv.GetDefaultVRF(), n.PeerAddressIP, newCfg.IPv4.ImportFilterChain)
bgpSrv.ReplaceExportFilterChain(bgpSrv.GetDefaultVRF(), n.PeerAddressIP, newCfg.IPv4.ExportFilterChain)
continue
}

bgpSrv.DisposePeer(oldCfg.PeerAddress)
bgpSrv.DisposePeer(bgpSrv.GetDefaultVRF(), oldCfg.PeerAddress)
}
}

// Turn up all sessions that are missing
for _, g := range bgp.Groups {
for _, n := range g.Neighbors {
if bgpSrv.GetPeerConfig(n.PeerAddressIP) != nil {
if bgpSrv.GetPeerConfig(bgpSrv.GetDefaultVRF(), n.PeerAddressIP) != nil {
continue
}

Expand Down
11 changes: 6 additions & 5 deletions examples/bgp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ func main() {
listen = append(listen, "[::]:179")
}

b := server.NewBGPServer(0, listen)
v, err := vrf.New("master", 0)
v, err := vrf.New(vrf.DefaultVRFName, 0)
if err != nil {
logrus.Fatal(err)
}

b := server.NewBGPServer(0, v, map[string][]string{vrf.DefaultVRFName: listen})

go startMetricsEndpoint(b)
go startAPIEndpoint(b)
go startAPIEndpoint(b, vrf.GetGlobalRegistry())

if err := b.Start(); err != nil {
log.Fatalf("Unable to start BGP server: %v", err)
Expand All @@ -69,8 +70,8 @@ func startMetricsEndpoint(server server.BGPServer) {
logrus.Error(http.ListenAndServe(":8080", nil))
}

func startAPIEndpoint(b server.BGPServer) {
apiSrv := server.NewBGPAPIServer(b)
func startAPIEndpoint(b server.BGPServer, vrfReg *vrf.VRFRegistry) {
apiSrv := server.NewBGPAPIServer(b, vrfReg)

lis, err := net.Listen("tcp", ":1337")
if err != nil {
Expand Down
65 changes: 62 additions & 3 deletions net/tcp/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,33 @@ import (
"fmt"
"net"

"github.com/bio-routing/bio-rd/routingtable/vrf"
"golang.org/x/sys/unix"
)

type ListenerFactoryI interface {
NewListener(v *vrf.VRF, laddr *net.TCPAddr, ttl uint8) (ListenerI, error)
}

type ListenerFactory struct{}

func NewListenerFactory() *ListenerFactory {
return &ListenerFactory{}
}

type ListenerI interface {
SetTCPMD5(peerAddr net.IP, secret string) error
AcceptTCP() (ConnI, error)
}

// Listener listens for TCP clients
type Listener struct {
fd int
laddr *net.TCPAddr
}

// Listen starts a TCPListener
func Listen(laddr *net.TCPAddr, ttl uint8) (*Listener, error) {
// NewListener starts a TCPListener
func (lf *ListenerFactory) NewListener(v *vrf.VRF, laddr *net.TCPAddr, ttl uint8) (ListenerI, error) {
l := &Listener{
laddr: laddr,
}
Expand Down Expand Up @@ -52,6 +68,14 @@ func Listen(laddr *net.TCPAddr, ttl uint8) (*Listener, error) {
}
}

if v.Name() != vrf.DefaultVRFName {
err = unix.SetsockoptString(fd, SOL_IP, unix.SO_BINDTODEVICE, v.Name())
if err != nil {
unix.Close(fd)
return nil, fmt.Errorf("unable to set SO_BINDTODEVICE (%s): %v", v.Name(), err)
}
}

if laddr.IP.To4() != nil {
err = unix.Bind(fd, &unix.SockaddrInet4{
Port: laddr.Port,
Expand Down Expand Up @@ -93,7 +117,7 @@ func (l *Listener) SetTCPMD5(peerAddr net.IP, secret string) error {
}

// AcceptTCP accepts a new TCP connection
func (l *Listener) AcceptTCP() (*Conn, error) {
func (l *Listener) AcceptTCP() (ConnI, error) {
fd, sa, err := unix.Accept(l.fd)
if err != nil {
return nil, err
Expand All @@ -120,3 +144,38 @@ func (l *Listener) AcceptTCP() (*Conn, error) {
raddr: raddr,
}, nil
}

type MockListenerFactory struct{}

func NewMockListenerFactory() *MockListenerFactory {
return &MockListenerFactory{}
}

type MockListener struct {
localAddr net.IP
localPort uint16
connCh chan *MockConn
}

func (lf *MockListenerFactory) NewMockListener(localAddr net.IP, localPort uint16) *MockListener {
return &MockListener{
localAddr: localAddr,
localPort: localPort,
connCh: make(chan *MockConn, 100),
}
}

func (l *MockListener) SetTCPMD5(peerAddr net.IP, secret string) error {
return nil
}

// AcceptTCP accepts a new TCP connection
func (l *MockListener) AcceptTCP() (ConnI, error) {
return <-l.connCh, nil
}

func (l *MockListener) Connect(addr net.IP, port uint16) *MockConn {
mc := NewMockConn(l.localAddr, l.localPort, addr, port)
l.connCh <- mc
return mc
}
145 changes: 145 additions & 0 deletions net/tcp/listener_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package tcp

import (
"fmt"
"net"
"sync"

"github.com/bio-routing/bio-rd/routingtable/vrf"
"github.com/bio-routing/bio-rd/util/log"
)

type ConnWithVRF struct {
Conn net.Conn
VRF *vrf.VRF
}

type ListenerManagerI interface {
ListenAddrsPerVRF(vrf *vrf.VRF) []string
GetListeners(v *vrf.VRF) []ListenerI
CreateListenersIfNotExists(v *vrf.VRF) error
AcceptCh() chan ConnWithVRF
}

type ListenerManager struct {
listenAddrsByVRF map[string][]string
listenersByVRF map[string][]ListenerI
listenersByVRFmu sync.RWMutex
acceptCh chan ConnWithVRF
listenerFactory ListenerFactoryI
}

func NewListenerManager(listenAddrsByVRF map[string][]string) *ListenerManager {
return &ListenerManager{
listenAddrsByVRF: listenAddrsByVRF,
listenersByVRF: make(map[string][]ListenerI),
listenerFactory: NewListenerFactory(),
}
}

func (lm *ListenerManager) ListenAddrsPerVRF(vrf *vrf.VRF) []string {
return lm.listenAddrsByVRF[vrf.Name()]
BarbarossaTM marked this conversation as resolved.
Show resolved Hide resolved
}

func (lm *ListenerManager) GetListeners(v *vrf.VRF) []ListenerI {
lm.listenersByVRFmu.Lock()
defer lm.listenersByVRFmu.Unlock()

ret := make([]ListenerI, 0)
for _, l := range lm.listenersByVRF[v.Name()] {
ret = append(ret, l)
}

return ret
}

func (lm *ListenerManager) CreateListenersIfNotExists(v *vrf.VRF) error {
lm.listenersByVRFmu.Lock()
defer lm.listenersByVRFmu.Unlock()

if _, exists := lm.listenersByVRF[v.Name()]; exists {
return nil
}

err := lm._createListeners(v)
if err != nil {
return fmt.Errorf("unable to create listeners: %v", err)
}

return nil
}

func (lm *ListenerManager) _createListeners(v *vrf.VRF) error {
for _, addr := range lm.ListenAddrsPerVRF(v) {
err := lm._addListener(v, addr, lm.acceptCh)
if err != nil {
return fmt.Errorf("unable to create TCP listener %q vrf %s: %v", addr, v.Name(), err)
}
}

return nil
}

// newListener creates a new Listener
func (lm *ListenerManager) _addListener(vrf *vrf.VRF, addr string, ch chan ConnWithVRF) error {
tcpaddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return err
}

log.Infof("Listener manager: Starting TCP listener on %s in VRF %s", addr, vrf.Name())
l, err := lm.listenerFactory.NewListener(vrf, tcpaddr, 255)
if err != nil {
return err
}

lm._add(vrf, l)

go func(tl ListenerI) error {
defer lm.dropListener(vrf, tl)

for {
conn, err := l.AcceptTCP()
if err != nil {
log.WithError(err).WithFields(log.Fields{
"Topic": "Peer",
}).Error("Failed to AcceptTCP")
return err
}

ch <- ConnWithVRF{
Conn: conn,
VRF: vrf,
}
}
}(l)

return nil
}

// _add is to be called with the mutex acquired
func (lm *ListenerManager) _add(vrf *vrf.VRF, l ListenerI) {
if _, exists := lm.listenersByVRF[vrf.Name()]; !exists {
lm.listenersByVRF[vrf.Name()] = make([]ListenerI, 0)
}

lm.listenersByVRF[vrf.Name()] = append(lm.listenersByVRF[vrf.Name()], l)
}

func (lm *ListenerManager) dropListener(vrf *vrf.VRF, l ListenerI) {
lm.listenersByVRFmu.Lock()
defer lm.listenersByVRFmu.Unlock()

vrfName := vrf.Name()
listeners := lm.listenersByVRF[vrfName]
for i, x := range listeners {
if x == l {
lm.listenersByVRF[vrfName] = append(listeners[:i], listeners[i+1:]...)
return
}
}
}

func (lm *ListenerManager) AcceptCh() chan ConnWithVRF {
return lm.acceptCh
}
3 changes: 3 additions & 0 deletions net/tcp/ports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package tcp

const BGPPORT = 179
Loading