Skip to content

Commit

Permalink
Add loadgen and loadgen-verify commands (#343)
Browse files Browse the repository at this point in the history
* Add loadgen and loadgen-verify commands

These are commands that can be used to generate write load on a storage
provider, and verify the Ads have been correctly ingested.

Useful for both stress testing an indexer as well as verifying
consistency.

* Slow down for CI
* Close context when we return from helper in test
* Skip e2e_test since it is unactionable
* Add healthcheck to ingest
* Disable race detector tests for load test
* Skip large load test in linux. Skip load tests on windows
* Fix dual import
* Plumb external-address-mapping
* resolve review issues
* Do not export consts
* Only run load tests if environ var set

Co-authored-by: gammazero <gammazero@users.noreply.github.com>
  • Loading branch information
MarcoPolo and gammazero committed Jan 16, 2023
1 parent ff6fb7e commit 77fb07b
Show file tree
Hide file tree
Showing 12 changed files with 982 additions and 2 deletions.
3 changes: 1 addition & 2 deletions command/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package command
import (
"context"
"fmt"
"os"
"testing"

"github.com/ipni/storetheindex/config"
Expand All @@ -16,7 +15,7 @@ func TestInit(t *testing.T) {
defer cancel()

tempDir := t.TempDir()
os.Setenv(config.EnvDir, tempDir)
t.Setenv(config.EnvDir, tempDir)

app := &cli.App{
Name: "indexer",
Expand Down
200 changes: 200 additions & 0 deletions command/loadgen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package command

import (
"context"
"errors"
"fmt"
mathrand "math/rand"
"strings"
"time"

"github.com/multiformats/go-multihash"
"github.com/urfave/cli/v2"

httpfinderclient "github.com/ipni/storetheindex/api/v0/finder/client/http"
"github.com/ipni/storetheindex/command/loadgen"
)

var LoadGenCmd = &cli.Command{
Name: "loadgen",
Usage: "Generate fake provider load for the indexer",
Flags: loadGenFlags,
Action: loadGenCmd,
}

var LoadGenVerifyCmd = &cli.Command{
Name: "loadgen-verify",
Usage: "Generate fake provider load for the indexer",
Flags: loadGenVerifyFlags,
Action: loadGenVerifyCmd,
}

var loadGenFlags = []cli.Flag{
&cli.StringFlag{
Name: "config",
Usage: "Config file that defines the load generated",
Required: false,
},
&cli.UintFlag{
Name: "concurrentProviders",
Usage: "How many concurrent providers",
Required: false,
Value: 1,
},
&cli.StringFlag{
Name: "indexer",
Usage: "Indexer http address. Host or host:port",
EnvVars: []string{"STORETHEINDEX_LISTEN_INGEST"},
Aliases: []string{"i"},
Required: false,
Value: "http://localhost:3001",
},
&cli.StringFlag{
Name: "topic",
Usage: "Which topic to use for libp2p",
Value: loadgen.DefaultConfig().GossipSubTopic,
},
&cli.StringFlag{
Name: "external-address-mappping",
Usage: `localIP=externalIP,localIP2=externalIP2.
Map the local listening address to a known
external address. Useful when behind a NAT (like in an AWS ec2 instance). It
will use the external IP when communicating with other peers.`,
},
}

func loadGenCmd(cctx *cli.Context) error {
configFile := cctx.String("config")
config := loadgen.DefaultConfig()
if configFile != "" {
var err error
config, err = loadgen.LoadConfigFromFile(configFile)
if err != nil {
panic("Failed to load config file: " + err.Error())
}
}
if cctx.IsSet("topic") {
config.GossipSubTopic = cctx.String("topic")
}

loadgen.StartLoadGen(cctx.Context, config, loadgen.LoadGenOpts{
IndexerAddr: cctx.String("indexer"),
ConcurrentProviders: cctx.Uint("concurrentProviders"),
ListenForInterrupt: true,
ExternalAddressMapping: parseKVs(cctx.String("external-address-mappping")),
})
return nil
}

var loadGenVerifyFlags = []cli.Flag{
&cli.Uint64Flag{
Name: "concurrentProviders",
Usage: "How many concurrent providers generated the load",
Value: 1,
Required: false,
},
&cli.Uint64Flag{
Name: "maxEntryNumber",
Usage: "How many entries were generated by the load test (per provider)",
Value: 1000,
Required: false,
},
&cli.Uint64Flag{
Name: "numberOfRandomQueries",
Usage: "How many queries to make in the address space (per provider).",
Value: 1000,
Required: false,
},
&cli.StringFlag{
Name: "indexerFind",
Usage: "HTTP Address of the indexer find endpoint e.g. http://localhost:3000",
EnvVars: []string{"STORETHEINDEX_LISTEN_FINDER_HTTP"},
Required: false,
Value: "http://localhost:3000",
},
}

func loadGenVerifyCmd(cctx *cli.Context) error {
client, err := httpfinderclient.New(cctx.String("indexerFind"))
if err != nil {
return err
}
var allMhs []multihash.Multihash
// Map from provider id to entry number id to if the indexer has it
allMhsProviderEntryNumber := map[uint64]map[uint64]bool{}
mhToProviderEntryNumber := map[string]struct {
providerNumber uint64
entryNumber uint64
}{}

numberOfMhsToQuery := cctx.Uint64("numberOfRandomQueries")
for i := uint64(0); i < cctx.Uint64("concurrentProviders"); i++ {
for j := uint64(0); j < numberOfMhsToQuery; j++ {
multihashIndex := uint64(mathrand.Int63n(int64(cctx.Uint64("maxEntryNumber"))))
mh, err := loadgen.GenerateMH(i, multihashIndex)
if err != nil {
return err
}
allMhs = append(allMhs, mh)
if allMhsProviderEntryNumber[i] == nil {
allMhsProviderEntryNumber[i] = map[uint64]bool{}
}

allMhsProviderEntryNumber[i][multihashIndex] = false
mhToProviderEntryNumber[mh.B58String()] = struct {
providerNumber uint64
entryNumber uint64
}{i, multihashIndex}
}
}

start := time.Now()

resp, err := client.FindBatch(context.Background(), allMhs)
if err != nil {
return err
}

for _, result := range resp.MultihashResults {
providerAndEntry := mhToProviderEntryNumber[result.Multihash.B58String()]
allMhsProviderEntryNumber[providerAndEntry.providerNumber][providerAndEntry.entryNumber] = true
}

if len(allMhs) != len(resp.MultihashResults) {
limitToShow := 10
for provider, entries := range allMhsProviderEntryNumber {
for entry, found := range entries {
if !found {
fmt.Printf("Missing: providerID=%d entryNumber=%d\n", provider, entry)
limitToShow--
if limitToShow <= 0 {
break
}
}
}
}
}

fmt.Printf("Found %d out of %d (%02d%%)\n", len(resp.MultihashResults), len(allMhs), int(float64(len(resp.MultihashResults))/float64(len(allMhs))*100))
fmt.Println("Find took", time.Since(start))
if len(allMhs) != len(resp.MultihashResults) {
return errors.New("not all mhs were found")
}
return nil
}

// parseKVs converts a string of the form key=value,key2=value2 into a map[string]string
func parseKVs(kvs string) map[string]string {
out := map[string]string{}
if kvs == "" {
return out
}
kvSlice := strings.Split(kvs, ",")
for _, kv := range kvSlice {
parts := strings.Split(kv, "=")
k := parts[0]
v := parts[1]
out[k] = v
}
return out
}
142 changes: 142 additions & 0 deletions command/loadgen/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package loadgen

import (
"encoding/json"
"fmt"
"go/ast"
"go/constant"
"go/parser"
mathrand "math/rand"
"os"
"strconv"
"strings"
)

type Config struct {
AdsPerSec uint `json:"adsPerSec"`
// A generator to specify how many entries per ad.
// A function so the caller can define a distribution to follow.
EntriesPerAdGenerator func() uint `json:"-"`
// For json to be able to use a predefined distribution.
EntriesPerAdType string `json:"entriesPerAdType"`
EntriesPerChunk uint `json:"entriesPerChunk"`
// Should this provider be an http provider?
IsHttp bool `json:"isHttp"`
HttpListenAddr string `json:"httpListenAddr"`
// How many of the last N ads should be kept. 0 means every ad is kept.
KeepNAds uint `json:"keepNAds"`
Seed uint64 `json:"seed"`

StopAfterNEntries uint64 `json:"stopAfterNEntries"`

ListenMultiaddr string `json:"listenMultiaddr"`
GossipSubTopic string `json:"gossipSubTopic"`
}

func evalBasicLit(expr *ast.BasicLit) constant.Value {
return constant.MakeFromLiteral(expr.Value, expr.Kind, 0)
}

func (c *Config) ParseEntriesPerAdGenerator() bool {
astV, _ := parser.ParseExpr(c.EntriesPerAdType)
distributionType, ok := astV.(*ast.CallExpr)
if !ok {
return false
}
switch distributionType.Fun.(*ast.Ident).Name {
case "Normal":
// Normal(stdev, mean)
sigma, ok := constant.Float64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit)))
if !ok {
return false
}
μ, ok := constant.Float64Val(evalBasicLit(distributionType.Args[1].(*ast.BasicLit)))
if !ok {
return false
}
c.EntriesPerAdGenerator = func() uint {
return uint(mathrand.NormFloat64()*sigma + μ)
}
case "Uniform":
// Uniform(start, end)
start, ok := constant.Int64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit)))
if !ok {
return false
}
end, ok := constant.Int64Val(evalBasicLit(distributionType.Args[1].(*ast.BasicLit)))
if !ok {
return false
}
c.EntriesPerAdGenerator = func() uint {
return uint(mathrand.Intn(int(end-start)) + int(start))
}
case "Always":
// Always(value)
v, ok := constant.Uint64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit)))
if !ok {
return false
}
c.EntriesPerAdGenerator = func() uint {
return uint(v)
}
}
return true
}

