Skip to content

Commit

Permalink
Merge pull request ethereum#78 from ethersphere/network-testing-frame…
Browse files Browse the repository at this point in the history
…work-msgfeed

Message reporting hook for event.Feed
  • Loading branch information
zelig authored May 1, 2017
2 parents ba3d785 + 8c82519 commit 4850da9
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 17 deletions.
2 changes: 1 addition & 1 deletion p2p/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (p *PeerAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {

go func() {
events := make(chan *p2p.PeerEvent)
sub := p.server().SubscribePeers(events)
sub := p.server().SubscribeEvents(events)
defer sub.Unsubscribe()

for {
Expand Down
31 changes: 20 additions & 11 deletions p2p/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)

func newPeer(rw *p2p.MsgPipeRW) *Peer {
func newPeer(rw MsgReadWriteCloser) *Peer {
return &Peer{
MsgPipeRW: rw,
MsgReadWriteCloser: rw,
Errc: make(chan error, 1),
Connc: make(chan bool),
Readyc: make(chan bool),
}
}

type Peer struct {
*p2p.MsgPipeRW
MsgReadWriteCloser
Connc chan bool
Readyc chan bool
Errc chan error
Expand All @@ -52,6 +52,12 @@ type Network interface {
Reporter
}

// Adds close pipe to the MsgReadWriter
type MsgReadWriteCloser interface {
p2p.MsgReadWriter
Close() error
}

// SimNode is the network adapter that
type SimNode struct {
lock sync.RWMutex
Expand Down Expand Up @@ -169,7 +175,7 @@ func (self *SimNode) getPeer(id *NodeId) *Peer {
return self.peers[i]
}

func (self *SimNode) setPeer(id *NodeId, rw *p2p.MsgPipeRW) *Peer {
func (self *SimNode) setPeer(id *NodeId, rw MsgReadWriteCloser) *Peer {
i, found := self.peerMap[id.NodeID]
if !found {
i = len(self.peers)
Expand All @@ -183,7 +189,8 @@ func (self *SimNode) setPeer(id *NodeId, rw *p2p.MsgPipeRW) *Peer {
// }
// legit reconnect reset disconnection error,
p := self.peers[i]
p.MsgPipeRW = rw
//p.MsgPipeRW = rw
p.MsgReadWriteCloser = rw
p.Connc = make(chan bool)
p.Readyc = make(chan bool)
return p
Expand All @@ -194,11 +201,12 @@ func (self *SimNode) RemovePeer(node *discover.Node) {
defer self.lock.Unlock()
id := &NodeId{node.ID}
peer := self.getPeer(id)
if peer == nil || peer.MsgPipeRW == nil {
//if peer == nil || peer.MsgPipeRW == nil {
if peer == nil || peer.MsgReadWriteCloser == nil {
return
}
peer.MsgPipeRW.Close()
peer.MsgPipeRW = nil
peer.MsgReadWriteCloser.Close()
peer.MsgReadWriteCloser = nil
// na := self.network.GetNodeAdapter(id)
// peer = na.(*SimNode).GetPeer(self.Id)
// peer.RW = nil
Expand All @@ -216,7 +224,8 @@ func (self *SimNode) AddPeer(node *discover.Node) {
rw, rrw := p2p.MsgPipe()
// // run protocol on remote node with self as peer
peer := self.getPeer(id)
if peer != nil && peer.MsgPipeRW != nil {
//if peer != nil && peer.MsgPipeRW != nil {
if peer != nil && peer.MsgReadWriteCloser != nil {
return
}
peer = self.setPeer(id, rrw)
Expand All @@ -228,7 +237,7 @@ func (self *SimNode) AddPeer(node *discover.Node) {
self.RunProtocol(na.(*SimNode), rw, rrw, peer)
}

func (self *SimNode) SubscribePeers(ch chan *p2p.PeerEvent) event.Subscription {
func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription {
return self.peerFeed.Subscribe(ch)
}

Expand All @@ -246,7 +255,7 @@ func (self *SimNode) PeersInfo() (info []*p2p.PeerInfo) {
return nil
}

func (self *SimNode) RunProtocol(node *SimNode, rw, rrw p2p.MsgReadWriter, peer *Peer) {
func (self *SimNode) RunProtocol(node *SimNode, rw, rrw MsgReadWriteCloser, peer *Peer) {
id := node.Id
protocol := self.service.Protocols()[0]
if protocol.Run == nil {
Expand Down
58 changes: 58 additions & 0 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import (
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp"


)

// Msg defines the structure of a p2p message.
Expand Down Expand Up @@ -271,3 +275,57 @@ func ExpectMsg(r MsgReader, code uint64, content interface{}) error {
}
return nil
}


// wraps a msgreadwriter to allow for emitting message events upon
// send or receive
type MsgReporterRW struct {
MsgReadWriter
feed *event.Feed
peerid discover.NodeID
closefunc func() error
}

func NewMsgReporterRW(feed *event.Feed, rw MsgReadWriter, id discover.NodeID, closefunc func() error) *MsgReporterRW {
return &MsgReporterRW{
MsgReadWriter: rw,
feed: feed,
peerid: id,
closefunc: closefunc,
}
}

func (self *MsgReporterRW) ReadMsg() (Msg, error) {
msg, err := self.MsgReadWriter.ReadMsg()
if err != nil {
return msg, err
}
event := PeerEvent{
Type: PeerEventTypeMsgRecv,
Peer: self.peerid,
Label: fmt.Sprintf("%d,%d", msg.Code, msg.Size),
}
self.feed.Send(event)
return msg, nil
}

func (self *MsgReporterRW) WriteMsg(msg Msg) error {
err := self.MsgReadWriter.WriteMsg(msg)
if err != nil {
return err
}
event := PeerEvent{
Type: PeerEventTypeMsgSend,
Peer: self.peerid,
Label: fmt.Sprintf("%d,%d", msg.Code, msg.Size),
}
self.feed.Send(event)
return nil
}

func (self *MsgReporterRW) Close() error {
if self.closefunc != nil {
return self.closefunc()
}
return nil
}
13 changes: 13 additions & 0 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,22 @@ const (
// PeerEventTypeDrop is the type of event emitted when a peer is
// dropped from a p2p.Server
PeerEventTypeDrop PeerEventType = "drop"

// PeerEventTypeMsgSend is the type of event emitted when a
// message is successfully sent to a peer
PeerEventTypeMsgSend PeerEventType = "msgsend"

// PeerEventTypeMsgSend is the type of event emitted when a
// message is successfully sent to a peer
PeerEventTypeMsgRecv PeerEventType = "msgrecv"
)

// PeerEvent is an event emitted when peers are either added or dropped from
// a p2p.Server
type PeerEvent struct {
Type PeerEventType
Peer discover.NodeID
Label string
}

// Peer represents a connected remote node.
Expand Down Expand Up @@ -381,6 +390,10 @@ func (rw *protoRW) ReadMsg() (Msg, error) {
}
}

func (rw *protoRW) Close() error {
return nil
}

// PeerInfo represents a short summary of the information known about a connected
// peer. Sub-protocol independent fields are contained and initialized here, with
// protocol specifics delegated to all connected sub-protocols.
Expand Down
4 changes: 2 additions & 2 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type Server interface {
Stop() error
AddPeer(node *discover.Node)
RemovePeer(node *discover.Node)
SubscribePeers(ch chan *PeerEvent) event.Subscription
SubscribeEvents(ch chan *PeerEvent) event.Subscription
PeerCount() int
NodeInfo() *NodeInfo
PeersInfo() []*PeerInfo
Expand Down Expand Up @@ -309,7 +309,7 @@ func (srv *server) RemovePeer(node *discover.Node) {
}

// SubscribePeers subscribes the given channel to peer events
func (srv *server) SubscribePeers(ch chan *PeerEvent) event.Subscription {
func (srv *server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
return srv.peerFeed.Subscribe(ch)
}

Expand Down
6 changes: 3 additions & 3 deletions p2p/testing/protocolsession.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (self *ProtocolSession) trigger(trig Trigger) error {
if peer == nil {
panic(fmt.Sprintf("trigger: peer %v does not exist (1- %v)", trig.Peer, len(self.Ids)))
}
if peer.MsgPipeRW == nil {
if peer.MsgReadWriteCloser == nil {
return fmt.Errorf("trigger: peer %v unreachable", trig.Peer)
}
errc := make(chan error)
Expand Down Expand Up @@ -112,7 +112,7 @@ func (self *ProtocolSession) expect(exp Expect) error {
if peer == nil {
panic(fmt.Sprintf("expect: peer %v does not exist (1- %v)", exp.Peer, len(self.Ids)))
}
if peer.MsgPipeRW == nil {
if peer.MsgReadWriteCloser== nil {
return fmt.Errorf("trigger: peer %v unreachable", exp.Peer)
}

Expand Down Expand Up @@ -247,7 +247,7 @@ func (self *ProtocolSession) TestDisconnected(disconnects ...*Disconnect) error
func (self *ProtocolSession) Stop() {
for _, id := range self.Ids {
p := self.GetPeer(id)
if p != nil && p.MsgPipeRW != nil {
if p != nil && p.MsgReadWriteCloser != nil {
p.Close()
}
}
Expand Down

0 comments on commit 4850da9

Please sign in to comment.