Skip to content

Commit

Permalink
swarm/network: refactor simulation tests bootstrap (ethereum#18975)
Browse files Browse the repository at this point in the history
(cherry picked from commit 597597e)
  • Loading branch information
nonsense authored and dshulyak committed Mar 14, 2019
1 parent 7ef934e commit 1bc4bfe
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 247 deletions.
77 changes: 77 additions & 0 deletions swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ import (
"math/rand"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
)
Expand Down Expand Up @@ -66,6 +69,80 @@ func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}

// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
addr := network.NewAddr(ctx.Config.Node())

netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, nil, err
}

netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

return addr, netStore, delivery, cleanup, nil
}

// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr
func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, err
}

netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

return netStore, delivery, cleanup, nil
}

// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
addr := network.NewAddr(ctx.Config.Node())

netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, nil, err
}

netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New

return addr, netStore, delivery, cleanup, nil
}

func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
n := ctx.Config.Node()

store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if *useMockStore {
store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr)
}
if err != nil {
return nil, nil, nil, err
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, nil, err
}

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())

kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)

bucket.Store(bucketKeyStore, store)
bucket.Store(bucketKeyDB, netStore)
bucket.Store(bucketKeyDelivery, delivery)
bucket.Store(bucketKeyFileStore, fileStore)

cleanup := func() {
netStore.Close()
os.RemoveAll(datadir)
}

return netStore, delivery, cleanup, nil
}

func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
Expand Down
51 changes: 11 additions & 40 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -457,39 +456,24 @@ func TestDeliveryFromNodes(t *testing.T) {
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
node := ctx.Config.Node()
addr := network.NewAddr(node)
store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}

kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalEnabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil

},
})
defer sim.Close()
Expand Down Expand Up @@ -644,38 +628,25 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
node := ctx.Config.Node()
addr := network.NewAddr(node)
store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
}, nil)
bucket.Store(bucketKeyRegistry, r)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil

},
})
defer sim.Close()
Expand Down
28 changes: 6 additions & 22 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
Expand All @@ -31,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand Down Expand Up @@ -62,26 +60,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
externalStreamMaxKeys := uint64(100)

sim := simulation.New(map[string]simulation.ServiceFunc{
"intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
"intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
store.Close()
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Expand All @@ -97,11 +80,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
})

fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup := func() {
r.Close()
clean()
}

return r, cleanup, nil

},
})
defer sim.Close()
Expand Down
52 changes: 16 additions & 36 deletions swarm/network/stream/snapshot_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package stream
import (
"context"
"fmt"
"os"
"sync"
"testing"
"time"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand Down Expand Up @@ -105,43 +103,25 @@ func TestRetrieval(t *testing.T) {
}

var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
"streamer": retrievalStreamerFunc,
}

func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)

localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)

cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close()
}
cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil
return r, cleanup, nil
},
}

/*
Expand Down
51 changes: 18 additions & 33 deletions swarm/network/stream/snapshot_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,42 +107,27 @@ func TestSyncingViaGlobalSync(t *testing.T) {
}

var simServiceMap = map[string]simulation.ServiceFunc{
"streamer": streamerFunc,
}
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
if err != nil {
return nil, nil, err
}

func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)

bucket.Store(bucketKeyRegistry, r)

cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close()
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)

return r, cleanup, nil
bucket.Store(bucketKeyRegistry, r)

cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil
},
}

func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
Expand Down
Loading

0 comments on commit 1bc4bfe

Please sign in to comment.