From 77fb07b0cd8eee6eda400a90de2af549cb2b937f Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 16 Jan 2023 11:37:33 -0800 Subject: [PATCH] Add loadgen and loadgen-verify commands (#343) * Add loadgen and loadgen-verify commands These are commands that can be used to generate write load on a storage provider, and verify the Ads have been correctly ingested. Useful for both stress testing an indexer as well as verifying consistency. * Slow down for CI * Close context when we return from helper in test * Skip e2e_test since it is unactionable * Add healthcheck to ingest * Disable race detector tests for load test * Skip large load test in linux. Skip load tests on windows * Fix dual import * Plumb external-address-mapping * resolve review issues * Do not export consts * Only run load tests if environ var set Co-authored-by: gammazero --- command/init_test.go | 3 +- command/loadgen.go | 200 +++++++++++++++++ command/loadgen/config.go | 142 ++++++++++++ command/loadgen/config_test.go | 40 ++++ command/loadgen/loadgen.go | 399 +++++++++++++++++++++++++++++++++ command/loadgen/pseudorand.go | 21 ++ command/loadgen_test.go | 162 +++++++++++++ go.mod | 1 + go.sum | 2 + main.go | 2 + server/ingest/http/handler.go | 10 + server/ingest/http/server.go | 2 + 12 files changed, 982 insertions(+), 2 deletions(-) create mode 100644 command/loadgen.go create mode 100644 command/loadgen/config.go create mode 100644 command/loadgen/config_test.go create mode 100644 command/loadgen/loadgen.go create mode 100644 command/loadgen/pseudorand.go create mode 100644 command/loadgen_test.go diff --git a/command/init_test.go b/command/init_test.go index 2422d4c98..bc8f75042 100644 --- a/command/init_test.go +++ b/command/init_test.go @@ -3,7 +3,6 @@ package command import ( "context" "fmt" - "os" "testing" "github.com/ipni/storetheindex/config" @@ -16,7 +15,7 @@ func TestInit(t *testing.T) { defer cancel() tempDir := t.TempDir() - os.Setenv(config.EnvDir, tempDir) + t.Setenv(config.EnvDir, tempDir) app := &cli.App{ Name: "indexer", diff --git a/command/loadgen.go b/command/loadgen.go new file mode 100644 index 000000000..54da6b070 --- /dev/null +++ b/command/loadgen.go @@ -0,0 +1,200 @@ +package command + +import ( + "context" + "errors" + "fmt" + mathrand "math/rand" + "strings" + "time" + + "github.com/multiformats/go-multihash" + "github.com/urfave/cli/v2" + + httpfinderclient "github.com/ipni/storetheindex/api/v0/finder/client/http" + "github.com/ipni/storetheindex/command/loadgen" +) + +var LoadGenCmd = &cli.Command{ + Name: "loadgen", + Usage: "Generate fake provider load for the indexer", + Flags: loadGenFlags, + Action: loadGenCmd, +} + +var LoadGenVerifyCmd = &cli.Command{ + Name: "loadgen-verify", + Usage: "Generate fake provider load for the indexer", + Flags: loadGenVerifyFlags, + Action: loadGenVerifyCmd, +} + +var loadGenFlags = []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Usage: "Config file that defines the load generated", + Required: false, + }, + &cli.UintFlag{ + Name: "concurrentProviders", + Usage: "How many concurrent providers", + Required: false, + Value: 1, + }, + &cli.StringFlag{ + Name: "indexer", + Usage: "Indexer http address. Host or host:port", + EnvVars: []string{"STORETHEINDEX_LISTEN_INGEST"}, + Aliases: []string{"i"}, + Required: false, + Value: "http://localhost:3001", + }, + &cli.StringFlag{ + Name: "topic", + Usage: "Which topic to use for libp2p", + Value: loadgen.DefaultConfig().GossipSubTopic, + }, + &cli.StringFlag{ + Name: "external-address-mappping", + Usage: `localIP=externalIP,localIP2=externalIP2. + Map the local listening address to a known + external address. Useful when behind a NAT (like in an AWS ec2 instance). It + will use the external IP when communicating with other peers.`, + }, +} + +func loadGenCmd(cctx *cli.Context) error { + configFile := cctx.String("config") + config := loadgen.DefaultConfig() + if configFile != "" { + var err error + config, err = loadgen.LoadConfigFromFile(configFile) + if err != nil { + panic("Failed to load config file: " + err.Error()) + } + } + if cctx.IsSet("topic") { + config.GossipSubTopic = cctx.String("topic") + } + + loadgen.StartLoadGen(cctx.Context, config, loadgen.LoadGenOpts{ + IndexerAddr: cctx.String("indexer"), + ConcurrentProviders: cctx.Uint("concurrentProviders"), + ListenForInterrupt: true, + ExternalAddressMapping: parseKVs(cctx.String("external-address-mappping")), + }) + return nil +} + +var loadGenVerifyFlags = []cli.Flag{ + &cli.Uint64Flag{ + Name: "concurrentProviders", + Usage: "How many concurrent providers generated the load", + Value: 1, + Required: false, + }, + &cli.Uint64Flag{ + Name: "maxEntryNumber", + Usage: "How many entries were generated by the load test (per provider)", + Value: 1000, + Required: false, + }, + &cli.Uint64Flag{ + Name: "numberOfRandomQueries", + Usage: "How many queries to make in the address space (per provider).", + Value: 1000, + Required: false, + }, + &cli.StringFlag{ + Name: "indexerFind", + Usage: "HTTP Address of the indexer find endpoint e.g. http://localhost:3000", + EnvVars: []string{"STORETHEINDEX_LISTEN_FINDER_HTTP"}, + Required: false, + Value: "http://localhost:3000", + }, +} + +func loadGenVerifyCmd(cctx *cli.Context) error { + client, err := httpfinderclient.New(cctx.String("indexerFind")) + if err != nil { + return err + } + var allMhs []multihash.Multihash + // Map from provider id to entry number id to if the indexer has it + allMhsProviderEntryNumber := map[uint64]map[uint64]bool{} + mhToProviderEntryNumber := map[string]struct { + providerNumber uint64 + entryNumber uint64 + }{} + + numberOfMhsToQuery := cctx.Uint64("numberOfRandomQueries") + for i := uint64(0); i < cctx.Uint64("concurrentProviders"); i++ { + for j := uint64(0); j < numberOfMhsToQuery; j++ { + multihashIndex := uint64(mathrand.Int63n(int64(cctx.Uint64("maxEntryNumber")))) + mh, err := loadgen.GenerateMH(i, multihashIndex) + if err != nil { + return err + } + allMhs = append(allMhs, mh) + if allMhsProviderEntryNumber[i] == nil { + allMhsProviderEntryNumber[i] = map[uint64]bool{} + } + + allMhsProviderEntryNumber[i][multihashIndex] = false + mhToProviderEntryNumber[mh.B58String()] = struct { + providerNumber uint64 + entryNumber uint64 + }{i, multihashIndex} + } + } + + start := time.Now() + + resp, err := client.FindBatch(context.Background(), allMhs) + if err != nil { + return err + } + + for _, result := range resp.MultihashResults { + providerAndEntry := mhToProviderEntryNumber[result.Multihash.B58String()] + allMhsProviderEntryNumber[providerAndEntry.providerNumber][providerAndEntry.entryNumber] = true + } + + if len(allMhs) != len(resp.MultihashResults) { + limitToShow := 10 + for provider, entries := range allMhsProviderEntryNumber { + for entry, found := range entries { + if !found { + fmt.Printf("Missing: providerID=%d entryNumber=%d\n", provider, entry) + limitToShow-- + if limitToShow <= 0 { + break + } + } + } + } + } + + fmt.Printf("Found %d out of %d (%02d%%)\n", len(resp.MultihashResults), len(allMhs), int(float64(len(resp.MultihashResults))/float64(len(allMhs))*100)) + fmt.Println("Find took", time.Since(start)) + if len(allMhs) != len(resp.MultihashResults) { + return errors.New("not all mhs were found") + } + return nil +} + +// parseKVs converts a string of the form key=value,key2=value2 into a map[string]string +func parseKVs(kvs string) map[string]string { + out := map[string]string{} + if kvs == "" { + return out + } + kvSlice := strings.Split(kvs, ",") + for _, kv := range kvSlice { + parts := strings.Split(kv, "=") + k := parts[0] + v := parts[1] + out[k] = v + } + return out +} diff --git a/command/loadgen/config.go b/command/loadgen/config.go new file mode 100644 index 000000000..f8b4309dd --- /dev/null +++ b/command/loadgen/config.go @@ -0,0 +1,142 @@ +package loadgen + +import ( + "encoding/json" + "fmt" + "go/ast" + "go/constant" + "go/parser" + mathrand "math/rand" + "os" + "strconv" + "strings" +) + +type Config struct { + AdsPerSec uint `json:"adsPerSec"` + // A generator to specify how many entries per ad. + // A function so the caller can define a distribution to follow. + EntriesPerAdGenerator func() uint `json:"-"` + // For json to be able to use a predefined distribution. + EntriesPerAdType string `json:"entriesPerAdType"` + EntriesPerChunk uint `json:"entriesPerChunk"` + // Should this provider be an http provider? + IsHttp bool `json:"isHttp"` + HttpListenAddr string `json:"httpListenAddr"` + // How many of the last N ads should be kept. 0 means every ad is kept. + KeepNAds uint `json:"keepNAds"` + Seed uint64 `json:"seed"` + + StopAfterNEntries uint64 `json:"stopAfterNEntries"` + + ListenMultiaddr string `json:"listenMultiaddr"` + GossipSubTopic string `json:"gossipSubTopic"` +} + +func evalBasicLit(expr *ast.BasicLit) constant.Value { + return constant.MakeFromLiteral(expr.Value, expr.Kind, 0) +} + +func (c *Config) ParseEntriesPerAdGenerator() bool { + astV, _ := parser.ParseExpr(c.EntriesPerAdType) + distributionType, ok := astV.(*ast.CallExpr) + if !ok { + return false + } + switch distributionType.Fun.(*ast.Ident).Name { + case "Normal": + // Normal(stdev, mean) + sigma, ok := constant.Float64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit))) + if !ok { + return false + } + μ, ok := constant.Float64Val(evalBasicLit(distributionType.Args[1].(*ast.BasicLit))) + if !ok { + return false + } + c.EntriesPerAdGenerator = func() uint { + return uint(mathrand.NormFloat64()*sigma + μ) + } + case "Uniform": + // Uniform(start, end) + start, ok := constant.Int64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit))) + if !ok { + return false + } + end, ok := constant.Int64Val(evalBasicLit(distributionType.Args[1].(*ast.BasicLit))) + if !ok { + return false + } + c.EntriesPerAdGenerator = func() uint { + return uint(mathrand.Intn(int(end-start)) + int(start)) + } + case "Always": + // Always(value) + v, ok := constant.Uint64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit))) + if !ok { + return false + } + c.EntriesPerAdGenerator = func() uint { + return uint(v) + } + } + return true +} + +func DefaultConfig() Config { + return Config{ + AdsPerSec: 4, + EntriesPerAdGenerator: func() uint { + return uint(mathrand.NormFloat64()*10 + 70) + }, + EntriesPerChunk: 10, + IsHttp: false, + KeepNAds: 0, + Seed: 0, + StopAfterNEntries: 1000, + // The actual listen address will be this plus the seed for the port + ListenMultiaddr: "/ip4/127.0.0.1/tcp/18001", + HttpListenAddr: "127.0.0.1:19001", + GossipSubTopic: "indexer/ingest/loadtest", + } +} + +func incrementListenMultiaddrPortBy(ma string, n uint) (string, error) { + parts := strings.Split(ma, "/") + port, err := strconv.Atoi(parts[len(parts)-1]) + if err != nil { + return "", err + } + parts[len(parts)-1] = strconv.Itoa(port + int(n)) + return strings.Join(parts, "/"), nil +} + +func incrementHttpListenPortBy(ma string, n uint) (string, error) { + parts := strings.Split(ma, ":") + port, err := strconv.Atoi(parts[len(parts)-1]) + if err != nil { + return "", err + } + parts[len(parts)-1] = strconv.Itoa(port + int(n)) + return strings.Join(parts, ":"), nil +} + +func LoadConfigFromFile(file string) (Config, error) { + defaultConf := DefaultConfig() + b, err := os.ReadFile(file) + if err != nil { + return defaultConf, err + } + + c := &defaultConf + err = json.Unmarshal(b, c) + + if err != nil { + return defaultConf, err + } + + if !c.ParseEntriesPerAdGenerator() { + return defaultConf, fmt.Errorf("could not parse entries per ad generator") + } + return *c, nil +} diff --git a/command/loadgen/config_test.go b/command/loadgen/config_test.go new file mode 100644 index 000000000..1798a7031 --- /dev/null +++ b/command/loadgen/config_test.go @@ -0,0 +1,40 @@ +package loadgen + +import ( + "fmt" + "testing" +) + +func TestParseEntriesPerAdGenerator(t *testing.T) { + type testCase struct { + desc string + typ string + } + + testCases := []testCase{ + { + "Normal distribution. stdev=2, mean=100", + "Normal(2, 100)", + }, + { + "Uniform distribution. start=20, end=100", + "Uniform(20, 100)", + }, + { + "Always. val=20", + "Always(20)", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + c := &Config{ + EntriesPerAdType: tc.typ, + } + + c.ParseEntriesPerAdGenerator() + fmt.Println(c.EntriesPerAdGenerator()) + }) + } + +} diff --git a/command/loadgen/loadgen.go b/command/loadgen/loadgen.go new file mode 100644 index 000000000..d9ebfd35d --- /dev/null +++ b/command/loadgen/loadgen.go @@ -0,0 +1,399 @@ +package loadgen + +import ( + "context" + "fmt" + "os" + "os/signal" + "strings" + "sync" + "time" + + mathrand "math/rand" + + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/storage/dsadapter" + ingesthttpclient "github.com/ipni/storetheindex/api/v0/ingest/client/http" + "github.com/ipni/storetheindex/api/v0/ingest/schema" + "github.com/ipni/storetheindex/dagsync" + "github.com/ipni/storetheindex/dagsync/dtsync" + "github.com/ipni/storetheindex/dagsync/httpsync" + "github.com/libp2p/go-libp2p" + libp2pconfig "github.com/libp2p/go-libp2p/config" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multihash" + "github.com/multiformats/go-varint" +) + +type LoadGenOpts struct { + IndexerAddr string + ConcurrentProviders uint + ExternalAddressMapping map[string]string + ListenForInterrupt bool +} + +func StartLoadGen(ctx context.Context, loadConfig Config, loadGenOpts LoadGenOpts) { + stopPublishingFns := make([]func(), 0, loadGenOpts.ConcurrentProviders) + closeFns := make([]func(), 0, loadGenOpts.ConcurrentProviders) + + fmt.Println("Starting load generator", loadGenOpts) + for i := uint(0); i < loadGenOpts.ConcurrentProviders; i++ { + configCopy := loadConfig + configCopy.Seed = loadConfig.Seed + uint64(i) + var err error + configCopy.ListenMultiaddr, err = incrementListenMultiaddrPortBy(loadConfig.ListenMultiaddr, i) + if err != nil { + panic("Failed to increment listen multiaddr: " + err.Error()) + } + configCopy.HttpListenAddr, err = incrementHttpListenPortBy(loadConfig.HttpListenAddr, i) + if err != nil { + panic("Failed to increment http listen multiaddr: " + err.Error()) + } + fmt.Println("Config is ", configCopy) + stopPublishing, close, err := startProviderLoadGen(configCopy, loadGenOpts.IndexerAddr, loadGenOpts.ExternalAddressMapping) + if err != nil { + panic("Failed to start provider: " + err.Error()) + } + stopPublishingFns = append(stopPublishingFns, stopPublishing) + closeFns = append(closeFns, close) + } + + ch := make(chan os.Signal, 1) + if loadGenOpts.ListenForInterrupt { + signal.Notify(ch, os.Interrupt) + <-ch + } else { + <-ctx.Done() + } + for _, fn := range stopPublishingFns { + fn() + } + + fmt.Println("New publishing stopped. Hit ctrl-c again to exit.") + if loadGenOpts.ListenForInterrupt { + <-ch + } else { + <-ctx.Done() + } + + for _, fn := range closeFns { + fn() + } +} + +func startProviderLoadGen(config Config, indexerHttpAddr string, addressMapping map[string]string) (stopGenAds func(), close func(), err error) { + p := newProviderLoadGen(config, indexerHttpAddr, addressMapping) + + fmt.Printf("Provider seed=%d ID=%v\n", p.config.Seed, p.h.ID()) + + var afterEachUpdate func() + if !config.IsHttp { + afterEachUpdate = func() { + err := p.announce() + if err != nil { + panic("Failed to announce: " + err.Error()) + } + } + } + stopGenAds = p.runUpdater(afterEachUpdate) + + close = func() {} + if config.IsHttp { + close = p.announceInBackground() + } + fmt.Println("Started provider load generator") + fmt.Println("Peer ID:", p.h.ID().Pretty()) + fmt.Println("Addrs:", p.h.Addrs()) + fmt.Println() + + return stopGenAds, close, nil +} + +func newProviderLoadGen(c Config, indexerHttpAddr string, addressMapping map[string]string) *providerLoadGen { + lsys := cidlink.DefaultLinkSystem() + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + store := &dsadapter.Adapter{ + Wrapped: ds, + } + lsys.SetReadStorage(store) + lsys.SetWriteStorage(store) + + pseudoRandReader := newPseudoRandReaderFrom(mathrand.NewSource(int64(c.Seed))) + signingKey, _, err := crypto.GenerateEd25519Key(pseudoRandReader) + if err != nil { + panic("Failed to generate signing key") + } + + host, err := libp2p.New(libp2p.ListenAddrStrings(c.ListenMultiaddr), libp2p.Identity(signingKey), libp2p.AddrsFactory(newAddrsFactory(addressMapping)), libp2p.ResourceManager(network.NullResourceManager)) + if err != nil { + panic("Failed to start host" + err.Error()) + } + + var pub dagsync.Publisher + if c.IsHttp { + pub, err = httpsync.NewPublisher(c.HttpListenAddr, lsys, host.ID(), signingKey) + } else { + pub, err = dtsync.NewPublisher(host, ds, lsys, c.GossipSubTopic) + + } + if err != nil { + panic("Failed to start publisher: " + err.Error()) + } + p := &providerLoadGen{ + indexerHttpAddr: indexerHttpAddr, + config: c, + signingKey: signingKey, + h: host, + lsys: lsys, + pub: pub, + } + + return p +} + +type providerLoadGen struct { + indexerHttpAddr string + config Config + signingKey crypto.PrivKey + h host.Host + lsys ipld.LinkSystem + pub dagsync.Publisher + // Keep track of the total number of entries we've created. There are a couple uses for this: + // * At the end we can tell the user how many entries we've created. + // * We can generate multihashes as a function of this number. E.g. the multihash is just an encoded version of the entry number. + entriesGenerated uint + adsGenerated uint + currentHead ipld.Link + recordKeepingMu sync.Mutex +} + +func (p *providerLoadGen) announce() error { + client, err := ingesthttpclient.New(p.indexerHttpAddr) + if err != nil { + return err + } + + addrs := p.h.Addrs()[:1] + if p.config.IsHttp { + parts := strings.Split(p.config.HttpListenAddr, ":") + httpMultiaddr := `/ip4/` + parts[0] + `/tcp/` + parts[1] + `/http` + ma, err := multiaddr.NewMultiaddr(httpMultiaddr) + if err != nil { + return err + } + addrs = []multiaddr.Multiaddr{ma} + } + + if p.currentHead == nil { + // Nothing to announce + return nil + } + return client.Announce(context.Background(), &peer.AddrInfo{ID: p.h.ID(), Addrs: addrs}, (p.currentHead).(cidlink.Link).Cid) +} + +func (p *providerLoadGen) announceInBackground() func() { + closer := make(chan struct{}) + t := time.NewTicker(2 * time.Second) + go func() { + for { + select { + case <-closer: + return + case <-t.C: + p.announce() + } + + } + }() + + return func() { + close(closer) + } +} + +func (p *providerLoadGen) runUpdater(afterEachUpdate func()) func() { + closer := make(chan struct{}) + + go func() { + t := time.NewTicker(time.Second / time.Duration(p.config.AdsPerSec)) + for { + select { + case <-closer: + return + case <-t.C: + start := time.Now() + p.recordKeepingMu.Lock() + var addrs []string + for _, a := range p.h.Addrs() { + addrs = append(addrs, a.String()) + } + adBuilder := adBuilder{ + mhGenerator: p.mhGenerator, + entryCount: p.config.EntriesPerAdGenerator(), + entriesPerChunk: p.config.EntriesPerChunk, + isRm: false, + provider: p.h.ID().String(), + providerAddrs: addrs, + metadata: []byte(fmt.Sprintf("providerSeed=%d,entriesGenerated=%d", p.config.Seed, p.entriesGenerated)), + } + + nextAdHead, err := adBuilder.build(p.lsys, p.signingKey, p.currentHead) + if err != nil { + panic(fmt.Sprintf("Failed to build ad: %s", err)) + } + + p.currentHead = nextAdHead + p.adsGenerated = p.adsGenerated + 1 + p.entriesGenerated = adBuilder.entryCount + p.entriesGenerated + fmt.Printf("ID=%d .Number of generated entries: %d\n", p.config.Seed, p.entriesGenerated) + + p.recordKeepingMu.Unlock() + + err = p.pub.UpdateRootWithAddrs(context.Background(), nextAdHead.(cidlink.Link).Cid, p.h.Addrs()) + if err != nil { + panic(fmt.Sprintf("Failed to publish ad: %s", err)) + } + + if afterEachUpdate != nil { + afterEachUpdate() + } + fmt.Println("Published ad in", time.Since(start)) + + if p.config.StopAfterNEntries > 0 && p.entriesGenerated > uint(p.config.StopAfterNEntries) { + fmt.Printf("ID=%d finished\n", p.config.Seed) + return + } + } + } + }() + + return func() { + close(closer) + } +} + +func GenerateMH(nodeID uint64, entryNumber uint64) (multihash.Multihash, error) { + nodeIDVarInt := varint.ToUvarint(nodeID) + nVarInt := varint.ToUvarint(entryNumber) + b := append(nodeIDVarInt, nVarInt...) + // Identity hash for debugging + // return multihash.Sum(b, multihash.IDENTITY, -1) + return multihash.Sum(b, multihash.SHA2_256, -1) +} + +func (p *providerLoadGen) mhGenerator(entryNumberWithinAd uint) (multihash.Multihash, error) { + i := p.entriesGenerated + entryNumberWithinAd + return GenerateMH(uint64(p.config.Seed), uint64(i)) +} + +type adBuilder struct { + // mhGenerator defines how the multihash for this given entry + mhGenerator func(entryNumberWithinAd uint) (multihash.Multihash, error) + entryCount uint + entriesPerChunk uint + isRm bool + contextID uint + metadata []byte + provider string + providerAddrs []string +} + +func (b adBuilder) build(lsys ipld.LinkSystem, signingKey crypto.PrivKey, prevAd ipld.Link) (ipld.Link, error) { + contextID := []byte(fmt.Sprintf("%d", b.contextID)) + metadata := b.metadata + var entriesLink ipld.Link + if b.entryCount == 0 { + entriesLink = ipld.Link(schema.NoEntries) + } else { + var allMhs []multihash.Multihash + for i := uint(0); i < b.entryCount; i++ { + mh, err := b.mhGenerator(i) + if err != nil { + return nil, err + } + allMhs = append(allMhs, mh) + } + for len(allMhs) > 0 { + splitIdx := len(allMhs) - int(b.entriesPerChunk) + if splitIdx < 0 { + splitIdx = 0 + } + mhChunk := allMhs[splitIdx:] + allMhs = allMhs[:splitIdx] + var err error + entriesNode, err := schema.EntryChunk{ + Entries: mhChunk, + Next: entriesLink, + }.ToNode() + if err != nil { + return nil, err + } + + entriesLinkV, err := lsys.Store(ipld.LinkContext{}, schema.Linkproto, entriesNode) + if err != nil { + return nil, err + } + if entriesLinkV != nil { + entriesLink = entriesLinkV + } + } + } + ad := schema.Advertisement{ + PreviousID: prevAd, + Provider: b.provider, + Addresses: b.providerAddrs, + Entries: entriesLink, + ContextID: contextID, + Metadata: metadata, + IsRm: b.isRm, + } + ad.Sign(signingKey) + + adNode, err := ad.ToNode() + if err != nil { + return nil, err + } + + adLink, err := lsys.Store(ipld.LinkContext{}, schema.Linkproto, adNode) + return adLink, err +} + +func newAddrsFactory(ipMapping map[string]string) libp2pconfig.AddrsFactory { + if ipMapping == nil { + ipMapping = map[string]string{} + } + return func(ms []multiaddr.Multiaddr) []multiaddr.Multiaddr { + var out []multiaddr.Multiaddr + for _, ma := range ms { + v, err := ma.ValueForProtocol(multiaddr.P_IP4) + if err != nil || ipMapping[v] == "" { + out = append(out, ma) + continue + } + + var mappedComponents []multiaddr.Multiaddr + multiaddr.ForEach(ma, func(c multiaddr.Component) bool { + if c.Protocol().Code == multiaddr.P_IP4 && ipMapping[c.Value()] != "" { + nextComponent, err := multiaddr.NewComponent(c.Protocol().Name, ipMapping[c.Value()]) + if err != nil { + panic("Failed to map multiaddr") + } + mappedComponents = append(mappedComponents, nextComponent) + + } else { + mappedComponents = append(mappedComponents, &c) + } + return true + }) + out = append(out, multiaddr.Join(mappedComponents...)) + } + return out + } +} diff --git a/command/loadgen/pseudorand.go b/command/loadgen/pseudorand.go new file mode 100644 index 000000000..2debd2b4f --- /dev/null +++ b/command/loadgen/pseudorand.go @@ -0,0 +1,21 @@ +package loadgen + +import ( + "io" + "math/rand" +) + +type pr struct { + rand.Source +} + +func newPseudoRandReaderFrom(src rand.Source) io.Reader { + return &pr{src} +} + +func (r *pr) Read(p []byte) (n int, err error) { + for i := 0; i < len(p); i++ { + p[i] = byte(r.Int63()) + } + return len(p), nil +} diff --git a/command/loadgen_test.go b/command/loadgen_test.go new file mode 100644 index 000000000..c6754d941 --- /dev/null +++ b/command/loadgen_test.go @@ -0,0 +1,162 @@ +//go:build !race + +package command_test + +import ( + "context" + "fmt" + "net/http" + "os" + "runtime" + "strings" + "testing" + "time" + + "github.com/ipni/storetheindex/command" + "github.com/ipni/storetheindex/command/loadgen" + "github.com/ipni/storetheindex/config" + "github.com/stretchr/testify/require" + "github.com/urfave/cli/v2" +) + +const ( + finderAddr = "/ip4/127.0.0.1/tcp/13000" + ingestAddr = "/ip4/127.0.0.1/tcp/13001" + adminAddr = "/ip4/127.0.0.1/tcp/13002" +) + +func TestSmallLoadNoHTTP(t *testing.T) { + skipUnlessLoadTest(t) + + ctx, cncl := context.WithTimeout(context.Background(), 60*time.Second) + defer cncl() + testLoadHelper(ctx, t, 1, 1000, false) +} + +func TestSmallLoadOverHTTP(t *testing.T) { + skipUnlessLoadTest(t) + + ctx, cncl := context.WithTimeout(context.Background(), 60*time.Second) + defer cncl() + testLoadHelper(ctx, t, 1, 1000, true) +} + +func TestLargeLoad(t *testing.T) { + skipUnlessLoadTest(t) + + ctx, cncl := context.WithTimeout(context.Background(), 180*time.Second) + defer cncl() + testLoadHelper(ctx, t, 10, 1000, false) +} + +func skipUnlessLoadTest(t *testing.T) { + if os.Getenv("STI_LOAD_TEST") == "" { + t.SkipNow() + } +} + +func testLoadHelper(ctx context.Context, t *testing.T, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) { + switch runtime.GOOS { + case "windows": + t.Skip("skipping test on", runtime.GOOS) + } + + // Set up a context that is canceled when the command is interrupted + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + tempDir := t.TempDir() + t.Setenv(config.EnvDir, tempDir) + + app := &cli.App{ + Name: "indexer", + Commands: []*cli.Command{ + command.InitCmd, + command.DaemonCmd, + command.LoadGenCmd, + command.LoadGenVerifyCmd, + }, + } + + t.Log("---> tempDir:", tempDir) + err := app.RunContext(ctx, []string{"storetheindex", "init", "--no-bootstrap", + "--listen-admin", adminAddr, + "--listen-finder", finderAddr, + "--listen-ingest", ingestAddr, + "--pubsub-topic", loadgen.DefaultConfig().GossipSubTopic, + }) + require.NoError(t, err) + + daemonDone := make(chan struct{}) + go func() { + app.RunContext(ctx, []string{"storetheindex", "daemon"}) + close(daemonDone) + }() + + finderParts := strings.Split(finderAddr, "/") + finderPort := finderParts[len(finderParts)-1] + + c := http.Client{} + for { + resp, err := c.Get("http://127.0.0.1:" + finderPort + "/health") + if err == nil && resp.StatusCode == 200 { + break + } + time.Sleep(100 * time.Millisecond) + } + + // Sleep a bit for the ingester to finish spinning up (should be right after finder but CI runs slow) + time.Sleep(1 * time.Second) + ingestParts := strings.Split(ingestAddr, "/") + ingestPort := ingestParts[len(ingestParts)-1] + for { + resp, err := c.Get("http://127.0.0.1:" + ingestPort + "/health") + if err == nil && resp.StatusCode == 200 { + break + } + time.Sleep(100 * time.Millisecond) + } + + loadgenDone := make(chan struct{}) + go func() { + startLoadgen(ctx, "http://127.0.0.1:"+ingestPort, concurrentProviders, numberOfEntriesPerProvider, useHTTP) + close(loadgenDone) + }() + + timer := time.NewTimer(time.Second) + foundAll := false +LOOP: + for { + err = app.RunContext(ctx, []string{"storetheindex", "loadgen-verify", "--indexerFind=http://127.0.0.1:" + finderPort, "--concurrentProviders=" + fmt.Sprint(concurrentProviders), "--maxEntryNumber=" + fmt.Sprint(numberOfEntriesPerProvider)}) + if err == nil { + foundAll = true + break + } + select { + case <-ctx.Done(): + break LOOP + case <-timer.C: + timer.Reset(time.Second) + } + } + timer.Stop() + + require.True(t, foundAll, "Did not find all entries") + + // Wait until loadgen and daemon are stopped so that tempDir can be removed + // on Windows. + cancel() + <-loadgenDone + <-daemonDone +} + +func startLoadgen(ctx context.Context, indexerAddr string, concurrentProviders uint, numberOfEntriesPerProvider uint, useHTTP bool) { + loadConfig := loadgen.DefaultConfig() + loadConfig.StopAfterNEntries = uint64(numberOfEntriesPerProvider) + loadConfig.IsHttp = useHTTP + + loadgen.StartLoadGen(ctx, loadConfig, loadgen.LoadGenOpts{ + IndexerAddr: indexerAddr, + ConcurrentProviders: concurrentProviders, + }) +} diff --git a/go.mod b/go.mod index a7ca415a2..4c9717336 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/ipfs/kubo v0.17.0 github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 github.com/ipld/go-ipld-prime v0.19.0 + github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd github.com/ipld/go-storethehash v0.3.13 github.com/ipni/go-indexer-core v0.6.20-0.20230112154735-7b53be959fe6 github.com/libp2p/go-libp2p v0.23.4 diff --git a/go.sum b/go.sum index 57089fb41..47f48c21f 100644 --- a/go.sum +++ b/go.sum @@ -660,6 +660,8 @@ github.com/ipld/go-ipld-prime v0.17.0/go.mod h1:aYcKm5TIvGfY8P3QBKz/2gKcLxzJ1zDa github.com/ipld/go-ipld-prime v0.19.0 h1:5axC7rJmPc17Emw6TelxGwnzALk0PdupZ2oj2roDj04= github.com/ipld/go-ipld-prime v0.19.0/go.mod h1:Q9j3BaVXwaA3o5JUDNvptDDr/x8+F7FG6XJ8WI3ILg4= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY= +github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd h1:qdjo1CRvAQhOMoyYjPnbdZ5rYFFmqztweQ9KAsuWpO0= +github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd/go.mod h1:9DD/GM0JNPoisgR09F62kbBi7kHa4eDIea4XshXYOVc= github.com/ipld/go-storethehash v0.3.13 h1:1T6kX5K57lAgxbsGitZEaZMAR3RdWch2kO8okK/BgN8= github.com/ipld/go-storethehash v0.3.13/go.mod h1:KCYpzmamubnSwm7fvWcCkm0aIwQh4WRNtzrKK4pVhAQ= github.com/ipni/go-indexer-core v0.6.20-0.20230112154735-7b53be959fe6 h1:sn06bRrS1aAlH5WdeLsHxeHPArKlS46bvwjSIapBf1M= diff --git a/main.go b/main.go index 260597a7f..9b7b8bf21 100644 --- a/main.go +++ b/main.go @@ -47,6 +47,8 @@ func main() { command.ProvidersCmd, command.RegisterCmd, command.SyntheticCmd, + command.LoadGenCmd, + command.LoadGenVerifyCmd, }, } diff --git a/server/ingest/http/handler.go b/server/ingest/http/handler.go index 0ae8dccee..d4d61f6a7 100644 --- a/server/ingest/http/handler.go +++ b/server/ingest/http/handler.go @@ -1,6 +1,7 @@ package httpingestserver import ( + "encoding/json" "io" "net/http" @@ -9,6 +10,7 @@ import ( "github.com/ipni/storetheindex/internal/ingest" "github.com/ipni/storetheindex/internal/registry" "github.com/ipni/storetheindex/server/ingest/handler" + "github.com/ipni/storetheindex/version" ) type httpHandler struct { @@ -109,3 +111,11 @@ func (h *httpHandler) announce(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusNoContent) } + +// GET /health +func (h *httpHandler) health(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Cache-Control", "no-cache") + v := version.String() + b, _ := json.Marshal(v) + httpserver.WriteJsonResponse(w, http.StatusOK, b) +} diff --git a/server/ingest/http/server.go b/server/ingest/http/server.go index 3a81f68e3..25bb27acf 100644 --- a/server/ingest/http/server.go +++ b/server/ingest/http/server.go @@ -54,6 +54,8 @@ func New(listen string, indexer indexer.Interface, ingester *ingest.Ingester, re // Registration routes r.HandleFunc("/register", h.registerProvider).Methods(http.MethodPost) r.HandleFunc("/register/{providerid}", h.removeProvider).Methods(http.MethodDelete) + + r.HandleFunc("/health", h.health).Methods(http.MethodGet) return s, nil }