diff --git a/cmd/cel-shed/eds_store_stress.go b/cmd/cel-shed/eds_store_stress.go index 90b6f0800b..88a538d07d 100644 --- a/cmd/cel-shed/eds_store_stress.go +++ b/cmd/cel-shed/eds_store_stress.go @@ -8,8 +8,11 @@ import ( "math" "net/http" "os" + "time" + logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" + "github.com/pyroscope-io/client/pyroscope" "github.com/spf13/cobra" "github.com/celestiaorg/celestia-node/libs/edssser" @@ -25,6 +28,10 @@ const ( edsLogStatFreqFlag = "log-stat-freq" edsCleanupFlag = "cleanup" edsFreshStartFlag = "fresh" + + pyroscopeEndpointFlag = "pyroscope" + putTimeoutFlag = "timeout" + badgerLogLevelFlag = "badger-log-level" ) func init() { @@ -38,12 +45,15 @@ func init() { pathFlagUsage := fmt.Sprintf("Directory path to use for stress test. Uses %s by default.", defaultPath) edsStoreStress.Flags().String(edsStorePathFlag, path, pathFlagUsage) + edsStoreStress.Flags().String(pyroscopeEndpointFlag, "", "Pyroscope address. If no address provided, pyroscope will be disabled") edsStoreStress.Flags().Int(edsWritesFlag, math.MaxInt, "Total EDS writes to make. MaxInt by default.") edsStoreStress.Flags().Int(edsSizeFlag, 128, "Chooses EDS size. 128 by default.") edsStoreStress.Flags().Bool(edsDisableLogFlag, false, "Disables logging. Enabled by default.") edsStoreStress.Flags().Int(edsLogStatFreqFlag, 10, "Write statistic logging frequency. 10 by default.") edsStoreStress.Flags().Bool(edsCleanupFlag, false, "Cleans up the store on stop. Disabled by default.") edsStoreStress.Flags().Bool(edsFreshStartFlag, false, "Cleanup previous state on start. Disabled by default.") + edsStoreStress.Flags().Int(putTimeoutFlag, 30, "Sets put timeout in seconds. 30 sec by default.") + edsStoreStress.Flags().String(badgerLogLevelFlag, "INFO", "Badger log level, Defaults to INFO") // kill redundant print nodebuilder.PrintKeyringInfo = false @@ -60,7 +70,27 @@ var edsStoreStress = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) (err error) { // expose expvar vars over http - go http.ListenAndServe(":9999", http.DefaultServeMux) + go http.ListenAndServe(":9999", http.DefaultServeMux) //nolint:errcheck,gosec + + endpoint, _ := cmd.Flags().GetString(pyroscopeEndpointFlag) + if endpoint != "" { + _, err = pyroscope.Start(pyroscope.Config{ + ApplicationName: "cel-shred.stresser", + ServerAddress: endpoint, + ProfileTypes: []pyroscope.ProfileType{ + pyroscope.ProfileCPU, + pyroscope.ProfileAllocObjects, + pyroscope.ProfileAllocSpace, + pyroscope.ProfileInuseObjects, + pyroscope.ProfileInuseSpace, + }, + }) + if err != nil { + fmt.Printf("failed to launch pyroscope with addr: %s err: %s\n", endpoint, err.Error()) + } else { + fmt.Println("connected pyroscope to:", endpoint) + } + } path, _ := cmd.Flags().GetString(edsStorePathFlag) fmt.Printf("using %s\n", path) @@ -80,16 +110,24 @@ var edsStoreStress = &cobra.Command{ }() } + loglevel, _ := cmd.Flags().GetString(badgerLogLevelFlag) + if err = logging.SetLogLevel("badger", loglevel); err != nil { + return err + } + disableLog, _ := cmd.Flags().GetBool(edsDisableLogFlag) logFreq, _ := cmd.Flags().GetInt(edsLogStatFreqFlag) edsWrites, _ := cmd.Flags().GetInt(edsWritesFlag) edsSize, _ := cmd.Flags().GetInt(edsSizeFlag) + putTimeout, _ := cmd.Flags().GetInt(putTimeoutFlag) + cfg := edssser.Config{ EDSSize: edsSize, EDSWrites: edsWrites, EnableLog: !disableLog, LogFilePath: path, StatLogFreq: logFreq, + OpTimeout: time.Duration(putTimeout) * time.Second, } err = nodebuilder.Init(*nodebuilder.DefaultConfig(node.Full), path, node.Full) diff --git a/libs/edssser/edssser.go b/libs/edssser/edssser.go index f4a21d1724..fd11b47fcf 100644 --- a/libs/edssser/edssser.go +++ b/libs/edssser/edssser.go @@ -23,6 +23,7 @@ type Config struct { EnableLog bool LogFilePath string StatLogFreq int + OpTimeout time.Duration } // EDSsser stand for EDS Store Stresser. @@ -67,25 +68,9 @@ func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) { } fmt.Printf("recovered %d EDSes\n\n", len(edsHashes)) - defer func() { - err = errors.Join(err, ss.dumpStat(stats.Finalize())) - }() - t := &testing.T{} for toWrite := ss.config.EDSWrites - len(edsHashes); ctx.Err() == nil && toWrite > 0; toWrite-- { - // divide by 2 to get ODS size as expected by RandEDS - square := edstest.RandEDS(t, ss.config.EDSSize/2) - dah, err := da.NewDataAvailabilityHeader(square) - if err != nil { - return stats, err - } - - now := time.Now() - err = ss.edsstore.Put(ctx, dah.Hash(), square) - if err != nil { - return stats, err - } - took := time.Since(now) + took, err := ss.put(ctx, t) stats.TotalWritten++ stats.TotalTime += took @@ -96,8 +81,6 @@ func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) { } if ss.config.EnableLog { - fmt.Println("square written", "size", ss.config.EDSSize, "took", took) - if stats.TotalWritten%ss.config.StatLogFreq == 0 { stats := stats.Finalize() fmt.Println(stats) @@ -108,9 +91,18 @@ func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) { } }() } + if err != nil { + fmt.Printf("ERROR put: %s, took: %v, at: %v\n", err.Error(), took, time.Now()) + continue + } + if took > ss.config.OpTimeout/2 { + fmt.Println("long put", "size", ss.config.EDSSize, "took", took, "at", time.Now()) + continue + } + + fmt.Println("square written", "size", ss.config.EDSSize, "took", took, "at", time.Now()) } } - return stats, nil } @@ -159,3 +151,22 @@ AvgTime %s stats.AvgTime, ) } + +func (ss *EDSsser) put(ctx context.Context, t *testing.T) (time.Duration, error) { + ctx, cancel := context.WithTimeout(ctx, ss.config.OpTimeout) + if ss.config.OpTimeout == 0 { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + + // divide by 2 to get ODS size as expected by RandEDS + square := edstest.RandEDS(t, ss.config.EDSSize/2) + dah, err := da.NewDataAvailabilityHeader(square) + if err != nil { + return 0, err + } + + now := time.Now() + err = ss.edsstore.Put(ctx, dah.Hash(), square) + return time.Since(now), err +}