func DefaultConfig() Config {
return Config{
AdsPerSec: 4,
EntriesPerAdGenerator: func() uint {
return uint(mathrand.NormFloat64()*10 + 70)
},
EntriesPerChunk: 10,
IsHttp: false,
KeepNAds: 0,
Seed: 0,
StopAfterNEntries: 1000,
// The actual listen address will be this plus the seed for the port
ListenMultiaddr: "/ip4/127.0.0.1/tcp/18001",
HttpListenAddr: "127.0.0.1:19001",
GossipSubTopic: "indexer/ingest/loadtest",
}
}

func incrementListenMultiaddrPortBy(ma string, n uint) (string, error) {
parts := strings.Split(ma, "/")
port, err := strconv.Atoi(parts[len(parts)-1])
if err != nil {
return "", err
}
parts[len(parts)-1] = strconv.Itoa(port + int(n))
return strings.Join(parts, "/"), nil
}

func incrementHttpListenPortBy(ma string, n uint) (string, error) {
parts := strings.Split(ma, ":")
port, err := strconv.Atoi(parts[len(parts)-1])
if err != nil {
return "", err
}
parts[len(parts)-1] = strconv.Itoa(port + int(n))
return strings.Join(parts, ":"), nil
}

func LoadConfigFromFile(file string) (Config, error) {
defaultConf := DefaultConfig()
b, err := os.ReadFile(file)
if err != nil {
return defaultConf, err
}

c := &defaultConf
err = json.Unmarshal(b, c)

if err != nil {
return defaultConf, err
}

if !c.ParseEntriesPerAdGenerator() {
return defaultConf, fmt.Errorf("could not parse entries per ad generator")
}
return *c, nil
}
Loading

0 comments on commit 77fb07b

Please sign in to comment.