Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update visualized snapshot test #18286

Merged
merged 5 commits into from
Dec 18, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 214 additions & 53 deletions swarm/network/stream/visualized_snapshot_sync_sim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,27 @@
package stream

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)

Expand Down Expand Up @@ -68,12 +79,12 @@ func watchSim(sim *simulation.Simulation) (context.Context, context.CancelFunc)
disconnections := sim.PeerEvents(
context.Background(),
sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
simulation.NewPeerEventsFilter().Drop(),
)

go func() {
for d := range disconnections {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
panic("unexpected disconnect")
cancelSimRun()
}
Expand Down Expand Up @@ -144,21 +155,75 @@ func sendSimTerminatedEvent(sim *simulation.Simulation) {
//It also sends some custom events so that the frontend
//can visualize messages like SendOfferedMsg, WantedHashesMsg, DeliveryMsg
func TestSnapshotSyncWithServer(t *testing.T) {
//t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted")

//define a wrapper object to be able to pass around data
wrapper := &netWrapper{}

nodeCount := *nodes
chunkCount := *chunks

if nodeCount == 0 || chunkCount == 0 {
nodeCount = 32
chunkCount = 1
}

log.Info(fmt.Sprintf("Running the simulation with %d nodes and %d chunks", nodeCount, chunkCount))

sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)

tr := &testRegistry{
Registry: r,
w: wrapper,
}

bucket.Store(bucketKeyRegistry, tr)

cleanup = func() {
netStore.Close()
tr.Close()
os.RemoveAll(datadir)
}

return tr, cleanup, nil
},
}).WithServer(":8888") //start with the HTTP server

t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted")
nodeCount, chunkCount, sim := setupSim(simServiceMap)
defer sim.Close()

log.Info("Initializing test config")

conf := &synctestConfig{}
//map of discover ID to indexes of chunks expected at that ID
conf.idToChunksMap = make(map[discover.NodeID][]int)
conf.idToChunksMap = make(map[enode.ID][]int)
//map of overlay address to discover ID
conf.addrToIDMap = make(map[string]discover.NodeID)
conf.addrToIDMap = make(map[string]enode.ID)
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)

