Skip to content

Commit

Permalink
resolve review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Jan 13, 2023
1 parent 5dac00d commit 61eabf5
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 45 deletions.
4 changes: 2 additions & 2 deletions command/loadgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/multiformats/go-multihash"
"github.com/urfave/cli/v2"

httpfinderclient "github.com/filecoin-project/storetheindex/api/v0/finder/client/http"
"github.com/filecoin-project/storetheindex/command/loadgen"
httpfinderclient "github.com/ipni/storetheindex/api/v0/finder/client/http"
"github.com/ipni/storetheindex/command/loadgen"
)

var LoadGenCmd = &cli.Command{
Expand Down
71 changes: 37 additions & 34 deletions command/loadgen/loadgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,22 @@ import (

mathrand "math/rand"

"github.com/filecoin-project/go-legs"
legsDT "github.com/filecoin-project/go-legs/dtsync"
legsHttp "github.com/filecoin-project/go-legs/httpsync"
ingesthttpclient "github.com/filecoin-project/storetheindex/api/v0/ingest/client/http"
"github.com/filecoin-project/storetheindex/api/v0/ingest/schema"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/dsadapter"
ingesthttpclient "github.com/ipni/storetheindex/api/v0/ingest/client/http"
"github.com/ipni/storetheindex/api/v0/ingest/schema"
"github.com/ipni/storetheindex/dagsync"
"github.com/ipni/storetheindex/dagsync/dtsync"
"github.com/ipni/storetheindex/dagsync/httpsync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
libp2pconfig "github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
Expand Down Expand Up @@ -94,12 +93,16 @@ func startProviderLoadGen(config Config, indexerHttpAddr string, addressMapping

fmt.Printf("Provider seed=%d ID=%v\n", p.config.Seed, p.h.ID())

stopGenAds = p.runUpdater(func() {
err := p.announce()
if err != nil {
panic("Failed to announce: " + err.Error())
var afterEachUpdate func()
if !config.IsHttp {
afterEachUpdate = func() {
err := p.announce()
if err != nil {
panic("Failed to announce: " + err.Error())
}
}
})
}
stopGenAds = p.runUpdater(afterEachUpdate)

close = func() {}
if config.IsHttp {
Expand Down Expand Up @@ -133,15 +136,15 @@ func newProviderLoadGen(c Config, indexerHttpAddr string, addressMapping map[str
panic("Failed to start host" + err.Error())
}

var pub legs.Publisher
var pub dagsync.Publisher
if c.IsHttp {
pub, err = legsHttp.NewPublisher(c.HttpListenAddr, lsys, host.ID(), signingKey)
pub, err = httpsync.NewPublisher(c.HttpListenAddr, lsys, host.ID(), signingKey)
} else {
pub, err = legsDT.NewPublisher(host, ds, lsys, c.GossipSubTopic)
pub, err = dtsync.NewPublisher(host, ds, lsys, c.GossipSubTopic)

}
if err != nil {
panic("Failed to start legs publisher: " + err.Error())
panic("Failed to start publisher: " + err.Error())
}
p := &providerLoadGen{
indexerHttpAddr: indexerHttpAddr,
Expand All @@ -161,13 +164,13 @@ type providerLoadGen struct {
signingKey crypto.PrivKey
h host.Host
lsys ipld.LinkSystem
pub legs.Publisher
pub dagsync.Publisher
// Keep track of the total number of entries we've created. There are a couple uses for this:
// * At the end we can tell the user how many entries we've created.
// * We can generate multihashes as a function of this number. E.g. the multihash is just an encoded version of the entry number.
entriesGenerated uint
adsGenerated uint
currentHead *ipld.Link
currentHead ipld.Link
recordKeepingMu sync.Mutex
}

Expand All @@ -189,9 +192,10 @@ func (p *providerLoadGen) announce() error {
}

if p.currentHead == nil {
return client.Announce(context.Background(), &peer.AddrInfo{ID: p.h.ID(), Addrs: addrs}, cid.Undef)
// Nothing to announce
return nil
}
return client.Announce(context.Background(), &peer.AddrInfo{ID: p.h.ID(), Addrs: addrs}, (*p.currentHead).(cidlink.Link).Cid)
return client.Announce(context.Background(), &peer.AddrInfo{ID: p.h.ID(), Addrs: addrs}, (p.currentHead).(cidlink.Link).Cid)
}

func (p *providerLoadGen) announceInBackground() func() {
Expand All @@ -214,9 +218,8 @@ func (p *providerLoadGen) announceInBackground() func() {
}
}

func (p *providerLoadGen) runUpdater(afterFirstUpdate func()) func() {
func (p *providerLoadGen) runUpdater(afterEachUpdate func()) func() {
closer := make(chan struct{})
var callAfterFirstUpdate *sync.Once = &sync.Once{}

go func() {
t := time.NewTicker(time.Second / time.Duration(p.config.AdsPerSec))
Expand Down Expand Up @@ -246,7 +249,7 @@ func (p *providerLoadGen) runUpdater(afterFirstUpdate func()) func() {
panic(fmt.Sprintf("Failed to build ad: %s", err))
}

p.currentHead = &nextAdHead
p.currentHead = nextAdHead
p.adsGenerated = p.adsGenerated + 1
p.entriesGenerated = adBuilder.entryCount + p.entriesGenerated
fmt.Printf("ID=%d .Number of generated entries: %d\n", p.config.Seed, p.entriesGenerated)
Expand All @@ -258,8 +261,9 @@ func (p *providerLoadGen) runUpdater(afterFirstUpdate func()) func() {
panic(fmt.Sprintf("Failed to publish ad: %s", err))
}

callAfterFirstUpdate.Do(afterFirstUpdate)

if afterEachUpdate != nil {
afterEachUpdate()
}
fmt.Println("Published ad in", time.Since(start))

if p.config.StopAfterNEntries > 0 && p.entriesGenerated > uint(p.config.StopAfterNEntries) {
Expand Down Expand Up @@ -301,13 +305,12 @@ type adBuilder struct {
providerAddrs []string
}

func (b adBuilder) build(lsys ipld.LinkSystem, signingKey crypto.PrivKey, prevAd *ipld.Link) (ipld.Link, error) {
func (b adBuilder) build(lsys ipld.LinkSystem, signingKey crypto.PrivKey, prevAd ipld.Link) (ipld.Link, error) {
contextID := []byte(fmt.Sprintf("%d", b.contextID))
metadata := b.metadata
var entriesLink *ipld.Link
var entriesLink ipld.Link
if b.entryCount == 0 {
l := ipld.Link(schema.NoEntries)
entriesLink = &l
entriesLink = ipld.Link(schema.NoEntries)
} else {
var allMhs []multihash.Multihash
for i := uint(0); i < b.entryCount; i++ {
Expand Down Expand Up @@ -338,15 +341,15 @@ func (b adBuilder) build(lsys ipld.LinkSystem, signingKey crypto.PrivKey, prevAd
return nil, err
}
if entriesLinkV != nil {
entriesLink = &entriesLinkV
entriesLink = entriesLinkV
}
}
}
ad := schema.Advertisement{
PreviousID: prevAd,
Provider: b.provider,
Addresses: b.providerAddrs,
Entries: *entriesLink,
Entries: entriesLink,
ContextID: contextID,
Metadata: metadata,
IsRm: b.isRm,
Expand Down
37 changes: 29 additions & 8 deletions command/loadgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"testing"
"time"

"github.com/filecoin-project/storetheindex/command/loadgen"
"github.com/filecoin-project/storetheindex/config"
"github.com/ipni/storetheindex/command/loadgen"
"github.com/ipni/storetheindex/config"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
)
Expand All @@ -22,22 +22,30 @@ const FinderAddr = "/ip4/127.0.0.1/tcp/13000"
const IngestAddr = "/ip4/127.0.0.1/tcp/13001"
const AdminAddr = "/ip4/127.0.0.1/tcp/13002"

func TestSmallLoad(t *testing.T) {
func TestSmallLoadNoHTTP(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
ctx, cncl := context.WithTimeout(context.Background(), 60*time.Second)
defer cncl()
testLoadHelper(ctx, t, 1, 1000, false)
}

func TestSmallLoadOverHTTP(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
ctx, cncl := context.WithTimeout(context.Background(), 60*time.Second)
defer cncl()
testLoadHelper(ctx, t, 1, 1000, true)
}

func TestLargeLoad(t *testing.T) {
switch runtime.GOOS {
case "linux":
t.Skip("skipping Large load test on linux because it takes too long in Github Actions CI.")
if testing.Short() {
t.Skip("skipping test in short mode.")
}
if os.Getenv("CI") != "" {
t.Skip("Skipping testing in CI environment")
}

ctx, cncl := context.WithTimeout(context.Background(), 180*time.Second)
Expand All @@ -52,7 +60,7 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint,
}

// Set up a context that is canceled when the command is interrupted
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()

tempDir := t.TempDir()
Expand All @@ -75,8 +83,10 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint,
err := app.RunContext(ctx, []string{"storetheindex", "init"})
require.NoError(t, err)

daemonDone := make(chan struct{})
go func() {
app.RunContext(ctx, []string{"storetheindex", "daemon"})
close(daemonDone)
}()

finderParts := strings.Split(FinderAddr, "/")
Expand All @@ -103,10 +113,13 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint,
time.Sleep(100 * time.Millisecond)
}

loadgenDone := make(chan struct{})
go func() {
startLoadgen(ctx, "http://127.0.0.1:"+ingestPort, concurrentProviders, numberOfEntriesPerProvider, useHTTP)
close(loadgenDone)
}()

timer := time.NewTimer(time.Second)
foundAll := false
LOOP:
for {
Expand All @@ -118,11 +131,19 @@ LOOP:
select {
case <-ctx.Done():
break LOOP
case <-time.After(1 * time.Second):
case <-timer.C:
timer.Reset(time.Second)
}
}
timer.Stop()

require.True(t, foundAll, "Did not find all entries")

// Wait until loadgen and daemon are stopped so that tempDir can be removed
// on Windows.
cancel()
<-loadgenDone
<-daemonDone
}

func startLoadgen(ctx context.Context, indexerAddr string, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) {
Expand Down
1 change: 0 additions & 1 deletion e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func (e *e2eTestRunner) stop(cmd *exec.Cmd, timeout time.Duration) {
}

func TestEndToEndWithReferenceProvider(t *testing.T) {
t.Skip("Skipping because it's failing on CI but works locally. So it is unactionable. Before re-enabling this test, please try to make it reproducible. Ideally in a clean environment that is completely independent of the host machine.")
switch runtime.GOOS {
case "windows":
t.Skip("skipping test on", runtime.GOOS)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/ipfs/kubo v0.17.0
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0
github.com/ipld/go-ipld-prime v0.19.0
github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd
github.com/ipld/go-storethehash v0.3.13
github.com/ipni/go-indexer-core v0.6.20-0.20230112154735-7b53be959fe6
github.com/libp2p/go-libp2p v0.23.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,8 @@ github.com/ipld/go-ipld-prime v0.17.0/go.mod h1:aYcKm5TIvGfY8P3QBKz/2gKcLxzJ1zDa
github.com/ipld/go-ipld-prime v0.19.0 h1:5axC7rJmPc17Emw6TelxGwnzALk0PdupZ2oj2roDj04=
github.com/ipld/go-ipld-prime v0.19.0/go.mod h1:Q9j3BaVXwaA3o5JUDNvptDDr/x8+F7FG6XJ8WI3ILg4=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY=
github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd h1:qdjo1CRvAQhOMoyYjPnbdZ5rYFFmqztweQ9KAsuWpO0=
github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd/go.mod h1:9DD/GM0JNPoisgR09F62kbBi7kHa4eDIea4XshXYOVc=
github.com/ipld/go-storethehash v0.3.13 h1:1T6kX5K57lAgxbsGitZEaZMAR3RdWch2kO8okK/BgN8=
github.com/ipld/go-storethehash v0.3.13/go.mod h1:KCYpzmamubnSwm7fvWcCkm0aIwQh4WRNtzrKK4pVhAQ=
github.com/ipni/go-indexer-core v0.6.20-0.20230112154735-7b53be959fe6 h1:sn06bRrS1aAlH5WdeLsHxeHPArKlS46bvwjSIapBf1M=
Expand Down

0 comments on commit 61eabf5

Please sign in to comment.