Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(edssser): introduce EDS Store Stresser and cel-shed utility #2482

Merged
merged 2 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions cmd/cel-shed/eds_store_stress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package main

import (
"context"
"errors"
_ "expvar"
"fmt"
"math"
"net/http"
"os"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
"github.com/pyroscope-io/client/pyroscope"
"github.com/spf13/cobra"

"github.com/celestiaorg/celestia-node/libs/edssser"
"github.com/celestiaorg/celestia-node/nodebuilder"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

const (
edsStorePathFlag = "path"
edsWritesFlag = "writes"
edsSizeFlag = "size"
edsDisableLogFlag = "disable-log"
edsLogStatFreqFlag = "log-stat-freq"
edsCleanupFlag = "cleanup"
edsFreshStartFlag = "fresh"

pyroscopeEndpointFlag = "pyroscope"
putTimeoutFlag = "timeout"
badgerLogLevelFlag = "badger-log-level"
)

func init() {
edsStoreCmd.AddCommand(edsStoreStress)

defaultPath := "~/.edssser"
path, err := homedir.Expand(defaultPath)
if err != nil {
panic(err)
}

pathFlagUsage := fmt.Sprintf("Directory path to use for stress test. Uses %s by default.", defaultPath)
edsStoreStress.Flags().String(edsStorePathFlag, path, pathFlagUsage)
edsStoreStress.Flags().String(pyroscopeEndpointFlag, "",
"Pyroscope address. If no address provided, pyroscope will be disabled")
edsStoreStress.Flags().Int(edsWritesFlag, math.MaxInt, "Total EDS writes to make. MaxInt by default.")
walldiss marked this conversation as resolved.
Show resolved Hide resolved
edsStoreStress.Flags().Int(edsSizeFlag, 128, "Chooses EDS size. 128 by default.")
musalbas marked this conversation as resolved.
Show resolved Hide resolved
edsStoreStress.Flags().Bool(edsDisableLogFlag, false, "Disables logging. Enabled by default.")
edsStoreStress.Flags().Int(edsLogStatFreqFlag, 10, "Write statistic logging frequency. 10 by default.")
edsStoreStress.Flags().Bool(edsCleanupFlag, false, "Cleans up the store on stop. Disabled by default.")
edsStoreStress.Flags().Bool(edsFreshStartFlag, false, "Cleanup previous state on start. Disabled by default.")
edsStoreStress.Flags().Int(putTimeoutFlag, 30, "Sets put timeout in seconds. 30 sec by default.")
edsStoreStress.Flags().String(badgerLogLevelFlag, "INFO", "Badger log level, Defaults to INFO")

// kill redundant print
nodebuilder.PrintKeyringInfo = false
}

var edsStoreCmd = &cobra.Command{
Use: "eds-store [subcommand]",
Short: "Collection of eds-store related utilities",
}

var edsStoreStress = &cobra.Command{
Use: "stress",
Short: `Runs eds.Store stress test over default node.Store Datastore backend (e.g. Badger).`,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) (err error) {
// expose expvar vars over http
go http.ListenAndServe(":9999", http.DefaultServeMux) //nolint:errcheck,gosec

endpoint, _ := cmd.Flags().GetString(pyroscopeEndpointFlag)
if endpoint != "" {
_, err = pyroscope.Start(pyroscope.Config{
ApplicationName: "cel-shred.stresser",
ServerAddress: endpoint,
ProfileTypes: []pyroscope.ProfileType{
pyroscope.ProfileCPU,
pyroscope.ProfileAllocObjects,
pyroscope.ProfileAllocSpace,
pyroscope.ProfileInuseObjects,
pyroscope.ProfileInuseSpace,
},
})
if err != nil {
fmt.Printf("failed to launch pyroscope with addr: %s err: %s\n", endpoint, err.Error())
} else {
fmt.Println("connected pyroscope to:", endpoint)
}
}

path, _ := cmd.Flags().GetString(edsStorePathFlag)
fmt.Printf("using %s\n", path)

freshStart, _ := cmd.Flags().GetBool(edsFreshStartFlag)
if freshStart {
err = os.RemoveAll(path)
if err != nil {
return err
}
}

cleanup, _ := cmd.Flags().GetBool(edsCleanupFlag)
if cleanup {
defer func() {
err = errors.Join(err, os.RemoveAll(path))
}()
}

loglevel, _ := cmd.Flags().GetString(badgerLogLevelFlag)
if err = logging.SetLogLevel("badger", loglevel); err != nil {
return err
}

disableLog, _ := cmd.Flags().GetBool(edsDisableLogFlag)
logFreq, _ := cmd.Flags().GetInt(edsLogStatFreqFlag)
edsWrites, _ := cmd.Flags().GetInt(edsWritesFlag)
edsSize, _ := cmd.Flags().GetInt(edsSizeFlag)
putTimeout, _ := cmd.Flags().GetInt(putTimeoutFlag)

cfg := edssser.Config{
EDSSize: edsSize,
EDSWrites: edsWrites,
EnableLog: !disableLog,
LogFilePath: path,
StatLogFreq: logFreq,
OpTimeout: time.Duration(putTimeout) * time.Second,
}

err = nodebuilder.Init(*nodebuilder.DefaultConfig(node.Full), path, node.Full)
if err != nil {
return err
}

nodestore, err := nodebuilder.OpenStore(path, nil)
if err != nil {
return err
}
defer func() {
err = errors.Join(err, nodestore.Close())
}()

datastore, err := nodestore.Datastore()
if err != nil {
return err
}

stresser, err := edssser.NewEDSsser(path, datastore, cfg)
if err != nil {
return err
}

stats, err := stresser.Run(cmd.Context())
if !errors.Is(err, context.Canceled) {
return err
}

fmt.Printf("%s", stats.Finalize())
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return nil
},
}
9 changes: 7 additions & 2 deletions cmd/cel-shed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package main
import (
"context"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"
)

