Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add loadgen and loadgen-verify commands #343

Merged
merged 12 commits into from
Jan 16, 2023
3 changes: 1 addition & 2 deletions command/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package command
import (
"context"
"fmt"
"os"
"testing"

"github.com/ipni/storetheindex/config"
Expand All @@ -16,7 +15,7 @@ func TestInit(t *testing.T) {
defer cancel()

tempDir := t.TempDir()
os.Setenv(config.EnvDir, tempDir)
t.Setenv(config.EnvDir, tempDir)

app := &cli.App{
Name: "indexer",
Expand Down
200 changes: 200 additions & 0 deletions command/loadgen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package command

import (
"context"
"errors"
"fmt"
mathrand "math/rand"
"strings"
"time"

"github.com/multiformats/go-multihash"
"github.com/urfave/cli/v2"

httpfinderclient "github.com/ipni/storetheindex/api/v0/finder/client/http"
"github.com/ipni/storetheindex/command/loadgen"
)

var LoadGenCmd = &cli.Command{
Name: "loadgen",
Usage: "Generate fake provider load for the indexer",
Flags: loadGenFlags,
Action: loadGenCmd,
}

var LoadGenVerifyCmd = &cli.Command{
Name: "loadgen-verify",
Usage: "Generate fake provider load for the indexer",
Flags: loadGenVerifyFlags,
Action: loadGenVerifyCmd,
}

var loadGenFlags = []cli.Flag{
&cli.StringFlag{
Name: "config",
Usage: "Config file that defines the load generated",
Required: false,
},
&cli.UintFlag{
Name: "concurrentProviders",
Usage: "How many concurrent providers",
Required: false,
Value: 1,
},
&cli.StringFlag{
Name: "indexer",
Usage: "Indexer http address. Host or host:port",
EnvVars: []string{"STORETHEINDEX_LISTEN_INGEST"},
Aliases: []string{"i"},
Required: false,
Value: "http://localhost:3001",
},
&cli.StringFlag{
Name: "topic",
Usage: "Which topic to use for libp2p",
Value: loadgen.DefaultConfig().GossipSubTopic,
},
&cli.StringFlag{
Name: "external-address-mappping",
Usage: `localIP=externalIP,localIP2=externalIP2.
Map the local listening address to a known
external address. Useful when behind a NAT (like in an AWS ec2 instance). It
will use the external IP when communicating with other peers.`,
},
}

func loadGenCmd(cctx *cli.Context) error {
configFile := cctx.String("config")
config := loadgen.DefaultConfig()
if configFile != "" {
var err error
config, err = loadgen.LoadConfigFromFile(configFile)
if err != nil {
panic("Failed to load config file: " + err.Error())
}
}
if cctx.IsSet("topic") {
config.GossipSubTopic = cctx.String("topic")
}

loadgen.StartLoadGen(cctx.Context, config, loadgen.LoadGenOpts{
IndexerAddr: cctx.String("indexer"),
ConcurrentProviders: cctx.Uint("concurrentProviders"),
ListenForInterrupt: true,
ExternalAddressMapping: parseKVs(cctx.String("external-address-mappping")),
})
return nil
}

var loadGenVerifyFlags = []cli.Flag{
&cli.Uint64Flag{
Name: "concurrentProviders",
Usage: "How many concurrent providers generated the load",
Value: 1,
Required: false,
},
&cli.Uint64Flag{
Name: "maxEntryNumber",
Usage: "How many entries were generated by the load test (per provider)",
Value: 1000,
Required: false,
},
&cli.Uint64Flag{
Name: "numberOfRandomQueries",
Usage: "How many queries to make in the address space (per provider).",
Value: 1000,
Required: false,
},
&cli.StringFlag{
Name: "indexerFind",
Usage: "HTTP Address of the indexer find endpoint e.g. http://localhost:3000",
EnvVars: []string{"STORETHEINDEX_LISTEN_FINDER_HTTP"},
Required: false,
Value: "http://localhost:3000",
},
}

func loadGenVerifyCmd(cctx *cli.Context) error {
client, err := httpfinderclient.New(cctx.String("indexerFind"))
if err != nil {
return err
}
var allMhs []multihash.Multihash
// Map from provider id to entry number id to if the indexer has it
allMhsProviderEntryNumber := map[uint64]map[uint64]bool{}
mhToProviderEntryNumber := map[string]struct {
providerNumber uint64
entryNumber uint64
}{}

numberOfMhsToQuery := cctx.Uint64("numberOfRandomQueries")
for i := uint64(0); i < cctx.Uint64("concurrentProviders"); i++ {
for j := uint64(0); j < numberOfMhsToQuery; j++ {
multihashIndex := uint64(mathrand.Int63n(int64(cctx.Uint64("maxEntryNumber"))))
mh, err := loadgen.GenerateMH(i, multihashIndex)
if err != nil {
return err
}
allMhs = append(allMhs, mh)
if allMhsProviderEntryNumber[i] == nil {
allMhsProviderEntryNumber[i] = map[uint64]bool{}
}

allMhsProviderEntryNumber[i][multihashIndex] = false
mhToProviderEntryNumber[mh.B58String()] = struct {
providerNumber uint64
entryNumber uint64
}{i, multihashIndex}
}
}

start := time.Now()

resp, err := client.FindBatch(context.Background(), allMhs)
if err != nil {
return err
}

for _, result := range resp.MultihashResults {
providerAndEntry := mhToProviderEntryNumber[result.Multihash.B58String()]
allMhsProviderEntryNumber[providerAndEntry.providerNumber][providerAndEntry.entryNumber] = true
}

if len(allMhs) != len(resp.MultihashResults) {
limitToShow := 10
for provider, entries := range allMhsProviderEntryNumber {
for entry, found := range entries {
if !found {
fmt.Printf("Missing: providerID=%d entryNumber=%d\n", provider, entry)
limitToShow--
if limitToShow <= 0 {
break
}
}
}
}
}

fmt.Printf("Found %d out of %d (%02d%%)\n", len(resp.MultihashResults), len(allMhs), int(float64(len(resp.MultihashResults))/float64(len(allMhs))*100))
fmt.Println("Find took", time.Since(start))
if len(allMhs) != len(resp.MultihashResults) {
return errors.New("not all mhs were found")
}
return nil
}

// parseKVs converts a string of the form key=value,key2=value2 into a map[string]string
func parseKVs(kvs string) map[string]string {
out := map[string]string{}
if kvs == "" {
return out
}
kvSlice := strings.Split(kvs, ",")
for _, kv := range kvSlice {
parts := strings.Split(kv, "=")
k := parts[0]
v := parts[1]
out[k] = v
}
return out
}
142 changes: 142 additions & 0 deletions command/loadgen/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package loadgen

import (
"encoding/json"
"fmt"
"go/ast"
"go/constant"
"go/parser"
mathrand "math/rand"
"os"
"strconv"
"strings"
)

type Config struct {
AdsPerSec uint `json:"adsPerSec"`
// A generator to specify how many entries per ad.
// A function so the caller can define a distribution to follow.
EntriesPerAdGenerator func() uint `json:"-"`
// For json to be able to use a predefined distribution.
EntriesPerAdType string `json:"entriesPerAdType"`
EntriesPerChunk uint `json:"entriesPerChunk"`
// Should this provider be an http provider?
IsHttp bool `json:"isHttp"`
HttpListenAddr string `json:"httpListenAddr"`
// How many of the last N ads should be kept. 0 means every ad is kept.
KeepNAds uint `json:"keepNAds"`
Seed uint64 `json:"seed"`

StopAfterNEntries uint64 `json:"stopAfterNEntries"`

ListenMultiaddr string `json:"listenMultiaddr"`
GossipSubTopic string `json:"gossipSubTopic"`
}

func evalBasicLit(expr *ast.BasicLit) constant.Value {
return constant.MakeFromLiteral(expr.Value, expr.Kind, 0)
}

func (c *Config) ParseEntriesPerAdGenerator() bool {
astV, _ := parser.ParseExpr(c.EntriesPerAdType)
distributionType, ok := astV.(*ast.CallExpr)
if !ok {
return false
}
switch distributionType.Fun.(*ast.Ident).Name {
case "Normal":
// Normal(stdev, mean)
sigma, ok := constant.Float64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit)))
if !ok {
return false
}
μ, ok := constant.Float64Val(evalBasicLit(distributionType.Args[1].(*ast.BasicLit)))
if !ok {
return false
}
c.EntriesPerAdGenerator = func() uint {
return uint(mathrand.NormFloat64()*sigma + μ)
}
case "Uniform":
// Uniform(start, end)
start, ok := constant.Int64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit)))
if !ok {
return false
}
end, ok := constant.Int64Val(evalBasicLit(distributionType.Args[1].(*ast.BasicLit)))
if !ok {
return false
}
c.EntriesPerAdGenerator = func() uint {
return uint(mathrand.Intn(int(end-start)) + int(start))
}
case "Always":
// Always(value)
v, ok := constant.Uint64Val(evalBasicLit(distributionType.Args[0].(*ast.BasicLit)))
if !ok {
return false
}
c.EntriesPerAdGenerator = func() uint {
return uint(v)
}
}
return true
}

