From c70f78aed556f2ad7478f882994ea33c24678320 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sat, 15 Jul 2023 23:18:16 +0200 Subject: [PATCH 1/2] feat(edssser): introduce EDS Store Stresser and cel-shed utility --- cmd/cel-shed/eds_store_stress.go | 126 ++++++++++++++++++++++++ cmd/cel-shed/main.go | 9 +- libs/edssser/edssser.go | 161 +++++++++++++++++++++++++++++++ nodebuilder/init.go | 9 +- 4 files changed, 301 insertions(+), 4 deletions(-) create mode 100644 cmd/cel-shed/eds_store_stress.go create mode 100644 libs/edssser/edssser.go diff --git a/cmd/cel-shed/eds_store_stress.go b/cmd/cel-shed/eds_store_stress.go new file mode 100644 index 0000000000..90b6f0800b --- /dev/null +++ b/cmd/cel-shed/eds_store_stress.go @@ -0,0 +1,126 @@ +package main + +import ( + "context" + "errors" + _ "expvar" + "fmt" + "math" + "net/http" + "os" + + "github.com/mitchellh/go-homedir" + "github.com/spf13/cobra" + + "github.com/celestiaorg/celestia-node/libs/edssser" + "github.com/celestiaorg/celestia-node/nodebuilder" + "github.com/celestiaorg/celestia-node/nodebuilder/node" +) + +const ( + edsStorePathFlag = "path" + edsWritesFlag = "writes" + edsSizeFlag = "size" + edsDisableLogFlag = "disable-log" + edsLogStatFreqFlag = "log-stat-freq" + edsCleanupFlag = "cleanup" + edsFreshStartFlag = "fresh" +) + +func init() { + edsStoreCmd.AddCommand(edsStoreStress) + + defaultPath := "~/.edssser" + path, err := homedir.Expand(defaultPath) + if err != nil { + panic(err) + } + + pathFlagUsage := fmt.Sprintf("Directory path to use for stress test. Uses %s by default.", defaultPath) + edsStoreStress.Flags().String(edsStorePathFlag, path, pathFlagUsage) + edsStoreStress.Flags().Int(edsWritesFlag, math.MaxInt, "Total EDS writes to make. MaxInt by default.") + edsStoreStress.Flags().Int(edsSizeFlag, 128, "Chooses EDS size. 128 by default.") + edsStoreStress.Flags().Bool(edsDisableLogFlag, false, "Disables logging. Enabled by default.") + edsStoreStress.Flags().Int(edsLogStatFreqFlag, 10, "Write statistic logging frequency. 10 by default.") + edsStoreStress.Flags().Bool(edsCleanupFlag, false, "Cleans up the store on stop. Disabled by default.") + edsStoreStress.Flags().Bool(edsFreshStartFlag, false, "Cleanup previous state on start. Disabled by default.") + + // kill redundant print + nodebuilder.PrintKeyringInfo = false +} + +var edsStoreCmd = &cobra.Command{ + Use: "eds-store [subcommand]", + Short: "Collection of eds-store related utilities", +} + +var edsStoreStress = &cobra.Command{ + Use: "stress", + Short: `Runs eds.Store stress test over default node.Store Datastore backend (e.g. Badger).`, + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) (err error) { + // expose expvar vars over http + go http.ListenAndServe(":9999", http.DefaultServeMux) + + path, _ := cmd.Flags().GetString(edsStorePathFlag) + fmt.Printf("using %s\n", path) + + freshStart, _ := cmd.Flags().GetBool(edsFreshStartFlag) + if freshStart { + err = os.RemoveAll(path) + if err != nil { + return err + } + } + + cleanup, _ := cmd.Flags().GetBool(edsCleanupFlag) + if cleanup { + defer func() { + err = errors.Join(err, os.RemoveAll(path)) + }() + } + + disableLog, _ := cmd.Flags().GetBool(edsDisableLogFlag) + logFreq, _ := cmd.Flags().GetInt(edsLogStatFreqFlag) + edsWrites, _ := cmd.Flags().GetInt(edsWritesFlag) + edsSize, _ := cmd.Flags().GetInt(edsSizeFlag) + cfg := edssser.Config{ + EDSSize: edsSize, + EDSWrites: edsWrites, + EnableLog: !disableLog, + LogFilePath: path, + StatLogFreq: logFreq, + } + + err = nodebuilder.Init(*nodebuilder.DefaultConfig(node.Full), path, node.Full) + if err != nil { + return err + } + + nodestore, err := nodebuilder.OpenStore(path, nil) + if err != nil { + return err + } + defer func() { + err = errors.Join(err, nodestore.Close()) + }() + + datastore, err := nodestore.Datastore() + if err != nil { + return err + } + + stresser, err := edssser.NewEDSsser(path, datastore, cfg) + if err != nil { + return err + } + + stats, err := stresser.Run(cmd.Context()) + if !errors.Is(err, context.Canceled) { + return err + } + + fmt.Printf("%s", stats.Finalize()) + return nil + }, +} diff --git a/cmd/cel-shed/main.go b/cmd/cel-shed/main.go index 7982cfc1be..872bbb48a9 100644 --- a/cmd/cel-shed/main.go +++ b/cmd/cel-shed/main.go @@ -3,12 +3,14 @@ package main import ( "context" "os" + "os/signal" + "syscall" "github.com/spf13/cobra" ) func init() { - rootCmd.AddCommand(p2pCmd, headerCmd) + rootCmd.AddCommand(p2pCmd, headerCmd, edsStoreCmd) } var rootCmd = &cobra.Command{ @@ -26,5 +28,8 @@ func main() { } func run() error { - return rootCmd.ExecuteContext(context.Background()) + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + return rootCmd.ExecuteContext(ctx) } diff --git a/libs/edssser/edssser.go b/libs/edssser/edssser.go new file mode 100644 index 0000000000..f4a21d1724 --- /dev/null +++ b/libs/edssser/edssser.go @@ -0,0 +1,161 @@ +package edssser + +import ( + "context" + "errors" + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/ipfs/go-datastore" + + "github.com/celestiaorg/celestia-app/pkg/da" + + "github.com/celestiaorg/celestia-node/share/eds" + "github.com/celestiaorg/celestia-node/share/eds/edstest" +) + +type Config struct { + EDSSize int + EDSWrites int + EnableLog bool + LogFilePath string + StatLogFreq int +} + +// EDSsser stand for EDS Store Stresser. +type EDSsser struct { + config Config + datastore datastore.Batching + edsstoreMu sync.Mutex + edsstore *eds.Store + + statsFileMu sync.Mutex + statsFile *os.File +} + +func NewEDSsser(path string, datastore datastore.Batching, cfg Config) (*EDSsser, error) { + edsstore, err := eds.NewStore(path, datastore) + if err != nil { + return nil, err + } + + return &EDSsser{ + config: cfg, + datastore: datastore, + edsstore: edsstore, + }, nil +} + +func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) { + ss.edsstoreMu.Lock() + defer ss.edsstoreMu.Unlock() + + err = ss.edsstore.Start(ctx) + if err != nil { + return stats, err + } + defer func() { + err = errors.Join(err, ss.edsstore.Stop(ctx)) + }() + + edsHashes, err := ss.edsstore.List() + if err != nil { + return stats, err + } + fmt.Printf("recovered %d EDSes\n\n", len(edsHashes)) + + defer func() { + err = errors.Join(err, ss.dumpStat(stats.Finalize())) + }() + + t := &testing.T{} + for toWrite := ss.config.EDSWrites - len(edsHashes); ctx.Err() == nil && toWrite > 0; toWrite-- { + // divide by 2 to get ODS size as expected by RandEDS + square := edstest.RandEDS(t, ss.config.EDSSize/2) + dah, err := da.NewDataAvailabilityHeader(square) + if err != nil { + return stats, err + } + + now := time.Now() + err = ss.edsstore.Put(ctx, dah.Hash(), square) + if err != nil { + return stats, err + } + took := time.Since(now) + + stats.TotalWritten++ + stats.TotalTime += took + if took < stats.MinTime || stats.MinTime == 0 { + stats.MinTime = took + } else if took > stats.MaxTime { + stats.MaxTime = took + } + + if ss.config.EnableLog { + fmt.Println("square written", "size", ss.config.EDSSize, "took", took) + + if stats.TotalWritten%ss.config.StatLogFreq == 0 { + stats := stats.Finalize() + fmt.Println(stats) + go func() { + err := ss.dumpStat(stats) + if err != nil { + fmt.Printf("error dumping stats: %s\n", err.Error()) + } + }() + } + } + } + + return stats, nil +} + +func (ss *EDSsser) dumpStat(stats Stats) (err error) { + ss.statsFileMu.Lock() + defer ss.statsFileMu.Unlock() + + ss.statsFile, err = os.Create(ss.config.LogFilePath + "/edssser_stats.txt") + if err != nil { + return err + } + + _, err = ss.statsFile.Write([]byte(stats.String())) + if err != nil { + return err + } + + return ss.statsFile.Close() +} + +type Stats struct { + TotalWritten int + TotalTime, MinTime, MaxTime, AvgTime time.Duration + // Deviation ? +} + +func (stats Stats) Finalize() Stats { + if stats.TotalTime != 0 { + stats.AvgTime = stats.TotalTime / time.Duration(stats.TotalWritten) + } + return stats +} + +func (stats Stats) String() string { + return fmt.Sprintf(` +TotalWritten %d +TotalWritingTime %v +MaxTime %s +MinTime %s +AvgTime %s +`, + stats.TotalWritten, + stats.TotalTime, + stats.MaxTime, + stats.MinTime, + stats.AvgTime, + ) +} diff --git a/nodebuilder/init.go b/nodebuilder/init.go index 2cabfc8abd..0593d88560 100644 --- a/nodebuilder/init.go +++ b/nodebuilder/init.go @@ -18,6 +18,9 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/state" ) +// PrintKeyringInfo whether to print keyring information during init. +var PrintKeyringInfo = true + // Init initializes the Node FileSystem Store for the given Node Type 'tp' in the directory under // 'path'. func Init(cfg Config, path string, tp node.Type) error { @@ -213,8 +216,10 @@ func generateKeys(cfg Config, ksPath string) error { if err != nil { return err } - fmt.Printf("\nNAME: %s\nADDRESS: %s\nMNEMONIC (save this somewhere safe!!!): \n%s\n\n", - keyInfo.Name, addr.String(), mn) + if PrintKeyringInfo { + fmt.Printf("\nNAME: %s\nADDRESS: %s\nMNEMONIC (save this somewhere safe!!!): \n%s\n\n", + keyInfo.Name, addr.String(), mn) + } return nil } From 953b5b4c47d875a40292aa7f6971e4cd23d5a1c8 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Thu, 3 Aug 2023 01:17:37 +0800 Subject: [PATCH 2/2] feat:(cmd/cel-shed): add few features to store stessser (#2519) --- cmd/cel-shed/eds_store_stress.go | 41 ++++++++++++++++++++++++- libs/edssser/edssser.go | 51 +++++++++++++++++++------------- 2 files changed, 71 insertions(+), 21 deletions(-) diff --git a/cmd/cel-shed/eds_store_stress.go b/cmd/cel-shed/eds_store_stress.go index 90b6f0800b..62ea5cb772 100644 --- a/cmd/cel-shed/eds_store_stress.go +++ b/cmd/cel-shed/eds_store_stress.go @@ -8,8 +8,11 @@ import ( "math" "net/http" "os" + "time" + logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" + "github.com/pyroscope-io/client/pyroscope" "github.com/spf13/cobra" "github.com/celestiaorg/celestia-node/libs/edssser" @@ -25,6 +28,10 @@ const ( edsLogStatFreqFlag = "log-stat-freq" edsCleanupFlag = "cleanup" edsFreshStartFlag = "fresh" + + pyroscopeEndpointFlag = "pyroscope" + putTimeoutFlag = "timeout" + badgerLogLevelFlag = "badger-log-level" ) func init() { @@ -38,12 +45,16 @@ func init() { pathFlagUsage := fmt.Sprintf("Directory path to use for stress test. Uses %s by default.", defaultPath) edsStoreStress.Flags().String(edsStorePathFlag, path, pathFlagUsage) + edsStoreStress.Flags().String(pyroscopeEndpointFlag, "", + "Pyroscope address. If no address provided, pyroscope will be disabled") edsStoreStress.Flags().Int(edsWritesFlag, math.MaxInt, "Total EDS writes to make. MaxInt by default.") edsStoreStress.Flags().Int(edsSizeFlag, 128, "Chooses EDS size. 128 by default.") edsStoreStress.Flags().Bool(edsDisableLogFlag, false, "Disables logging. Enabled by default.") edsStoreStress.Flags().Int(edsLogStatFreqFlag, 10, "Write statistic logging frequency. 10 by default.") edsStoreStress.Flags().Bool(edsCleanupFlag, false, "Cleans up the store on stop. Disabled by default.") edsStoreStress.Flags().Bool(edsFreshStartFlag, false, "Cleanup previous state on start. Disabled by default.") + edsStoreStress.Flags().Int(putTimeoutFlag, 30, "Sets put timeout in seconds. 30 sec by default.") + edsStoreStress.Flags().String(badgerLogLevelFlag, "INFO", "Badger log level, Defaults to INFO") // kill redundant print nodebuilder.PrintKeyringInfo = false @@ -60,7 +71,27 @@ var edsStoreStress = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) (err error) { // expose expvar vars over http - go http.ListenAndServe(":9999", http.DefaultServeMux) + go http.ListenAndServe(":9999", http.DefaultServeMux) //nolint:errcheck,gosec + + endpoint, _ := cmd.Flags().GetString(pyroscopeEndpointFlag) + if endpoint != "" { + _, err = pyroscope.Start(pyroscope.Config{ + ApplicationName: "cel-shred.stresser", + ServerAddress: endpoint, + ProfileTypes: []pyroscope.ProfileType{ + pyroscope.ProfileCPU, + pyroscope.ProfileAllocObjects, + pyroscope.ProfileAllocSpace, + pyroscope.ProfileInuseObjects, + pyroscope.ProfileInuseSpace, + }, + }) + if err != nil { + fmt.Printf("failed to launch pyroscope with addr: %s err: %s\n", endpoint, err.Error()) + } else { + fmt.Println("connected pyroscope to:", endpoint) + } + } path, _ := cmd.Flags().GetString(edsStorePathFlag) fmt.Printf("using %s\n", path) @@ -80,16 +111,24 @@ var edsStoreStress = &cobra.Command{ }() } + loglevel, _ := cmd.Flags().GetString(badgerLogLevelFlag) + if err = logging.SetLogLevel("badger", loglevel); err != nil { + return err + } + disableLog, _ := cmd.Flags().GetBool(edsDisableLogFlag) logFreq, _ := cmd.Flags().GetInt(edsLogStatFreqFlag) edsWrites, _ := cmd.Flags().GetInt(edsWritesFlag) edsSize, _ := cmd.Flags().GetInt(edsSizeFlag) + putTimeout, _ := cmd.Flags().GetInt(putTimeoutFlag) + cfg := edssser.Config{ EDSSize: edsSize, EDSWrites: edsWrites, EnableLog: !disableLog, LogFilePath: path, StatLogFreq: logFreq, + OpTimeout: time.Duration(putTimeout) * time.Second, } err = nodebuilder.Init(*nodebuilder.DefaultConfig(node.Full), path, node.Full) diff --git a/libs/edssser/edssser.go b/libs/edssser/edssser.go index f4a21d1724..fd11b47fcf 100644 --- a/libs/edssser/edssser.go +++ b/libs/edssser/edssser.go @@ -23,6 +23,7 @@ type Config struct { EnableLog bool LogFilePath string StatLogFreq int + OpTimeout time.Duration } // EDSsser stand for EDS Store Stresser. @@ -67,25 +68,9 @@ func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) { } fmt.Printf("recovered %d EDSes\n\n", len(edsHashes)) - defer func() { - err = errors.Join(err, ss.dumpStat(stats.Finalize())) - }() - t := &testing.T{} for toWrite := ss.config.EDSWrites - len(edsHashes); ctx.Err() == nil && toWrite > 0; toWrite-- { - // divide by 2 to get ODS size as expected by RandEDS - square := edstest.RandEDS(t, ss.config.EDSSize/2) - dah, err := da.NewDataAvailabilityHeader(square) - if err != nil { - return stats, err - } - - now := time.Now() - err = ss.edsstore.Put(ctx, dah.Hash(), square) - if err != nil { - return stats, err - } - took := time.Since(now) + took, err := ss.put(ctx, t) stats.TotalWritten++ stats.TotalTime += took @@ -96,8 +81,6 @@ func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) { } if ss.config.EnableLog { - fmt.Println("square written", "size", ss.config.EDSSize, "took", took) - if stats.TotalWritten%ss.config.StatLogFreq == 0 { stats := stats.Finalize() fmt.Println(stats) @@ -108,9 +91,18 @@ func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) { } }() } + if err != nil { + fmt.Printf("ERROR put: %s, took: %v, at: %v\n", err.Error(), took, time.Now()) + continue + } + if took > ss.config.OpTimeout/2 { + fmt.Println("long put", "size", ss.config.EDSSize, "took", took, "at", time.Now()) + continue + } + + fmt.Println("square written", "size", ss.config.EDSSize, "took", took, "at", time.Now()) } } - return stats, nil } @@ -159,3 +151,22 @@ AvgTime %s stats.AvgTime, ) } + +func (ss *EDSsser) put(ctx context.Context, t *testing.T) (time.Duration, error) { + ctx, cancel := context.WithTimeout(ctx, ss.config.OpTimeout) + if ss.config.OpTimeout == 0 { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + + // divide by 2 to get ODS size as expected by RandEDS + square := edstest.RandEDS(t, ss.config.EDSSize/2) + dah, err := da.NewDataAvailabilityHeader(square) + if err != nil { + return 0, err + } + + now := time.Now() + err = ss.edsstore.Put(ctx, dah.Hash(), square) + return time.Since(now), err +}