From 066895a44285885ea3a87aeb91044a7ff1a516de Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 8 Apr 2022 14:09:42 -0700 Subject: [PATCH 01/12] Add loadgen and loadgen-verify commands These are commands that can be used to generate write load on a storage provider, and verify the Ads have been correctly ingested. Useful for both stress testing an indexer as well as verifying consistency. --- command/loadgen.go | 199 +++++++++++++++++ command/loadgen/config.go | 142 ++++++++++++ command/loadgen/config_test.go | 40 ++++ command/loadgen/loadgen.go | 396 +++++++++++++++++++++++++++++++++ command/loadgen/pseudorand.go | 22 ++ command/loadgen_test.go | 110 +++++++++ main.go | 2 + 7 files changed, 911 insertions(+) create mode 100644 command/loadgen.go create mode 100644 command/loadgen/config.go create mode 100644 command/loadgen/config_test.go create mode 100644 command/loadgen/loadgen.go create mode 100644 command/loadgen/pseudorand.go create mode 100644 command/loadgen_test.go diff --git a/command/loadgen.go b/command/loadgen.go new file mode 100644 index 000000000..24f107c56 --- /dev/null +++ b/command/loadgen.go @@ -0,0 +1,199 @@ +package command + +import ( + "context" + "errors" + "fmt" + mathrand "math/rand" + "strings" + "time" + + "github.com/multiformats/go-multihash" + "github.com/urfave/cli/v2" + + httpfinderclient "github.com/filecoin-project/storetheindex/api/v0/finder/client/http" + "github.com/filecoin-project/storetheindex/command/loadgen" +) + +var LoadGenCmd = &cli.Command{ + Name: "loadgen", + Usage: "Generate fake provider load for the indexer", + Flags: loadGenFlags, + Action: loadGenCmd, +} + +var LoadGenVerifyCmd = &cli.Command{ + Name: "loadgen-verify", + Usage: "Generate fake provider load for the indexer", + Flags: loadGenVerifyFlags, + Action: loadGenVerifyCmd, +} + +var loadGenFlags = []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Usage: "Config file that defines the load generated", + Required: false, + }, + &cli.UintFlag{ + Name: "concurrentProviders", + Usage: "How many concurrent providers", + Required: false, + Value: 1, + }, + &cli.StringFlag{ + Name: "indexer", + Usage: "Indexer http address. Host or host:port", + EnvVars: []string{"STORETHEINDEX_LISTEN_INGEST"}, + Aliases: []string{"i"}, + Required: false, + Value: "http://localhost:3001", + }, + &cli.StringFlag{ + Name: "topic", + Usage: "Which topic to use for libp2p", + Value: loadgen.DefaultConfig().GossipSubTopic, + }, + &cli.StringFlag{ + Name: "external-address-mappping", + Usage: `localIP=externalIP,localIP2=externalIP2. + Map the local listening address to a known + external address. Useful when behind a NAT (like in an AWS ec2 instance). It + will use the external IP when communicating with other peers.`, + }, +} + +func loadGenCmd(cctx *cli.Context) error { + configFile := cctx.String("config") + config := loadgen.DefaultConfig() + if configFile != "" { + var err error + config, err = loadgen.LoadConfigFromFile(configFile) + if err != nil { + panic("Failed to load config file: " + err.Error()) + } + } + if cctx.IsSet("topic") { + config.GossipSubTopic = cctx.String("topic") + } + + loadgen.StartLoadGen(cctx.Context, config, loadgen.LoadGenOpts{ + IndexerAddr: cctx.String("indexer"), + ConcurrentProviders: cctx.Uint("concurrentProviders"), + ListenForInterrupt: true, + }) + return nil +} + +var loadGenVerifyFlags = []cli.Flag{ + &cli.Uint64Flag{ + Name: "concurrentProviders", + Usage: "How many concurrent providers generated the load", + Value: 1, + Required: false, + }, + &cli.Uint64Flag{ + Name: "maxEntryNumber", + Usage: "How many entries were generated by the load test (per provider)", + Value: 1000, + Required: false, + }, + &cli.Uint64Flag{ + Name: "numberOfRandomQueries", + Usage: "How many queries to make in the address space (per provider).", + Value: 1000, + Required: false, + }, + &cli.StringFlag{ + Name: "indexerFind", + Usage: "HTTP Address of the indexer find endpoint e.g. http://localhost:3000", + EnvVars: []string{"STORETHEINDEX_LISTEN_FINDER_HTTP"}, + Required: false, + Value: "http://localhost:3000", + }, +} + +func loadGenVerifyCmd(cctx *cli.Context) error { + client, err := httpfinderclient.New(cctx.String("indexerFind")) + if err != nil { + return err + } + var allMhs []multihash.Multihash + // Map from provider id to entry number id to if the indexer has it + allMhsProviderEntryNumber := map[uint64]map[uint64]bool{} + mhToProviderEntryNumber := map[string]struct { + providerNumber uint64 + entryNumber uint64 + }{} + + numberOfMhsToQuery := cctx.Uint64("numberOfRandomQueries") + for i := uint64(0); i < cctx.Uint64("concurrentProviders"); i++ { + for j := uint64(0); j < numberOfMhsToQuery; j++ { + multihashIndex := uint64(mathrand.Int63n(int64(cctx.Uint64("maxEntryNumber")))) + mh, err := loadgen.GenerateMH(i, multihashIndex) + if err != nil { + return err + } + allMhs = append(allMhs, mh) + if allMhsProviderEntryNumber[i] == nil { + allMhsProviderEntryNumber[i] = map[uint64]bool{} + } + + allMhsProviderEntryNumber[i][multihashIndex] = false + mhToProviderEntryNumber[mh.B58String()] = struct { + providerNumber uint64 + entryNumber uint64 + }{i, multihashIndex} + } + } + + start := time.Now() + + resp, err := client.FindBatch(context.Background(), allMhs) + if err != nil { + return err + } + + for _, result := range resp.MultihashResults { + providerAndEntry := mhToProviderEntryNumber[result.Multihash.B58String()] + allMhsProviderEntryNumber[providerAndEntry.providerNumber][providerAndEntry.entryNumber] = true + } + + if len(allMhs) != len(resp.MultihashResults) { + limitToShow := 10 + for provider, entries := range allMhsProviderEntryNumber { + for entry, found := range entries { + if !found { + fmt.Printf("Missing: providerID=%d entryNumber=%d\n", provider, entry) + limitToShow-- + if limitToShow <= 0 { + break + } + } + } + } + } + + fmt.Printf("Found %d out of %d (%02d%%)\n", len(resp.MultihashResults), len(allMhs), int(float64(len(resp.MultihashResults))/float64(len(allMhs))*100)) + fmt.Println("Find took", time.Since(start)) + if len(allMhs) != len(resp.MultihashResults) { + return errors.New("not all mhs were found") + } + return nil +} + +// parseKVs converts a string of the form key=value,key2=value2 into a map[string]string +func parseKVs(kvs string) map[string]string { + out := map[string]string{} + if kvs == "" { + return out + } + kvSlice := strings.Split(kvs, ",") + for _, kv := range kvSlice { + parts := strings.Split(kv, "=") + k := parts[0] + v := parts[1] + out[k] = v + } + return out +} diff --git a/command/loadgen/config.go b/command/loadgen/config.go new file mode 100644 index 000000000..f8b4309dd --- /dev/null +++ b/command/loadgen/config.go @@ -0,0 +1,142 @@ +package loadgen + +import ( + "encoding/json" + "fmt" + "go/ast" + "go/constant" + "go/parser" + mathrand "math/rand" + "os" + "strconv" + "strings" +) + +type Config struct { + AdsPerSec uint `json:"adsPerSec"` + // A generator to specify how many entries per ad. + // A function so the caller can define a distribution to follow. + EntriesPerAdGenerator func() uint `json:"-"` + // For json to be able to use a predefined distribution. + EntriesPerAdType string `json:"entriesPerAdType"` + EntriesPerChunk uint `json:"entriesPerChunk"` + // Should this provider be an http provider? + IsHttp bool `json:"isHttp"` + HttpListenAddr string `json:"httpListenAddr"` + // How many of the last N ads should be kept. 0 means every ad is kept. + KeepNAds uint `json:"keepNAds"` + Seed uint64 `json:"seed"` + + StopAfterNEntries uint64 `json:"stopAfterNEntries"` + + ListenMultiaddr string `json:"listenMultiaddr"` + GossipSubTopic string `json:"gossipSubTopic"` +} + +func evalBasicLit(expr *ast.BasicLit) constant.Value { + return constant.MakeFromLiteral(expr.Value, expr.Kind, 0) +} + +func (c *Config) ParseEntriesPerAdGenerator() bool { + astV, _ := parser.ParseExpr(c.EntriesPerAdType) + distributionType, ok := astV.(*ast.CallExpr) + if !ok { + return false + } + switch distributionType.Fun.(*ast.Ident).Name { + case "Normal": + // Normal(stdev, mean) + sigma, ok := constant.Float64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit))) + if !ok { + return false + } + μ, ok := constant.Float64Val(evalBasicLit(distributionType.Args[1].(*ast.BasicLit))) + if !ok { + return false + } + c.EntriesPerAdGenerator = func() uint { + return uint(mathrand.NormFloat64()*sigma + μ) + } + case "Uniform": + // Uniform(start, end) + start, ok := constant.Int64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit))) + if !ok { + return false + } + end, ok := constant.Int64Val(evalBasicLit(distributionType.Args[1].(*ast.BasicLit))) + if !ok { + return false + } + c.EntriesPerAdGenerator = func() uint { + return uint(mathrand.Intn(int(end-start)) + int(start)) + } + case "Always": + // Always(value) + v, ok := constant.Uint64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit))) + if !ok { + return false + } + c.EntriesPerAdGenerator = func() uint { + return uint(v) + } + } + return true +} + +func DefaultConfig() Config { + return Config{ + AdsPerSec: 4, + EntriesPerAdGenerator: func() uint { + return uint(mathrand.NormFloat64()*10 + 70) + }, + EntriesPerChunk: 10, + IsHttp: false, + KeepNAds: 0, + Seed: 0, + StopAfterNEntries: 1000, + // The actual listen address will be this plus the seed for the port + ListenMultiaddr: "/ip4/127.0.0.1/tcp/18001", + HttpListenAddr: "127.0.0.1:19001", + GossipSubTopic: "indexer/ingest/loadtest", + } +} + +func incrementListenMultiaddrPortBy(ma string, n uint) (string, error) { + parts := strings.Split(ma, "/") + port, err := strconv.Atoi(parts[len(parts)-1]) + if err != nil { + return "", err + } + parts[len(parts)-1] = strconv.Itoa(port + int(n)) + return strings.Join(parts, "/"), nil +} + +func incrementHttpListenPortBy(ma string, n uint) (string, error) { + parts := strings.Split(ma, ":") + port, err := strconv.Atoi(parts[len(parts)-1]) + if err != nil { + return "", err + } + parts[len(parts)-1] = strconv.Itoa(port + int(n)) + return strings.Join(parts, ":"), nil +} + +func LoadConfigFromFile(file string) (Config, error) { + defaultConf := DefaultConfig() + b, err := os.ReadFile(file) + if err != nil { + return defaultConf, err + } + + c := &defaultConf + err = json.Unmarshal(b, c) + + if err != nil { + return defaultConf, err + } + + if !c.ParseEntriesPerAdGenerator() { + return defaultConf, fmt.Errorf("could not parse entries per ad generator") + } + return *c, nil +} diff --git a/command/loadgen/config_test.go b/command/loadgen/config_test.go new file mode 100644 index 000000000..1798a7031 --- /dev/null +++ b/command/loadgen/config_test.go @@ -0,0 +1,40 @@ +package loadgen + +import ( + "fmt" + "testing" +) + +func TestParseEntriesPerAdGenerator(t *testing.T) { + type testCase struct { + desc string + typ string + } + + testCases := []testCase{ + { + "Normal distribution. stdev=2, mean=100", + "Normal(2, 100)", + }, + { + "Uniform distribution. start=20, end=100", + "Uniform(20, 100)", + }, + { + "Always. val=20", + "Always(20)", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + c := &Config{ + EntriesPerAdType: tc.typ, + } + + c.ParseEntriesPerAdGenerator() + fmt.Println(c.EntriesPerAdGenerator()) + }) + } + +} diff --git a/command/loadgen/loadgen.go b/command/loadgen/loadgen.go new file mode 100644 index 000000000..356ec14b0 --- /dev/null +++ b/command/loadgen/loadgen.go @@ -0,0 +1,396 @@ +package loadgen + +import ( + "context" + "fmt" + "os" + "os/signal" + "strings" + "sync" + "time" + + mathrand "math/rand" + + "github.com/filecoin-project/go-legs" + legsDT "github.com/filecoin-project/go-legs/dtsync" + legsHttp "github.com/filecoin-project/go-legs/httpsync" + ingesthttpclient "github.com/filecoin-project/storetheindex/api/v0/ingest/client/http" + "github.com/filecoin-project/storetheindex/api/v0/ingest/schema" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/storage/dsadapter" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + libp2pconfig "github.com/libp2p/go-libp2p/config" + "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multihash" + "github.com/multiformats/go-varint" +) + +type LoadGenOpts struct { + IndexerAddr string + ConcurrentProviders uint + ExternalAddressMapping map[string]string + ListenForInterrupt bool +} + +func StartLoadGen(ctx context.Context, loadConfig Config, loadGenOpts LoadGenOpts) { + stopPublishingFns := make([]func(), 0, loadGenOpts.ConcurrentProviders) + closeFns := make([]func(), 0, loadGenOpts.ConcurrentProviders) + + fmt.Println("Starting load generator", loadGenOpts) + for i := uint(0); i < loadGenOpts.ConcurrentProviders; i++ { + configCopy := loadConfig + configCopy.Seed = loadConfig.Seed + uint64(i) + var err error + configCopy.ListenMultiaddr, err = incrementListenMultiaddrPortBy(loadConfig.ListenMultiaddr, i) + if err != nil { + panic("Failed to increment listen multiaddr: " + err.Error()) + } + configCopy.HttpListenAddr, err = incrementHttpListenPortBy(loadConfig.HttpListenAddr, i) + if err != nil { + panic("Failed to increment http listen multiaddr: " + err.Error()) + } + fmt.Println("Config is ", configCopy) + stopPublishing, close, err := startProviderLoadGen(configCopy, loadGenOpts.IndexerAddr, loadGenOpts.ExternalAddressMapping) + if err != nil { + panic("Failed to start provider: " + err.Error()) + } + stopPublishingFns = append(stopPublishingFns, stopPublishing) + closeFns = append(closeFns, close) + } + + ch := make(chan os.Signal, 1) + if loadGenOpts.ListenForInterrupt { + signal.Notify(ch, os.Interrupt) + <-ch + } else { + <-ctx.Done() + } + for _, fn := range stopPublishingFns { + fn() + } + + fmt.Println("New publishing stopped. Hit ctrl-c again to exit.") + if loadGenOpts.ListenForInterrupt { + <-ch + } else { + <-ctx.Done() + } + + for _, fn := range closeFns { + fn() + } +} + +func startProviderLoadGen(config Config, indexerHttpAddr string, addressMapping map[string]string) (stopGenAds func(), close func(), err error) { + p := newProviderLoadGen(config, indexerHttpAddr, addressMapping) + + fmt.Printf("Provider seed=%d ID=%v\n", p.config.Seed, p.h.ID()) + + stopGenAds = p.runUpdater(func() { + err := p.announce() + if err != nil { + panic("Failed to announce: " + err.Error()) + } + }) + + close = func() {} + if config.IsHttp { + close = p.announceInBackground() + } + fmt.Println("Started provider load generator") + fmt.Println("Peer ID:", p.h.ID().Pretty()) + fmt.Println("Addrs:", p.h.Addrs()) + fmt.Println() + + return stopGenAds, close, nil +} + +func newProviderLoadGen(c Config, indexerHttpAddr string, addressMapping map[string]string) *providerLoadGen { + lsys := cidlink.DefaultLinkSystem() + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + store := &dsadapter.Adapter{ + Wrapped: ds, + } + lsys.SetReadStorage(store) + lsys.SetWriteStorage(store) + + pseudoRandReader := newPseudoRandReaderFrom(mathrand.NewSource(int64(c.Seed))) + signingKey, _, err := crypto.GenerateEd25519Key(pseudoRandReader) + if err != nil { + panic("Failed to generate signing key") + } + + host, err := libp2p.New(libp2p.ListenAddrStrings(c.ListenMultiaddr), libp2p.Identity(signingKey), libp2p.AddrsFactory(newAddrsFactory(addressMapping)), libp2p.ResourceManager(network.NullResourceManager)) + if err != nil { + panic("Failed to start host" + err.Error()) + } + + var pub legs.Publisher + if c.IsHttp { + pub, err = legsHttp.NewPublisher(c.HttpListenAddr, lsys, host.ID(), signingKey) + } else { + pub, err = legsDT.NewPublisher(host, ds, lsys, c.GossipSubTopic) + + } + if err != nil { + panic("Failed to start legs publisher: " + err.Error()) + } + p := &providerLoadGen{ + indexerHttpAddr: indexerHttpAddr, + config: c, + signingKey: signingKey, + h: host, + lsys: lsys, + pub: pub, + } + + return p +} + +type providerLoadGen struct { + indexerHttpAddr string + config Config + signingKey crypto.PrivKey + h host.Host + lsys ipld.LinkSystem + pub legs.Publisher + // Keep track of the total number of entries we've created. There are a couple uses for this: + // * At the end we can tell the user how many entries we've created. + // * We can generate multihashes as a function of this number. E.g. the multihash is just an encoded version of the entry number. + entriesGenerated uint + adsGenerated uint + currentHead *ipld.Link + recordKeepingMu sync.Mutex +} + +func (p *providerLoadGen) announce() error { + client, err := ingesthttpclient.New(p.indexerHttpAddr) + if err != nil { + return err + } + + addrs := p.h.Addrs()[:1] + if p.config.IsHttp { + parts := strings.Split(p.config.HttpListenAddr, ":") + httpMultiaddr := `/ip4/` + parts[0] + `/tcp/` + parts[1] + `/http` + ma, err := multiaddr.NewMultiaddr(httpMultiaddr) + if err != nil { + return err + } + addrs = []multiaddr.Multiaddr{ma} + } + + if p.currentHead == nil { + return client.Announce(context.Background(), &peer.AddrInfo{ID: p.h.ID(), Addrs: addrs}, cid.Undef) + } + return client.Announce(context.Background(), &peer.AddrInfo{ID: p.h.ID(), Addrs: addrs}, (*p.currentHead).(cidlink.Link).Cid) +} + +func (p *providerLoadGen) announceInBackground() func() { + closer := make(chan struct{}) + t := time.NewTicker(2 * time.Second) + go func() { + for { + select { + case <-closer: + return + case <-t.C: + p.announce() + } + + } + }() + + return func() { + close(closer) + } +} + +func (p *providerLoadGen) runUpdater(afterFirstUpdate func()) func() { + closer := make(chan struct{}) + var callAfterFirstUpdate *sync.Once = &sync.Once{} + + go func() { + t := time.NewTicker(time.Second / time.Duration(p.config.AdsPerSec)) + for { + select { + case <-closer: + return + case <-t.C: + start := time.Now() + p.recordKeepingMu.Lock() + var addrs []string + for _, a := range p.h.Addrs() { + addrs = append(addrs, a.String()) + } + adBuilder := adBuilder{ + mhGenerator: p.mhGenerator, + entryCount: p.config.EntriesPerAdGenerator(), + entriesPerChunk: p.config.EntriesPerChunk, + isRm: false, + provider: p.h.ID().String(), + providerAddrs: addrs, + metadata: []byte(fmt.Sprintf("providerSeed=%d,entriesGenerated=%d", p.config.Seed, p.entriesGenerated)), + } + + nextAdHead, err := adBuilder.build(p.lsys, p.signingKey, p.currentHead) + if err != nil { + panic(fmt.Sprintf("Failed to build ad: %s", err)) + } + + p.currentHead = &nextAdHead + p.adsGenerated = p.adsGenerated + 1 + p.entriesGenerated = adBuilder.entryCount + p.entriesGenerated + fmt.Printf("ID=%d .Number of generated entries: %d\n", p.config.Seed, p.entriesGenerated) + + p.recordKeepingMu.Unlock() + + err = p.pub.UpdateRootWithAddrs(context.Background(), nextAdHead.(cidlink.Link).Cid, p.h.Addrs()) + if err != nil { + panic(fmt.Sprintf("Failed to publish ad: %s", err)) + } + + callAfterFirstUpdate.Do(afterFirstUpdate) + + fmt.Println("Published ad in", time.Since(start)) + + if p.config.StopAfterNEntries > 0 && p.entriesGenerated > uint(p.config.StopAfterNEntries) { + fmt.Printf("ID=%d finished\n", p.config.Seed) + return + } + } + } + }() + + return func() { + close(closer) + } +} + +func GenerateMH(nodeID uint64, entryNumber uint64) (multihash.Multihash, error) { + nodeIDVarInt := varint.ToUvarint(nodeID) + nVarInt := varint.ToUvarint(entryNumber) + b := append(nodeIDVarInt, nVarInt...) + // Identity hash for debugging + // return multihash.Sum(b, multihash.IDENTITY, -1) + return multihash.Sum(b, multihash.SHA2_256, -1) +} + +func (p *providerLoadGen) mhGenerator(entryNumberWithinAd uint) (multihash.Multihash, error) { + i := p.entriesGenerated + entryNumberWithinAd + return GenerateMH(uint64(p.config.Seed), uint64(i)) +} + +type adBuilder struct { + // mhGenerator defines how the multihash for this given entry + mhGenerator func(entryNumberWithinAd uint) (multihash.Multihash, error) + entryCount uint + entriesPerChunk uint + isRm bool + contextID uint + metadata []byte + provider string + providerAddrs []string +} + +func (b adBuilder) build(lsys ipld.LinkSystem, signingKey crypto.PrivKey, prevAd *ipld.Link) (ipld.Link, error) { + contextID := []byte(fmt.Sprintf("%d", b.contextID)) + metadata := b.metadata + var entriesLink *ipld.Link + if b.entryCount == 0 { + l := ipld.Link(schema.NoEntries) + entriesLink = &l + } else { + var allMhs []multihash.Multihash + for i := uint(0); i < b.entryCount; i++ { + mh, err := b.mhGenerator(i) + if err != nil { + return nil, err + } + allMhs = append(allMhs, mh) + } + for len(allMhs) > 0 { + splitIdx := len(allMhs) - int(b.entriesPerChunk) + if splitIdx < 0 { + splitIdx = 0 + } + mhChunk := allMhs[splitIdx:] + allMhs = allMhs[:splitIdx] + var err error + entriesNode, err := schema.EntryChunk{ + Entries: mhChunk, + Next: entriesLink, + }.ToNode() + if err != nil { + return nil, err + } + + entriesLinkV, err := lsys.Store(ipld.LinkContext{}, schema.Linkproto, entriesNode) + if err != nil { + return nil, err + } + if entriesLinkV != nil { + entriesLink = &entriesLinkV + } + } + } + ad := schema.Advertisement{ + PreviousID: prevAd, + Provider: b.provider, + Addresses: b.providerAddrs, + Entries: *entriesLink, + ContextID: contextID, + Metadata: metadata, + IsRm: b.isRm, + } + ad.Sign(signingKey) + + adNode, err := ad.ToNode() + if err != nil { + return nil, err + } + + adLink, err := lsys.Store(ipld.LinkContext{}, schema.Linkproto, adNode) + return adLink, err +} + +func newAddrsFactory(ipMapping map[string]string) libp2pconfig.AddrsFactory { + if ipMapping == nil { + ipMapping = map[string]string{} + } + return func(ms []multiaddr.Multiaddr) []multiaddr.Multiaddr { + var out []multiaddr.Multiaddr + for _, ma := range ms { + v, err := ma.ValueForProtocol(multiaddr.P_IP4) + if err != nil || ipMapping[v] == "" { + out = append(out, ma) + continue + } + + var mappedComponents []multiaddr.Multiaddr + multiaddr.ForEach(ma, func(c multiaddr.Component) bool { + if c.Protocol().Code == multiaddr.P_IP4 && ipMapping[c.Value()] != "" { + nextComponent, err := multiaddr.NewComponent(c.Protocol().Name, ipMapping[c.Value()]) + if err != nil { + panic("Failed to map multiaddr") + } + mappedComponents = append(mappedComponents, nextComponent) + + } else { + mappedComponents = append(mappedComponents, &c) + } + return true + }) + out = append(out, multiaddr.Join(mappedComponents...)) + } + return out + } +} diff --git a/command/loadgen/pseudorand.go b/command/loadgen/pseudorand.go new file mode 100644 index 000000000..bfd649c74 --- /dev/null +++ b/command/loadgen/pseudorand.go @@ -0,0 +1,22 @@ +package loadgen + +import ( + "io" + "math/rand" + mathrand "math/rand" +) + +type pr struct { + rand.Source +} + +func newPseudoRandReaderFrom(src mathrand.Source) io.Reader { + return &pr{src} +} + +func (r *pr) Read(p []byte) (n int, err error) { + for i := 0; i < len(p); i++ { + p[i] = byte(r.Int63()) + } + return len(p), nil +} diff --git a/command/loadgen_test.go b/command/loadgen_test.go new file mode 100644 index 000000000..55e1dd4b3 --- /dev/null +++ b/command/loadgen_test.go @@ -0,0 +1,110 @@ +package command + +import ( + "context" + "fmt" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/filecoin-project/storetheindex/command/loadgen" + "github.com/filecoin-project/storetheindex/config" + "github.com/stretchr/testify/require" + "github.com/urfave/cli/v2" +) + +const FinderAddr = "/ip4/127.0.0.1/tcp/13000" +const IngestAddr = "/ip4/127.0.0.1/tcp/13001" +const AdminAddr = "/ip4/127.0.0.1/tcp/13002" + +func TestSmallLoad(t *testing.T) { + ctx, cncl := context.WithTimeout(context.Background(), 60*time.Second) + defer cncl() + testLoadHelper(ctx, t, 1, 1000, false) +} + +func TestSmallLoadOverHTTP(t *testing.T) { + ctx, cncl := context.WithTimeout(context.Background(), 60*time.Second) + defer cncl() + testLoadHelper(ctx, t, 1, 1000, true) +} + +func TestLargeLoad(t *testing.T) { + ctx, cncl := context.WithTimeout(context.Background(), 180*time.Second) + defer cncl() + testLoadHelper(ctx, t, 10, 1000, false) +} + +func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) { + tempDir := t.TempDir() + os.Setenv(config.EnvDir, tempDir) + os.Setenv("STORETHEINDEX_LISTEN_FINDER", FinderAddr) + os.Setenv("STORETHEINDEX_LISTEN_ADMIN", AdminAddr) + os.Setenv("STORETHEINDEX_LISTEN_INGEST", IngestAddr) + os.Setenv("STORETHEINDE_PUBSUB_TOPIC", loadgen.DefaultConfig().GossipSubTopic) + + app := &cli.App{ + Name: "indexer", + Commands: []*cli.Command{ + InitCmd, + DaemonCmd, + LoadGenCmd, + LoadGenVerifyCmd, + }, + } + + err := app.RunContext(ctx, []string{"storetheindex", "init"}) + require.NoError(t, err) + + go func() { + app.RunContext(ctx, []string{"storetheindex", "daemon"}) + }() + + finderParts := strings.Split(FinderAddr, "/") + finderPort := finderParts[len(finderParts)-1] + + c := http.Client{} + for { + resp, err := c.Get("http://127.0.0.1:" + finderPort + "/health") + if err == nil && resp.StatusCode == 200 { + break + } + time.Sleep(100 * time.Millisecond) + } + + ingestParts := strings.Split(IngestAddr, "/") + ingestPort := ingestParts[len(ingestParts)-1] + go func() { + loadGenTest(ctx, "http://127.0.0.1:"+ingestPort, concurrentProviders, numberOfEntriesPerProvider, useHTTP) + }() + + foundAll := false +LOOP: + for { + err = app.RunContext(ctx, []string{"storetheindex", "loadgen-verify", "--indexerFind=http://127.0.0.1:" + finderPort, "--concurrentProviders=" + fmt.Sprint(concurrentProviders), "--maxEntryNumber=" + fmt.Sprint(numberOfEntriesPerProvider)}) + if err == nil { + foundAll = true + break + } + select { + case <-ctx.Done(): + break LOOP + case <-time.After(1 * time.Second): + } + } + + require.True(t, foundAll, "Did not find all entries") +} + +func loadGenTest(ctx context.Context, indexerAddr string, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) { + loadConfig := loadgen.DefaultConfig() + loadConfig.StopAfterNEntries = uint64(numberOfEntriesPerProvider) + loadConfig.IsHttp = useHTTP + + loadgen.StartLoadGen(ctx, loadConfig, loadgen.LoadGenOpts{ + IndexerAddr: indexerAddr, + ConcurrentProviders: concurrentProviders, + }) +} diff --git a/main.go b/main.go index 260597a7f..9b7b8bf21 100644 --- a/main.go +++ b/main.go @@ -47,6 +47,8 @@ func main() { command.ProvidersCmd, command.RegisterCmd, command.SyntheticCmd, + command.LoadGenCmd, + command.LoadGenVerifyCmd, }, } From 06cad444379474f102d6bb035d9b87a27e9895c5 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 8 Apr 2022 14:39:59 -0700 Subject: [PATCH 02/12] Slow down for CI --- command/loadgen_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/command/loadgen_test.go b/command/loadgen_test.go index 55e1dd4b3..c84403a7f 100644 --- a/command/loadgen_test.go +++ b/command/loadgen_test.go @@ -74,10 +74,12 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, time.Sleep(100 * time.Millisecond) } + // Sleep a bit for the ingester to finish spinning up (should be right after finder but CI runs slow) + time.Sleep(1 * time.Second) ingestParts := strings.Split(IngestAddr, "/") ingestPort := ingestParts[len(ingestParts)-1] go func() { - loadGenTest(ctx, "http://127.0.0.1:"+ingestPort, concurrentProviders, numberOfEntriesPerProvider, useHTTP) + startLoadgen(ctx, "http://127.0.0.1:"+ingestPort, concurrentProviders, numberOfEntriesPerProvider, useHTTP) }() foundAll := false @@ -98,7 +100,7 @@ LOOP: require.True(t, foundAll, "Did not find all entries") } -func loadGenTest(ctx context.Context, indexerAddr string, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) { +func startLoadgen(ctx context.Context, indexerAddr string, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) { loadConfig := loadgen.DefaultConfig() loadConfig.StopAfterNEntries = uint64(numberOfEntriesPerProvider) loadConfig.IsHttp = useHTTP From 1a15e7a78a9f76e3fc9f91992a00895e8ff4ca5f Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 8 Apr 2022 14:47:24 -0700 Subject: [PATCH 03/12] Close context when we return from helper in test --- command/loadgen_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/command/loadgen_test.go b/command/loadgen_test.go index c84403a7f..4b42bdf23 100644 --- a/command/loadgen_test.go +++ b/command/loadgen_test.go @@ -38,6 +38,10 @@ func TestLargeLoad(t *testing.T) { } func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) { + // Set up a context that is canceled when the command is interrupted + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tempDir := t.TempDir() os.Setenv(config.EnvDir, tempDir) os.Setenv("STORETHEINDEX_LISTEN_FINDER", FinderAddr) From c6f9e3ec2a2a3a21f50f0f55c7ea4a67a0a5a19a Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 8 Apr 2022 15:02:51 -0700 Subject: [PATCH 04/12] Skip e2e_test since it is unactionable --- e2e_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/e2e_test.go b/e2e_test.go index b73fbf056..ab82fe85f 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -118,6 +118,7 @@ func (e *e2eTestRunner) stop(cmd *exec.Cmd, timeout time.Duration) { } func TestEndToEndWithReferenceProvider(t *testing.T) { + t.Skip("Skipping because it's failing on CI but works locally. So it is unactionable. Before re-enabling this test, please try to make it reproducible. Ideally in a clean environment that is completely independent of the host machine.") switch runtime.GOOS { case "windows": t.Skip("skipping test on", runtime.GOOS) From d353fb24295b9f2316addfa10385fa5a62ed4559 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 12 Apr 2022 11:34:59 -0700 Subject: [PATCH 05/12] Add healthcheck to ingest --- command/loadgen_test.go | 8 ++++++++ server/ingest/http/handler.go | 10 ++++++++++ server/ingest/http/server.go | 2 ++ 3 files changed, 20 insertions(+) diff --git a/command/loadgen_test.go b/command/loadgen_test.go index 4b42bdf23..8d166bf39 100644 --- a/command/loadgen_test.go +++ b/command/loadgen_test.go @@ -82,6 +82,14 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, time.Sleep(1 * time.Second) ingestParts := strings.Split(IngestAddr, "/") ingestPort := ingestParts[len(ingestParts)-1] + for { + resp, err := c.Get("http://127.0.0.1:" + ingestPort + "/health") + if err == nil && resp.StatusCode == 200 { + break + } + time.Sleep(100 * time.Millisecond) + } + go func() { startLoadgen(ctx, "http://127.0.0.1:"+ingestPort, concurrentProviders, numberOfEntriesPerProvider, useHTTP) }() diff --git a/server/ingest/http/handler.go b/server/ingest/http/handler.go index 0ae8dccee..d4d61f6a7 100644 --- a/server/ingest/http/handler.go +++ b/server/ingest/http/handler.go @@ -1,6 +1,7 @@ package httpingestserver import ( + "encoding/json" "io" "net/http" @@ -9,6 +10,7 @@ import ( "github.com/ipni/storetheindex/internal/ingest" "github.com/ipni/storetheindex/internal/registry" "github.com/ipni/storetheindex/server/ingest/handler" + "github.com/ipni/storetheindex/version" ) type httpHandler struct { @@ -109,3 +111,11 @@ func (h *httpHandler) announce(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusNoContent) } + +// GET /health +func (h *httpHandler) health(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Cache-Control", "no-cache") + v := version.String() + b, _ := json.Marshal(v) + httpserver.WriteJsonResponse(w, http.StatusOK, b) +} diff --git a/server/ingest/http/server.go b/server/ingest/http/server.go index 3a81f68e3..25bb27acf 100644 --- a/server/ingest/http/server.go +++ b/server/ingest/http/server.go @@ -54,6 +54,8 @@ func New(listen string, indexer indexer.Interface, ingester *ingest.Ingester, re // Registration routes r.HandleFunc("/register", h.registerProvider).Methods(http.MethodPost) r.HandleFunc("/register/{providerid}", h.removeProvider).Methods(http.MethodDelete) + + r.HandleFunc("/health", h.health).Methods(http.MethodGet) return s, nil } From ab7c34d67fd5af30f751b0dc34203df955df9922 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 12 Apr 2022 11:36:36 -0700 Subject: [PATCH 06/12] Disable race detector tests for load test --- command/loadgen_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/command/loadgen_test.go b/command/loadgen_test.go index 8d166bf39..50ff25e64 100644 --- a/command/loadgen_test.go +++ b/command/loadgen_test.go @@ -1,3 +1,5 @@ +//go:build !race + package command import ( From 3b41c7b0c9084cba1e31a330efcdc4dbe9ded6a6 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 12 Apr 2022 13:30:36 -0700 Subject: [PATCH 07/12] Skip large load test in linux. Skip load tests on windows --- command/loadgen_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/command/loadgen_test.go b/command/loadgen_test.go index 50ff25e64..b757370dc 100644 --- a/command/loadgen_test.go +++ b/command/loadgen_test.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "os" + "runtime" "strings" "testing" "time" @@ -34,12 +35,22 @@ func TestSmallLoadOverHTTP(t *testing.T) { } func TestLargeLoad(t *testing.T) { + switch runtime.GOOS { + case "linux": + t.Skip("skipping Large load test on linux because it takes too long in Github Actions CI.") + } + ctx, cncl := context.WithTimeout(context.Background(), 180*time.Second) defer cncl() testLoadHelper(ctx, t, 10, 1000, false) } func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) { + switch runtime.GOOS { + case "windows": + t.Skip("skipping test on", runtime.GOOS) + } + // Set up a context that is canceled when the command is interrupted ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 2e54cb6f85ddce8ab06f9d38f6b70bee0547bf04 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 13 Apr 2022 09:05:33 -0700 Subject: [PATCH 08/12] Fix dual import --- command/loadgen/pseudorand.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/command/loadgen/pseudorand.go b/command/loadgen/pseudorand.go index bfd649c74..2debd2b4f 100644 --- a/command/loadgen/pseudorand.go +++ b/command/loadgen/pseudorand.go @@ -3,14 +3,13 @@ package loadgen import ( "io" "math/rand" - mathrand "math/rand" ) type pr struct { rand.Source } -func newPseudoRandReaderFrom(src mathrand.Source) io.Reader { +func newPseudoRandReaderFrom(src rand.Source) io.Reader { return &pr{src} } From 5dac00d034e9e652952ed94956a24472c53b3141 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 13 Apr 2022 10:49:07 -0700 Subject: [PATCH 09/12] Plumb external-address-mapping --- command/loadgen.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/command/loadgen.go b/command/loadgen.go index 24f107c56..82dfb0455 100644 --- a/command/loadgen.go +++ b/command/loadgen.go @@ -78,9 +78,10 @@ func loadGenCmd(cctx *cli.Context) error { } loadgen.StartLoadGen(cctx.Context, config, loadgen.LoadGenOpts{ - IndexerAddr: cctx.String("indexer"), - ConcurrentProviders: cctx.Uint("concurrentProviders"), - ListenForInterrupt: true, + IndexerAddr: cctx.String("indexer"), + ConcurrentProviders: cctx.Uint("concurrentProviders"), + ListenForInterrupt: true, + ExternalAddressMapping: parseKVs(cctx.String("external-address-mappping")), }) return nil } From 61eabf504b32ef8927d6e051fd1db611c597886a Mon Sep 17 00:00:00 2001 From: gammazero Date: Fri, 13 Jan 2023 11:10:12 -0800 Subject: [PATCH 10/12] resolve review issues --- command/loadgen.go | 4 +-- command/loadgen/loadgen.go | 71 ++++++++++++++++++++------------------ command/loadgen_test.go | 37 +++++++++++++++----- e2e_test.go | 1 - go.mod | 1 + go.sum | 2 ++ 6 files changed, 71 insertions(+), 45 deletions(-) diff --git a/command/loadgen.go b/command/loadgen.go index 82dfb0455..54da6b070 100644 --- a/command/loadgen.go +++ b/command/loadgen.go @@ -11,8 +11,8 @@ import ( "github.com/multiformats/go-multihash" "github.com/urfave/cli/v2" - httpfinderclient "github.com/filecoin-project/storetheindex/api/v0/finder/client/http" - "github.com/filecoin-project/storetheindex/command/loadgen" + httpfinderclient "github.com/ipni/storetheindex/api/v0/finder/client/http" + "github.com/ipni/storetheindex/command/loadgen" ) var LoadGenCmd = &cli.Command{ diff --git a/command/loadgen/loadgen.go b/command/loadgen/loadgen.go index 356ec14b0..d9ebfd35d 100644 --- a/command/loadgen/loadgen.go +++ b/command/loadgen/loadgen.go @@ -11,23 +11,22 @@ import ( mathrand "math/rand" - "github.com/filecoin-project/go-legs" - legsDT "github.com/filecoin-project/go-legs/dtsync" - legsHttp "github.com/filecoin-project/go-legs/httpsync" - ingesthttpclient "github.com/filecoin-project/storetheindex/api/v0/ingest/client/http" - "github.com/filecoin-project/storetheindex/api/v0/ingest/schema" - "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/storage/dsadapter" + ingesthttpclient "github.com/ipni/storetheindex/api/v0/ingest/client/http" + "github.com/ipni/storetheindex/api/v0/ingest/schema" + "github.com/ipni/storetheindex/dagsync" + "github.com/ipni/storetheindex/dagsync/dtsync" + "github.com/ipni/storetheindex/dagsync/httpsync" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" libp2pconfig "github.com/libp2p/go-libp2p/config" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" "github.com/multiformats/go-varint" @@ -94,12 +93,16 @@ func startProviderLoadGen(config Config, indexerHttpAddr string, addressMapping fmt.Printf("Provider seed=%d ID=%v\n", p.config.Seed, p.h.ID()) - stopGenAds = p.runUpdater(func() { - err := p.announce() - if err != nil { - panic("Failed to announce: " + err.Error()) + var afterEachUpdate func() + if !config.IsHttp { + afterEachUpdate = func() { + err := p.announce() + if err != nil { + panic("Failed to announce: " + err.Error()) + } } - }) + } + stopGenAds = p.runUpdater(afterEachUpdate) close = func() {} if config.IsHttp { @@ -133,15 +136,15 @@ func newProviderLoadGen(c Config, indexerHttpAddr string, addressMapping map[str panic("Failed to start host" + err.Error()) } - var pub legs.Publisher + var pub dagsync.Publisher if c.IsHttp { - pub, err = legsHttp.NewPublisher(c.HttpListenAddr, lsys, host.ID(), signingKey) + pub, err = httpsync.NewPublisher(c.HttpListenAddr, lsys, host.ID(), signingKey) } else { - pub, err = legsDT.NewPublisher(host, ds, lsys, c.GossipSubTopic) + pub, err = dtsync.NewPublisher(host, ds, lsys, c.GossipSubTopic) } if err != nil { - panic("Failed to start legs publisher: " + err.Error()) + panic("Failed to start publisher: " + err.Error()) } p := &providerLoadGen{ indexerHttpAddr: indexerHttpAddr, @@ -161,13 +164,13 @@ type providerLoadGen struct { signingKey crypto.PrivKey h host.Host lsys ipld.LinkSystem - pub legs.Publisher + pub dagsync.Publisher // Keep track of the total number of entries we've created. There are a couple uses for this: // * At the end we can tell the user how many entries we've created. // * We can generate multihashes as a function of this number. E.g. the multihash is just an encoded version of the entry number. entriesGenerated uint adsGenerated uint - currentHead *ipld.Link + currentHead ipld.Link recordKeepingMu sync.Mutex } @@ -189,9 +192,10 @@ func (p *providerLoadGen) announce() error { } if p.currentHead == nil { - return client.Announce(context.Background(), &peer.AddrInfo{ID: p.h.ID(), Addrs: addrs}, cid.Undef) + // Nothing to announce + return nil } - return client.Announce(context.Background(), &peer.AddrInfo{ID: p.h.ID(), Addrs: addrs}, (*p.currentHead).(cidlink.Link).Cid) + return client.Announce(context.Background(), &peer.AddrInfo{ID: p.h.ID(), Addrs: addrs}, (p.currentHead).(cidlink.Link).Cid) } func (p *providerLoadGen) announceInBackground() func() { @@ -214,9 +218,8 @@ func (p *providerLoadGen) announceInBackground() func() { } } -func (p *providerLoadGen) runUpdater(afterFirstUpdate func()) func() { +func (p *providerLoadGen) runUpdater(afterEachUpdate func()) func() { closer := make(chan struct{}) - var callAfterFirstUpdate *sync.Once = &sync.Once{} go func() { t := time.NewTicker(time.Second / time.Duration(p.config.AdsPerSec)) @@ -246,7 +249,7 @@ func (p *providerLoadGen) runUpdater(afterFirstUpdate func()) func() { panic(fmt.Sprintf("Failed to build ad: %s", err)) } - p.currentHead = &nextAdHead + p.currentHead = nextAdHead p.adsGenerated = p.adsGenerated + 1 p.entriesGenerated = adBuilder.entryCount + p.entriesGenerated fmt.Printf("ID=%d .Number of generated entries: %d\n", p.config.Seed, p.entriesGenerated) @@ -258,8 +261,9 @@ func (p *providerLoadGen) runUpdater(afterFirstUpdate func()) func() { panic(fmt.Sprintf("Failed to publish ad: %s", err)) } - callAfterFirstUpdate.Do(afterFirstUpdate) - + if afterEachUpdate != nil { + afterEachUpdate() + } fmt.Println("Published ad in", time.Since(start)) if p.config.StopAfterNEntries > 0 && p.entriesGenerated > uint(p.config.StopAfterNEntries) { @@ -301,13 +305,12 @@ type adBuilder struct { providerAddrs []string } -func (b adBuilder) build(lsys ipld.LinkSystem, signingKey crypto.PrivKey, prevAd *ipld.Link) (ipld.Link, error) { +func (b adBuilder) build(lsys ipld.LinkSystem, signingKey crypto.PrivKey, prevAd ipld.Link) (ipld.Link, error) { contextID := []byte(fmt.Sprintf("%d", b.contextID)) metadata := b.metadata - var entriesLink *ipld.Link + var entriesLink ipld.Link if b.entryCount == 0 { - l := ipld.Link(schema.NoEntries) - entriesLink = &l + entriesLink = ipld.Link(schema.NoEntries) } else { var allMhs []multihash.Multihash for i := uint(0); i < b.entryCount; i++ { @@ -338,7 +341,7 @@ func (b adBuilder) build(lsys ipld.LinkSystem, signingKey crypto.PrivKey, prevAd return nil, err } if entriesLinkV != nil { - entriesLink = &entriesLinkV + entriesLink = entriesLinkV } } } @@ -346,7 +349,7 @@ func (b adBuilder) build(lsys ipld.LinkSystem, signingKey crypto.PrivKey, prevAd PreviousID: prevAd, Provider: b.provider, Addresses: b.providerAddrs, - Entries: *entriesLink, + Entries: entriesLink, ContextID: contextID, Metadata: metadata, IsRm: b.isRm, diff --git a/command/loadgen_test.go b/command/loadgen_test.go index b757370dc..02ba991d8 100644 --- a/command/loadgen_test.go +++ b/command/loadgen_test.go @@ -12,8 +12,8 @@ import ( "testing" "time" - "github.com/filecoin-project/storetheindex/command/loadgen" - "github.com/filecoin-project/storetheindex/config" + "github.com/ipni/storetheindex/command/loadgen" + "github.com/ipni/storetheindex/config" "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" ) @@ -22,22 +22,30 @@ const FinderAddr = "/ip4/127.0.0.1/tcp/13000" const IngestAddr = "/ip4/127.0.0.1/tcp/13001" const AdminAddr = "/ip4/127.0.0.1/tcp/13002" -func TestSmallLoad(t *testing.T) { +func TestSmallLoadNoHTTP(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } ctx, cncl := context.WithTimeout(context.Background(), 60*time.Second) defer cncl() testLoadHelper(ctx, t, 1, 1000, false) } func TestSmallLoadOverHTTP(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } ctx, cncl := context.WithTimeout(context.Background(), 60*time.Second) defer cncl() testLoadHelper(ctx, t, 1, 1000, true) } func TestLargeLoad(t *testing.T) { - switch runtime.GOOS { - case "linux": - t.Skip("skipping Large load test on linux because it takes too long in Github Actions CI.") + if testing.Short() { + t.Skip("skipping test in short mode.") + } + if os.Getenv("CI") != "" { + t.Skip("Skipping testing in CI environment") } ctx, cncl := context.WithTimeout(context.Background(), 180*time.Second) @@ -52,7 +60,7 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, } // Set up a context that is canceled when the command is interrupted - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() tempDir := t.TempDir() @@ -75,8 +83,10 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, err := app.RunContext(ctx, []string{"storetheindex", "init"}) require.NoError(t, err) + daemonDone := make(chan struct{}) go func() { app.RunContext(ctx, []string{"storetheindex", "daemon"}) + close(daemonDone) }() finderParts := strings.Split(FinderAddr, "/") @@ -103,10 +113,13 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, time.Sleep(100 * time.Millisecond) } + loadgenDone := make(chan struct{}) go func() { startLoadgen(ctx, "http://127.0.0.1:"+ingestPort, concurrentProviders, numberOfEntriesPerProvider, useHTTP) + close(loadgenDone) }() + timer := time.NewTimer(time.Second) foundAll := false LOOP: for { @@ -118,11 +131,19 @@ LOOP: select { case <-ctx.Done(): break LOOP - case <-time.After(1 * time.Second): + case <-timer.C: + timer.Reset(time.Second) } } + timer.Stop() require.True(t, foundAll, "Did not find all entries") + + // Wait until loadgen and daemon are stopped so that tempDir can be removed + // on Windows. + cancel() + <-loadgenDone + <-daemonDone } func startLoadgen(ctx context.Context, indexerAddr string, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) { diff --git a/e2e_test.go b/e2e_test.go index ab82fe85f..b73fbf056 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -118,7 +118,6 @@ func (e *e2eTestRunner) stop(cmd *exec.Cmd, timeout time.Duration) { } func TestEndToEndWithReferenceProvider(t *testing.T) { - t.Skip("Skipping because it's failing on CI but works locally. So it is unactionable. Before re-enabling this test, please try to make it reproducible. Ideally in a clean environment that is completely independent of the host machine.") switch runtime.GOOS { case "windows": t.Skip("skipping test on", runtime.GOOS) diff --git a/go.mod b/go.mod index a7ca415a2..4c9717336 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/ipfs/kubo v0.17.0 github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 github.com/ipld/go-ipld-prime v0.19.0 + github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd github.com/ipld/go-storethehash v0.3.13 github.com/ipni/go-indexer-core v0.6.20-0.20230112154735-7b53be959fe6 github.com/libp2p/go-libp2p v0.23.4 diff --git a/go.sum b/go.sum index 57089fb41..47f48c21f 100644 --- a/go.sum +++ b/go.sum @@ -660,6 +660,8 @@ github.com/ipld/go-ipld-prime v0.17.0/go.mod h1:aYcKm5TIvGfY8P3QBKz/2gKcLxzJ1zDa github.com/ipld/go-ipld-prime v0.19.0 h1:5axC7rJmPc17Emw6TelxGwnzALk0PdupZ2oj2roDj04= github.com/ipld/go-ipld-prime v0.19.0/go.mod h1:Q9j3BaVXwaA3o5JUDNvptDDr/x8+F7FG6XJ8WI3ILg4= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY= +github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd h1:qdjo1CRvAQhOMoyYjPnbdZ5rYFFmqztweQ9KAsuWpO0= +github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd/go.mod h1:9DD/GM0JNPoisgR09F62kbBi7kHa4eDIea4XshXYOVc= github.com/ipld/go-storethehash v0.3.13 h1:1T6kX5K57lAgxbsGitZEaZMAR3RdWch2kO8okK/BgN8= github.com/ipld/go-storethehash v0.3.13/go.mod h1:KCYpzmamubnSwm7fvWcCkm0aIwQh4WRNtzrKK4pVhAQ= github.com/ipni/go-indexer-core v0.6.20-0.20230112154735-7b53be959fe6 h1:sn06bRrS1aAlH5WdeLsHxeHPArKlS46bvwjSIapBf1M= From 30a7809b30b60d66183436a052ee8ab2539f7c6b Mon Sep 17 00:00:00 2001 From: gammazero Date: Fri, 13 Jan 2023 14:36:59 -0800 Subject: [PATCH 11/12] Do not export consts --- command/loadgen_test.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/command/loadgen_test.go b/command/loadgen_test.go index 02ba991d8..71f175c5d 100644 --- a/command/loadgen_test.go +++ b/command/loadgen_test.go @@ -1,6 +1,6 @@ //go:build !race -package command +package command_test import ( "context" @@ -12,15 +12,18 @@ import ( "testing" "time" + "github.com/ipni/storetheindex/command" "github.com/ipni/storetheindex/command/loadgen" "github.com/ipni/storetheindex/config" "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" ) -const FinderAddr = "/ip4/127.0.0.1/tcp/13000" -const IngestAddr = "/ip4/127.0.0.1/tcp/13001" -const AdminAddr = "/ip4/127.0.0.1/tcp/13002" +const ( + finderAddr = "/ip4/127.0.0.1/tcp/13000" + ingestAddr = "/ip4/127.0.0.1/tcp/13001" + adminAddr = "/ip4/127.0.0.1/tcp/13002" +) func TestSmallLoadNoHTTP(t *testing.T) { if testing.Short() { @@ -65,18 +68,18 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, tempDir := t.TempDir() os.Setenv(config.EnvDir, tempDir) - os.Setenv("STORETHEINDEX_LISTEN_FINDER", FinderAddr) - os.Setenv("STORETHEINDEX_LISTEN_ADMIN", AdminAddr) - os.Setenv("STORETHEINDEX_LISTEN_INGEST", IngestAddr) + os.Setenv("STORETHEINDEX_LISTEN_FINDER", finderAddr) + os.Setenv("STORETHEINDEX_LISTEN_ADMIN", adminAddr) + os.Setenv("STORETHEINDEX_LISTEN_INGEST", ingestAddr) os.Setenv("STORETHEINDE_PUBSUB_TOPIC", loadgen.DefaultConfig().GossipSubTopic) app := &cli.App{ Name: "indexer", Commands: []*cli.Command{ - InitCmd, - DaemonCmd, - LoadGenCmd, - LoadGenVerifyCmd, + command.InitCmd, + command.DaemonCmd, + command.LoadGenCmd, + command.LoadGenVerifyCmd, }, } @@ -89,7 +92,7 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, close(daemonDone) }() - finderParts := strings.Split(FinderAddr, "/") + finderParts := strings.Split(finderAddr, "/") finderPort := finderParts[len(finderParts)-1] c := http.Client{} @@ -103,7 +106,7 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, // Sleep a bit for the ingester to finish spinning up (should be right after finder but CI runs slow) time.Sleep(1 * time.Second) - ingestParts := strings.Split(IngestAddr, "/") + ingestParts := strings.Split(ingestAddr, "/") ingestPort := ingestParts[len(ingestParts)-1] for { resp, err := c.Get("http://127.0.0.1:" + ingestPort + "/health") From 52b84af1d7e2f63b948d1e8104ea914ca519349c Mon Sep 17 00:00:00 2001 From: gammazero Date: Sat, 14 Jan 2023 00:09:45 -0800 Subject: [PATCH 12/12] Only run load tests if environ var set --- command/init_test.go | 3 +-- command/loadgen_test.go | 37 +++++++++++++++++++------------------ 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/command/init_test.go b/command/init_test.go index 2422d4c98..bc8f75042 100644 --- a/command/init_test.go +++ b/command/init_test.go @@ -3,7 +3,6 @@ package command import ( "context" "fmt" - "os" "testing" "github.com/ipni/storetheindex/config" @@ -16,7 +15,7 @@ func TestInit(t *testing.T) { defer cancel() tempDir := t.TempDir() - os.Setenv(config.EnvDir, tempDir) + t.Setenv(config.EnvDir, tempDir) app := &cli.App{ Name: "indexer", diff --git a/command/loadgen_test.go b/command/loadgen_test.go index 71f175c5d..c6754d941 100644 --- a/command/loadgen_test.go +++ b/command/loadgen_test.go @@ -26,36 +26,35 @@ const ( ) func TestSmallLoadNoHTTP(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } + skipUnlessLoadTest(t) + ctx, cncl := context.WithTimeout(context.Background(), 60*time.Second) defer cncl() testLoadHelper(ctx, t, 1, 1000, false) } func TestSmallLoadOverHTTP(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } + skipUnlessLoadTest(t) + ctx, cncl := context.WithTimeout(context.Background(), 60*time.Second) defer cncl() testLoadHelper(ctx, t, 1, 1000, true) } func TestLargeLoad(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - if os.Getenv("CI") != "" { - t.Skip("Skipping testing in CI environment") - } + skipUnlessLoadTest(t) ctx, cncl := context.WithTimeout(context.Background(), 180*time.Second) defer cncl() testLoadHelper(ctx, t, 10, 1000, false) } +func skipUnlessLoadTest(t *testing.T) { + if os.Getenv("STI_LOAD_TEST") == "" { + t.SkipNow() + } +} + func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) { switch runtime.GOOS { case "windows": @@ -67,11 +66,7 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, defer cancel() tempDir := t.TempDir() - os.Setenv(config.EnvDir, tempDir) - os.Setenv("STORETHEINDEX_LISTEN_FINDER", finderAddr) - os.Setenv("STORETHEINDEX_LISTEN_ADMIN", adminAddr) - os.Setenv("STORETHEINDEX_LISTEN_INGEST", ingestAddr) - os.Setenv("STORETHEINDE_PUBSUB_TOPIC", loadgen.DefaultConfig().GossipSubTopic) + t.Setenv(config.EnvDir, tempDir) app := &cli.App{ Name: "indexer", @@ -83,7 +78,13 @@ func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, }, } - err := app.RunContext(ctx, []string{"storetheindex", "init"}) + t.Log("---> tempDir:", tempDir) + err := app.RunContext(ctx, []string{"storetheindex", "init", "--no-bootstrap", + "--listen-admin", adminAddr, + "--listen-finder", finderAddr, + "--listen-ingest", ingestAddr, + "--pubsub-topic", loadgen.DefaultConfig().GossipSubTopic, + }) require.NoError(t, err) daemonDone := make(chan struct{})