Skip to content

Commit

Permalink
feat:(cmd/cel-shed): add few features to store stessser (#2519)
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss authored and Wondertan committed Aug 3, 2023
1 parent c70f78a commit 46bf0a0
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 21 deletions.
40 changes: 39 additions & 1 deletion cmd/cel-shed/eds_store_stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"math"
"net/http"
"os"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
"github.com/pyroscope-io/client/pyroscope"
"github.com/spf13/cobra"

"github.com/celestiaorg/celestia-node/libs/edssser"
Expand All @@ -25,6 +28,10 @@ const (
edsLogStatFreqFlag = "log-stat-freq"
edsCleanupFlag = "cleanup"
edsFreshStartFlag = "fresh"

pyroscopeEndpointFlag = "pyroscope"
putTimeoutFlag = "timeout"
badgerLogLevelFlag = "badger-log-level"
)

func init() {
Expand All @@ -38,12 +45,15 @@ func init() {

pathFlagUsage := fmt.Sprintf("Directory path to use for stress test. Uses %s by default.", defaultPath)
edsStoreStress.Flags().String(edsStorePathFlag, path, pathFlagUsage)
edsStoreStress.Flags().String(pyroscopeEndpointFlag, "", "Pyroscope address. If no address provided, pyroscope will be disabled")
edsStoreStress.Flags().Int(edsWritesFlag, math.MaxInt, "Total EDS writes to make. MaxInt by default.")
edsStoreStress.Flags().Int(edsSizeFlag, 128, "Chooses EDS size. 128 by default.")
edsStoreStress.Flags().Bool(edsDisableLogFlag, false, "Disables logging. Enabled by default.")
edsStoreStress.Flags().Int(edsLogStatFreqFlag, 10, "Write statistic logging frequency. 10 by default.")
edsStoreStress.Flags().Bool(edsCleanupFlag, false, "Cleans up the store on stop. Disabled by default.")
edsStoreStress.Flags().Bool(edsFreshStartFlag, false, "Cleanup previous state on start. Disabled by default.")
edsStoreStress.Flags().Int(putTimeoutFlag, 30, "Sets put timeout in seconds. 30 sec by default.")
edsStoreStress.Flags().String(badgerLogLevelFlag, "INFO", "Badger log level, Defaults to INFO")

// kill redundant print
nodebuilder.PrintKeyringInfo = false
Expand All @@ -60,7 +70,27 @@ var edsStoreStress = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) (err error) {
// expose expvar vars over http
go http.ListenAndServe(":9999", http.DefaultServeMux)
go http.ListenAndServe(":9999", http.DefaultServeMux) //nolint:errcheck,gosec

endpoint, _ := cmd.Flags().GetString(pyroscopeEndpointFlag)
if endpoint != "" {
_, err = pyroscope.Start(pyroscope.Config{
ApplicationName: "cel-shred.stresser",
ServerAddress: endpoint,
ProfileTypes: []pyroscope.ProfileType{
pyroscope.ProfileCPU,
pyroscope.ProfileAllocObjects,
pyroscope.ProfileAllocSpace,
pyroscope.ProfileInuseObjects,
pyroscope.ProfileInuseSpace,
},
})
if err != nil {
fmt.Printf("failed to launch pyroscope with addr: %s err: %s\n", endpoint, err.Error())
} else {
fmt.Println("connected pyroscope to:", endpoint)
}
}

path, _ := cmd.Flags().GetString(edsStorePathFlag)
fmt.Printf("using %s\n", path)
Expand All @@ -80,16 +110,24 @@ var edsStoreStress = &cobra.Command{
}()
}

loglevel, _ := cmd.Flags().GetString(badgerLogLevelFlag)
if err = logging.SetLogLevel("badger", loglevel); err != nil {
return err
}

disableLog, _ := cmd.Flags().GetBool(edsDisableLogFlag)
logFreq, _ := cmd.Flags().GetInt(edsLogStatFreqFlag)
edsWrites, _ := cmd.Flags().GetInt(edsWritesFlag)
edsSize, _ := cmd.Flags().GetInt(edsSizeFlag)
putTimeout, _ := cmd.Flags().GetInt(putTimeoutFlag)

cfg := edssser.Config{
EDSSize: edsSize,
EDSWrites: edsWrites,
EnableLog: !disableLog,
LogFilePath: path,
StatLogFreq: logFreq,
OpTimeout: time.Duration(putTimeout) * time.Second,
}

err = nodebuilder.Init(*nodebuilder.DefaultConfig(node.Full), path, node.Full)
Expand Down
51 changes: 31 additions & 20 deletions libs/edssser/edssser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
EnableLog bool
LogFilePath string
StatLogFreq int
OpTimeout time.Duration
}

// EDSsser stand for EDS Store Stresser.
Expand Down Expand Up @@ -67,25 +68,9 @@ func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) {
}
fmt.Printf("recovered %d EDSes\n\n", len(edsHashes))

defer func() {
err = errors.Join(err, ss.dumpStat(stats.Finalize()))
}()

t := &testing.T{}
for toWrite := ss.config.EDSWrites - len(edsHashes); ctx.Err() == nil && toWrite > 0; toWrite-- {
// divide by 2 to get ODS size as expected by RandEDS
square := edstest.RandEDS(t, ss.config.EDSSize/2)
dah, err := da.NewDataAvailabilityHeader(square)
if err != nil {
return stats, err
}

now := time.Now()
err = ss.edsstore.Put(ctx, dah.Hash(), square)
if err != nil {
return stats, err
}
took := time.Since(now)
took, err := ss.put(ctx, t)

stats.TotalWritten++
stats.TotalTime += took
Expand All @@ -96,8 +81,6 @@ func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) {
}

if ss.config.EnableLog {
fmt.Println("square written", "size", ss.config.EDSSize, "took", took)

if stats.TotalWritten%ss.config.StatLogFreq == 0 {
stats := stats.Finalize()
fmt.Println(stats)
Expand All @@ -108,9 +91,18 @@ func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) {
}
}()
}
if err != nil {
fmt.Printf("ERROR put: %s, took: %v, at: %v\n", err.Error(), took, time.Now())
continue
}
if took > ss.config.OpTimeout/2 {
fmt.Println("long put", "size", ss.config.EDSSize, "took", took, "at", time.Now())
continue
}

fmt.Println("square written", "size", ss.config.EDSSize, "took", took, "at", time.Now())
}
}

return stats, nil
}

Expand Down Expand Up @@ -159,3 +151,22 @@ AvgTime %s
stats.AvgTime,
)
}

func (ss *EDSsser) put(ctx context.Context, t *testing.T) (time.Duration, error) {
ctx, cancel := context.WithTimeout(ctx, ss.config.OpTimeout)
if ss.config.OpTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()

// divide by 2 to get ODS size as expected by RandEDS
square := edstest.RandEDS(t, ss.config.EDSSize/2)
dah, err := da.NewDataAvailabilityHeader(square)
if err != nil {
return 0, err
}

now := time.Now()
err = ss.edsstore.Put(ctx, dah.Hash(), square)
return time.Since(now), err
}

0 comments on commit 46bf0a0

Please sign in to comment.