diff --git a/cmd/bio-rd/main.go b/cmd/bio-rd/main.go index 082992e4..0127d5da 100644 --- a/cmd/bio-rd/main.go +++ b/cmd/bio-rd/main.go @@ -61,12 +61,17 @@ func main() { os.Exit(1) } - bgpSrv = bgpserver.NewBGPServer( - startCfg.RoutingOptions.RouterIDUint32, - []string{ + listenAddrsByVRF := map[string][]string{ + vrf.DefaultVRFName: { "[::]:179", "0.0.0.0:179", }, + } + + bgpSrv = bgpserver.NewBGPServer( + startCfg.RoutingOptions.RouterIDUint32, + vrfReg.CreateVRFIfNotExists(vrf.DefaultVRFName, 0), + listenAddrsByVRF, ) err = bgpSrv.Start() @@ -75,13 +80,11 @@ func main() { os.Exit(1) } - vrfReg.CreateVRFIfNotExists(vrf.DefaultVRFName, 0) - go configReloader() sigHUP <- syscall.SIGHUP installSignalHandler() - s := bgpserver.NewBGPAPIServer(bgpSrv) + s := bgpserver.NewBGPAPIServer(bgpSrv, vrfReg) isisAPISrv := isisserver.NewISISAPIServer(isisSrv) unaryInterceptors := []grpc.UnaryServerInterceptor{} streamInterceptors := []grpc.StreamServerInterceptor{} @@ -135,7 +138,6 @@ func configReloader() { } func loadConfig(cfg *config.Config) error { - for _, ri := range cfg.RoutingInstances { err := configureRoutingInstance(ri) _ = err @@ -167,7 +169,7 @@ func configureProtocolsBGP(bgp *config.BGP) error { found := false for _, g := range bgp.Groups { for _, n := range g.Neighbors { - if n.PeerAddressIP == p { + if n.PeerAddressIP == p.Addr() && p.VRF() == bgpSrv.GetDefaultVRF() { found = true break } @@ -175,33 +177,33 @@ func configureProtocolsBGP(bgp *config.BGP) error { } if !found { - bgpSrv.DisposePeer(p) + bgpSrv.DisposePeer(bgpSrv.GetDefaultVRF(), p.Addr()) } } // Tear down peers that need new sessions as they changed too significantly for _, g := range bgp.Groups { for _, n := range g.Neighbors { - newCfg := BGPPeerConfig(n, vrfReg.GetVRFByName(vrf.DefaultVRFName)) - oldCfg := bgpSrv.GetPeerConfig(n.PeerAddressIP) + newCfg := BGPPeerConfig(n, bgpSrv.GetDefaultVRF()) + oldCfg := bgpSrv.GetPeerConfig(bgpSrv.GetDefaultVRF(), n.PeerAddressIP) if oldCfg == nil { continue } if !oldCfg.NeedsRestart(newCfg) { - bgpSrv.ReplaceImportFilterChain(n.PeerAddressIP, newCfg.IPv4.ImportFilterChain) - bgpSrv.ReplaceExportFilterChain(n.PeerAddressIP, newCfg.IPv4.ExportFilterChain) + bgpSrv.ReplaceImportFilterChain(bgpSrv.GetDefaultVRF(), n.PeerAddressIP, newCfg.IPv4.ImportFilterChain) + bgpSrv.ReplaceExportFilterChain(bgpSrv.GetDefaultVRF(), n.PeerAddressIP, newCfg.IPv4.ExportFilterChain) continue } - bgpSrv.DisposePeer(oldCfg.PeerAddress) + bgpSrv.DisposePeer(bgpSrv.GetDefaultVRF(), oldCfg.PeerAddress) } } // Turn up all sessions that are missing for _, g := range bgp.Groups { for _, n := range g.Neighbors { - if bgpSrv.GetPeerConfig(n.PeerAddressIP) != nil { + if bgpSrv.GetPeerConfig(bgpSrv.GetDefaultVRF(), n.PeerAddressIP) != nil { continue } diff --git a/examples/bgp/main.go b/examples/bgp/main.go index 0f06247a..32adf9fc 100644 --- a/examples/bgp/main.go +++ b/examples/bgp/main.go @@ -35,14 +35,15 @@ func main() { listen = append(listen, "[::]:179") } - b := server.NewBGPServer(0, listen) - v, err := vrf.New("master", 0) + v, err := vrf.New(vrf.DefaultVRFName, 0) if err != nil { logrus.Fatal(err) } + b := server.NewBGPServer(0, v, map[string][]string{vrf.DefaultVRFName: listen}) + go startMetricsEndpoint(b) - go startAPIEndpoint(b) + go startAPIEndpoint(b, vrf.GetGlobalRegistry()) if err := b.Start(); err != nil { log.Fatalf("Unable to start BGP server: %v", err) @@ -69,8 +70,8 @@ func startMetricsEndpoint(server server.BGPServer) { logrus.Error(http.ListenAndServe(":8080", nil)) } -func startAPIEndpoint(b server.BGPServer) { - apiSrv := server.NewBGPAPIServer(b) +func startAPIEndpoint(b server.BGPServer, vrfReg *vrf.VRFRegistry) { + apiSrv := server.NewBGPAPIServer(b, vrfReg) lis, err := net.Listen("tcp", ":1337") if err != nil { diff --git a/net/tcp/listen.go b/net/tcp/listen.go index f6d88d3c..f91c0e84 100644 --- a/net/tcp/listen.go +++ b/net/tcp/listen.go @@ -4,17 +4,33 @@ import ( "fmt" "net" + "github.com/bio-routing/bio-rd/routingtable/vrf" "golang.org/x/sys/unix" ) +type ListenerFactoryI interface { + NewListener(v *vrf.VRF, laddr *net.TCPAddr, ttl uint8) (ListenerI, error) +} + +type ListenerFactory struct{} + +func NewListenerFactory() *ListenerFactory { + return &ListenerFactory{} +} + +type ListenerI interface { + SetTCPMD5(peerAddr net.IP, secret string) error + AcceptTCP() (ConnI, error) +} + // Listener listens for TCP clients type Listener struct { fd int laddr *net.TCPAddr } -// Listen starts a TCPListener -func Listen(laddr *net.TCPAddr, ttl uint8) (*Listener, error) { +// NewListener starts a TCPListener +func (lf *ListenerFactory) NewListener(v *vrf.VRF, laddr *net.TCPAddr, ttl uint8) (ListenerI, error) { l := &Listener{ laddr: laddr, } @@ -52,6 +68,14 @@ func Listen(laddr *net.TCPAddr, ttl uint8) (*Listener, error) { } } + if v.Name() != vrf.DefaultVRFName { + err = unix.SetsockoptString(fd, SOL_IP, unix.SO_BINDTODEVICE, v.Name()) + if err != nil { + unix.Close(fd) + return nil, fmt.Errorf("unable to set SO_BINDTODEVICE (%s): %v", v.Name(), err) + } + } + if laddr.IP.To4() != nil { err = unix.Bind(fd, &unix.SockaddrInet4{ Port: laddr.Port, @@ -93,7 +117,7 @@ func (l *Listener) SetTCPMD5(peerAddr net.IP, secret string) error { } // AcceptTCP accepts a new TCP connection -func (l *Listener) AcceptTCP() (*Conn, error) { +func (l *Listener) AcceptTCP() (ConnI, error) { fd, sa, err := unix.Accept(l.fd) if err != nil { return nil, err @@ -120,3 +144,38 @@ func (l *Listener) AcceptTCP() (*Conn, error) { raddr: raddr, }, nil } + +type MockListenerFactory struct{} + +func NewMockListenerFactory() *MockListenerFactory { + return &MockListenerFactory{} +} + +type MockListener struct { + localAddr net.IP + localPort uint16 + connCh chan *MockConn +} + +func (lf *MockListenerFactory) NewMockListener(localAddr net.IP, localPort uint16) *MockListener { + return &MockListener{ + localAddr: localAddr, + localPort: localPort, + connCh: make(chan *MockConn, 100), + } +} + +func (l *MockListener) SetTCPMD5(peerAddr net.IP, secret string) error { + return nil +} + +// AcceptTCP accepts a new TCP connection +func (l *MockListener) AcceptTCP() (ConnI, error) { + return <-l.connCh, nil +} + +func (l *MockListener) Connect(addr net.IP, port uint16) *MockConn { + mc := NewMockConn(l.localAddr, l.localPort, addr, port) + l.connCh <- mc + return mc +} diff --git a/net/tcp/listener_manager.go b/net/tcp/listener_manager.go new file mode 100644 index 00000000..8dcc8750 --- /dev/null +++ b/net/tcp/listener_manager.go @@ -0,0 +1,145 @@ +package tcp + +import ( + "fmt" + "net" + "sync" + + "github.com/bio-routing/bio-rd/routingtable/vrf" + "github.com/bio-routing/bio-rd/util/log" +) + +type ConnWithVRF struct { + Conn net.Conn + VRF *vrf.VRF +} + +type ListenerManagerI interface { + ListenAddrsPerVRF(vrf *vrf.VRF) []string + GetListeners(v *vrf.VRF) []ListenerI + CreateListenersIfNotExists(v *vrf.VRF) error + AcceptCh() chan ConnWithVRF +} + +type ListenerManager struct { + listenAddrsByVRF map[string][]string + listenersByVRF map[string][]ListenerI + listenersByVRFmu sync.RWMutex + acceptCh chan ConnWithVRF + listenerFactory ListenerFactoryI +} + +func NewListenerManager(listenAddrsByVRF map[string][]string) *ListenerManager { + return &ListenerManager{ + listenAddrsByVRF: listenAddrsByVRF, + listenersByVRF: make(map[string][]ListenerI), + listenerFactory: NewListenerFactory(), + } +} + +func (lm *ListenerManager) ListenAddrsPerVRF(vrf *vrf.VRF) []string { + return lm.listenAddrsByVRF[vrf.Name()] +} + +func (lm *ListenerManager) GetListeners(v *vrf.VRF) []ListenerI { + lm.listenersByVRFmu.Lock() + defer lm.listenersByVRFmu.Unlock() + + ret := make([]ListenerI, 0) + for _, l := range lm.listenersByVRF[v.Name()] { + ret = append(ret, l) + } + + return ret +} + +func (lm *ListenerManager) CreateListenersIfNotExists(v *vrf.VRF) error { + lm.listenersByVRFmu.Lock() + defer lm.listenersByVRFmu.Unlock() + + if _, exists := lm.listenersByVRF[v.Name()]; exists { + return nil + } + + err := lm._createListeners(v) + if err != nil { + return fmt.Errorf("unable to create listeners: %v", err) + } + + return nil +} + +func (lm *ListenerManager) _createListeners(v *vrf.VRF) error { + for _, addr := range lm.ListenAddrsPerVRF(v) { + err := lm._addListener(v, addr, lm.acceptCh) + if err != nil { + return fmt.Errorf("unable to create TCP listener %q vrf %s: %v", addr, v.Name(), err) + } + } + + return nil +} + +// newListener creates a new Listener +func (lm *ListenerManager) _addListener(vrf *vrf.VRF, addr string, ch chan ConnWithVRF) error { + tcpaddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return err + } + + log.Infof("Listener manager: Starting TCP listener on %s in VRF %s", addr, vrf.Name()) + l, err := lm.listenerFactory.NewListener(vrf, tcpaddr, 255) + if err != nil { + return err + } + + lm._add(vrf, l) + + go func(tl ListenerI) error { + defer lm.dropListener(vrf, tl) + + for { + conn, err := l.AcceptTCP() + if err != nil { + log.WithError(err).WithFields(log.Fields{ + "Topic": "Peer", + }).Error("Failed to AcceptTCP") + return err + } + + ch <- ConnWithVRF{ + Conn: conn, + VRF: vrf, + } + } + }(l) + + return nil +} + +// _add is to be called with the mutex acquired +func (lm *ListenerManager) _add(vrf *vrf.VRF, l ListenerI) { + if _, exists := lm.listenersByVRF[vrf.Name()]; !exists { + lm.listenersByVRF[vrf.Name()] = make([]ListenerI, 0) + } + + lm.listenersByVRF[vrf.Name()] = append(lm.listenersByVRF[vrf.Name()], l) +} + +func (lm *ListenerManager) dropListener(vrf *vrf.VRF, l ListenerI) { + lm.listenersByVRFmu.Lock() + defer lm.listenersByVRFmu.Unlock() + + vrfName := vrf.Name() + listeners := lm.listenersByVRF[vrfName] + for i, x := range listeners { + if x == l { + lm.listenersByVRF[vrfName] = append(listeners[:i], listeners[i+1:]...) + return + } + } +} + +func (lm *ListenerManager) AcceptCh() chan ConnWithVRF { + return lm.acceptCh +} diff --git a/net/tcp/ports.go b/net/tcp/ports.go new file mode 100644 index 00000000..76b328bb --- /dev/null +++ b/net/tcp/ports.go @@ -0,0 +1,3 @@ +package tcp + +const BGPPORT = 179 diff --git a/net/tcp/tcp.go b/net/tcp/tcp.go index f8dde7d0..771c0dc1 100644 --- a/net/tcp/tcp.go +++ b/net/tcp/tcp.go @@ -16,6 +16,21 @@ const ( SOL_IPV6 = 0x29 ) +type ConnI interface { + Write(b []byte) (n int, err error) + Read(b []byte) (n int, err error) + Close() error + LocalAddr() net.Addr + RemoteAddr() net.Addr + SetDeadline(t time.Time) error + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error + SetTTL(ttl uint8) error + SetDontRoute() error + SetNoDelay() error + SetBindToDev(devName string) error +} + // Conn is TCP connection type Conn struct { fd int @@ -117,3 +132,102 @@ func (c *Conn) SetNoDelay() error { func (c *Conn) SetBindToDev(devName string) error { return unix.SetsockoptString(c.fd, unix.IPPROTO_TCP, unix.SO_BINDTODEVICE, devName) } + +// MockConn is mocked TCP connection +type MockConn struct { + chOut chan []byte + chIn chan byte + laddr *net.TCPAddr + raddr *net.TCPAddr + closed bool +} + +func NewMockConn(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16) *MockConn { + return &MockConn{ + chOut: make(chan []byte), + chIn: make(chan byte), + laddr: &net.TCPAddr{ + IP: srcIP, + Port: int(srcPort), + }, + raddr: &net.TCPAddr{ + IP: dstIP, + Port: int(dstPort), + }, + } +} + +// Write writes to a TCP connection +func (c *MockConn) Write(b []byte) (n int, err error) { + if c.closed { + return 0, fmt.Errorf("connection is closed") + } + + c.chOut <- b + return len(b), nil +} + +// Read reads from a TCP connection +func (c *MockConn) Read(b []byte) (n int, err error) { + if c.closed { + return 0, fmt.Errorf("connection is closed") + } + + for i := range b { + b[i] = <-c.chIn + } + + return len(b), nil +} + +func (c *MockConn) WriteFromOtherEnd(b []byte) { + for _, x := range b { + c.chIn <- x + } +} + +func (c *MockConn) ReadFromOtherEnd() []byte { + return <-c.chOut +} + +func (c *MockConn) Close() error { + c.closed = true + return nil +} + +func (c *MockConn) LocalAddr() net.Addr { + return c.laddr +} + +func (c *MockConn) RemoteAddr() net.Addr { + return c.raddr +} + +// SetDeadline is here to fulfill net.Conn interface +func (c *MockConn) SetDeadline(t time.Time) error { + return nil +} + +func (c *MockConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *MockConn) SetWriteDeadline(t time.Time) error { + return nil +} + +func (c *MockConn) SetTTL(ttl uint8) error { + return nil +} + +func (c *MockConn) SetDontRoute() error { + return nil +} + +func (c *MockConn) SetNoDelay() error { + return nil +} + +func (c *MockConn) SetBindToDev(devName string) error { + return nil +} diff --git a/protocols/bgp/api/bgp.pb.go b/protocols/bgp/api/bgp.pb.go index 1f4e1e81..1006ae18 100644 --- a/protocols/bgp/api/bgp.pb.go +++ b/protocols/bgp/api/bgp.pb.go @@ -176,9 +176,10 @@ type DumpRIBRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Peer *api.IP `protobuf:"bytes,1,opt,name=peer,proto3" json:"peer,omitempty"` - Afi uint32 `protobuf:"varint,2,opt,name=afi,proto3" json:"afi,omitempty"` - Safi uint32 `protobuf:"varint,3,opt,name=safi,proto3" json:"safi,omitempty"` + Peer *api.IP `protobuf:"bytes,1,opt,name=peer,proto3" json:"peer,omitempty"` + Afi uint32 `protobuf:"varint,2,opt,name=afi,proto3" json:"afi,omitempty"` + Safi uint32 `protobuf:"varint,3,opt,name=safi,proto3" json:"safi,omitempty"` + VrfName string `protobuf:"bytes,4,opt,name=vrf_name,json=vrfName,proto3" json:"vrf_name,omitempty"` } func (x *DumpRIBRequest) Reset() { @@ -234,6 +235,13 @@ func (x *DumpRIBRequest) GetSafi() uint32 { return 0 } +func (x *DumpRIBRequest) GetVrfName() string { + if x != nil { + return x.VrfName + } + return "" +} + var File_protocols_bgp_api_bgp_proto protoreflect.FileDescriptor var file_protocols_bgp_api_bgp_proto_rawDesc = []byte{ @@ -258,30 +266,31 @@ var file_protocols_bgp_api_bgp_proto_rawDesc = []byte{ 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x08, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x62, 0x69, 0x6f, 0x2e, 0x62, 0x67, 0x70, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x08, - 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x57, 0x0a, 0x0e, 0x44, 0x75, 0x6d, 0x70, + 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x72, 0x0a, 0x0e, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x49, 0x42, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x04, 0x70, 0x65, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x62, 0x69, 0x6f, 0x2e, 0x6e, 0x65, 0x74, 0x2e, 0x49, 0x50, 0x52, 0x04, 0x70, 0x65, 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x66, 0x69, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x61, 0x66, 0x69, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x61, 0x66, 0x69, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x73, 0x61, 0x66, - 0x69, 0x32, 0xd4, 0x01, 0x0a, 0x0a, 0x42, 0x67, 0x70, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x12, 0x4d, 0x0a, 0x0c, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, - 0x12, 0x1c, 0x2e, 0x62, 0x69, 0x6f, 0x2e, 0x62, 0x67, 0x70, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, - 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, - 0x2e, 0x62, 0x69, 0x6f, 0x2e, 0x62, 0x67, 0x70, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x65, 0x73, - 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x3a, 0x0a, 0x09, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x49, 0x42, 0x49, 0x6e, 0x12, 0x17, 0x2e, 0x62, - 0x69, 0x6f, 0x2e, 0x62, 0x67, 0x70, 0x2e, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x49, 0x42, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x62, 0x69, 0x6f, 0x2e, 0x72, 0x6f, 0x75, 0x74, - 0x65, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3b, 0x0a, 0x0a, 0x44, - 0x75, 0x6d, 0x70, 0x52, 0x49, 0x42, 0x4f, 0x75, 0x74, 0x12, 0x17, 0x2e, 0x62, 0x69, 0x6f, 0x2e, - 0x62, 0x67, 0x70, 0x2e, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x49, 0x42, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x62, 0x69, 0x6f, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x52, - 0x6f, 0x75, 0x74, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x69, 0x6f, 0x2d, 0x72, 0x6f, 0x75, 0x74, 0x69, - 0x6e, 0x67, 0x2f, 0x62, 0x69, 0x6f, 0x2d, 0x72, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, - 0x6f, 0x6c, 0x73, 0x2f, 0x62, 0x67, 0x70, 0x2f, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x69, 0x12, 0x19, 0x0a, 0x08, 0x76, 0x72, 0x66, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x72, 0x66, 0x4e, 0x61, 0x6d, 0x65, 0x32, 0xd4, 0x01, 0x0a, + 0x0a, 0x42, 0x67, 0x70, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4d, 0x0a, 0x0c, 0x4c, + 0x69, 0x73, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1c, 0x2e, 0x62, 0x69, + 0x6f, 0x2e, 0x62, 0x67, 0x70, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x62, 0x69, 0x6f, 0x2e, + 0x62, 0x67, 0x70, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3a, 0x0a, 0x09, 0x44, 0x75, + 0x6d, 0x70, 0x52, 0x49, 0x42, 0x49, 0x6e, 0x12, 0x17, 0x2e, 0x62, 0x69, 0x6f, 0x2e, 0x62, 0x67, + 0x70, 0x2e, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x49, 0x42, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x10, 0x2e, 0x62, 0x69, 0x6f, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x52, 0x6f, 0x75, + 0x74, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3b, 0x0a, 0x0a, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x49, + 0x42, 0x4f, 0x75, 0x74, 0x12, 0x17, 0x2e, 0x62, 0x69, 0x6f, 0x2e, 0x62, 0x67, 0x70, 0x2e, 0x44, + 0x75, 0x6d, 0x70, 0x52, 0x49, 0x42, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, + 0x62, 0x69, 0x6f, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x22, + 0x00, 0x30, 0x01, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x62, 0x69, 0x6f, 0x2d, 0x72, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2f, 0x62, 0x69, + 0x6f, 0x2d, 0x72, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x2f, 0x62, + 0x67, 0x70, 0x2f, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/protocols/bgp/api/bgp.proto b/protocols/bgp/api/bgp.proto index 6ebbd314..39857923 100644 --- a/protocols/bgp/api/bgp.proto +++ b/protocols/bgp/api/bgp.proto @@ -24,6 +24,7 @@ message DumpRIBRequest { bio.net.IP peer = 1; uint32 afi = 2; uint32 safi = 3; + string vrf_name = 4; } service BgpService { diff --git a/protocols/bgp/server/bgp_api.go b/protocols/bgp/server/bgp_api.go index 4d2a41a3..54596d3b 100644 --- a/protocols/bgp/server/bgp_api.go +++ b/protocols/bgp/server/bgp_api.go @@ -5,19 +5,22 @@ import ( "fmt" "github.com/bio-routing/bio-rd/protocols/bgp/api" + "github.com/bio-routing/bio-rd/routingtable/vrf" bnet "github.com/bio-routing/bio-rd/net" ) type BGPAPIServer struct { api.UnimplementedBgpServiceServer - srv BGPServer + srv BGPServer + vrfReg *vrf.VRFRegistry } // NewBGPAPIServer creates a new BGP API Server -func NewBGPAPIServer(s BGPServer) *BGPAPIServer { +func NewBGPAPIServer(s BGPServer, vrfReg *vrf.VRFRegistry) *BGPAPIServer { return &BGPAPIServer{ - srv: s, + srv: s, + vrfReg: vrfReg, } } @@ -27,7 +30,12 @@ func (s *BGPAPIServer) ListSessions(ctx context.Context, in *api.ListSessionsReq // DumpRIBIn dumps the RIB in of a peer for a given AFI/SAFI func (s *BGPAPIServer) DumpRIBIn(in *api.DumpRIBRequest, stream api.BgpService_DumpRIBInServer) error { - r := s.srv.GetRIBIn(bnet.IPFromProtoIP(in.Peer).Ptr(), uint16(in.Afi), uint8(in.Safi)) + v := s.getVRF(in) + if v == nil { + return fmt.Errorf("unable to find vrf %q", in.VrfName) + } + + r := s.srv.GetRIBIn(v, bnet.IPFromProtoIP(in.Peer).Ptr(), uint16(in.Afi), uint8(in.Safi)) if r == nil { return fmt.Errorf("unable to get AdjRIBIn") } @@ -45,7 +53,12 @@ func (s *BGPAPIServer) DumpRIBIn(in *api.DumpRIBRequest, stream api.BgpService_D // DumpRIBOut dumps the RIB out of a peer for a given AFI/SAFI func (s *BGPAPIServer) DumpRIBOut(in *api.DumpRIBRequest, stream api.BgpService_DumpRIBOutServer) error { - r := s.srv.GetRIBOut(bnet.IPFromProtoIP(in.Peer).Ptr(), uint16(in.Afi), uint8(in.Safi)) + v := s.getVRF(in) + if v == nil { + return fmt.Errorf("unable to find vrf %q", in.VrfName) + } + + r := s.srv.GetRIBOut(v, bnet.IPFromProtoIP(in.Peer).Ptr(), uint16(in.Afi), uint8(in.Safi)) if r == nil { return fmt.Errorf("unable to get AdjRIBOut") } @@ -60,3 +73,11 @@ func (s *BGPAPIServer) DumpRIBOut(in *api.DumpRIBRequest, stream api.BgpService_ return nil } + +func (s *BGPAPIServer) getVRF(in *api.DumpRIBRequest) *vrf.VRF { + if in.VrfName == "" { + in.VrfName = vrf.DefaultVRFName + } + + return s.vrfReg.GetVRFByName(in.VrfName) +} diff --git a/protocols/bgp/server/bgp_api_test.go b/protocols/bgp/server/bgp_api_test.go index b7c28249..f952002a 100644 --- a/protocols/bgp/server/bgp_api_test.go +++ b/protocols/bgp/server/bgp_api_test.go @@ -2,6 +2,7 @@ package server import ( "context" + "io" "net" "testing" "time" @@ -15,6 +16,7 @@ import ( "github.com/bio-routing/bio-rd/routingtable/adjRIBIn" "github.com/bio-routing/bio-rd/routingtable/adjRIBOut" "github.com/bio-routing/bio-rd/routingtable/filter" + "github.com/bio-routing/bio-rd/routingtable/vrf" "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/grpc/test/bufconn" @@ -23,6 +25,8 @@ import ( ) func TestDumpRIBInOut(t *testing.T) { + vrf.GetGlobalRegistry().CreateVRFIfNotExists(vrf.DefaultVRFName, 0) + sessionAttrs := routingtable.SessionAttrs{ RouterID: 0, ClusterID: 0, @@ -43,9 +47,10 @@ func TestDumpRIBInOut(t *testing.T) { apisrv: &BGPAPIServer{ srv: &bgpServer{ peers: &peerManager{ - peers: map[bnet.IP]*peer{}, + peers: map[PeerKey]*peer{}, }, }, + vrfReg: vrf.GetGlobalRegistry(), }, addRoutes: []*route.Route{}, req: &api.DumpRIBRequest{ @@ -54,15 +59,18 @@ func TestDumpRIBInOut(t *testing.T) { Safi: packet.SAFIUnicast, }, expected: []*routeapi.Route{}, - wantFail: false, + wantFail: true, }, { name: "Test #1: No routes given", apisrv: &BGPAPIServer{ srv: &bgpServer{ peers: &peerManager{ - peers: map[bnet.IP]*peer{ - bnet.IPv4FromOctets(10, 0, 0, 0): { + peers: map[PeerKey]*peer{ + { + vrf: vrf.GetGlobalRegistry().GetVRFByName(vrf.DefaultVRFName), + neighborIP: bnet.IPv4FromOctets(10, 0, 0, 0).Dedup(), + }: { fsms: []*FSM{ 0: { ipv4Unicast: &fsmAddressFamily{ @@ -75,6 +83,7 @@ func TestDumpRIBInOut(t *testing.T) { }, }, }, + vrfReg: vrf.GetGlobalRegistry(), }, addRoutes: []*route.Route{}, req: &api.DumpRIBRequest{ @@ -90,8 +99,11 @@ func TestDumpRIBInOut(t *testing.T) { apisrv: &BGPAPIServer{ srv: &bgpServer{ peers: &peerManager{ - peers: map[bnet.IP]*peer{ - bnet.IPv4FromOctets(10, 0, 0, 0): { + peers: map[PeerKey]*peer{ + { + vrf: vrf.GetGlobalRegistry().GetVRFByName(vrf.DefaultVRFName), + neighborIP: bnet.IPv4FromOctets(10, 0, 0, 0).Dedup(), + }: { addr: bnet.IPv4(123).Ptr(), fsms: []*FSM{ 0: { @@ -105,6 +117,7 @@ func TestDumpRIBInOut(t *testing.T) { }, }, }, + vrfReg: vrf.GetGlobalRegistry(), }, addRoutes: []*route.Route{ route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(20, 0, 0, 0), 16).Ptr(), &route.Path{ @@ -119,9 +132,10 @@ func TestDumpRIBInOut(t *testing.T) { }), }, req: &api.DumpRIBRequest{ - Peer: bnet.IPv4FromOctets(10, 0, 0, 0).ToProto(), - Afi: packet.AFIIPv4, - Safi: packet.SAFIUnicast, + Peer: bnet.IPv4FromOctets(10, 0, 0, 0).ToProto(), + Afi: packet.AFIIPv4, + Safi: packet.SAFIUnicast, + VrfName: vrf.DefaultVRFName, }, expected: []*routeapi.Route{ { @@ -150,8 +164,11 @@ func TestDumpRIBInOut(t *testing.T) { apisrv: &BGPAPIServer{ srv: &bgpServer{ peers: &peerManager{ - peers: map[bnet.IP]*peer{ - bnet.IPv4FromOctets(10, 0, 0, 0): { + peers: map[PeerKey]*peer{ + { + vrf: vrf.GetGlobalRegistry().GetVRFByName(vrf.DefaultVRFName), + neighborIP: bnet.IPv4FromOctets(10, 0, 0, 0).Dedup(), + }: { addr: bnet.IPv4(123).Ptr(), fsms: []*FSM{ 0: { @@ -165,6 +182,7 @@ func TestDumpRIBInOut(t *testing.T) { }, }, }, + vrfReg: vrf.GetGlobalRegistry(), }, addRoutes: []*route.Route{ route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(20, 0, 0, 0), 16).Ptr(), &route.Path{ @@ -258,7 +276,11 @@ func TestDumpRIBInOut(t *testing.T) { for _, test := range tests { for _, r := range test.addRoutes { for _, p := range r.Paths() { - test.apisrv.srv.(*bgpServer).peers.peers[bnet.IPv4FromOctets(10, 0, 0, 0)].fsms[0].ipv4Unicast.adjRIBIn.AddPath(r.Prefix(), p) + pk := PeerKey{ + vrf: vrf.GetGlobalRegistry().GetVRFByName(vrf.DefaultVRFName), + neighborIP: bnet.IPv4FromOctets(10, 0, 0, 0).Dedup(), + } + test.apisrv.srv.(*bgpServer).peers.peers[pk].fsms[0].ipv4Unicast.adjRIBIn.AddPath(r.Prefix(), p) } } @@ -291,6 +313,10 @@ func TestDumpRIBInOut(t *testing.T) { for { r, err := streamClient.Recv() if err != nil { + if err != io.EOF && !test.wantFail { + t.Fatalf("stream RPC ended with non EOF error for test %q: %v", test.name, err) + } + break } @@ -313,7 +339,11 @@ func TestDumpRIBInOut(t *testing.T) { for _, test := range tests { for _, r := range test.addRoutes { for _, p := range r.Paths() { - test.apisrv.srv.(*bgpServer).peers.peers[bnet.IPv4FromOctets(10, 0, 0, 0)].fsms[0].ipv4Unicast.adjRIBOut.AddPath(r.Prefix(), p) + pk := PeerKey{ + vrf: vrf.GetGlobalRegistry().GetVRFByName(vrf.DefaultVRFName), + neighborIP: bnet.IPv4FromOctets(10, 0, 0, 0).Dedup(), + } + test.apisrv.srv.(*bgpServer).peers.peers[pk].fsms[0].ipv4Unicast.adjRIBOut.AddPath(r.Prefix(), p) } } diff --git a/protocols/bgp/server/fsm.go b/protocols/bgp/server/fsm.go index 317efaa5..2f319041 100644 --- a/protocols/bgp/server/fsm.go +++ b/protocols/bgp/server/fsm.go @@ -279,7 +279,7 @@ func (fsm *FSM) tcpConnector(ctx context.Context) { } rAddr := net.TCPAddr{ IP: fsm.peer.addr.ToNetIP(), - Port: BGPPORT, + Port: tcp.BGPPORT, } c, err := tcp.Dial(&lAddr, &rAddr, fsm.peer.ttl, fsm.peer.config.AuthenticationKey, fsm.peer.ttl == 0, fsm.peer.getBindDev()) diff --git a/protocols/bgp/server/metrics_service_test.go b/protocols/bgp/server/metrics_service_test.go index 5219515b..2a427ff4 100644 --- a/protocols/bgp/server/metrics_service_test.go +++ b/protocols/bgp/server/metrics_service_test.go @@ -242,7 +242,7 @@ func TestMetrics(t *testing.T) { fsm.establishedTime = establishedTime } - s := newBGPServer(0, nil) + s := newBGPServer(0, vrf, nil) s.peers.add(test.peer) actual, err := s.Metrics() diff --git a/protocols/bgp/server/peer.go b/protocols/bgp/server/peer.go index 96fb22d1..432a7b5e 100644 --- a/protocols/bgp/server/peer.go +++ b/protocols/bgp/server/peer.go @@ -462,3 +462,10 @@ func (p *peer) getBindDev() string { return "" } + +func (p *peer) peerKey() PeerKey { + return PeerKey{ + vrf: p.vrf, + neighborIP: p.addr.Dedup(), + } +} diff --git a/protocols/bgp/server/peer_manager.go b/protocols/bgp/server/peer_manager.go index 468aeabd..ba4ed7eb 100644 --- a/protocols/bgp/server/peer_manager.go +++ b/protocols/bgp/server/peer_manager.go @@ -4,16 +4,30 @@ import ( "sync" bnet "github.com/bio-routing/bio-rd/net" + "github.com/bio-routing/bio-rd/routingtable/vrf" ) +type PeerKey struct { + vrf *vrf.VRF + neighborIP *bnet.IP +} + +func (pk PeerKey) VRF() *vrf.VRF { + return pk.vrf +} + +func (pk PeerKey) Addr() *bnet.IP { + return pk.neighborIP +} + type peerManager struct { - peers map[bnet.IP]*peer + peers map[PeerKey]*peer peersMu sync.RWMutex } func newPeerManager() *peerManager { return &peerManager{ - peers: make(map[bnet.IP]*peer), + peers: make(map[PeerKey]*peer), } } @@ -21,21 +35,24 @@ func (m *peerManager) add(p *peer) { m.peersMu.Lock() defer m.peersMu.Unlock() - m.peers[*p.GetAddr()] = p + m.peers[p.peerKey()] = p } -func (m *peerManager) remove(neighborIP *bnet.IP) { +func (m *peerManager) remove(pk PeerKey) { m.peersMu.Lock() defer m.peersMu.Unlock() - delete(m.peers, *neighborIP) + delete(m.peers, pk) } -func (m *peerManager) get(neighborIP *bnet.IP) *peer { +func (m *peerManager) get(vrf *vrf.VRF, neighborIP *bnet.IP) *peer { m.peersMu.RLock() defer m.peersMu.RUnlock() - p := m.peers[*neighborIP] + p := m.peers[PeerKey{ + vrf: vrf, + neighborIP: neighborIP.Dedup(), + }] return p } diff --git a/protocols/bgp/server/peer_manager_test.go b/protocols/bgp/server/peer_manager_test.go index 0ef1d192..ed122874 100644 --- a/protocols/bgp/server/peer_manager_test.go +++ b/protocols/bgp/server/peer_manager_test.go @@ -17,7 +17,7 @@ func TestAdd(t *testing.T) { m := newPeerManager() m.add(p) - found := m.peers[*ip] + found := m.peers[p.peerKey()] assert.Exactly(t, p, found) } @@ -28,9 +28,9 @@ func TestRemove(t *testing.T) { } m := newPeerManager() - m.peers[*ip] = p + m.peers[p.peerKey()] = p - m.remove(ip) + m.remove(p.peerKey()) assert.Empty(t, m.peers) } @@ -42,9 +42,9 @@ func TestGet(t *testing.T) { } m := newPeerManager() - m.peers[*ip] = p + m.peers[p.peerKey()] = p - found := m.get(ip) + found := m.get(nil, ip) assert.Exactly(t, p, found) } @@ -57,8 +57,8 @@ func TestList(t *testing.T) { } m := newPeerManager() - m.peers[*p1.GetAddr()] = p1 - m.peers[*p2.GetAddr()] = p2 + m.peers[p1.peerKey()] = p1 + m.peers[p2.peerKey()] = p2 list := m.list() assert.Contains(t, list, p1) diff --git a/protocols/bgp/server/server.go b/protocols/bgp/server/server.go index 0df87d54..9f0cfd9c 100644 --- a/protocols/bgp/server/server.go +++ b/protocols/bgp/server/server.go @@ -2,10 +2,11 @@ package server import ( "fmt" - "net" + "github.com/bio-routing/bio-rd/net/tcp" "github.com/bio-routing/bio-rd/routingtable/adjRIBOut" "github.com/bio-routing/bio-rd/routingtable/filter" + "github.com/bio-routing/bio-rd/routingtable/vrf" "github.com/bio-routing/bio-rd/routingtable/adjRIBIn" @@ -20,81 +21,67 @@ const ( ) type bgpServer struct { - listenAddrs []string - listeners []*TCPListener - acceptCh chan net.Conn - peers *peerManager - routerID uint32 - metrics *metricsService + listenerManager tcp.ListenerManagerI + defaultVRF *vrf.VRF + peers *peerManager + routerID uint32 + metrics *metricsService } type BGPServer interface { RouterID() uint32 Start() error AddPeer(PeerConfig) error - GetPeerConfig(*bnet.IP) *PeerConfig - DisposePeer(*bnet.IP) - GetPeers() []*bnet.IP + GetPeerConfig(*vrf.VRF, *bnet.IP) *PeerConfig + DisposePeer(*vrf.VRF, *bnet.IP) + GetPeers() []PeerKey Metrics() (*metrics.BGPMetrics, error) - GetRIBIn(peerIP *bnet.IP, afi uint16, safi uint8) *adjRIBIn.AdjRIBIn - GetRIBOut(peerIP *bnet.IP, afi uint16, safi uint8) *adjRIBOut.AdjRIBOut - ConnectMockPeer(peer PeerConfig, con net.Conn) - ReplaceImportFilterChain(peer *bnet.IP, c filter.Chain) error - ReplaceExportFilterChain(peer *bnet.IP, c filter.Chain) error + GetRIBIn(vrf *vrf.VRF, peerIP *bnet.IP, afi uint16, safi uint8) *adjRIBIn.AdjRIBIn + GetRIBOut(vrf *vrf.VRF, peerIP *bnet.IP, afi uint16, safi uint8) *adjRIBOut.AdjRIBOut + ReplaceImportFilterChain(vrf *vrf.VRF, peer *bnet.IP, c filter.Chain) error + ReplaceExportFilterChain(vrf *vrf.VRF, peer *bnet.IP, c filter.Chain) error + GetDefaultVRF() *vrf.VRF } // NewBGPServer creates a new instance of bgpServer -func NewBGPServer(routerID uint32, addrs []string) BGPServer { - return newBGPServer(routerID, addrs) +func NewBGPServer(routerID uint32, defaultVRF *vrf.VRF, listenAddrsByVRF map[string][]string) BGPServer { + return newBGPServer(routerID, defaultVRF, listenAddrsByVRF) } -func newBGPServer(routerID uint32, addrs []string) *bgpServer { +func newBGPServer(routerID uint32, defaultVRF *vrf.VRF, listenAddrsByVRF map[string][]string) *bgpServer { server := &bgpServer{ - peers: newPeerManager(), - routerID: routerID, - listenAddrs: addrs, + peers: newPeerManager(), + routerID: routerID, + listenerManager: tcp.NewListenerManager(listenAddrsByVRF), + defaultVRF: defaultVRF, } server.metrics = &metricsService{server} return server } +func (b *bgpServer) GetDefaultVRF() *vrf.VRF { + return b.defaultVRF +} + func (b *bgpServer) RouterID() uint32 { return b.routerID } // GetPeers gets a list of all peers -func (b *bgpServer) GetPeers() []*bnet.IP { - ret := make([]*bnet.IP, 0) +func (b *bgpServer) GetPeers() []PeerKey { + ret := make([]PeerKey, 0) for _, p := range b.peers.list() { - ret = append(ret, p.addr) + ret = append(ret, p.peerKey()) } return ret } -func (b *bgpServer) Start() error { - if len(b.listenAddrs) > 0 { - acceptCh := make(chan net.Conn, 4096) - for _, addr := range b.listenAddrs { - l, err := NewTCPListener(addr, acceptCh) - if err != nil { - return fmt.Errorf("failed to start TCPListener for %s: %w", addr, err) - } - b.listeners = append(b.listeners, l) - } - b.acceptCh = acceptCh - - go b.incomingConnectionWorker() - } - - return nil -} - // ReplaceImportFilterChain replaces a peers import filter -func (b *bgpServer) ReplaceImportFilterChain(peerIP *bnet.IP, c filter.Chain) error { - p := b.peers.get(peerIP) +func (b *bgpServer) ReplaceImportFilterChain(vrf *vrf.VRF, peerIP *bnet.IP, c filter.Chain) error { + p := b.peers.get(vrf, peerIP) if p == nil { return fmt.Errorf("peer %q not found", peerIP.String()) } @@ -104,8 +91,8 @@ func (b *bgpServer) ReplaceImportFilterChain(peerIP *bnet.IP, c filter.Chain) er } // ReplaceExportFilterChain replaces a peers import filter -func (b *bgpServer) ReplaceExportFilterChain(peerIP *bnet.IP, c filter.Chain) error { - p := b.peers.get(peerIP) +func (b *bgpServer) ReplaceExportFilterChain(vrf *vrf.VRF, peerIP *bnet.IP, c filter.Chain) error { + p := b.peers.get(vrf, peerIP) if p == nil { return fmt.Errorf("peer %q not found", peerIP.String()) } @@ -114,8 +101,8 @@ func (b *bgpServer) ReplaceExportFilterChain(peerIP *bnet.IP, c filter.Chain) er return nil } -func (b *bgpServer) GetRIBIn(peerIP *bnet.IP, afi uint16, safi uint8) *adjRIBIn.AdjRIBIn { - p := b.peers.get(peerIP) +func (b *bgpServer) GetRIBIn(vrf *vrf.VRF, peerIP *bnet.IP, afi uint16, safi uint8) *adjRIBIn.AdjRIBIn { + p := b.peers.get(vrf, peerIP) if p == nil { return nil } @@ -133,8 +120,8 @@ func (b *bgpServer) GetRIBIn(peerIP *bnet.IP, afi uint16, safi uint8) *adjRIBIn. return f.adjRIBIn.(*adjRIBIn.AdjRIBIn) } -func (b *bgpServer) GetRIBOut(peerIP *bnet.IP, afi uint16, safi uint8) *adjRIBOut.AdjRIBOut { - p := b.peers.get(peerIP) +func (b *bgpServer) GetRIBOut(vrf *vrf.VRF, peerIP *bnet.IP, afi uint16, safi uint8) *adjRIBOut.AdjRIBOut { + p := b.peers.get(vrf, peerIP) if p == nil { return nil } @@ -154,20 +141,20 @@ func (b *bgpServer) GetRIBOut(peerIP *bnet.IP, afi uint16, safi uint8) *adjRIBOu func (b *bgpServer) incomingConnectionWorker() { for { - c := <-b.acceptCh + c := <-b.listenerManager.AcceptCh() - peerAddr, _ := bnetutils.BIONetIPFromAddr(c.RemoteAddr().String()) - peer := b.peers.get(peerAddr.Dedup()) + peerAddr, _ := bnetutils.BIONetIPFromAddr(c.Conn.RemoteAddr().String()) + peer := b.peers.get(c.VRF, peerAddr.Dedup()) if peer == nil { - c.Close() + c.Conn.Close() log.WithFields(log.Fields{ - "source": c.RemoteAddr(), + "source": c.Conn.RemoteAddr(), }).Info("TCP connection from unknown source") continue } log.WithFields(log.Fields{ - "source": c.RemoteAddr(), + "source": c.Conn.RemoteAddr(), }).Info("Incoming TCP connection") log.WithFields(log.Fields{ @@ -182,16 +169,14 @@ func (b *bgpServer) incomingConnectionWorker() { peer.fsmsMu.Unlock() go fsm.run() - fsm.conCh <- c + fsm.conCh <- c.Conn } } -func (b *bgpServer) ConnectMockPeer(peer PeerConfig, con net.Conn) { - acceptCh := make(chan net.Conn, 4096) - b.acceptCh = acceptCh +func (b *bgpServer) Start() error { go b.incomingConnectionWorker() - b.acceptCh <- con + return nil } func (b *bgpServer) AddPeer(c PeerConfig) error { @@ -203,9 +188,14 @@ func (b *bgpServer) AddPeer(c PeerConfig) error { return err } + err = b.listenerManager.CreateListenersIfNotExists(c.VRF) + if err != nil { + return err + } + if c.AuthenticationKey != "" { - for _, l := range b.listeners { - err = l.setTCPMD5(c.PeerAddress.ToNetIP(), c.AuthenticationKey) + for _, l := range b.listenerManager.GetListeners(c.VRF) { + err = l.SetTCPMD5(c.PeerAddress.ToNetIP(), c.AuthenticationKey) if err != nil { return fmt.Errorf("unable to set TCP MD5 secret: %w", err) } @@ -229,8 +219,8 @@ func (b *bgpServer) AddPeer(c PeerConfig) error { } // GetPeerConfig gets a BGP peer by its address -func (b *bgpServer) GetPeerConfig(addr *bnet.IP) *PeerConfig { - p := b.peers.get(addr) +func (b *bgpServer) GetPeerConfig(vrf *vrf.VRF, addr *bnet.IP) *PeerConfig { + p := b.peers.get(vrf, addr) if p != nil { return p.config } @@ -238,15 +228,18 @@ func (b *bgpServer) GetPeerConfig(addr *bnet.IP) *PeerConfig { return nil } -func (b *bgpServer) DisposePeer(addr *bnet.IP) { - p := b.peers.get(addr) +func (b *bgpServer) DisposePeer(vrf *vrf.VRF, addr *bnet.IP) { + p := b.peers.get(vrf, addr) if p == nil { return } log.Infof("disposing BGP session with %s", addr.String()) p.stop() - b.peers.remove(addr) + b.peers.remove(PeerKey{ + vrf: vrf, + neighborIP: addr, + }) } func (b *bgpServer) Metrics() (*metrics.BGPMetrics, error) { diff --git a/protocols/bgp/server/tcplistener.go b/protocols/bgp/server/tcplistener.go deleted file mode 100644 index 0b8a03e4..00000000 --- a/protocols/bgp/server/tcplistener.go +++ /dev/null @@ -1,57 +0,0 @@ -package server - -import ( - "net" - - "github.com/bio-routing/bio-rd/net/tcp" - "github.com/bio-routing/bio-rd/util/log" -) - -const ( - // BGPPORT is the port of the BGP protocol - BGPPORT = 179 -) - -// TCPListener is a TCP listen wrapper -type TCPListener struct { - l *tcp.Listener - closeCh chan struct{} -} - -// NewTCPListener creates a new TCPListener -func NewTCPListener(addr string, ch chan net.Conn) (*TCPListener, error) { - tcpaddr, err := net.ResolveTCPAddr("tcp", addr) - if err != nil { - return nil, err - } - - l, err := tcp.Listen(tcpaddr, 255) - if err != nil { - return nil, err - } - - tl := &TCPListener{ - l: l, - closeCh: make(chan struct{}), - } - - go func(tl *TCPListener) error { - for { - conn, err := tl.l.AcceptTCP() - if err != nil { - close(tl.closeCh) - log.WithError(err).WithFields(log.Fields{ - "Topic": "Peer", - }).Error("Failed to AcceptTCP") - return err - } - ch <- conn - } - }(tl) - - return tl, nil -} - -func (t *TCPListener) setTCPMD5(addr net.IP, secret string) error { - return t.l.SetTCPMD5(addr, secret) -} diff --git a/routingtable/vrf/vrf_registry.go b/routingtable/vrf/vrf_registry.go index 8fe1a305..38f75e29 100644 --- a/routingtable/vrf/vrf_registry.go +++ b/routingtable/vrf/vrf_registry.go @@ -92,18 +92,6 @@ func GetGlobalRegistry() *VRFRegistry { return globalRegistry } -// GetVRFByRD gets a VRF by route distinguisher -/*func (r *VRFRegistry) GetVRFByRD(rd uint64) *VRF { - r.mu.Lock() - defer r.mu.Unlock() - - if _, ok := r.vrfs[rd]; ok { - return r.vrfs[rd] - } - - return nil -}*/ - // GetVRFByRD gets a VRF by route distinguisher func (r *VRFRegistry) GetVRFByName(name string) *VRF { r.mu.RLock()