func init() {
rootCmd.AddCommand(p2pCmd, headerCmd)
rootCmd.AddCommand(p2pCmd, headerCmd, edsStoreCmd)
}

var rootCmd = &cobra.Command{
Expand All @@ -26,5 +28,8 @@ func main() {
}

func run() error {
return rootCmd.ExecuteContext(context.Background())
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

return rootCmd.ExecuteContext(ctx)
}
172 changes: 172 additions & 0 deletions libs/edssser/edssser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package edssser

import (
"context"
"errors"
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/ipfs/go-datastore"

"github.com/celestiaorg/celestia-app/pkg/da"

"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)

type Config struct {
EDSSize int
EDSWrites int
EnableLog bool
LogFilePath string
StatLogFreq int
OpTimeout time.Duration
}

// EDSsser stand for EDS Store Stresser.
type EDSsser struct {
config Config
datastore datastore.Batching
edsstoreMu sync.Mutex
edsstore *eds.Store

statsFileMu sync.Mutex
statsFile *os.File
}

func NewEDSsser(path string, datastore datastore.Batching, cfg Config) (*EDSsser, error) {
edsstore, err := eds.NewStore(path, datastore)
if err != nil {
return nil, err
}

return &EDSsser{
config: cfg,
datastore: datastore,
edsstore: edsstore,
}, nil
}

func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) {
ss.edsstoreMu.Lock()
defer ss.edsstoreMu.Unlock()

err = ss.edsstore.Start(ctx)
if err != nil {
return stats, err
}
defer func() {
err = errors.Join(err, ss.edsstore.Stop(ctx))
}()

edsHashes, err := ss.edsstore.List()
if err != nil {
return stats, err
}
fmt.Printf("recovered %d EDSes\n\n", len(edsHashes))

t := &testing.T{}
for toWrite := ss.config.EDSWrites - len(edsHashes); ctx.Err() == nil && toWrite > 0; toWrite-- {
took, err := ss.put(ctx, t)

stats.TotalWritten++
stats.TotalTime += took
if took < stats.MinTime || stats.MinTime == 0 {
stats.MinTime = took
} else if took > stats.MaxTime {
stats.MaxTime = took
}

if ss.config.EnableLog {
if stats.TotalWritten%ss.config.StatLogFreq == 0 {
stats := stats.Finalize()
fmt.Println(stats)
go func() {
err := ss.dumpStat(stats)
if err != nil {
fmt.Printf("error dumping stats: %s\n", err.Error())
}
}()
}
if err != nil {
fmt.Printf("ERROR put: %s, took: %v, at: %v\n", err.Error(), took, time.Now())
continue
}
if took > ss.config.OpTimeout/2 {
fmt.Println("long put", "size", ss.config.EDSSize, "took", took, "at", time.Now())
continue
}

fmt.Println("square written", "size", ss.config.EDSSize, "took", took, "at", time.Now())
}
}
return stats, nil
}

func (ss *EDSsser) dumpStat(stats Stats) (err error) {
ss.statsFileMu.Lock()
defer ss.statsFileMu.Unlock()

ss.statsFile, err = os.Create(ss.config.LogFilePath + "/edssser_stats.txt")
if err != nil {
return err
}

_, err = ss.statsFile.Write([]byte(stats.String()))
if err != nil {
return err
}

return ss.statsFile.Close()
}

type Stats struct {
TotalWritten int
TotalTime, MinTime, MaxTime, AvgTime time.Duration
// Deviation ?
}

func (stats Stats) Finalize() Stats {
if stats.TotalTime != 0 {
stats.AvgTime = stats.TotalTime / time.Duration(stats.TotalWritten)
walldiss marked this conversation as resolved.
Show resolved Hide resolved
}
return stats
}

func (stats Stats) String() string {
return fmt.Sprintf(`
TotalWritten %d
TotalWritingTime %v
MaxTime %s
MinTime %s
AvgTime %s
`,
stats.TotalWritten,
stats.TotalTime,
stats.MaxTime,
stats.MinTime,
stats.AvgTime,
)
}

func (ss *EDSsser) put(ctx context.Context, t *testing.T) (time.Duration, error) {
ctx, cancel := context.WithTimeout(ctx, ss.config.OpTimeout)
if ss.config.OpTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()

// divide by 2 to get ODS size as expected by RandEDS
square := edstest.RandEDS(t, ss.config.EDSSize/2)
dah, err := da.NewDataAvailabilityHeader(square)
if err != nil {
return 0, err
}

now := time.Now()
err = ss.edsstore.Put(ctx, dah.Hash(), square)
return time.Since(now), err
}
9 changes: 7 additions & 2 deletions nodebuilder/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/state"
)

// PrintKeyringInfo whether to print keyring information during init.
var PrintKeyringInfo = true

// Init initializes the Node FileSystem Store for the given Node Type 'tp' in the directory under
// 'path'.
func Init(cfg Config, path string, tp node.Type) error {
Expand Down Expand Up @@ -213,8 +216,10 @@ func generateKeys(cfg Config, ksPath string) error {
if err != nil {
return err
}
fmt.Printf("\nNAME: %s\nADDRESS: %s\nMNEMONIC (save this somewhere safe!!!): \n%s\n\n",
keyInfo.Name, addr.String(), mn)
if PrintKeyringInfo {
fmt.Printf("\nNAME: %s\nADDRESS: %s\nMNEMONIC (save this somewhere safe!!!): \n%s\n\n",
keyInfo.Name, addr.String(), mn)
}
return nil
}

Expand Down
Loading