//pass the network to the wrapper object
wrapper.setNetwork(sim.Net)
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
if err != nil {
panic(err)
Expand All @@ -167,49 +232,6 @@ func TestSnapshotSyncWithServer(t *testing.T) {
ctx, cancelSimRun := watchSim(sim)
defer cancelSimRun()

//setup filters in the event feed
offeredHashesFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(1)
wantedFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(2)
deliveryFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(6)
eventC := sim.PeerEvents(ctx, sim.UpNodeIDs(), offeredHashesFilter, wantedFilter, deliveryFilter)

quit := make(chan struct{})

go func() {
for e := range eventC {
select {
case <-quit:
fmt.Println("quitting event loop")
return
default:
}
if e.Error != nil {
t.Fatal(e.Error)
}
if *e.Event.MsgCode == uint64(1) {
evt := &simulations.Event{
Type: EventTypeChunkOffered,
Node: sim.Net.GetNode(e.NodeID),
Control: false,
}
sim.Net.Events().Send(evt)
} else if *e.Event.MsgCode == uint64(2) {
evt := &simulations.Event{
Type: EventTypeChunkWanted,
Node: sim.Net.GetNode(e.NodeID),
Control: false,
}
sim.Net.Events().Send(evt)
} else if *e.Event.MsgCode == uint64(6) {
evt := &simulations.Event{
Type: EventTypeChunkDelivered,
Node: sim.Net.GetNode(e.NodeID),
Control: false,
}
sim.Net.Events().Send(evt)
}
}
}()
//run the sim
result := runSim(conf, ctx, sim, chunkCount)

Expand All @@ -218,11 +240,150 @@ func TestSnapshotSyncWithServer(t *testing.T) {
Type: EventTypeSimTerminated,
Control: false,
}
sim.Net.Events().Send(evt)
go sim.Net.Events().Send(evt)

if result.Error != nil {
panic(result.Error)
}
close(quit)
log.Info("Simulation ended")
}

//testRegistry embeds registry
//it allows to replace the protocol run function
type testRegistry struct {
*Registry
w *netWrapper
}

//Protocols replaces the protocol's run function
func (tr *testRegistry) Protocols() []p2p.Protocol {
regProto := tr.Registry.Protocols()
//set the `stream` protocol's run function with the testRegistry's one
regProto[0].Run = tr.runProto
return regProto
}

//runProto is the new overwritten protocol's run function for this test
func (tr *testRegistry) runProto(p *p2p.Peer, rw p2p.MsgReadWriter) error {
//create a custom rw message ReadWriter
testRw := &testMsgReadWriter{
MsgReadWriter: rw,
Peer: p,
w: tr.w,
Registry: tr.Registry,
}
//now run the actual upper layer `Registry`'s protocol function
return tr.runProtocol(p, testRw)
}

//testMsgReadWriter is a custom rw
//it will allow us to re-use the message twice
type testMsgReadWriter struct {
*Registry
p2p.MsgReadWriter
*p2p.Peer
w *netWrapper
}

//netWrapper wrapper object so we can pass data around
type netWrapper struct {
net *simulations.Network
}

//set the network to the wrapper for later use (used inside the custom rw)
func (w *netWrapper) setNetwork(n *simulations.Network) {
w.net = n
}

//get he network from the wrapper (used inside the custom rw)
func (w *netWrapper) getNetwork() *simulations.Network {
return w.net
}

// ReadMsg reads a message from the underlying MsgReadWriter and emits a
// "message received" event
//we do this because we are interested in the Payload of the message for custom use
//in this test, but messages can only be consumed once (stream io.Reader)
func (ev *testMsgReadWriter) ReadMsg() (p2p.Msg, error) {
//read the message from the underlying rw
msg, err := ev.MsgReadWriter.ReadMsg()
if err != nil {
return msg, err
}

//don't do anything with message codes we actually are not needing/reading
subCodes := []uint64{1, 2, 10}
found := false
for _, c := range subCodes {
if c == msg.Code {
found = true
}
}
//just return if not a msg code we are interested in
if !found {
return msg, nil
}

//we use a io.TeeReader so that we can read the message twice
//the Payload is a io.Reader, so if we read from it, the actual protocol handler
//cannot access it anymore.
//But we need that handler to be able to consume the message as normal,
//as if we would not do anything here with that message
var buf bytes.Buffer
tee := io.TeeReader(msg.Payload, &buf)

mcp := &p2p.Msg{
Code: msg.Code,
Size: msg.Size,
ReceivedAt: msg.ReceivedAt,
Payload: tee,
}
//assign the copy for later use
msg.Payload = &buf

//now let's look into the message
var wmsg protocols.WrappedMsg
err = mcp.Decode(&wmsg)
if err != nil {
log.Error(err.Error())
return msg, err
}
//create a new message from the code
val, ok := ev.Registry.GetSpec().NewMsg(mcp.Code)
if !ok {
return msg, errors.New(fmt.Sprintf("Invalid message code: %v", msg.Code))
}
//decode it
if err := rlp.DecodeBytes(wmsg.Payload, val); err != nil {
return msg, errors.New(fmt.Sprintf("Decoding error <= %v: %v", msg, err))
}
//now for every message type we are interested in, create a custom event and send it
var evt *simulations.Event
switch val := val.(type) {
case *OfferedHashesMsg:
evt = &simulations.Event{
Type: EventTypeChunkOffered,
Node: ev.w.getNetwork().GetNode(ev.ID()),
Control: false,
Data: val.Hashes,
}
case *WantedHashesMsg:
evt = &simulations.Event{
Type: EventTypeChunkWanted,
Node: ev.w.getNetwork().GetNode(ev.ID()),
Control: false,
}
case *ChunkDeliveryMsgSyncing:
evt = &simulations.Event{
Type: EventTypeChunkDelivered,
Node: ev.w.getNetwork().GetNode(ev.ID()),
Control: false,
Data: val.Addr.String(),
}
}
if evt != nil {
//send custom event to feed; frontend will listen to it and display
ev.w.getNetwork().Events().Send(evt)
}
return msg, nil
}