func DefaultConfig() Config {
return Config{
AdsPerSec: 4,
EntriesPerAdGenerator: func() uint {
return uint(mathrand.NormFloat64()*10 + 70)
},
EntriesPerChunk: 10,
IsHttp: false,
KeepNAds: 0,
Seed: 0,
StopAfterNEntries: 1000,
// The actual listen address will be this plus the seed for the port
ListenMultiaddr: "/ip4/127.0.0.1/tcp/18001",
HttpListenAddr: "127.0.0.1:19001",
GossipSubTopic: "indexer/ingest/loadtest",
}
}

func incrementListenMultiaddrPortBy(ma string, n uint) (string, error) {
parts := strings.Split(ma, "/")
port, err := strconv.Atoi(parts[len(parts)-1])
if err != nil {
return "", err
}
parts[len(parts)-1] = strconv.Itoa(port + int(n))
return strings.Join(parts, "/"), nil
}

func incrementHttpListenPortBy(ma string, n uint) (string, error) {
parts := strings.Split(ma, ":")
port, err := strconv.Atoi(parts[len(parts)-1])
if err != nil {
return "", err
}
parts[len(parts)-1] = strconv.Itoa(port + int(n))
return strings.Join(parts, ":"), nil
}

func LoadConfigFromFile(file string) (Config, error) {
defaultConf := DefaultConfig()
b, err := os.ReadFile(file)
if err != nil {
return defaultConf, err
}

c := &defaultConf
err = json.Unmarshal(b, c)

if err != nil {
return defaultConf, err
}

if !c.ParseEntriesPerAdGenerator() {
return defaultConf, fmt.Errorf("could not parse entries per ad generator")
}
return *c, nil
}
Loading