From 97fb90c37af733c8cadd77c638261f4d029badb6 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Wed, 10 Jun 2015 08:01:37 +0100 Subject: [PATCH 01/15] Always include local peer in Peers --- router/peers.go | 4 +++- router/peers_test.go | 1 - router/router.go | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/router/peers.go b/router/peers.go index 144bc1a48c..b3db672687 100644 --- a/router/peers.go +++ b/router/peers.go @@ -32,7 +32,9 @@ type ConnectionSummary struct { } func NewPeers(ourself *LocalPeer) *Peers { - return &Peers{ourself: ourself, table: make(map[PeerName]*Peer)} + peers := &Peers{ourself: ourself, table: make(map[PeerName]*Peer)} + peers.FetchWithDefault(ourself.Peer) + return peers } func (peers *Peers) OnGC(callback func(*Peer)) { diff --git a/router/peers_test.go b/router/peers_test.go index 698502addd..3d7ab95ec7 100644 --- a/router/peers_test.go +++ b/router/peers_test.go @@ -20,7 +20,6 @@ import ( func newNode(name PeerName) (*Peer, *Peers) { peer := NewLocalPeer(name, "", nil) peers := NewPeers(peer) - peers.FetchWithDefault(peer.Peer) return peer.Peer, peers } diff --git a/router/router.go b/router/router.go index 6b0d27a3bf..12cbf008ce 100644 --- a/router/router.go +++ b/router/router.go @@ -73,7 +73,6 @@ func NewRouter(config Config, name PeerName, nickName string) *Router { router.Macs = NewMacCache(macMaxAge, onMacExpiry) router.Peers = NewPeers(router.Ourself) router.Peers.OnGC(onPeerGC) - router.Peers.FetchWithDefault(router.Ourself.Peer) router.Routes = NewRoutes(router.Ourself, router.Peers, router.Overlay.InvalidateRoutes) router.ConnectionMaker = NewConnectionMaker(router.Ourself, router.Peers, router.Port, router.PeerDiscovery) router.TopologyGossip = router.NewGossip("topology", router) From c6edf8421f876d11da5677fb666257edebf896e2 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Mon, 27 Jul 2015 16:33:21 +0100 Subject: [PATCH 02/15] Handle the not-found case implicitly in Broadcast and BroadcastAll --- router/routes.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/router/routes.go b/router/routes.go index b463b658e6..5070563586 100644 --- a/router/routes.go +++ b/router/routes.go @@ -65,21 +65,13 @@ func (routes *Routes) UnicastAll(name PeerName) (PeerName, bool) { func (routes *Routes) Broadcast(name PeerName) []PeerName { routes.RLock() defer routes.RUnlock() - hops, found := routes.broadcast[name] - if !found { - return []PeerName{} - } - return hops + return routes.broadcast[name] } func (routes *Routes) BroadcastAll(name PeerName) []PeerName { routes.RLock() defer routes.RUnlock() - hops, found := routes.broadcastAll[name] - if !found { - return []PeerName{} - } - return hops + return routes.broadcastAll[name] } // Choose min(log2(n_peers), n_neighbouring_peers) neighbours, with a From 0841f4a27d53b0fb3136ea38559a2181a6d318c0 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Tue, 28 Jul 2015 15:28:18 +0100 Subject: [PATCH 03/15] Tidy broadcastPeerUpdate --- router/local_peer.go | 9 +++++++-- router/router.go | 8 ++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/router/local_peer.go b/router/local_peer.go index 1e8cd96111..15076963a1 100644 --- a/router/local_peer.go +++ b/router/local_peer.go @@ -160,7 +160,10 @@ func (peer *LocalPeer) handleAddConnection(conn Connection) error { conn.Log("connection added (new peer)") peer.router.SendAllGossipDown(conn) } + + peer.router.Routes.Recalculate() peer.broadcastPeerUpdate(conn.Remote()) + return nil } @@ -174,6 +177,8 @@ func (peer *LocalPeer) handleConnectionEstablished(conn Connection) { } peer.connectionEstablished(conn) conn.Log("connection fully established") + + peer.router.Routes.Recalculate() peer.broadcastPeerUpdate() } @@ -193,14 +198,14 @@ func (peer *LocalPeer) handleDeleteConnection(conn Connection) { // Must do garbage collection first to ensure we don't send out an // update with unreachable peers (can cause looping) peer.router.Peers.GarbageCollect() + peer.router.Routes.Recalculate() peer.broadcastPeerUpdate() } // helpers func (peer *LocalPeer) broadcastPeerUpdate(peers ...*Peer) { - peer.router.Routes.Recalculate() - peer.router.TopologyGossip.GossipBroadcast(NewTopologyGossipData(peer.router.Peers, append(peers, peer.Peer)...)) + peer.router.BroadcastTopologyUpdate(append(peers, peer.Peer)) } func (peer *LocalPeer) checkConnectionLimit() error { diff --git a/router/router.go b/router/router.go index 12cbf008ce..93dc923f89 100644 --- a/router/router.go +++ b/router/router.go @@ -260,12 +260,16 @@ type TopologyGossipData struct { update PeerNameSet } -func NewTopologyGossipData(peers *Peers, update ...*Peer) *TopologyGossipData { +func (router *Router) BroadcastTopologyUpdate(update []*Peer) { names := make(PeerNameSet) for _, p := range update { names[p.Name] = void } - return &TopologyGossipData{peers: peers, update: names} + + router.TopologyGossip.GossipBroadcast(&TopologyGossipData{ + peers: router.Peers, + update: names, + }) } func (d *TopologyGossipData) Merge(other GossipData) { From 67bfa5bc900c9074351b942875bb7a1e222e944e Mon Sep 17 00:00:00 2001 From: David Wragg Date: Fri, 24 Jul 2015 13:30:01 +0100 Subject: [PATCH 04/15] applyUpdate can return a PeerNameSet --- router/peers.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/router/peers.go b/router/peers.go index b3db672687..6f898aae9f 100644 --- a/router/peers.go +++ b/router/peers.go @@ -130,13 +130,19 @@ func (peers *Peers) ApplyUpdate(update []byte) (PeerNameSet, PeerNameSet, error) for _, peer := range decodedUpdate { updateNames[peer.Name] = void } - return updateNames, setFromPeersMap(newUpdate), nil + + return updateNames, newUpdate, nil } func (peers *Peers) Names() PeerNameSet { peers.RLock() defer peers.RUnlock() - return setFromPeersMap(peers.table) + + names := make(PeerNameSet) + for name := range peers.table { + names[name] = void + } + return names } func (peers *Peers) EncodePeers(names PeerNameSet) []byte { @@ -178,14 +184,6 @@ func (peers *Peers) garbageCollect() []*Peer { return removed } -func setFromPeersMap(peers map[PeerName]*Peer) PeerNameSet { - names := make(PeerNameSet) - for name := range peers { - names[name] = void - } - return names -} - func (peers *Peers) decodeUpdate(update []byte) (newPeers map[PeerName]*Peer, decodedUpdate []*Peer, decodedConns [][]ConnectionSummary, err error) { newPeers = make(map[PeerName]*Peer) decodedUpdate = []*Peer{} @@ -231,8 +229,8 @@ func (peers *Peers) decodeUpdate(update []byte) (newPeers map[PeerName]*Peer, de return } -func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]ConnectionSummary) map[PeerName]*Peer { - newUpdate := make(map[PeerName]*Peer) +func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]ConnectionSummary) PeerNameSet { + newUpdate := make(PeerNameSet) for idx, newPeer := range decodedUpdate { connSummaries := decodedConns[idx] name := newPeer.Name @@ -257,7 +255,7 @@ func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]Connecti // the router.Peers, so there can be no race here. peer.Version = newPeer.Version peer.connections = makeConnsMap(peer, connSummaries, peers.table) - newUpdate[name] = peer + newUpdate[name] = void } return newUpdate } From 91dda0816520bd35a084c3f9c613de7b7d610203 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Fri, 24 Jul 2015 13:57:34 +0100 Subject: [PATCH 05/15] Rename Peers "table" field to "byName" --- router/mocks_test.go | 2 +- router/peers.go | 38 +++++++++++++++++++------------------- router/routes.go | 2 +- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/router/mocks_test.go b/router/mocks_test.go index dcc5abebcf..064850dfe0 100644 --- a/router/mocks_test.go +++ b/router/mocks_test.go @@ -70,7 +70,7 @@ func checkEqualConns(t *testing.T, ourName PeerName, got, wanted map[PeerName]Co // Get all the peers from a Peers in a slice func (peers *Peers) allPeers() []*Peer { var res []*Peer - for _, peer := range peers.table { + for _, peer := range peers.byName { res = append(res, peer) } return res diff --git a/router/peers.go b/router/peers.go index 6f898aae9f..9c9f07d222 100644 --- a/router/peers.go +++ b/router/peers.go @@ -10,7 +10,7 @@ import ( type Peers struct { sync.RWMutex ourself *LocalPeer - table map[PeerName]*Peer + byName map[PeerName]*Peer onGC []func(*Peer) } @@ -32,7 +32,7 @@ type ConnectionSummary struct { } func NewPeers(ourself *LocalPeer) *Peers { - peers := &Peers{ourself: ourself, table: make(map[PeerName]*Peer)} + peers := &Peers{ourself: ourself, byName: make(map[PeerName]*Peer)} peers.FetchWithDefault(ourself.Peer) return peers } @@ -54,11 +54,11 @@ func (peers *Peers) invokeOnGCCallbacks(removed []*Peer) { func (peers *Peers) FetchWithDefault(peer *Peer) *Peer { peers.Lock() defer peers.Unlock() - if existingPeer, found := peers.table[peer.Name]; found { + if existingPeer, found := peers.byName[peer.Name]; found { existingPeer.localRefCount++ return existingPeer } - peers.table[peer.Name] = peer + peers.byName[peer.Name] = peer peer.localRefCount++ return peer } @@ -66,13 +66,13 @@ func (peers *Peers) FetchWithDefault(peer *Peer) *Peer { func (peers *Peers) Fetch(name PeerName) *Peer { peers.RLock() defer peers.RUnlock() - return peers.table[name] + return peers.byName[name] } func (peers *Peers) FetchAndAddRef(name PeerName) *Peer { peers.Lock() defer peers.Unlock() - peer := peers.table[name] + peer := peers.byName[name] if peer != nil { peer.localRefCount++ } @@ -88,7 +88,7 @@ func (peers *Peers) Dereference(peer *Peer) { func (peers *Peers) ForEach(fun func(*Peer)) { peers.RLock() defer peers.RUnlock() - for _, peer := range peers.table { + for _, peer := range peers.byName { fun(peer) } } @@ -112,7 +112,7 @@ func (peers *Peers) ApplyUpdate(update []byte) (PeerNameSet, PeerNameSet, error) // have no knowledge of. We can now apply the update. Start by // adding in any new peers into the cache. for name, newPeer := range newPeers { - peers.table[name] = newPeer + peers.byName[name] = newPeer } // Now apply the updates @@ -139,7 +139,7 @@ func (peers *Peers) Names() PeerNameSet { defer peers.RUnlock() names := make(PeerNameSet) - for name := range peers.table { + for name := range peers.byName { names[name] = void } return names @@ -151,7 +151,7 @@ func (peers *Peers) EncodePeers(names PeerNameSet) []byte { peers.RLock() defer peers.RUnlock() for name := range names { - if peer, found := peers.table[name]; found { + if peer, found := peers.byName[name]; found { if peer == peers.ourself.Peer { peers.ourself.Encode(enc) } else { @@ -175,9 +175,9 @@ func (peers *Peers) garbageCollect() []*Peer { peers.ourself.RLock() _, reached := peers.ourself.Routes(nil, false) peers.ourself.RUnlock() - for name, peer := range peers.table { + for name, peer := range peers.byName { if _, found := reached[peer.Name]; !found && peer.localRefCount == 0 { - delete(peers.table, name) + delete(peers.byName, name) removed = append(removed, peer) } } @@ -202,7 +202,7 @@ func (peers *Peers) decodeUpdate(update []byte) (newPeers map[PeerName]*Peer, de newPeer := NewPeerFromSummary(peerSummary) decodedUpdate = append(decodedUpdate, newPeer) decodedConns = append(decodedConns, connSummaries) - existingPeer, found := peers.table[newPeer.Name] + existingPeer, found := peers.byName[newPeer.Name] if !found { newPeers[newPeer.Name] = newPeer } else if existingPeer.UID != newPeer.UID { @@ -217,7 +217,7 @@ func (peers *Peers) decodeUpdate(update []byte) (newPeers map[PeerName]*Peer, de if _, found := newPeers[remoteName]; found { continue } - if _, found := peers.table[remoteName]; found { + if _, found := peers.byName[remoteName]; found { continue } // Update refers to a peer which we have no knowledge @@ -234,8 +234,8 @@ func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]Connecti for idx, newPeer := range decodedUpdate { connSummaries := decodedConns[idx] name := newPeer.Name - // guaranteed to find peer in the peers.table - peer := peers.table[name] + // guaranteed to find peer in the peers.byName + peer := peers.byName[name] if peer != newPeer && (peer == peers.ourself.Peer || peer.Version >= newPeer.Version) { // Nobody but us updates us. And if we know more about a @@ -254,7 +254,7 @@ func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]Connecti // router.Peers.ApplyUpdate. But ApplyUpdate takes the Lock on // the router.Peers, so there can be no race here. peer.Version = newPeer.Version - peer.connections = makeConnsMap(peer, connSummaries, peers.table) + peer.connections = makeConnsMap(peer, connSummaries, peers.byName) newUpdate[name] = void } return newUpdate @@ -292,11 +292,11 @@ func decodePeer(dec *gob.Decoder) (peerSummary PeerSummary, connSummaries []Conn return } -func makeConnsMap(peer *Peer, connSummaries []ConnectionSummary, table map[PeerName]*Peer) map[PeerName]Connection { +func makeConnsMap(peer *Peer, connSummaries []ConnectionSummary, byName map[PeerName]*Peer) map[PeerName]Connection { conns := make(map[PeerName]Connection) for _, connSummary := range connSummaries { name := PeerNameFromBin(connSummary.NameByte) - remotePeer := table[name] + remotePeer := byName[name] conn := NewRemoteConnection(peer, remotePeer, connSummary.RemoteTCPAddr, connSummary.Outbound, connSummary.Established) conns[name] = conn } diff --git a/router/routes.go b/router/routes.go index 5070563586..b7ef0f3099 100644 --- a/router/routes.go +++ b/router/routes.go @@ -200,7 +200,7 @@ func (routes *Routes) calculateUnicast(establishedAndSymmetric bool) unicastRout // where <= is the subset relationship on keys of the returned map. func (routes *Routes) calculateBroadcast(establishedAndSymmetric bool) broadcastRoutes { broadcast := make(broadcastRoutes) - for _, peer := range routes.peers.table { + for _, peer := range routes.peers.byName { hops := []PeerName{} if found, reached := peer.Routes(routes.ourself.Peer, establishedAndSymmetric); found { routes.ourself.ForEachConnectedPeer(establishedAndSymmetric, reached, From 51317dca708e250f69d237063879455aadbaab89 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Tue, 8 Sep 2015 11:21:27 +0100 Subject: [PATCH 06/15] Introduce PeersPendingNotifications --- router/peers.go | 51 +++++++++++++++++++++++++++----------------- router/peers_test.go | 17 ++++++++++----- 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/router/peers.go b/router/peers.go index 9c9f07d222..6287d49d3d 100644 --- a/router/peers.go +++ b/router/peers.go @@ -31,6 +31,13 @@ type ConnectionSummary struct { Established bool } +// Pending notifications due to changes to Peers that need to be ent +// out once the Peers is unlocked. +type PeersPendingNotifications struct { + // Peers that have been GCed + removed []*Peer +} + func NewPeers(ourself *LocalPeer) *Peers { peers := &Peers{ourself: ourself, byName: make(map[PeerName]*Peer)} peers.FetchWithDefault(ourself.Peer) @@ -40,13 +47,22 @@ func NewPeers(ourself *LocalPeer) *Peers { func (peers *Peers) OnGC(callback func(*Peer)) { peers.Lock() defer peers.Unlock() + + // Although the array underlying peers.onGC might be accessed + // without holding the lock in unlockAndNotify, we don't + // support removing callbacks, so a simple append here is + // safe. peers.onGC = append(peers.onGC, callback) } -func (peers *Peers) invokeOnGCCallbacks(removed []*Peer) { - for _, callback := range peers.onGC { - for _, peer := range removed { - callback(peer) +func (peers *Peers) unlockAndNotify(pending *PeersPendingNotifications) { + onGC := peers.onGC + peers.Unlock() + if pending.removed != nil { + for _, callback := range onGC { + for _, peer := range pending.removed { + callback(peer) + } } } } @@ -101,10 +117,11 @@ func (peers *Peers) ForEach(fun func(*Peer)) { // "improved" update containing just these new/updated elements. func (peers *Peers) ApplyUpdate(update []byte) (PeerNameSet, PeerNameSet, error) { peers.Lock() + var pending PeersPendingNotifications + defer peers.unlockAndNotify(&pending) newPeers, decodedUpdate, decodedConns, err := peers.decodeUpdate(update) if err != nil { - peers.Unlock() return nil, nil, err } @@ -117,15 +134,11 @@ func (peers *Peers) ApplyUpdate(update []byte) (PeerNameSet, PeerNameSet, error) // Now apply the updates newUpdate := peers.applyUpdate(decodedUpdate, decodedConns) - removed := peers.garbageCollect() - for _, peerRemoved := range removed { + peers.garbageCollect(&pending) + for _, peerRemoved := range pending.removed { delete(newUpdate, peerRemoved.Name) } - // Don't need to hold peers lock any longer - peers.Unlock() - peers.invokeOnGCCallbacks(removed) - updateNames := make(PeerNameSet) for _, peer := range decodedUpdate { updateNames[peer.Name] = void @@ -162,26 +175,24 @@ func (peers *Peers) EncodePeers(names PeerNameSet) []byte { return buf.Bytes() } -func (peers *Peers) GarbageCollect() []*Peer { +func (peers *Peers) GarbageCollect() { peers.Lock() - removed := peers.garbageCollect() - peers.Unlock() - peers.invokeOnGCCallbacks(removed) - return removed + var pending PeersPendingNotifications + defer peers.unlockAndNotify(&pending) + + peers.garbageCollect(&pending) } -func (peers *Peers) garbageCollect() []*Peer { - removed := []*Peer{} +func (peers *Peers) garbageCollect(pending *PeersPendingNotifications) { peers.ourself.RLock() _, reached := peers.ourself.Routes(nil, false) peers.ourself.RUnlock() for name, peer := range peers.byName { if _, found := reached[peer.Name]; !found && peer.localRefCount == 0 { delete(peers.byName, name) - removed = append(removed, peer) + pending.removed = append(pending.removed, peer) } } - return removed } func (peers *Peers) decodeUpdate(update []byte) (newPeers map[PeerName]*Peer, decodedUpdate []*Peer, decodedConns [][]ConnectionSummary, err error) { diff --git a/router/peers_test.go b/router/peers_test.go index 3d7ab95ec7..d24aabb5dc 100644 --- a/router/peers_test.go +++ b/router/peers_test.go @@ -71,6 +71,13 @@ func TestPeersEncoding(t *testing.T) { } } +func garbageCollect(peers *Peers) []*Peer { + var removed []*Peer + peers.OnGC(func(peer *Peer) { removed = append(removed, peer) }) + peers.GarbageCollect() + return removed +} + func TestPeersGarbageCollection(t *testing.T) { const ( peer1NameString = "01:00:00:01:00:00" @@ -97,17 +104,17 @@ func TestPeersGarbageCollection(t *testing.T) { ps2.AddTestRemoteConnection(p3, p1) // Every peer is referenced, so nothing should be dropped - require.Empty(t, ps1.GarbageCollect(), "peers removed") - require.Empty(t, ps2.GarbageCollect(), "peers removed") - require.Empty(t, ps3.GarbageCollect(), "peers removed") + require.Empty(t, garbageCollect(ps1), "peers removed") + require.Empty(t, garbageCollect(ps2), "peers removed") + require.Empty(t, garbageCollect(ps3), "peers removed") // Drop the connection from 2 to 3, and 3 isn't garbage-collected // because 1 has a connection to 3 ps2.DeleteTestConnection(p3) - require.Empty(t, ps2.GarbageCollect(), "peers removed") + require.Empty(t, garbageCollect(ps2), "peers removed") // Drop the connection from 1 to 3, and 3 will get removed by // garbage-collection ps1.DeleteTestConnection(p3) - checkPeerArray(t, ps1.GarbageCollect(), p3) + checkPeerArray(t, garbageCollect(ps1), p3) } From 971511e1c5a6e1cfbd5d526340d4ca18fbda33e7 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Fri, 24 Jul 2015 13:57:03 +0100 Subject: [PATCH 07/15] Introduce PeerShortIDs These are needed to support vxlan encapsulation. In a vxlan packet, the only place we can straightforwardly hold the source and destination peer identifiers used by the weave router is in the 24-bit VNI (vxlan network identifier) field. This gives us 12 bits to indicate a peer. The resulting limit of 4096 peers on a weave network does not seem like a problem in the near future. But it does mean that decentralized allocation makes collisions fairly likely even in weave networks of sizes that are common today. Fortunately, the problem is much more tractable than IPAM because we can detect when collisions occur and take action to resolve them. --- router/connection.go | 11 +- router/local_peer.go | 18 ++- router/overlay.go | 6 + router/peer.go | 16 ++- router/peers.go | 239 ++++++++++++++++++++++++++++++++++++++-- router/peers_test.go | 256 +++++++++++++++++++++++++++++++++++++++++++ router/router.go | 19 ++-- router/sleeve.go | 4 + router/utils.go | 19 ++-- 9 files changed, 558 insertions(+), 30 deletions(-) diff --git a/router/connection.go b/router/connection.go index c5cae87a8c..99577e5782 100644 --- a/router/connection.go +++ b/router/connection.go @@ -255,6 +255,7 @@ func (conn *LocalConnection) makeFeatures() map[string]string { "PeerNameFlavour": PeerNameFlavour, "Name": conn.local.Name.String(), "NickName": conn.local.NickName, + "ShortID": fmt.Sprint(conn.local.ShortID), "UID": fmt.Sprint(conn.local.UID), "ConnID": fmt.Sprint(conn.uid), } @@ -276,7 +277,7 @@ func (features features) Get(key string) string { } func (conn *LocalConnection) parseFeatures(features features) (*Peer, error) { - if err := features.MustHave([]string{"PeerNameFlavour", "Name", "NickName", "UID", "ConnID"}); err != nil { + if err := features.MustHave([]string{"PeerNameFlavour", "Name", "NickName", "ShortID", "UID", "ConnID"}); err != nil { return nil, err } @@ -292,6 +293,12 @@ func (conn *LocalConnection) parseFeatures(features features) (*Peer, error) { nickName := features.Get("NickName") + shortID, err := strconv.ParseUint(features.Get("ShortID"), 10, + PeerShortIDBits) + if err != nil { + return nil, err + } + uid, err := ParsePeerUID(features.Get("UID")) if err != nil { return nil, err @@ -303,7 +310,7 @@ func (conn *LocalConnection) parseFeatures(features features) (*Peer, error) { } conn.uid ^= remoteConnID - return NewPeer(name, nickName, uid, 0), nil + return NewPeer(name, nickName, uid, 0, PeerShortID(shortID)), nil } func (conn *LocalConnection) registerRemote(remote *Peer, acceptNewPeer bool) error { diff --git a/router/local_peer.go b/router/local_peer.go index 15076963a1..1d15922168 100644 --- a/router/local_peer.go +++ b/router/local_peer.go @@ -19,7 +19,8 @@ type LocalPeerAction func() func NewLocalPeer(name PeerName, nickName string, router *Router) *LocalPeer { actionChan := make(chan LocalPeerAction, ChannelSize) peer := &LocalPeer{ - Peer: NewPeer(name, nickName, randomPeerUID(), 0), + Peer: NewPeer(name, nickName, randomPeerUID(), 0, + randomPeerShortID()), router: router, actionChan: actionChan, } @@ -205,7 +206,13 @@ func (peer *LocalPeer) handleDeleteConnection(conn Connection) { // helpers func (peer *LocalPeer) broadcastPeerUpdate(peers ...*Peer) { - peer.router.BroadcastTopologyUpdate(append(peers, peer.Peer)) + // Some tests run without a router. This should be fixed so + // that the relevant part of Router can be easily run in the + // context of a test, but that will involve significant + // reworking of tests. + if peer.router != nil { + peer.router.BroadcastTopologyUpdate(append(peers, peer.Peer)) + } } func (peer *LocalPeer) checkConnectionLimit() error { @@ -241,3 +248,10 @@ func (peer *LocalPeer) connectionCount() int { defer peer.RUnlock() return len(peer.connections) } + +func (peer *LocalPeer) setShortID(shortID PeerShortID) { + peer.Lock() + defer peer.Unlock() + peer.ShortID = shortID + peer.Version++ +} diff --git a/router/overlay.go b/router/overlay.go index b84db7d6a8..b552e17956 100644 --- a/router/overlay.go +++ b/router/overlay.go @@ -15,6 +15,9 @@ type Overlay interface { // The routes have changed, so any cached information should // be discarded. InvalidateRoutes() + + // A mapping of a short id to a peer has changed + InvalidateShortIDs() } type ForwarderParams struct { @@ -90,6 +93,9 @@ func (NullOverlay) MakeForwarder(ForwarderParams) (OverlayForwarder, error) { func (NullOverlay) InvalidateRoutes() { } +func (NullOverlay) InvalidateShortIDs() { +} + func (NullOverlay) SetListener(OverlayForwarderListener) { } diff --git a/router/peer.go b/router/peer.go index 7dfd08578d..82ae22011e 100644 --- a/router/peer.go +++ b/router/peer.go @@ -17,11 +17,24 @@ func ParsePeerUID(s string) (PeerUID, error) { return PeerUID(uid), err } +// Short IDs exist for the sake of fast datapath. They are 12 bits, +// randomly assigned, but we detect and recover from collisions. This +// does limit us to 4096 peers, but that should be sufficient for a +// while. +type PeerShortID uint16 + +const PeerShortIDBits = 12 + +func randomPeerShortID() PeerShortID { + return PeerShortID(randUint16() & (1< entry.peer.Name) + } + } + + // Check that every peer was seen + for _, n := range counts { + require.Equal(t, 1, n) + } + + // Delete all the peers + shuffle() + for _, p := range ps { + peers.deleteByShortID(p, &pending) + } + + for _, entry := range peers.byShortID { + if entry.peer != peers.ourself.Peer { + require.Nil(t, entry.peer) + } + + require.Empty(t, entry.others) + } +} + +// Test the easy case of short id reassignment, when few short ids are taken +func TestShortIDReassignmentEasy(t *testing.T) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + _, peers := newNode(PeerName(0)) + + for i := 1; i <= 10; i++ { + peers.FetchWithDefault(NewPeer(PeerName(i), "", PeerUID(i), 0, + PeerShortID(rng.Intn(1< Date: Thu, 27 Aug 2015 17:26:53 +0100 Subject: [PATCH 08/15] Replace OverlayForwarderListener with channels It's more robust to signal the conditions asynchronously via channels rather than callbacks. --- router/connection.go | 53 +++++++++++++++++-------------------------- router/overlay.go | 40 ++++++++++++++++++++------------ router/sleeve.go | 54 ++++++++++++++++++++------------------------ 3 files changed, 72 insertions(+), 75 deletions(-) diff --git a/router/connection.go b/router/connection.go index 99577e5782..1090aa894d 100644 --- a/router/connection.go +++ b/router/connection.go @@ -115,8 +115,6 @@ func (conn *LocalConnection) BreakTie(dupConn Connection) ConnectionTieBreak { } func (conn *LocalConnection) Established() bool { - conn.RLock() - defer conn.RUnlock() return conn.established } @@ -214,17 +212,16 @@ func (conn *LocalConnection) run(actionChan <-chan ConnectionAction, finished ch } conn.Router.ConnectionMaker.ConnectionCreated(conn) - // SetListener has the side-effect of telling the forwarder - // that the connection is confirmed. This comes after - // AddConnection, because only after that completes do we know - // the connection is valid: in particular that it is not a - // duplicate connection to the same peer. Sending heartbeats - // on a duplicate connection can trip up crypto at the other - // end, since the associated UDP packets may get decoded by - // the other connection. It is also generally wasteful to - // engage in any interaction with the remote on a connection - // that turns out to be invalid. - conn.forwarder.SetListener(ConnectionAsForwarderListener{conn}) + // Forwarder confirmation comes after AddConnection, because + // only after that completes do we know the connection is + // valid: in particular that it is not a duplicate connection + // to the same peer. Sending heartbeats on a duplicate + // connection can trip up crypto at the other end, since the + // associated UDP packets may get decoded by the other + // connection. It is also generally wasteful to engage in any + // interaction with the remote on a connection that turns out + // to be invalid. + conn.forwarder.Confirm() // receiveTCP must follow also AddConnection. In the absence // of any indirect connectivity to the remote peer, the first @@ -335,12 +332,23 @@ func (conn *LocalConnection) registerRemote(remote *Peer, acceptNewPeer bool) er } func (conn *LocalConnection) actorLoop(actionChan <-chan ConnectionAction) (err error) { + fwdErrorChan := conn.forwarder.ErrorChannel() + fwdEstablishedChan := conn.forwarder.EstablishedChannel() + for err == nil { select { case action := <-actionChan: err = action() + case <-conn.heartbeatTCP.C: err = conn.sendSimpleProtocolMsg(ProtocolHeartbeat) + + case <-fwdEstablishedChan: + conn.established = true + fwdEstablishedChan = nil + conn.Router.Ourself.ConnectionEstablished(conn) + + case err = <-fwdErrorChan: } } return @@ -388,25 +396,6 @@ func (conn *LocalConnection) sendOverlayControlMessage(tag ProtocolTag, msg []by return conn.sendProtocolMsg(ProtocolMsg{tag, msg}) } -type ConnectionAsForwarderListener struct{ conn *LocalConnection } - -func (l ConnectionAsForwarderListener) Established() { - l.conn.sendAction(func() error { - old := l.conn.established - l.conn.Lock() - l.conn.established = true - l.conn.Unlock() - if !old { - l.conn.Router.Ourself.ConnectionEstablished(l.conn) - } - return nil - }) -} - -func (l ConnectionAsForwarderListener) Error(err error) { - l.conn.sendAction(func() error { return err }) -} - // Helpers func (conn *LocalConnection) sendSimpleProtocolMsg(tag ProtocolTag) error { diff --git a/router/overlay.go b/router/overlay.go index b552e17956..9da023beab 100644 --- a/router/overlay.go +++ b/router/overlay.go @@ -57,16 +57,25 @@ type OverlayCrypto struct { // All of the machinery to forward packets to a particular peer type OverlayForwarder interface { - // Register a callback for forwarder state changes. - // side-effect, calling this confirms that the connection is - // really wanted, and so the provider should activate it. - // However, Forward might be called before this is called - // (e.g. on another thread). - SetListener(OverlayForwarderListener) - - // Forward a packet across the connection. + // Forward a packet across the connection. May be called as + // soon as the forwarder is created, in particular before + // Confirm(). Forward(ForwardPacketKey) FlowOp + // Confirm that the connection is really wanted, and so the + // Overlay should begin heartbeats etc. to verify the + // operation of the forwarder. + Confirm() + + // A channel indicating that the forwarder is established, + // i.e. its operation has been confirmed. + EstablishedChannel() <-chan struct{} + + // A channel indicating an error from the forwarder. The + // forwarder is not expected to be operational after the first + // error, so the channel only needs to buffer a single error. + ErrorChannel() <-chan error + Stop() // Handle a message from the peer. 'tag' exists for @@ -75,11 +84,6 @@ type OverlayForwarder interface { ControlMessage(tag ProtocolTag, msg []byte) } -type OverlayForwarderListener interface { - Established() - Error(error) -} - type NullOverlay struct{} func (NullOverlay) StartConsumingPackets(*Peer, *Peers, OverlayConsumer) error { @@ -96,7 +100,15 @@ func (NullOverlay) InvalidateRoutes() { func (NullOverlay) InvalidateShortIDs() { } -func (NullOverlay) SetListener(OverlayForwarderListener) { +func (NullOverlay) Confirm() { +} + +func (NullOverlay) EstablishedChannel() <-chan struct{} { + return nil +} + +func (NullOverlay) ErrorChannel() <-chan error { + return nil } func (NullOverlay) Forward(ForwardPacketKey) FlowOp { diff --git a/router/sleeve.go b/router/sleeve.go index 957ab9dd8d..ed08d44e49 100644 --- a/router/sleeve.go +++ b/router/sleeve.go @@ -292,9 +292,12 @@ type sleeveForwarder struct { confirmedChan chan<- struct{} finishedChan <-chan struct{} + // listener channels + establishedChan chan struct{} + errorChan chan error + // Explicitly locked state lock sync.RWMutex - listener OverlayForwarderListener remoteAddr *net.UDPAddr // These fields are accessed and updated independently, so no @@ -375,6 +378,8 @@ func (sleeve *SleeveOverlay) MakeForwarder(params ForwarderParams) (OverlayForwa controlMsgChan: controlMsgChan, confirmedChan: confirmedChan, finishedChan: finishedChan, + establishedChan: make(chan struct{}), + errorChan: make(chan error, 1), remoteAddr: params.RemoteAddr, mtu: DefaultMTU, crypto: crypto, @@ -400,23 +405,23 @@ func (fwd *sleeveForwarder) logPrefix() string { return fwd.logPrefixFor(remoteAddr) } -func (fwd *sleeveForwarder) SetListener(listener OverlayForwarderListener) { - log.Debug(fwd.logPrefix(), "SetListener ", listener) - - fwd.lock.Lock() - fwd.listener = listener - fwd.lock.Unlock() +func (fwd *sleeveForwarder) Confirm() { + log.Debug(fwd.logPrefix(), "Confirm") - // Setting the listener confirms that the forwarder is really - // wanted - if listener != nil { - select { - case fwd.confirmedChan <- struct{}{}: - case <-fwd.finishedChan: - } + select { + case fwd.confirmedChan <- struct{}{}: + case <-fwd.finishedChan: } } +func (fwd *sleeveForwarder) EstablishedChannel() <-chan struct{} { + return fwd.establishedChan +} + +func (fwd *sleeveForwarder) ErrorChannel() <-chan error { + return fwd.errorChan +} + type curriedForward struct { fwd *sleeveForwarder key ForwardPacketKey @@ -590,7 +595,6 @@ func (fwd *sleeveForwarder) ControlMessage(tag ProtocolTag, msg []byte) { func (fwd *sleeveForwarder) Stop() { fwd.sleeve.removeForwarder(fwd.remotePeer.Name, fwd) - fwd.SetListener(nil) // Tell the forwarder goroutine to finish. We don't need to // wait for it. @@ -664,9 +668,9 @@ loop: fwd.lock.RLock() defer fwd.lock.RUnlock() - if fwd.listener != nil { - fwd.listener.Error(err) - } + + // this is the only place we send an error to errorChan + fwd.errorChan <- err } func (fwd *sleeveForwarder) aggregateAndSend(frame aggregatorFrame, @@ -857,14 +861,14 @@ func (fwd *sleeveForwarder) setRemoteAddr(addr *net.UDPAddr) { func (fwd *sleeveForwarder) handleHeartbeatAck() error { log.Debug(fwd.logPrefix(), "handleHeartbeatAck") - // The connection is now regarded as established - fwd.notifyEstablished() - if fwd.heartbeatInterval != SlowHeartbeat { fwd.heartbeatInterval = SlowHeartbeat if fwd.heartbeatTimer != nil { fwd.heartbeatTimer.Reset(fwd.heartbeatInterval) } + + // The connection is now regarded as established + close(fwd.establishedChan) } fwd.fragTestTicker = time.NewTicker(FragTestInterval) @@ -879,14 +883,6 @@ func (fwd *sleeveForwarder) handleHeartbeatAck() error { make([]byte, PMTUDiscoverySize)) } -func (fwd *sleeveForwarder) notifyEstablished() { - fwd.lock.RLock() - defer fwd.lock.RUnlock() - if fwd.listener != nil { - fwd.listener.Established() - } -} - func (fwd *sleeveForwarder) sendFragTest() error { log.Debug(fwd.logPrefix(), "sendFragTest") fwd.stackFrag = false From f56457271da73bed3971df056e63e4809a85b601 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Wed, 9 Sep 2015 11:55:09 +0100 Subject: [PATCH 09/15] OverlaySwitch --- router/connection.go | 11 +- router/overlay.go | 15 +- router/overlay_switch.go | 395 +++++++++++++++++++++++++++++++++++++++ router/protocol.go | 2 +- router/sleeve.go | 10 +- 5 files changed, 422 insertions(+), 11 deletions(-) create mode 100644 router/overlay_switch.go diff --git a/router/connection.go b/router/connection.go index 1090aa894d..926d1392ca 100644 --- a/router/connection.go +++ b/router/connection.go @@ -199,6 +199,7 @@ func (conn *LocalConnection) run(actionChan <-chan ConnectionAction, finished ch ConnUID: conn.uid, Crypto: conn.forwarderCrypto(), SendControlMessage: conn.sendOverlayControlMessage, + Features: intro.Features, } if conn.forwarder, err = conn.Router.Overlay.MakeForwarder(params); err != nil { return @@ -248,7 +249,7 @@ func (conn *LocalConnection) run(actionChan <-chan ConnectionAction, finished ch } func (conn *LocalConnection) makeFeatures() map[string]string { - return map[string]string{ + features := map[string]string{ "PeerNameFlavour": PeerNameFlavour, "Name": conn.local.Name.String(), "NickName": conn.local.NickName, @@ -256,6 +257,8 @@ func (conn *LocalConnection) makeFeatures() map[string]string { "UID": fmt.Sprint(conn.local.UID), "ConnID": fmt.Sprint(conn.uid), } + conn.Router.Overlay.AddFeaturesTo(features) + return features } type features map[string]string @@ -392,8 +395,8 @@ func (conn *LocalConnection) forwarderCrypto() *OverlayCrypto { } } -func (conn *LocalConnection) sendOverlayControlMessage(tag ProtocolTag, msg []byte) error { - return conn.sendProtocolMsg(ProtocolMsg{tag, msg}) +func (conn *LocalConnection) sendOverlayControlMessage(tag byte, msg []byte) error { + return conn.sendProtocolMsg(ProtocolMsg{ProtocolTag(tag), msg}) } // Helpers @@ -430,7 +433,7 @@ func (conn *LocalConnection) handleProtocolMsg(tag ProtocolTag, payload []byte) switch tag { case ProtocolHeartbeat: case ProtocolConnectionEstablished, ProtocolFragmentationReceived, ProtocolPMTUVerified, ProtocolOverlayControlMsg: - conn.forwarder.ControlMessage(tag, payload) + conn.forwarder.ControlMessage(byte(tag), payload) case ProtocolGossipUnicast, ProtocolGossipBroadcast, ProtocolGossip: return conn.Router.handleGossip(tag, payload) default: diff --git a/router/overlay.go b/router/overlay.go index 9da023beab..e8b44ab1e6 100644 --- a/router/overlay.go +++ b/router/overlay.go @@ -18,6 +18,9 @@ type Overlay interface { // A mapping of a short id to a peer has changed InvalidateShortIDs() + + // Enhance a features map with overlay-related features + AddFeaturesTo(map[string]string) } type ForwarderParams struct { @@ -41,7 +44,10 @@ type ForwarderParams struct { // Function to send a control message to the counterpart // forwarder. - SendControlMessage func(tag ProtocolTag, msg []byte) error + SendControlMessage func(tag byte, msg []byte) error + + // Features passed at connection initiation + Features map[string]string } // When a consumer is called, the decoder will already have been used @@ -81,7 +87,7 @@ type OverlayForwarder interface { // Handle a message from the peer. 'tag' exists for // compatibility, and should always be // ProtocolOverlayControlMessage for non-sleeve overlays. - ControlMessage(tag ProtocolTag, msg []byte) + ControlMessage(tag byte, msg []byte) } type NullOverlay struct{} @@ -111,6 +117,9 @@ func (NullOverlay) ErrorChannel() <-chan error { return nil } +func (NullOverlay) AddFeaturesTo(map[string]string) { +} + func (NullOverlay) Forward(ForwardPacketKey) FlowOp { return nil } @@ -118,5 +127,5 @@ func (NullOverlay) Forward(ForwardPacketKey) FlowOp { func (NullOverlay) Stop() { } -func (NullOverlay) ControlMessage(ProtocolTag, []byte) { +func (NullOverlay) ControlMessage(byte, []byte) { } diff --git a/router/overlay_switch.go b/router/overlay_switch.go new file mode 100644 index 0000000000..b2ae6af31d --- /dev/null +++ b/router/overlay_switch.go @@ -0,0 +1,395 @@ +package router + +import ( + "fmt" + "strings" + "sync" +) + +// OverlaySwitch selects which overlay to use, from a set of +// subsidiary overlays. First, it passes a list of supported overlays +// in the connection features, and uses that to determine which +// overlays are in common. Then it tries those common overlays, and +// uses the best one that seems to be working. + +type OverlaySwitch struct { + overlays map[string]Overlay + overlayNames []string +} + +func NewOverlaySwitch() *OverlaySwitch { + return &OverlaySwitch{overlays: make(map[string]Overlay)} +} + +func (osw *OverlaySwitch) Add(name string, overlay Overlay) { + // check for repeated names + if _, present := osw.overlays[name]; present { + log.Fatal("OverlaySwitch: repeated overlay name") + } + + osw.overlays[name] = overlay + osw.overlayNames = append(osw.overlayNames, name) +} + +func (osw *OverlaySwitch) AddFeaturesTo(features map[string]string) { + features["Overlays"] = strings.Join(osw.overlayNames, " ") +} + +func (osw *OverlaySwitch) InvalidateRoutes() { + for _, overlay := range osw.overlays { + overlay.InvalidateRoutes() + } +} + +func (osw *OverlaySwitch) InvalidateShortIDs() { + for _, overlay := range osw.overlays { + overlay.InvalidateShortIDs() + } +} + +func (osw *OverlaySwitch) StartConsumingPackets(localPeer *Peer, peers *Peers, + consumer OverlayConsumer) error { + for _, overlay := range osw.overlays { + if err := overlay.StartConsumingPackets(localPeer, peers, + consumer); err != nil { + return err + } + } + return nil +} + +type namedOverlay struct { + Overlay + name string +} + +// Find the common set of overlays supported by both sides, with the +// ordering being the same on both sides too. +func (osw *OverlaySwitch) commonOverlays(params ForwarderParams) ([]namedOverlay, error) { + var peerOverlays []string + if overlaysFeature, present := params.Features["Overlays"]; present { + peerOverlays = strings.Split(overlaysFeature, " ") + } + + common := make(map[string]Overlay) + for _, name := range peerOverlays { + if overlay := osw.overlays[name]; overlay != nil { + common[name] = overlay + } + } + + if len(common) == 0 { + return nil, fmt.Errorf("no overlays in common with peer") + } + + // we order them according to the connecting node + ordering := osw.overlayNames + if params.RemoteAddr == nil { + // we are the connectee + ordering = peerOverlays + } + + res := make([]namedOverlay, 0, len(common)) + for _, name := range ordering { + overlay := common[name] + if overlay != nil { + res = append(res, namedOverlay{overlay, name}) + } + } + + // we use bytes to represent forwarder indices in control + // messages, so just in case: + if len(res) > 256 { + res = res[:256] + } + + return res, nil +} + +type overlaySwitchForwarder struct { + remotePeer *Peer + + lock sync.Mutex + + // the forwarder to send on + best OverlayForwarder + + // the subsidiary forwarders + forwarders []subForwarder + + // closed to tell the main goroutine to stop + stopChan chan<- struct{} + + alreadyEstablished bool + establishedChan chan struct{} + errorChan chan error +} + +// A subsidiary forwarder +type subForwarder struct { + fwd OverlayForwarder + overlayName string + + // Has the forwarder signalled that it is established? + established bool + + // closed to tell the forwarder monitor goroutine to stop + stopChan chan<- struct{} +} + +// An event from a subsidiary forwarder +type subForwarderEvent struct { + // the index of the forwarder + index int + + // is this an "established" event? + established bool + + // is this an error event? + err error +} + +func (osw *OverlaySwitch) MakeForwarder( + params ForwarderParams) (OverlayForwarder, error) { + overlays, err := osw.commonOverlays(params) + if err != nil { + return nil, err + } + + // channel to carry events from the subforwarder monitors to + // the main goroutine + eventsChan := make(chan subForwarderEvent) + + // channel to stop the main goroutine + stopChan := make(chan struct{}) + + fwd := &overlaySwitchForwarder{ + remotePeer: params.RemotePeer, + + forwarders: make([]subForwarder, len(overlays)), + stopChan: stopChan, + + establishedChan: make(chan struct{}), + errorChan: make(chan error, 1), + } + + origSendControlMessage := params.SendControlMessage + for i, overlay := range overlays { + // Prefix control messages to indicate the relevant forwarder + index := i + params.SendControlMessage = func(tag byte, msg []byte) error { + xmsg := make([]byte, len(msg)+2) + xmsg[0] = byte(index) + xmsg[1] = tag + copy(xmsg[2:], msg) + return origSendControlMessage(ProtocolOverlayControlMsg, + xmsg) + } + + subFwd, err := overlay.MakeForwarder(params) + if err != nil { + fwd.stopFrom(0) + return nil, err + } + + subStopChan := make(chan struct{}) + go monitorForwarder(i, eventsChan, subStopChan, subFwd) + fwd.forwarders[i] = subForwarder{ + fwd: subFwd, + overlayName: overlay.name, + stopChan: subStopChan, + } + } + + fwd.chooseBest() + go fwd.run(eventsChan, stopChan) + return fwd, nil +} + +func monitorForwarder(index int, eventsChan chan<- subForwarderEvent, + stopChan <-chan struct{}, fwd OverlayForwarder) { + establishedChan := fwd.EstablishedChannel() +loop: + for { + e := subForwarderEvent{index: index} + + select { + case <-establishedChan: + e.established = true + establishedChan = nil + + case err := <-fwd.ErrorChannel(): + e.err = err + + case <-stopChan: + break loop + } + + select { + case eventsChan <- e: + case <-stopChan: + break loop + } + + if e.err != nil { + break loop + } + } + + fwd.Stop() +} + +func (fwd *overlaySwitchForwarder) run(eventsChan <-chan subForwarderEvent, + stopChan <-chan struct{}) { +loop: + for { + select { + case <-stopChan: + break loop + + case e := <-eventsChan: + switch { + case e.established: + fwd.established(e.index) + case e.err != nil: + fwd.error(e.index, e.err) + } + } + } + + fwd.lock.Lock() + defer fwd.lock.Unlock() + fwd.stopFrom(0) +} + +func (fwd *overlaySwitchForwarder) established(index int) { + fwd.lock.Lock() + defer fwd.lock.Unlock() + + fwd.forwarders[index].established = true + + // less preferred forwarders are no longer needed + fwd.stopFrom(index + 1) + + if !fwd.alreadyEstablished { + fwd.alreadyEstablished = true + close(fwd.establishedChan) + } + + fwd.chooseBest() +} + +func (fwd *overlaySwitchForwarder) logPrefix() string { + return fmt.Sprintf("overlay_switch ->[%s] ", fwd.remotePeer) +} + +func (fwd *overlaySwitchForwarder) error(index int, err error) { + fwd.lock.Lock() + defer fwd.lock.Unlock() + + log.Info(fwd.logPrefix(), fwd.forwarders[index].overlayName, " ", err) + fwd.forwarders[index].fwd = nil + fwd.chooseBest() +} + +func (fwd *overlaySwitchForwarder) stopFrom(index int) { + for index < len(fwd.forwarders) { + subFwd := &fwd.forwarders[index] + if subFwd.fwd != nil { + subFwd.fwd = nil + close(subFwd.stopChan) + } + index++ + } +} + +func (fwd *overlaySwitchForwarder) chooseBest() { + // the most preferred established forwarder is the best + // otherwise, the most preferred working forwarder is the best + var bestEstablished, bestWorking *subForwarder + + for i := range fwd.forwarders { + subFwd := &fwd.forwarders[i] + if subFwd.fwd == nil { + continue + } + + if bestWorking == nil { + bestWorking = subFwd + } + + if bestEstablished == nil && subFwd.established { + bestEstablished = subFwd + } + } + + best := bestEstablished + if best == nil { + if bestWorking == nil { + select { + case fwd.errorChan <- fmt.Errorf("no working forwarders to %s", fwd.remotePeer): + default: + } + + return + } + + best = bestWorking + } + + if fwd.best != best.fwd { + fwd.best = best.fwd + log.Info(fwd.logPrefix(), "using ", best.overlayName) + } +} + +func (fwd *overlaySwitchForwarder) Confirm() { + var forwarders []OverlayForwarder + + fwd.lock.Lock() + for _, subFwd := range fwd.forwarders { + if subFwd.fwd != nil { + forwarders = append(forwarders, subFwd.fwd) + } + } + fwd.lock.Unlock() + + for _, subFwd := range forwarders { + subFwd.Confirm() + } +} + +func (fwd *overlaySwitchForwarder) Forward(pk ForwardPacketKey) FlowOp { + fwd.lock.Lock() + best := fwd.best + fwd.lock.Unlock() + + if best == nil { + return nil + } + + return best.Forward(pk) +} + +func (fwd *overlaySwitchForwarder) EstablishedChannel() <-chan struct{} { + return fwd.establishedChan +} + +func (fwd *overlaySwitchForwarder) ErrorChannel() <-chan error { + return fwd.errorChan +} + +func (fwd *overlaySwitchForwarder) Stop() { + fwd.lock.Lock() + defer fwd.lock.Unlock() + fwd.stopFrom(0) +} + +func (fwd *overlaySwitchForwarder) ControlMessage(tag byte, msg []byte) { + fwd.lock.Lock() + subFwd := fwd.forwarders[msg[0]].fwd + fwd.lock.Unlock() + if subFwd != nil { + subFwd.ControlMessage(msg[1], msg[2:]) + } +} diff --git a/router/protocol.go b/router/protocol.go index f19593738c..add17a179a 100644 --- a/router/protocol.go +++ b/router/protocol.go @@ -328,7 +328,7 @@ func (res *ProtocolIntroResults) setupCrypto(params ProtocolIntroParams, remoteP type ProtocolTag byte const ( - ProtocolHeartbeat ProtocolTag = iota + ProtocolHeartbeat = iota ProtocolConnectionEstablished ProtocolFragmentationReceived ProtocolPMTUVerified diff --git a/router/sleeve.go b/router/sleeve.go index ed08d44e49..7ed39f342b 100644 --- a/router/sleeve.go +++ b/router/sleeve.go @@ -133,6 +133,10 @@ func (*SleeveOverlay) InvalidateShortIDs() { // no cached information, so nothing to do } +func (*SleeveOverlay) AddFeaturesTo(map[string]string) { + // No features to be provided, to facilitate compatibility +} + func (sleeve *SleeveOverlay) lookupForwarder(peer PeerName) *sleeveForwarder { sleeve.lock.Lock() defer sleeve.lock.Unlock() @@ -281,7 +285,7 @@ type sleeveForwarder struct { sleeve *SleeveOverlay remotePeer *Peer remotePeerBin []byte - sendControlMsg func(ProtocolTag, []byte) error + sendControlMsg func(byte, []byte) error connUID uint64 // Channels to communicate with the aggregator goroutine @@ -342,7 +346,7 @@ type specialFrame struct { // A control message type controlMessage struct { - tag ProtocolTag + tag byte msg []byte } @@ -586,7 +590,7 @@ func frameTooBig(frame []byte, mtu int) bool { return len(frame) > mtu+EthernetOverhead } -func (fwd *sleeveForwarder) ControlMessage(tag ProtocolTag, msg []byte) { +func (fwd *sleeveForwarder) ControlMessage(tag byte, msg []byte) { select { case fwd.controlMsgChan <- controlMessage{tag, msg}: case <-fwd.finishedChan: From bc026ab49a39e3476453ac1f12609873f9d9d8ed Mon Sep 17 00:00:00 2001 From: David Wragg Date: Wed, 9 Sep 2015 11:50:44 +0100 Subject: [PATCH 10/15] Show the overlay name in sleeve log messsages --- router/sleeve.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/sleeve.go b/router/sleeve.go index 7ed39f342b..c60e6702b2 100644 --- a/router/sleeve.go +++ b/router/sleeve.go @@ -399,7 +399,7 @@ func (sleeve *SleeveOverlay) MakeForwarder(params ForwarderParams) (OverlayForwa } func (fwd *sleeveForwarder) logPrefixFor(sender *net.UDPAddr) string { - return fmt.Sprintf("->[%s|%s]: ", sender, fwd.remotePeer) + return fmt.Sprintf("sleeve ->[%s|%s]: ", sender, fwd.remotePeer) } func (fwd *sleeveForwarder) logPrefix() string { From e8f975e44f8d0d03daffa900631c3c2c1c950a16 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Fri, 24 Jul 2015 16:14:14 +0100 Subject: [PATCH 11/15] Introduce checkFatal in prog/weaver/main.go --- prog/weaver/main.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/prog/weaver/main.go b/prog/weaver/main.go index b778b34983..eb02e1ea33 100644 --- a/prog/weaver/main.go +++ b/prog/weaver/main.go @@ -116,15 +116,11 @@ func main() { if ifaceName != "" { iface, err := weavenet.EnsureInterface(ifaceName) - if err != nil { - Log.Fatal(err) - } + checkFatal(err) // bufsz flag is in MB config.Bridge, err = weave.NewPcap(iface, bufSzMB*1024*1024) - if err != nil { - Log.Fatal(err) - } + checkFatal(err) } if routerName == "" { @@ -135,15 +131,11 @@ func main() { } name, err := weave.PeerNameFromUserInput(routerName) - if err != nil { - Log.Fatal(err) - } + checkFatal(err) if nickName == "" { nickName, err = os.Hostname() - if err != nil { - Log.Fatal(err) - } + checkFatal(err) } if password == "" { @@ -285,9 +277,8 @@ func (nopPacketLogging) LogForwardPacket(string, weave.ForwardPacketKey) { func parseAndCheckCIDR(cidrStr string) address.CIDR { _, cidr, err := address.ParseCIDR(cidrStr) - if err != nil { - Log.Fatal(err) - } + checkFatal(err) + if cidr.Size() < ipam.MinSubnetSize { Log.Fatalf("Allocation range smaller than minimum size %d: %s", ipam.MinSubnetSize, cidrStr) } @@ -346,3 +337,9 @@ func listenAndServeHTTP(httpAddr string, muxRouter *mux.Router) { Log.Fatal("Unable to create http server", err) } } + +func checkFatal(e error) { + if e != nil { + Log.Fatal(e) + } +} From 295876e1d113017fe3d37d4e50aa56a530a03327 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Wed, 9 Sep 2015 11:55:39 +0100 Subject: [PATCH 12/15] Fast datapath --- prog/weaver/main.go | 93 +++- router/consts.go | 20 +- router/fastdp.go | 1115 +++++++++++++++++++++++++++++++++++++++++++ router/flow.go | 32 +- router/odp.go | 60 +++ router/sleeve.go | 20 +- 6 files changed, 1303 insertions(+), 37 deletions(-) create mode 100644 router/fastdp.go create mode 100644 router/odp.go diff --git a/prog/weaver/main.go b/prog/weaver/main.go index eb02e1ea33..5599728d6f 100644 --- a/prog/weaver/main.go +++ b/prog/weaver/main.go @@ -13,6 +13,7 @@ import ( "github.com/davecheney/profile" "github.com/docker/docker/pkg/mflag" "github.com/gorilla/mux" + "github.com/weaveworks/go-odp/odp" . "github.com/weaveworks/weave/common" "github.com/weaveworks/weave/common/docker" @@ -35,8 +36,13 @@ func main() { runtime.GOMAXPROCS(procs) var ( + // flags that cause immediate exit + justVersion bool + createDatapath bool + deleteDatapath bool + addDatapathInterface string + config weave.Config - justVersion bool protocolMinVersion int ifaceName string routerName string @@ -60,9 +66,14 @@ func main() { dnsClientTimeout time.Duration dnsEffectiveListenAddress string iface *net.Interface + datapathName string ) mflag.BoolVar(&justVersion, []string{"#version", "-version"}, false, "print version and exit") + mflag.BoolVar(&createDatapath, []string{"-create-datapath"}, false, "create ODP datapath and exit") + mflag.BoolVar(&deleteDatapath, []string{"-delete-datapath"}, false, "delete ODP datapath and exit") + mflag.StringVar(&addDatapathInterface, []string{"-add-datapath-iface"}, "", "add a network interface to the ODP datapath and exit") + mflag.IntVar(&config.Port, []string{"#port", "-port"}, weave.Port, "router port") mflag.IntVar(&protocolMinVersion, []string{"-min-protocol-version"}, weave.ProtocolMinVersion, "minimum weave protocol version") mflag.StringVar(&ifaceName, []string{"#iface", "-iface"}, "", "name of interface to capture/inject from (disabled if blank)") @@ -86,6 +97,7 @@ func main() { mflag.IntVar(&dnsTTL, []string{"-dns-ttl"}, nameserver.DefaultTTL, "TTL for DNS request from our domain") mflag.DurationVar(&dnsClientTimeout, []string{"-dns-fallback-timeout"}, nameserver.DefaultClientTimeout, "timeout for fallback DNS requests") mflag.StringVar(&dnsEffectiveListenAddress, []string{"-dns-effective-listen-address"}, "", "address DNS will actually be listening, after Docker port mapping") + mflag.StringVar(&datapathName, []string{"-datapath"}, "", "ODP datapath name") // crude way of detecting that we probably have been started in a // container, with `weave launch` --> suppress misleading paths in @@ -94,14 +106,37 @@ func main() { os.Args[0] = "weave" mflag.CommandLine.Init("weave", mflag.ExitOnError) } + mflag.Parse() peers = mflag.Args() SetLogLevel(logLevel) - if justVersion { + + switch { + case justVersion: fmt.Printf("weave router %s\n", version) os.Exit(0) + + case createDatapath: + err := weave.CreateDatapath(datapathName) + if odp.IsKernelLacksODPError(err) { + // When the kernel lacks ODP support, exit + // with a special status to distinguish it for + // the weave script. + os.Exit(17) + } + + checkFatal(err) + os.Exit(0) + + case deleteDatapath: + checkFatal(weave.DeleteDatapath(datapathName)) + os.Exit(0) + + case addDatapathInterface != "": + checkFatal(weave.AddDatapathInterface(datapathName, addDatapathInterface)) + os.Exit(0) } Log.Println("Command line options:", options()) @@ -112,9 +147,29 @@ func main() { } config.ProtocolMinVersion = byte(protocolMinVersion) - var err error + var fastDPOverlay weave.Overlay + if datapathName != "" { + // A datapath name implies that "Bridge" and "Overlay" + // packet handling use fast datapath, although other + // options can override that below. Even if both + // things are overridden, we might need bridging on + // the datapath. + fastdp, err := weave.NewFastDatapath(weave.FastDatapathConfig{ + DatapathName: datapathName, + Port: config.Port, + }) + + checkFatal(err) + config.Bridge = fastdp.Bridge() + fastDPOverlay = fastdp.Overlay() + } if ifaceName != "" { + // -iface can coexist with -datapath, because + // pcap-based packet capture is a bit more efficient + // than capture via ODP misses, even when using an + // ODP-based bridge. So when using weave encyption, + // it's preferable to use -iface. iface, err := weavenet.EnsureInterface(ifaceName) checkFatal(err) @@ -123,6 +178,27 @@ func main() { checkFatal(err) } + if password == "" { + password = os.Getenv("WEAVE_PASSWORD") + } + + if password == "" { + Log.Println("Communication between peers is unencrypted.") + } else { + config.Password = []byte(password) + Log.Println("Communication between peers is encrypted.") + + // fastdp doesn't support encryption + fastDPOverlay = nil + } + + overlays := weave.NewOverlaySwitch() + if fastDPOverlay != nil { + overlays.Add("fastdp", fastDPOverlay) + } + overlays.Add("sleeve", weave.NewSleeveOverlay(config.Port)) + config.Overlay = overlays + if routerName == "" { if iface == nil { Log.Fatal("Either an interface must be specified with --iface or a name with -name") @@ -138,16 +214,6 @@ func main() { checkFatal(err) } - if password == "" { - password = os.Getenv("WEAVE_PASSWORD") - } - if password == "" { - Log.Println("Communication between peers is unencrypted.") - } else { - config.Password = []byte(password) - Log.Println("Communication between peers is encrypted.") - } - if prof != "" { p := *profile.CPUProfile p.ProfilePath = prof @@ -156,7 +222,6 @@ func main() { } config.PeerDiscovery = !noDiscovery - config.Overlay = weave.NewSleeveOverlay(config.Port) if pktdebug { config.PacketLogging = packetLogging{} diff --git a/router/consts.go b/router/consts.go index f8b718c2b7..6fd115885a 100644 --- a/router/consts.go +++ b/router/consts.go @@ -6,12 +6,16 @@ import ( ) const ( - Port = 6783 - HTTPPort = Port + 1 - MaxUDPPacketSize = 65535 - ChannelSize = 16 - TCPHeartbeat = 30 * time.Second - GossipInterval = 30 * time.Second - MaxDuration = time.Duration(math.MaxInt64) - MaxTCPMsgSize = 10 * 1024 * 1024 + Port = 6783 + HTTPPort = Port + 1 + MaxUDPPacketSize = 65535 + ChannelSize = 16 + TCPHeartbeat = 30 * time.Second + GossipInterval = 30 * time.Second + MaxDuration = time.Duration(math.MaxInt64) + MaxTCPMsgSize = 10 * 1024 * 1024 + FastHeartbeat = 500 * time.Millisecond + SlowHeartbeat = 10 * time.Second + MaxMissedHeartbeats = 6 + HeartbeatTimeout = MaxMissedHeartbeats * SlowHeartbeat ) diff --git a/router/fastdp.go b/router/fastdp.go new file mode 100644 index 0000000000..7208094a79 --- /dev/null +++ b/router/fastdp.go @@ -0,0 +1,1115 @@ +package router + +import ( + "encoding/binary" + "fmt" + "net" + "sync" + "time" + + "github.com/weaveworks/go-odp/odp" +) + +// The virtual bridge accepts packets from ODP vports and the router +// port (i.e. InjectPacket). We need a map key to index those +// possibilities: +type bridgePortID struct { + vport odp.VportID + router bool +} + +// A bridgeSender sends out a packet from the virtual bridge +type bridgeSender func(key PacketKey, lock *fastDatapathLock) FlowOp + +// A missHandler handles an ODP miss +type missHandler func(fks odp.FlowKeys, lock *fastDatapathLock) FlowOp + +type FastDatapath struct { + dpname string + + // The mtu from the datapath netdev (which should match the + // mtus on all the veths hooked up to the datapath). We + // validate that we are able to support that mtu. + mtu int + + // The lock guards the FastDatapath state, and also + // synchronizes use of the dpif + lock sync.Mutex + dpif *odp.Dpif + dp odp.DatapathHandle + deleteFlowsCount uint64 + missHandlers map[odp.VportID]missHandler + localPeer *Peer + peers *Peers + overlayConsumer OverlayConsumer + + // Bridge state: How to send to the given bridge port + sendToPort map[bridgePortID]bridgeSender + + // How to send to a given destination MAC + sendToMAC map[MAC]bridgeSender + + // MACs seen on the bridge recently + seenMACs map[MAC]struct{} + + // vxlan vports associated with the given UDP ports + vxlanVportIDs map[int]odp.VportID + mainVxlanVportID odp.VportID + + // A singleton pool for the occasions when we need to decode + // the packet. + dec *EthernetDecoder + + // forwarders by remote peer + forwarders map[PeerName]*fastDatapathForwarder +} + +type FastDatapathConfig struct { + DatapathName string + Port int + ExpireFlowsInterval time.Duration + ExpireMACsInterval time.Duration +} + +func NewFastDatapath(config FastDatapathConfig) (*FastDatapath, error) { + dpif, err := odp.NewDpif() + if err != nil { + return nil, err + } + + success := false + defer func() { + if !success { + dpif.Close() + } + }() + + dp, err := dpif.LookupDatapath(config.DatapathName) + if err != nil { + return nil, err + } + + iface, err := net.InterfaceByName(config.DatapathName) + if err != nil { + return nil, err + } + + fastdp := &FastDatapath{ + dpname: config.DatapathName, + mtu: iface.MTU, + dpif: dpif, + dp: dp, + missHandlers: make(map[odp.VportID]missHandler), + sendToPort: nil, + sendToMAC: make(map[MAC]bridgeSender), + seenMACs: make(map[MAC]struct{}), + vxlanVportIDs: make(map[int]odp.VportID), + forwarders: make(map[PeerName]*fastDatapathForwarder), + } + + if err := fastdp.deleteVxlanVports(); err != nil { + return nil, err + } + + if err := fastdp.deleteFlows(); err != nil { + return nil, err + } + + // We use the weave port number plus 1 for vxlan. Hard-coding + // this relationship may seem dubious, but there is no moral + // difference between this and requiring that the sleeve UDP + // port number is the same as the TCP port number. The hard + // part would be not adding weaver flags to allow the port + // numbers to be independent, but working out how to specify + // them on the connecting side. So we can wait to find out if + // anyone wants that. + fastdp.mainVxlanVportID, err = fastdp.getVxlanVportID(config.Port + 1) + if err != nil { + return nil, err + } + + // need to lock before we might receive events + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + + if _, err := dp.ConsumeMisses(fastdp); err != nil { + return nil, err + } + + if _, err := dp.ConsumeVportEvents(fastdp); err != nil { + return nil, err + } + + vports, err := dp.EnumerateVports() + if err != nil { + return nil, err + } + + for _, vport := range vports { + fastdp.makeBridgeVport(vport) + } + + success = true + go fastdp.run() + return fastdp, nil +} + +func (fastdp *FastDatapath) Close() error { + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + err := fastdp.dpif.Close() + fastdp.dpif = nil + return err +} + +// While processing a packet, we can potentially acquire and drop the +// FastDatapath lock many times (acquiring it to acceess FastDatapath +// state, and invoke ODP operations; dropping it to invoke callbacks +// that may re-enter the FastDatapath). A fastDatapathLock +// coordinates this process. +type fastDatapathLock struct { + fastdp *FastDatapath + locked bool + + // While the lock is dropped, deleteFlows could be called. We + // need to detect when this happens and avoid creating flows, + // because they may be based on stale information. + deleteFlowsCount uint64 +} + +func (fastdp *FastDatapath) startLock() fastDatapathLock { + fastdp.lock.Lock() + return fastDatapathLock{ + fastdp: fastdp, + locked: true, + deleteFlowsCount: fastdp.deleteFlowsCount, + } +} + +func (lock *fastDatapathLock) unlock() { + if lock.locked { + lock.fastdp.lock.Unlock() + lock.locked = false + } +} + +func (lock *fastDatapathLock) relock() { + if !lock.locked { + lock.fastdp.lock.Lock() + lock.locked = true + } +} + +// Bridge bits + +type fastDatapathBridge struct { + *FastDatapath +} + +func (fastdp *FastDatapath) Bridge() Bridge { + return fastDatapathBridge{fastdp} +} + +func (fastdp fastDatapathBridge) String() string { + return fmt.Sprint(fastdp.dpname, " (via ODP)") +} + +func (fastDatapathBridge) Stats() map[string]int { + return nil +} + +var routerBridgePortID = bridgePortID{router: true} + +func (fastdp fastDatapathBridge) StartConsumingPackets(consumer BridgeConsumer) error { + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + + if fastdp.sendToPort[routerBridgePortID] != nil { + return fmt.Errorf("FastDatapath already has a BridgeConsumer") + } + + // set up delivery to the weave router port on the bridge + fastdp.addSendToPort(routerBridgePortID, + func(key PacketKey, lock *fastDatapathLock) FlowOp { + // drop the FastDatapath lock in order to call + // the consumer + lock.unlock() + return consumer(key) + }) + return nil +} + +func (fastdp fastDatapathBridge) InjectPacket(key PacketKey) FlowOp { + lock := fastdp.startLock() + defer lock.unlock() + return fastdp.bridge(routerBridgePortID, key, &lock) +} + +// Ethernet bridge implementation + +func (fastdp *FastDatapath) bridge(ingress bridgePortID, + key PacketKey, lock *fastDatapathLock) FlowOp { + lock.relock() + if fastdp.sendToMAC[key.SrcMAC] == nil { + // Learn the source MAC + fastdp.sendToMAC[key.SrcMAC] = fastdp.sendToPort[ingress] + fastdp.seenMACs[key.SrcMAC] = struct{}{} + } + + // If we know about the destination MAC, deliver it to the + // associated port. + if sender := fastdp.sendToMAC[key.DstMAC]; sender != nil { + return NewMultiFlowOp(false, odpEthernetFlowKey(key), + sender(key, lock)) + } + + // Otherwise, it might be a real broadcast, or it might + // be for a MAC we don't know about yet. Either way, we'll + // broadcast it. + mfop := NewMultiFlowOp(false) + + if (key.DstMAC[0] & 1) == 0 { + // Not a real broadcast, so don't create a flow rule. + // If we did, we'd need to delete the flows every time + // we learned a new MAC address, or have a more + // complicated selective invalidation scheme. + mfop.Add(vetoFlowCreationFlowOp{}) + } else { + // A real broadcast + mfop.Add(odpEthernetFlowKey(key)) + } + + // Send to all ports except the one it came in on. The + // sendToPort map is immutable, so it is safe to iterate over + // it even though the sender functions can drop the + // fastDatapathLock + for id, sender := range fastdp.sendToPort { + if id != ingress { + mfop.Add(sender(key, lock)) + } + } + + return mfop +} + +// Overlay bits + +type fastDatapathOverlay struct { + *FastDatapath +} + +func (fastdp *FastDatapath) Overlay() Overlay { + return fastDatapathOverlay{fastdp} +} + +func (fastdp fastDatapathOverlay) InvalidateRoutes() { + log.Debug("InvalidateRoutes") + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + checkWarn(fastdp.deleteFlows()) +} + +func (fastdp fastDatapathOverlay) InvalidateShortIDs() { + log.Debug("InvalidateShortIDs") + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + checkWarn(fastdp.deleteFlows()) +} + +func (fastDatapathOverlay) AddFeaturesTo(features map[string]string) { + // Nothing needed. Fast datapath support is indicated through + // OverlaySwitch. +} + +func (fastdp fastDatapathOverlay) StartConsumingPackets(localPeer *Peer, + peers *Peers, consumer OverlayConsumer) error { + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + + if fastdp.overlayConsumer != nil { + return fmt.Errorf("FastDatapath already has an OverlayConsumer") + } + + fastdp.localPeer = localPeer + fastdp.peers = peers + fastdp.overlayConsumer = consumer + return nil +} + +func (fastdp *FastDatapath) getVxlanVportID(udpPort int) (odp.VportID, error) { + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + + if vxlanVportID, present := fastdp.vxlanVportIDs[udpPort]; present { + return vxlanVportID, nil + } + + vxlanVportID, err := fastdp.dp.CreateVport( + odp.NewVxlanVportSpec(fmt.Sprintf("vxlan-%d", udpPort), + uint16(udpPort))) + if err != nil { + return 0, err + } + + fastdp.vxlanVportIDs[udpPort] = vxlanVportID + fastdp.missHandlers[vxlanVportID] = func(fks odp.FlowKeys, lock *fastDatapathLock) FlowOp { + tunnel := fks[odp.OVS_KEY_ATTR_TUNNEL].(odp.TunnelFlowKey) + tunKey := tunnel.Key() + + lock.relock() + consumer := fastdp.overlayConsumer + if consumer == nil { + return vetoFlowCreationFlowOp{} + } + + srcPeer, dstPeer := fastdp.extractPeers(tunKey.TunnelId) + if srcPeer == nil || dstPeer == nil { + return vetoFlowCreationFlowOp{} + } + + lock.unlock() + pk := flowKeysToPacketKey(fks) + var zeroMAC MAC + if pk.SrcMAC == zeroMAC && pk.DstMAC == zeroMAC { + return vxlanSpecialPacketFlowOp{fastdp, srcPeer, + &net.UDPAddr{ + IP: net.IP(tunKey.Ipv4Src[:]), + Port: udpPort, + }, + } + } + + key := ForwardPacketKey{ + SrcPeer: srcPeer, + DstPeer: dstPeer, + PacketKey: pk, + } + + var tunnelFlowKey odp.TunnelFlowKey + tunnelFlowKey.SetTunnelId(tunKey.TunnelId) + tunnelFlowKey.SetIpv4Src(tunKey.Ipv4Src) + tunnelFlowKey.SetIpv4Dst(tunKey.Ipv4Dst) + + return NewMultiFlowOp(false, odpFlowKey(tunnelFlowKey), + consumer(key)) + } + + return vxlanVportID, nil +} + +func (fastdp *FastDatapath) extractPeers(tunnelID [8]byte) (*Peer, *Peer) { + vni := binary.BigEndian.Uint64(tunnelID[:]) + srcPeer := fastdp.peers.FetchByShortID(PeerShortID(vni & 0xfff)) + dstPeer := fastdp.peers.FetchByShortID(PeerShortID((vni >> 12) & 0xfff)) + return srcPeer, dstPeer +} + +type vxlanSpecialPacketFlowOp struct { + fastdp *FastDatapath + srcPeer *Peer + sender *net.UDPAddr +} + +func (op vxlanSpecialPacketFlowOp) Process(frame []byte, dec *EthernetDecoder, + broadcast bool) { + op.fastdp.lock.Lock() + fwd := op.fastdp.forwarders[op.srcPeer.Name] + op.fastdp.lock.Unlock() + + if fwd != nil && dec.IsSpecial() { + fwd.handleVxlanSpecialPacket(frame, op.sender) + } +} + +type fastDatapathForwarder struct { + fastdp *FastDatapath + remotePeer *Peer + localIP [4]byte + sendControlMsg func(byte, []byte) error + connUID uint64 + vxlanVportID odp.VportID + + lock sync.RWMutex + confirmed bool + remoteAddr *net.UDPAddr + heartbeatInterval time.Duration + heartbeatTimer *time.Timer + heartbeatTimeout *time.Timer + ackedHeartbeat bool + stopChan chan struct{} + stopped bool + + establishedChan chan struct{} + errorChan chan error +} + +func (fastdp fastDatapathOverlay) MakeForwarder( + params ForwarderParams) (OverlayForwarder, error) { + if params.Crypto != nil { + // No encryption suport in fastdp. The weaver main.go + // is responsible for ensuring this doesn't happen. + log.Fatal("Attempt to use FastDatapath with encryption") + } + + vxlanVportID := fastdp.mainVxlanVportID + remoteAddr := params.RemoteAddr + if remoteAddr != nil { + // The provided address contains the main weave port + // number to connect to. We need to derive the vxlan + // port number from that. + vxlanRemoteAddr := *params.RemoteAddr + vxlanRemoteAddr.Port++ + remoteAddr = &vxlanRemoteAddr + + var err error + vxlanVportID, err = fastdp.getVxlanVportID(remoteAddr.Port) + if err != nil { + return nil, err + } + } + + localIP, err := ipv4Bytes(params.LocalIP) + if err != nil { + return nil, err + } + + fwd := &fastDatapathForwarder{ + fastdp: fastdp.FastDatapath, + remotePeer: params.RemotePeer, + localIP: localIP, + sendControlMsg: params.SendControlMessage, + connUID: params.ConnUID, + vxlanVportID: vxlanVportID, + + remoteAddr: remoteAddr, + heartbeatInterval: FastHeartbeat, + stopChan: make(chan struct{}), + + establishedChan: make(chan struct{}), + errorChan: make(chan error, 1), + } + + return fwd, err +} + +func ipv4Bytes(ip net.IP) (res [4]byte, err error) { + ipv4 := ip.To4() + if ipv4 != nil { + copy(res[:], ipv4) + } else { + err = fmt.Errorf("IP address %s is not IPv4", ip) + } + return +} + +func (fwd *fastDatapathForwarder) logPrefix() string { + return fmt.Sprintf("fastdp ->[%s|%s]: ", fwd.remoteAddr, fwd.remotePeer) +} + +func (fwd *fastDatapathForwarder) Confirm() { + fwd.lock.Lock() + defer fwd.lock.Unlock() + + if fwd.confirmed { + log.Fatal(fwd.logPrefix(), "already confirmed") + } + + log.Debug(fwd.logPrefix(), "confirmed") + fwd.fastdp.addForwarder(fwd.remotePeer.Name, fwd) + fwd.confirmed = true + + if fwd.remoteAddr != nil { + // have the goroutine send a heartbeat straight away + fwd.heartbeatTimer = time.NewTimer(0) + } else { + // we'll reset the timer when we learn the remote ip + fwd.heartbeatTimer = time.NewTimer(MaxDuration) + } + + fwd.heartbeatTimeout = time.NewTimer(HeartbeatTimeout) + go fwd.doHeartbeats() +} + +func (fwd *fastDatapathForwarder) EstablishedChannel() <-chan struct{} { + return fwd.establishedChan +} + +func (fwd *fastDatapathForwarder) ErrorChannel() <-chan error { + return fwd.errorChan +} + +func (fwd *fastDatapathForwarder) doHeartbeats() { + var err error + + for err == nil { + select { + case <-fwd.heartbeatTimer.C: + if fwd.confirmed { + fwd.sendHeartbeat() + } + fwd.heartbeatTimer.Reset(fwd.heartbeatInterval) + + case <-fwd.heartbeatTimeout.C: + err = fmt.Errorf("timed out waiting for vxlan heartbeat") + + case <-fwd.stopChan: + return + } + } + + fwd.lock.Lock() + defer fwd.lock.Unlock() + fwd.handleError(err) +} + +// Handle an error which leads to notifying the listener and +// termination of the forwarder +func (fwd *fastDatapathForwarder) handleError(err error) { + if err == nil { + return + } + + select { + case fwd.errorChan <- err: + default: + } + + // stop the heartbeat goroutine + if !fwd.stopped { + fwd.stopped = true + close(fwd.stopChan) + } +} + +func (fwd *fastDatapathForwarder) sendHeartbeat() { + fwd.lock.RLock() + log.Debug(fwd.logPrefix(), "sendHeartbeat") + + // the heartbeat payload consists of the 64-bit connection uid + // followed by the 16-bit packet size. + buf := make([]byte, EthernetOverhead+fwd.fastdp.mtu) + binary.BigEndian.PutUint64(buf[EthernetOverhead:], fwd.connUID) + binary.BigEndian.PutUint16(buf[EthernetOverhead+8:], uint16(len(buf))) + + dec := NewEthernetDecoder() + dec.DecodeLayers(buf) + pk := ForwardPacketKey{ + PacketKey: dec.PacketKey(), + SrcPeer: fwd.fastdp.localPeer, + DstPeer: fwd.remotePeer, + } + fwd.lock.RUnlock() + fwd.Forward(pk).Process(buf, dec, false) +} + +const ( + FastDatapathHeartbeatAck = iota +) + +func (fwd *fastDatapathForwarder) handleVxlanSpecialPacket(frame []byte, + sender *net.UDPAddr) { + fwd.lock.Lock() + defer fwd.lock.Unlock() + + log.Debug(fwd.logPrefix(), "handleVxlanSpecialPacket") + + // the only special packet type is a heartbeat + if len(frame) < EthernetOverhead+10 { + log.Warning(fwd.logPrefix(), "short vxlan special packet: ", len(frame), " bytes") + return + } + + if binary.BigEndian.Uint64(frame[EthernetOverhead:]) != fwd.connUID || + uint16(len(frame)) != binary.BigEndian.Uint16(frame[EthernetOverhead+8:]) { + return + } + + if fwd.remoteAddr == nil { + fwd.remoteAddr = sender + + if fwd.confirmed { + fwd.heartbeatTimer.Reset(0) + } + } else if !udpAddrsEqual(fwd.remoteAddr, sender) { + log.Info(fwd.logPrefix(), + "Peer IP address changed to ", sender) + fwd.remoteAddr = sender + } + + if !fwd.ackedHeartbeat { + fwd.ackedHeartbeat = true + fwd.handleError(fwd.sendControlMsg(FastDatapathHeartbeatAck, nil)) + } + + // we can receive a heartbeat before Confirm() has set up + // heartbeatTimeout + if fwd.heartbeatTimeout != nil { + fwd.heartbeatTimeout.Reset(HeartbeatTimeout) + } +} + +func (fwd *fastDatapathForwarder) ControlMessage(tag byte, msg []byte) { + fwd.lock.Lock() + defer fwd.lock.Unlock() + + switch tag { + case FastDatapathHeartbeatAck: + fwd.handleHeartbeatAck() + + default: + log.Info(fwd.logPrefix(), + "Ignoring unknown control message: ", tag) + } +} + +func (fwd *fastDatapathForwarder) handleHeartbeatAck() { + log.Debug(fwd.logPrefix(), "handleHeartbeatAck") + + if fwd.heartbeatInterval != SlowHeartbeat { + close(fwd.establishedChan) + fwd.heartbeatInterval = SlowHeartbeat + if fwd.heartbeatTimer != nil { + fwd.heartbeatTimer.Reset(fwd.heartbeatInterval) + } + } +} + +func (fwd *fastDatapathForwarder) Forward(key ForwardPacketKey) FlowOp { + fwd.lock.RLock() + defer fwd.lock.RUnlock() + + if fwd.remoteAddr == nil { + // Returning nil would discard the packet, but also + // result in a flow rule, which we would have to + // invalidate when we learn the remote IP. So for + // now, just prevent flows. + return vetoFlowCreationFlowOp{} + } + + remoteIP, err := ipv4Bytes(fwd.remoteAddr.IP) + if err != nil { + log.Error(err) + return nil + } + + var sta odp.SetTunnelAction + sta.SetTunnelId(tunnelIDFor(key)) + sta.SetIpv4Src(fwd.localIP) + sta.SetIpv4Dst(remoteIP) + sta.SetTos(0) + sta.SetTtl(64) + sta.SetDf(true) + sta.SetCsum(false) + return fwd.fastdp.odpActions(sta, odp.NewOutputAction(fwd.vxlanVportID)) +} + +func tunnelIDFor(key ForwardPacketKey) (tunnelID [8]byte) { + src := uint64(key.SrcPeer.ShortID) + dst := uint64(key.DstPeer.ShortID) + binary.BigEndian.PutUint64(tunnelID[:], src|dst<<12) + return +} + +func (fwd *fastDatapathForwarder) Stop() { + // Might be nice to delete all the relevant flows here, but we + // can just let them expire. + fwd.fastdp.removeForwarder(fwd.remotePeer.Name, fwd) + + fwd.lock.Lock() + defer fwd.lock.Unlock() + fwd.sendControlMsg = func(byte, []byte) error { return nil } + + // stop the heartbeat goroutine + if !fwd.stopped { + fwd.stopped = true + close(fwd.stopChan) + } +} + +func (fastdp *FastDatapath) addForwarder(peer PeerName, + fwd *fastDatapathForwarder) { + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + + // We shouldn't have two confirmed forwarders to the same + // remotePeer, due to the checks in LocalPeer AddConnection. + fastdp.forwarders[peer] = fwd +} + +func (fastdp *FastDatapath) removeForwarder(peer PeerName, + fwd *fastDatapathForwarder) { + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + if fastdp.forwarders[peer] == fwd { + delete(fastdp.forwarders, peer) + } +} + +func (fastdp *FastDatapath) deleteFlows() error { + fastdp.deleteFlowsCount++ + + flows, err := fastdp.dp.EnumerateFlows() + if err != nil { + return err + } + + for _, flow := range flows { + err = fastdp.dp.DeleteFlow(flow.FlowKeys) + if err != nil && !odp.IsNoSuchFlowError(err) { + return err + } + } + + return nil +} + +func (fastdp *FastDatapath) deleteVxlanVports() error { + vports, err := fastdp.dp.EnumerateVports() + if err != nil { + return err + } + + for _, vport := range vports { + if vport.Spec.TypeName() != "vxlan" { + continue + } + + err = fastdp.dp.DeleteVport(vport.ID) + if err != nil && !odp.IsNoSuchVportError(err) { + return err + } + } + + return nil +} + +func (fastdp *FastDatapath) run() { + expireMACsCh := time.Tick(10 * time.Minute) + expireFlowsCh := time.Tick(5 * time.Minute) + + for { + select { + case <-expireMACsCh: + fastdp.expireMACs() + + case <-expireFlowsCh: + fastdp.expireFlows() + } + } +} + +func (fastdp *FastDatapath) expireMACs() { + lock := fastdp.startLock() + defer lock.unlock() + + for mac := range fastdp.sendToMAC { + if _, present := fastdp.seenMACs[mac]; !present { + delete(fastdp.sendToMAC, mac) + } + } + + fastdp.seenMACs = make(map[MAC]struct{}) +} + +func (fastdp *FastDatapath) expireFlows() { + lock := fastdp.startLock() + defer lock.unlock() + + flows, err := fastdp.dp.EnumerateFlows() + if err != nil { + log.Warn(err) + } + + for _, flow := range flows { + if flow.Used == 0 { + log.Debug("Expiring flow ", flow.FlowSpec) + err = fastdp.dp.DeleteFlow(flow.FlowKeys) + } else { + fastdp.touchFlow(flow.FlowKeys, &lock) + err = fastdp.dp.ClearFlow(flow.FlowSpec) + } + + if err != nil && !odp.IsNoSuchFlowError(err) { + log.Warn(err) + } + } +} + +// The router needs to know which flows are active in order to +// maintain its MAC->peer table. We do this by querying the router +// without an actual packet being involved. Maybe it's +// worth devising a more unified approach in the future. +func (fastdp *FastDatapath) touchFlow(fks odp.FlowKeys, + lock *fastDatapathLock) { + // All the flows we create should have an ingress key, but we + // check here just in case we encounter one from somewhere + // else. + ingressKey, present := fks[odp.OVS_KEY_ATTR_IN_PORT] + if present { + ingress := ingressKey.(odp.InPortFlowKey).VportID() + handler := fastdp.getMissHandler(ingress) + if handler != nil { + handler(fks, lock) + lock.relock() + } + } +} + +func (fastdp *FastDatapath) Error(err error, stopped bool) { + if stopped { + log.Fatal("Error while listeniing on ODP datapath: ", err) + } + + log.Error("Error while listening on ODP datapath: ", err) +} + +func (fastdp *FastDatapath) Miss(packet []byte, fks odp.FlowKeys) error { + ingress := fks[odp.OVS_KEY_ATTR_IN_PORT].(odp.InPortFlowKey).VportID() + log.Debug("ODP miss ", fks, " on port ", ingress) + + lock := fastdp.startLock() + defer lock.unlock() + + handler := fastdp.getMissHandler(ingress) + if handler == nil { + return nil + } + + // Always include the ingress vport in the flow key. While + // this is not strictly necessary in some cases (e.g. for + // delivery to a local netdev based on the dest MAC), + // including the ingress in every flow makes things simpler + // in touchFlow. + mfop := NewMultiFlowOp(false, handler(fks, &lock), + odpFlowKey(odp.NewInPortFlowKey(ingress))) + fastdp.send(mfop, packet, &lock) + return nil +} + +func (fastdp *FastDatapath) getMissHandler(ingress odp.VportID) missHandler { + handler := fastdp.missHandlers[ingress] + if handler == nil { + vport, err := fastdp.dp.LookupVport(ingress) + if err != nil { + log.Error(err) + return nil + } + + fastdp.makeBridgeVport(vport) + } + + return handler +} + +func (fastdp *FastDatapath) VportCreated(dpid odp.DatapathID, vport odp.Vport) error { + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + + if _, present := fastdp.missHandlers[vport.ID]; !present { + fastdp.makeBridgeVport(vport) + } + + return nil +} + +func (fastdp *FastDatapath) VportDeleted(dpid odp.DatapathID, vport odp.Vport) error { + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + + // there might be flow rules that still refer to the id of + // this vport. But we just allow them to expire. Unless we + // want prompt migration of MAC addresses, that should be + // fine. + delete(fastdp.missHandlers, vport.ID) + fastdp.deleteSendToPort(bridgePortID{vport: vport.ID}) + return nil +} + +func (fastdp *FastDatapath) makeBridgeVport(vport odp.Vport) { + // Set up a bridge port for netdev and internal vports. vxlan + // vports are handled separately, as they do not correspond to + // bridge ports (we set up the miss handler for them in + // getVxlanVportID). + typ := vport.Spec.TypeName() + if typ != "netdev" && typ != "internal" { + return + } + + vportID := vport.ID + + // Sending to the bridge port outputs on the vport: + fastdp.addSendToPort(bridgePortID{vport: vportID}, + func(_ PacketKey, _ *fastDatapathLock) FlowOp { + return fastdp.odpActions(odp.NewOutputAction(vportID)) + }) + + // Delete flows, in order to recalculate flows for broadcasts + // on the bridge. + checkWarn(fastdp.deleteFlows()) + + // Packets coming from the netdev are processed by the bridge + fastdp.missHandlers[vportID] = func(flowKeys odp.FlowKeys, lock *fastDatapathLock) FlowOp { + return fastdp.bridge(bridgePortID{vport: vportID}, + flowKeysToPacketKey(flowKeys), lock) + } +} + +func flowKeysToPacketKey(fks odp.FlowKeys) PacketKey { + eth := fks[odp.OVS_KEY_ATTR_ETHERNET].(odp.EthernetFlowKey).Key() + return PacketKey{SrcMAC: eth.EthSrc, DstMAC: eth.EthDst} +} + +// The sendToPort map is read-only, so this method does the copy in +// order to add an entry. +func (fastdp *FastDatapath) addSendToPort(portID bridgePortID, + sender bridgeSender) { + sendToPort := map[bridgePortID]bridgeSender{portID: sender} + for id, sender := range fastdp.sendToPort { + sendToPort[id] = sender + } + fastdp.sendToPort = sendToPort +} + +func (fastdp *FastDatapath) deleteSendToPort(portID bridgePortID) { + sendToPort := make(map[bridgePortID]bridgeSender) + for id, sender := range fastdp.sendToPort { + if id != portID { + sendToPort[id] = sender + } + } + fastdp.sendToPort = sendToPort +} + +// Send a packet, creating a corresponding ODP flow rule if possible +func (fastdp *FastDatapath) send(fops FlowOp, frame []byte, + lock *fastDatapathLock) { + // Gather the actions from actionFlowOps, execute any others + var dec *EthernetDecoder + flow := odp.NewFlowSpec() + createFlow := true + + for _, xfop := range FlattenFlowOp(fops) { + switch fop := xfop.(type) { + case interface { + updateFlowSpec(*odp.FlowSpec) + }: + fop.updateFlowSpec(&flow) + case vetoFlowCreationFlowOp: + createFlow = false + default: + // A foreign FlowOp (e.g. a sleeve forwarding + // FlowOp), so send the packet through the + // FlowOp interface, decoding the packet + // lazily. + if dec == nil { + dec = fastdp.takeDecoder(lock) + dec.DecodeLayers(frame) + + // If we are sending the packet + // through the FlowOp interface, we + // mustn't create a flow, as that + // could prevent the proper handling + // of similar packets in the future. + createFlow = false + } + + if len(dec.decoded) != 0 { + lock.unlock() + fop.Process(frame, dec, false) + } + } + } + + if dec != nil { + // put the decoder back + lock.relock() + fastdp.dec = dec + } + + if len(flow.Actions) != 0 { + lock.relock() + checkWarn(fastdp.dp.Execute(frame, nil, flow.Actions)) + } + + if createFlow { + lock.relock() + // if the fastdp's deleteFlowsCount changed since we + // initially locked it, then we might have created a + // flow on the basis of stale information. It's fine + // to handle one packet like that, but it would be bad + // to introduce a stale flow. + if lock.deleteFlowsCount == fastdp.deleteFlowsCount { + log.Debug("Creating ODP flow ", flow) + checkWarn(fastdp.dp.CreateFlow(flow)) + } + } +} + +// Get the EthernetDecoder from the singleton pool +func (fastdp *FastDatapath) takeDecoder(lock *fastDatapathLock) *EthernetDecoder { + lock.relock() + dec := fastdp.dec + if dec == nil { + dec = NewEthernetDecoder() + } else { + fastdp.dec = nil + } + return dec +} + +type odpActionsFlowOp struct { + fastdp *FastDatapath + actions []odp.Action +} + +func (fastdp *FastDatapath) odpActions(actions ...odp.Action) FlowOp { + return odpActionsFlowOp{ + fastdp: fastdp, + actions: actions, + } +} + +func (fop odpActionsFlowOp) updateFlowSpec(flow *odp.FlowSpec) { + flow.AddActions(fop.actions) +} + +func (fop odpActionsFlowOp) Process(frame []byte, dec *EthernetDecoder, + broadcast bool) { + fastdp := fop.fastdp + fastdp.lock.Lock() + defer fastdp.lock.Unlock() + checkWarn(fastdp.dp.Execute(frame, nil, fop.actions)) +} + +type nopFlowOp struct{} + +func (nopFlowOp) Process([]byte, *EthernetDecoder, bool) { + // A nopFlowOp just provides a hint about flow creation, it + // doesn't send anything +} + +// A vetoFlowCreationFlowOp flags that no flow should be created +type vetoFlowCreationFlowOp struct { + nopFlowOp +} + +// A odpFlowKeyFlowOp adds a FlowKey to the resulting flow +type odpFlowKeyFlowOp struct { + key odp.FlowKey + nopFlowOp +} + +func odpFlowKey(key odp.FlowKey) FlowOp { + return odpFlowKeyFlowOp{key: key} +} + +func (fop odpFlowKeyFlowOp) updateFlowSpec(flow *odp.FlowSpec) { + flow.AddKey(fop.key) +} + +func odpEthernetFlowKey(key PacketKey) FlowOp { + fk := odp.NewEthernetFlowKey() + fk.SetEthSrc(key.SrcMAC) + fk.SetEthDst(key.DstMAC) + return odpFlowKeyFlowOp{key: fk} +} diff --git a/router/flow.go b/router/flow.go index 2a7ac03953..f2f5c7913c 100644 --- a/router/flow.go +++ b/router/flow.go @@ -34,12 +34,18 @@ type MultiFlowOp struct { ops []FlowOp } -func NewMultiFlowOp(broadcast bool) *MultiFlowOp { - return &MultiFlowOp{broadcast: broadcast} +func NewMultiFlowOp(broadcast bool, ops ...FlowOp) *MultiFlowOp { + mfop := &MultiFlowOp{broadcast: broadcast} + mfop.Add(ops...) + return mfop } func (mfop *MultiFlowOp) Add(ops ...FlowOp) { - mfop.ops = append(mfop.ops, ops...) + for _, op := range ops { + if op != nil { + mfop.ops = append(mfop.ops, op) + } + } } func (mfop *MultiFlowOp) Process(frame []byte, dec *EthernetDecoder, @@ -48,3 +54,23 @@ func (mfop *MultiFlowOp) Process(frame []byte, dec *EthernetDecoder, op.Process(frame, dec, mfop.broadcast) } } + +func FlattenFlowOp(fop FlowOp) []FlowOp { + return collectFlowOps(nil, fop) +} + +func collectFlowOps(into []FlowOp, fop FlowOp) []FlowOp { + if fop == nil { + return into + } + + if mfop, ok := fop.(*MultiFlowOp); ok { + for _, op := range mfop.ops { + into = collectFlowOps(into, op) + } + + return into + } + + return append(into, fop) +} diff --git a/router/odp.go b/router/odp.go new file mode 100644 index 0000000000..cea42dd59f --- /dev/null +++ b/router/odp.go @@ -0,0 +1,60 @@ +package router + +import ( + "github.com/weaveworks/go-odp/odp" +) + +// ODP admin functionality + +func CreateDatapath(dpname string) error { + dpif, err := odp.NewDpif() + if err != nil { + return err + } + + defer dpif.Close() + + _, err = dpif.CreateDatapath(dpname) + if err != nil && !odp.IsDatapathNameAlreadyExistsError(err) { + return err + } + + return nil +} + +func DeleteDatapath(dpname string) error { + dpif, err := odp.NewDpif() + if err != nil { + return err + } + + defer dpif.Close() + + dp, err := dpif.LookupDatapath(dpname) + if err != nil { + if odp.IsNoSuchDatapathError(err) { + return nil + } + + return err + } + + return dp.Delete() +} + +func AddDatapathInterface(dpname string, ifname string) error { + dpif, err := odp.NewDpif() + if err != nil { + return err + } + + defer dpif.Close() + + dp, err := dpif.LookupDatapath(dpname) + if err != nil { + return err + } + + _, err = dp.CreateVport(odp.NewNetdevVportSpec(ifname)) + return err +} diff --git a/router/sleeve.go b/router/sleeve.go index c60e6702b2..a2e2fd2e1f 100644 --- a/router/sleeve.go +++ b/router/sleeve.go @@ -45,18 +45,14 @@ import ( // sleeveForwarder.mtu <--------------------------> const ( - EthernetOverhead = 14 - UDPOverhead = 28 // 20 bytes for IPv4, 8 bytes for UDP - DefaultMTU = 65535 - FragTestSize = 60001 - PMTUDiscoverySize = 60000 - FastHeartbeat = 500 * time.Millisecond - SlowHeartbeat = 10 * time.Second - FragTestInterval = 5 * time.Minute - MTUVerifyAttempts = 8 - MTUVerifyTimeout = 10 * time.Millisecond // doubled with each attempt - MaxMissedHeartbeats = 6 - HeartbeatTimeout = MaxMissedHeartbeats * SlowHeartbeat + EthernetOverhead = 14 + UDPOverhead = 28 // 20 bytes for IPv4, 8 bytes for UDP + DefaultMTU = 65535 + FragTestSize = 60001 + PMTUDiscoverySize = 60000 + FragTestInterval = 5 * time.Minute + MTUVerifyAttempts = 8 + MTUVerifyTimeout = 10 * time.Millisecond // doubled with each attempt ) type SleeveOverlay struct { From 431197042d14570cb2e1e8e1fb70418e2142b4e4 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Thu, 28 May 2015 10:37:46 +0100 Subject: [PATCH 13/15] Weave script changes to enable fast datapath This means: * The 'weave' netdev is an OVS datapath * The weave router container runs with --net=host in order to access the datapath. * The MTU is set to a smaller value, to allow vxlan encapsulation. Setting the WEAVE_NO_FASTDP environment variable disables these things, effectively running the router without fast datapath. It is an environment variable rather than an option to 'weave launch' because it takes effect in create_bridge, which is called from many subcommands, and it doesn't seem practical to add option processing to all of them. --- weave | 221 ++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 185 insertions(+), 36 deletions(-) diff --git a/weave b/weave index 6750df5f0b..554185b4e5 100755 --- a/weave +++ b/weave @@ -107,6 +107,8 @@ exec_remote() { -e WEAVE_PASSWORD \ -e WEAVE_PORT \ -e WEAVE_CONTAINER_NAME \ + -e WEAVE_MTU \ + -e WEAVE_NO_FASTDP \ -e DOCKER_BRIDGE \ -e DOCKER_CLIENT_HOST="$DOCKER_CLIENT_HOST" \ -e DOCKER_CLIENT_TLS_VERIFY="$DOCKER_CLIENT_TLS_VERIFY" \ @@ -263,7 +265,8 @@ DOCKER_BRIDGE=${DOCKER_BRIDGE:-docker0} CONTAINER_NAME=${WEAVE_CONTAINER_NAME:-weave} BRIDGE=weave CONTAINER_IFNAME=ethwe -MTU=65535 +# ROUTER_HOSTNETNS_IFNAME is only used for fastdp with encryption +ROUTER_HOSTNETNS_IFNAME=weave2 PORT=${WEAVE_PORT:-6783} HTTP_PORT=6784 PROXY_PORT=12375 @@ -366,33 +369,57 @@ random_mac() { ###################################################################### create_bridge() { + BRIDGE_TYPE= + if [ ! -d /sys/class/net/$BRIDGE ] ; then - ip link add name $BRIDGE type bridge - # Set a random MAC address on the bridge. Current Linux - # kernels already do this when creating a bridge, but there - # are rumours it was not always so. - ip link set dev $BRIDGE address $(random_mac) - # Attempting to set the bridge MTU to a high value directly - # fails. Bridges take the lowest MTU of their interfaces. So - # instead we create a temporary interface with the desired - # MTU, attach that to the bridge, and then remove it again. - ip link add name v${CONTAINER_IFNAME}du mtu $MTU type dummy - ip link set dev v${CONTAINER_IFNAME}du master $BRIDGE - ip link del dev v${CONTAINER_IFNAME}du - # Drop traffic from Docker bridge to Weave; it can break subnet isolation + if [ -n "$WEAVE_NO_FASTDP" ] ; then + BRIDGE_TYPE=bridge + elif docker run --rm --privileged --net=host $IMAGE $COVERAGE_ARGS --create-datapath --datapath=$BRIDGE ; then + BRIDGE_TYPE=fastdp + elif [ $? = 17 ] ; then + # Exit status of 17 means the kernel doesn't have ODP + BRIDGE_TYPE=bridge + else + return 1 + fi + + init_bridge_$BRIDGE_TYPE + + # Drop traffic from Docker bridge to Weave; it can break + # subnet isolation if [ "$DOCKER_BRIDGE" != "$BRIDGE" ] ; then # Note using -I to insert ahead of Docker's bridge rules run_iptables -t filter -I FORWARD -i $DOCKER_BRIDGE -o $BRIDGE -j DROP fi + # Work around the situation where there are no rules allowing traffic # across our bridge. E.g. ufw add_iptables_rule filter FORWARD -i $BRIDGE -o $BRIDGE -j ACCEPT + # create a chain for masquerading run_iptables -t nat -N WEAVE >/dev/null 2>&1 || true add_iptables_rule nat POSTROUTING -j WEAVE + else + # Detect whether fast datapath is in use on + # $BRIDGE. Unfortunately there's no simple way to positively + # check whether $BRIDGE is a ODP netdev, so we have to check + # whether it is a bridge instead. + if [ -d /sys/class/net/$BRIDGE/bridge ] ; then + BRIDGE_TYPE=bridge + else + BRIDGE_TYPE=fastdp + fi + + # WEAVE_MTU may have been specified when the bridge was + # created (perhaps implicitly with WEAVE_NO_FASTDP). So take + # the MTU from the bridge unless it is explicitly specified + # for this invocation. + MTU=${WEAVE_MTU:-$(cat /sys/class/net/$BRIDGE/mtu)} fi - [ "$1" != "--without-ethtool" ] && ethtool -K $BRIDGE tx off >/dev/null + if [ "$1" != "--without-ethtool" ] ; then + ethtool_tx_off_$BRIDGE_TYPE $BRIDGE + fi ip link set dev $BRIDGE up @@ -401,11 +428,58 @@ create_bridge() { configure_arp_cache $BRIDGE } +init_bridge_fastdp () { + # GCE has the lowest underlay network MTU we're likely to encounter on + # a local network, at 1460 bytes. To get the overlay MTU from that we + # subtract 20 bytes for the outer IPv4 header, 8 bytes for the outer + # UDP header, 8 bytes for the vxlan header, and 14 bytes for the inner + # ethernet header. + MTU=${WEAVE_MTU:-1410} + + # create_bridge already created the datapath netdev + ip link set dev $BRIDGE mtu $MTU +} + +init_bridge_bridge () { + MTU=${WEAVE_MTU:-65535} + + ip link add name $BRIDGE type bridge + + # Set a random MAC address on the bridge. Current Linux + # kernels already do this when creating a bridge, but there + # are rumours it was not always so. + ip link set dev $BRIDGE address $(random_mac) + + # Attempting to set the bridge MTU to a high value directly + # fails. Bridges take the lowest MTU of their interfaces. So + # instead we create a temporary interface with the desired + # MTU, attach that to the bridge, and then remove it again. + ip link add name v${CONTAINER_IFNAME}du mtu $MTU type dummy + ip link set dev v${CONTAINER_IFNAME}du master $BRIDGE + ip link del dev v${CONTAINER_IFNAME}du +} + +ethtool_tx_off_fastdp () { + true +} + +ethtool_tx_off_bridge () { + ethtool -K $1 tx off >/dev/null +} + destroy_bridge() { - [ -d /sys/class/net/$BRIDGE ] && ip link del dev $BRIDGE + if [ -d /sys/class/net/$BRIDGE ] ; then + if [ -d /sys/class/net/$BRIDGE/bridge ] ; then + ip link del dev $BRIDGE + else + docker run --rm --privileged --net=host $IMAGE $COVERAGE_ARGS --delete-datapath --datapath=$BRIDGE + fi + fi + if [ "$DOCKER_BRIDGE" != "$BRIDGE" ] ; then run_iptables -t filter -D FORWARD -i $DOCKER_BRIDGE -o $BRIDGE -j DROP 2>/dev/null || true fi + run_iptables -t filter -D FORWARD -i $BRIDGE -o $BRIDGE -j ACCEPT 2>/dev/null || true run_iptables -t nat -F WEAVE >/dev/null 2>&1 || true run_iptables -t nat -D POSTROUTING -j WEAVE >/dev/null 2>&1 || true @@ -474,17 +548,11 @@ netnsenter() { nsenter --net=$PROCFS/$CONTAINER_PID/ns/net "$@" } +# connect_container_to_bridge connect_container_to_bridge() { - if [ -h "$PROCFS/$CONTAINER_PID/ns/net" -a -h "/proc/$$/ns/net" -a "$(readlink $PROCFS/$CONTAINER_PID/ns/net)" = "$(readlink /proc/$$/ns/net)" ] ; then - echo "Container is running in the host network namespace, and therefore cannot be" >&2 - echo "connected to weave. Perhaps the container was started with --net=host." >&2 - return 1 - fi ip link add name $LOCAL_IFNAME mtu $MTU type veth peer name $GUEST_IFNAME mtu $MTU || return 1 - if ! ethtool -K $GUEST_IFNAME tx off >/dev/null || - ! ip link set $LOCAL_IFNAME master $BRIDGE || - ! ip link set $LOCAL_IFNAME up || + if ! ethtool_tx_off_$BRIDGE_TYPE $GUEST_IFNAME || ! ip link set $GUEST_IFNAME netns $PROCFS/$CONTAINER_PID/ns/net ; then # failed before we assigned the veth to the container's # namespace @@ -492,12 +560,30 @@ connect_container_to_bridge() { return 1 fi - if ! netnsenter ip link set $GUEST_IFNAME name $CONTAINER_IFNAME || - ! configure_arp_cache $CONTAINER_IFNAME "netnsenter" ; then + # Versions of the 'ip' command differ in whether they create a + # veth pair in the 'up' or 'down' state. Furthermore, moving one + # end into a netns normally sets them to 'down'. But, moving an + # end into the default netns is a null operation, so they stay up, + # which leads to errors later on. So to get a consistent result, + # we have to be explicit in setting them 'up' or 'down' as + # necessary. + if ! netnsenter ip link set $GUEST_IFNAME down || + ! netnsenter ip link set $GUEST_IFNAME name $1 up || + ! ip link set $LOCAL_IFNAME up || + ! add_iface_$BRIDGE_TYPE $LOCAL_IFNAME || + ! configure_arp_cache $1 "netnsenter" ; then return 1 fi } +add_iface_fastdp () { + docker run --rm --privileged --net=host $IMAGE $COVERAGE_ARGS --datapath=$BRIDGE --add-datapath-iface=$1 +} + +add_iface_bridge () { + ip link set $1 master $BRIDGE +} + ask_version() { if ! DOCKERIMAGE=$(docker inspect --format='{{.Image}}' $1 2>/dev/null) ; then if ! DOCKERIMAGE=$(docker inspect --format='{{.Id}}' $2 2>/dev/null) ; then @@ -507,23 +593,65 @@ ask_version() { [ -n "$DOCKERIMAGE" ] && docker run --rm -e WEAVE_CIDR=none $3 $DOCKERIMAGE $COVERAGE_ARGS --version } +router_opts_fastdp () { + if [ -z "$WEAVE_PASSWORD" ] ; then + echo "--datapath $BRIDGE" + else + # When using encryption, we still do bridging on the ODP + # datapath, because you can 'weave launch' without encryption + # and then later restart the router with encryption, or vice + # versa. Encryption disables the use of the fastdp Overlay, + # but the router could still use the fastdp Bridge to receive + # packets. However, pcap has better performance when sniffing + # every packet. So we pass --iface to use the pcap Bridge. + # + # Why don't we simply pass "--iface $BRIDGE". We could, + # except for the fact that NetworkManager likes to down the + # odp $BRIDGE netdev (at least under ubuntu), and you can only + # use pcap on an interface that is up. We avoid that by use + # pcap via a veth pair (NetworkManager leaves them alone). + # Having a netdev in the host netns called "ethwe" might + # surprise people, so it is called $ROUTER_HOSTNETNS_IFNAME + # instead. + echo "--datapath $BRIDGE --iface $ROUTER_HOSTNETNS_IFNAME" + fi +} + +router_opts_bridge () { + echo "--iface $CONTAINER_IFNAME" +} + ###################################################################### # functions invoked through with_container_netns ###################################################################### -launch() { +setup_router_iface_fastdp() { + if [ -n "$WEAVE_PASSWORD" ] ; then + # See router_opts_fastdp + connect_container_to_bridge $ROUTER_HOSTNETNS_IFNAME && + ip link set $ROUTER_HOSTNETNS_IFNAME up + fi +} + +setup_router_iface_bridge() { if ! netnsenter ip link show eth0 >/dev/null ; then echo "Perhaps you are running the docker daemon with container networking disabled (-b=none)." >&2 return 1 fi - connect_container_to_bridge && + connect_container_to_bridge $CONTAINER_IFNAME && netnsenter ethtool -K eth0 tx off >/dev/null && netnsenter ip link set $CONTAINER_IFNAME up } attach() { + if [ -h "$PROCFS/$CONTAINER_PID/ns/net" -a -h "/proc/$$/ns/net" -a "$(readlink $PROCFS/$CONTAINER_PID/ns/net)" = "$(readlink /proc/$$/ns/net)" ] ; then + echo "Container is running in the host network namespace, and therefore cannot be" >&2 + echo "connected to weave. Perhaps the container was started with --net=host." >&2 + return 1 + fi + if ! netnsenter ip link show $CONTAINER_IFNAME >/dev/null 2>&1 ; then - connect_container_to_bridge || return 1 + connect_container_to_bridge $CONTAINER_IFNAME || return 1 fi NEW_ADDRS= @@ -645,8 +773,8 @@ container_ip() { fi case "$status" in "true ") - echo "$1 container has no IP address; is Docker networking enabled?" >&2 - return 1 + #echo "$1 container has no IP address; is Docker networking enabled?" >&2 + CONTAINER_IP=127.0.0.1 ;; true*) CONTAINER_IP="${status#true }" @@ -1127,10 +1255,17 @@ launch_router() { shift 1 fi CONTAINER_PORT=$PORT - ARGS="" + ARGS= IPRANGE= IPRANGE_SPECIFIED= + + # DNS_PORT_MAPPING is only used if the router runs in a container + # netns. DNS_ROUTER_OPTS is only used if the router runs in the + # host netns. DNS_PORT_MAPPING="-p $DOCKER_BRIDGE_IP:53:53/udp -p $DOCKER_BRIDGE_IP:53:53/tcp" + DNS_ROUTER_OPTS="--dns-listen-address $DOCKER_BRIDGE_IP:53" + NO_DNS_OPT= + while [ $# -gt 0 ] ; do case "$1" in -password|--password) @@ -1163,6 +1298,8 @@ launch_router() { ;; --no-dns) DNS_PORT_MAPPING= + DNS_ROUTER_OPTS= + NO_DNS_OPT="--no-dns" ARGS="$ARGS $1" ;; *) @@ -1185,16 +1322,28 @@ launch_router() { echo "Unless this is deliberate, you must pick another range and set it on all hosts." >&2 fi fi + + if [ "$BRIDGE_TYPE" = fastdp ] ; then + NETHOST_OPT="--net=host" + fi + # Set WEAVE_DOCKER_ARGS in the environment in order to supply # additional parameters, such as resource limits, to docker # when launching the weave container. ROUTER_CONTAINER=$(docker run --privileged -d --name=$CONTAINER_NAME \ - -p $PORT:$CONTAINER_PORT/tcp -p $PORT:$CONTAINER_PORT/udp $DNS_PORT_MAPPING \ -v /var/run/docker.sock:/var/run/docker.sock \ + -p $PORT:$CONTAINER_PORT/tcp -p $PORT:$CONTAINER_PORT/udp \ + ${NETHOST_OPT:-$DNS_PORT_MAPPING} \ -e WEAVE_PASSWORD \ -e WEAVE_CIDR=none \ - $WEAVE_DOCKER_ARGS $IMAGE $COVERAGE_ARGS --iface $CONTAINER_IFNAME --port $CONTAINER_PORT --name "$PEERNAME" --nickname "$(hostname)" --ipalloc-range "$IPRANGE" --dns-effective-listen-address "$DOCKER_BRIDGE_IP" --docker-api "unix:///var/run/docker.sock" "$@") - with_container_netns_or_die $ROUTER_CONTAINER launch + $WEAVE_DOCKER_ARGS $IMAGE $COVERAGE_ARGS \ + --port $CONTAINER_PORT --name "$PEERNAME" --nickname "$(hostname)" \ + $(router_opts_$BRIDGE_TYPE) \ + --ipalloc-range "$IPRANGE" \ + --dns-effective-listen-address $DOCKER_BRIDGE_IP \ + ${NETHOST_OPT:+$DNS_ROUTER_OPTS} $NO_DNS_OPT \ + --docker-api "unix:///var/run/docker.sock" "$@") + with_container_netns_or_die $ROUTER_CONTAINER setup_router_iface_$BRIDGE_TYPE err_msg=$(death_msg $CONTAINER_NAME) container_ip $CONTAINER_NAME "$err_msg" "$err_msg" || return 1 wait_for_status $CONTAINER_NAME http_call $CONTAINER_IP:$HTTP_PORT @@ -1202,7 +1351,7 @@ launch_router() { # Tell the newly-started weave IP allocator about existing weave IPs with_container_addresses ipam_reclaim weave:expose $(docker ps -q --no-trunc) fi - if [ -n "$DNS_PORT_MAPPING" ] ; then + if [ -z "$NO_DNS_OPT" ] ; then # Tell the newly-started weaveDNS about existing weave IPs for CONTAINER in $(docker ps -q --no-trunc) ; do if CONTAINER_IPS=$(with_container_netns $CONTAINER container_weave_addrs 2>&1 | sed -n -e 's/inet \([^/]*\)\/\(.*\)/\1/p') ; then From b56ff7678cefdefe750a1b598f7454e16c98bf77 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Tue, 15 Sep 2015 17:06:51 +0100 Subject: [PATCH 14/15] Make code coverage work through the proxy Previously, the proxy would run the weave script in ways that did not run the weaver executable. Now they can. This is a problem because of the contortions involved in using golang's code coverage support in the integration tests, which require the weaver command line args to be slightly mangled. --- proxy/common.go | 19 ++++++++++++++++--- proxy/proxy.go | 12 +++++++----- weave | 2 ++ 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/proxy/common.go b/proxy/common.go index 068e171276..02de600918 100644 --- a/proxy/common.go +++ b/proxy/common.go @@ -24,15 +24,28 @@ var ( func callWeave(args ...string) ([]byte, []byte, error) { args = append([]string{"--local"}, args...) - Log.Debug("Calling weave", args) cmd := exec.Command("./weave", args...) cmd.Env = []string{ "PATH=/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", "PROCFS=/hostproc", } - if bridge := os.Getenv("DOCKER_BRIDGE"); bridge != "" { - cmd.Env = append(cmd.Env, fmt.Sprintf("DOCKER_BRIDGE=%s", bridge)) + + propagateEnv := func(key string) { + if val := os.Getenv(key); val != "" { + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", key, val)) + } } + + propagateEnv("DOCKER_BRIDGE") + + // Propogage WEAVE_DEBUG, to make debugging easier. + propagateEnv("WEAVE_DEBUG") + + // This prevents the code coverage contortions in our + // integration test suite breaking things. + propagateEnv("COVERAGE") + + Log.Debug("Calling weave args: ", args, "env: ", cmd.Env) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr diff --git a/proxy/proxy.go b/proxy/proxy.go index 8d09292e86..f24b322744 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -436,13 +436,15 @@ func (proxy *Proxy) getDNSDomain() (domain string) { } weaveContainer, err := proxy.client.InspectContainer("weave") - if err != nil || - weaveContainer.NetworkSettings == nil || - weaveContainer.NetworkSettings.IPAddress == "" { - return + var weaveIP string + if err == nil && weaveContainer.NetworkSettings != nil { + weaveIP = weaveContainer.NetworkSettings.IPAddress + } + if weaveIP == "" { + weaveIP = "127.0.0.1" } - url := fmt.Sprintf("http://%s:%d/domain", weaveContainer.NetworkSettings.IPAddress, router.HTTPPort) + url := fmt.Sprintf("http://%s:%d/domain", weaveIP, router.HTTPPort) resp, err := http.Get(url) if err != nil || resp.StatusCode != http.StatusOK { return diff --git a/weave b/weave index 554185b4e5..29c10d22cb 100755 --- a/weave +++ b/weave @@ -1381,6 +1381,8 @@ launch_proxy() { -e PROCFS=/hostproc \ -e WEAVE_CIDR=none \ -e DOCKER_BRIDGE \ + -e WEAVE_DEBUG \ + -e COVERAGE \ --entrypoint=/home/weave/weaveproxy \ $WEAVEPROXY_DOCKER_ARGS $EXEC_IMAGE $COVERAGE_ARGS $PROXY_ARGS) wait_for_status $PROXY_CONTAINER_NAME http_call_unix $PROXY_CONTAINER_NAME status.sock From 0093eb12200af0749eb950df456a0e941e684786 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Mon, 12 Oct 2015 21:24:57 +0100 Subject: [PATCH 15/15] A smoke test for WEAVE_NO_FASTDP Partly for the side effect of restoring coverage of router/ethernet_decoder.go --- test/190_no_fastdp_2_test.sh | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100755 test/190_no_fastdp_2_test.sh diff --git a/test/190_no_fastdp_2_test.sh b/test/190_no_fastdp_2_test.sh new file mode 100755 index 0000000000..fe793f2b8c --- /dev/null +++ b/test/190_no_fastdp_2_test.sh @@ -0,0 +1,27 @@ +#! /bin/bash + +. ./config.sh + +C1=10.2.1.46 +C2=10.2.1.47 + +start_suite "WEAVE_NO_FASTDP operation" + +WEAVE_NO_FASTDP=1 weave_on $HOST1 launch +WEAVE_NO_FASTDP=1 weave_on $HOST2 launch $HOST1 + +start_container $HOST1 $C1/24 --name=c1 +start_container $HOST2 $C2/24 --name=c2 + +# Without fastdp, ethwe should have an MTU of 65535 +assert_raises "exec_on $HOST1 c1 sh -c 'ip link show ethwe | grep \ mtu\ 65535\ >/dev/null'" + +# Pump some data over a TCP socket between the containers. This +# should cause PMTU discovery on the overlay network, and hence +# exercise the sleeve code to generate an ICMP frag-needed. +assert_raises "exec_on $HOST1 c1 sh -c 'nc -l -p 5555 >/tmp/foo'" & +sleep 5 +assert_raises "exec_on $HOST2 c2 sh -c 'dd if=/dev/zero bs=10k count=1 | nc $C1 5555'" +assert_raises "exec_on $HOST1 c1 sh -c 'test \$(stat -c %s /tmp/foo) -eq 10240'" + +end_suite