Skip to content

Commit

Permalink
Merge pull request #227 from libp2p/feat/tracing
Browse files Browse the repository at this point in the history
tracing support
  • Loading branch information
vyzo authored Nov 19, 2019
2 parents 28a87b3 + 7065297 commit 01b9825
Show file tree
Hide file tree
Showing 9 changed files with 7,104 additions and 8 deletions.
20 changes: 16 additions & 4 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err
type FloodSubRouter struct {
p *PubSub
protocols []protocol.ID
tracer *pubsubTracer
}

func (fs *FloodSubRouter) Protocols() []protocol.ID {
Expand All @@ -39,11 +40,16 @@ func (fs *FloodSubRouter) Protocols() []protocol.ID {

func (fs *FloodSubRouter) Attach(p *PubSub) {
fs.p = p
fs.tracer = p.tracer
}

func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {}
func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
fs.tracer.AddPeer(p, proto)
}

func (fs *FloodSubRouter) RemovePeer(peer.ID) {}
func (fs *FloodSubRouter) RemovePeer(p peer.ID) {
fs.tracer.RemovePeer(p)
}

func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
Expand Down Expand Up @@ -91,13 +97,19 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {

select {
case mch <- out:
fs.tracer.SendRPC(out, pid)
default:
log.Infof("dropping message to peer %s: queue full", pid)
fs.tracer.DropRPC(out, pid)
// Drop it. The peer is too slow.
}
}
}

func (fs *FloodSubRouter) Join(topic string) {}
func (fs *FloodSubRouter) Join(topic string) {
fs.tracer.Join(topic)
}

func (fs *FloodSubRouter) Leave(topic string) {}
func (fs *FloodSubRouter) Leave(topic string) {
fs.tracer.Join(topic)
}
14 changes: 14 additions & 0 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type GossipSubRouter struct {
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
mcache *MessageCache
tracer *pubsubTracer
}

func (gs *GossipSubRouter) Protocols() []protocol.ID {
Expand All @@ -73,16 +74,19 @@ func (gs *GossipSubRouter) Protocols() []protocol.ID {

func (gs *GossipSubRouter) Attach(p *PubSub) {
gs.p = p
gs.tracer = p.tracer
go gs.heartbeatTimer()
}

func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
log.Debugf("PEERUP: Add new peer %s using %s", p, proto)
gs.tracer.AddPeer(p, proto)
gs.peers[p] = proto
}

func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
gs.tracer.RemovePeer(p)
delete(gs.peers, p)
for _, peers := range gs.mesh {
delete(peers, p)
Expand Down Expand Up @@ -208,6 +212,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
prune = append(prune, topic)
} else {
log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
}
Expand All @@ -231,6 +236,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
peers, ok := gs.mesh[topic]
if ok {
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
}
Expand Down Expand Up @@ -294,6 +300,7 @@ func (gs *GossipSubRouter) Join(topic string) {
}

log.Debugf("JOIN %s", topic)
gs.tracer.Join(topic)

gmap, ok = gs.fanout[topic]
if ok {
Expand All @@ -319,6 +326,7 @@ func (gs *GossipSubRouter) Join(topic string) {

for p := range gmap {
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
gs.sendGraft(p, topic)
gs.tagPeer(p, topic)
}
Expand All @@ -331,11 +339,13 @@ func (gs *GossipSubRouter) Leave(topic string) {
}

log.Debugf("LEAVE %s", topic)
gs.tracer.Leave(topic)

delete(gs.mesh, topic)

for p := range gmap {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
gs.sendPrune(p, topic)
gs.untagPeer(p, topic)
}
Expand Down Expand Up @@ -384,8 +394,10 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {

select {
case mch <- out:
gs.tracer.SendRPC(out, p)
default:
log.Infof("dropping message to peer %s: queue full", p)
gs.tracer.DropRPC(out, p)
// push control messages that need to be retried
ctl := out.GetControl()
if ctl != nil {
Expand Down Expand Up @@ -443,6 +455,7 @@ func (gs *GossipSubRouter) heartbeat() {

for _, p := range plst {
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
topics := tograft[p]
Expand All @@ -458,6 +471,7 @@ func (gs *GossipSubRouter) heartbeat() {

for _, p := range plst[:idontneed] {
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
topics := toprune[p]
Expand Down
Loading

0 comments on commit 01b9825

Please sign in to comment.