-
Notifications
You must be signed in to change notification settings - Fork 919
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(edssser): introduce EDS Store Stresser and cel-shed utility
- Loading branch information
Showing
4 changed files
with
301 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
_ "expvar" | ||
"fmt" | ||
"math" | ||
"net/http" | ||
"os" | ||
|
||
"github.com/mitchellh/go-homedir" | ||
"github.com/spf13/cobra" | ||
|
||
"github.com/celestiaorg/celestia-node/libs/edssser" | ||
"github.com/celestiaorg/celestia-node/nodebuilder" | ||
"github.com/celestiaorg/celestia-node/nodebuilder/node" | ||
) | ||
|
||
const ( | ||
edsStorePathFlag = "path" | ||
edsWritesFlag = "writes" | ||
edsSizeFlag = "size" | ||
edsDisableLogFlag = "disable-log" | ||
edsLogStatFreqFlag = "log-stat-freq" | ||
edsCleanupFlag = "cleanup" | ||
edsFreshStartFlag = "fresh" | ||
) | ||
|
||
func init() { | ||
edsStoreCmd.AddCommand(edsStoreStress) | ||
|
||
defaultPath := "~/.edssser" | ||
path, err := homedir.Expand(defaultPath) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
pathFlagUsage := fmt.Sprintf("Directory path to use for stress test. Uses %s by default.", defaultPath) | ||
edsStoreStress.Flags().String(edsStorePathFlag, path, pathFlagUsage) | ||
edsStoreStress.Flags().Int(edsWritesFlag, math.MaxInt, "Total EDS writes to make. MaxInt by default.") | ||
edsStoreStress.Flags().Int(edsSizeFlag, 128, "Chooses EDS size. 128 by default.") | ||
edsStoreStress.Flags().Bool(edsDisableLogFlag, false, "Disables logging. Enabled by default.") | ||
edsStoreStress.Flags().Int(edsLogStatFreqFlag, 10, "Write statistic logging frequency. 10 by default.") | ||
edsStoreStress.Flags().Bool(edsCleanupFlag, false, "Cleans up the store on stop. Disabled by default.") | ||
edsStoreStress.Flags().Bool(edsFreshStartFlag, false, "Cleanup previous state on start. Disabled by default.") | ||
|
||
// kill redundant print | ||
nodebuilder.PrintKeyringInfo = false | ||
} | ||
|
||
var edsStoreCmd = &cobra.Command{ | ||
Use: "eds-store [subcommand]", | ||
Short: "Collection of eds-store related utilities", | ||
} | ||
|
||
var edsStoreStress = &cobra.Command{ | ||
Use: "stress", | ||
Short: `Runs eds.Store stress test over default node.Store Datastore backend (e.g. Badger).`, | ||
SilenceUsage: true, | ||
RunE: func(cmd *cobra.Command, args []string) (err error) { | ||
// expose expvar vars over http | ||
go http.ListenAndServe(":9999", http.DefaultServeMux) | ||
|
||
path, _ := cmd.Flags().GetString(edsStorePathFlag) | ||
fmt.Printf("using %s\n", path) | ||
|
||
freshStart, _ := cmd.Flags().GetBool(edsFreshStartFlag) | ||
if freshStart { | ||
err = os.RemoveAll(path) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
cleanup, _ := cmd.Flags().GetBool(edsCleanupFlag) | ||
if cleanup { | ||
defer func() { | ||
err = errors.Join(err, os.RemoveAll(path)) | ||
}() | ||
} | ||
|
||
disableLog, _ := cmd.Flags().GetBool(edsDisableLogFlag) | ||
logFreq, _ := cmd.Flags().GetInt(edsLogStatFreqFlag) | ||
edsWrites, _ := cmd.Flags().GetInt(edsWritesFlag) | ||
edsSize, _ := cmd.Flags().GetInt(edsSizeFlag) | ||
cfg := edssser.Config{ | ||
EDSSize: edsSize, | ||
EDSWrites: edsWrites, | ||
EnableLog: !disableLog, | ||
LogFilePath: path, | ||
StatLogFreq: logFreq, | ||
} | ||
|
||
err = nodebuilder.Init(*nodebuilder.DefaultConfig(node.Full), path, node.Full) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
nodestore, err := nodebuilder.OpenStore(path, nil) | ||
if err != nil { | ||
return err | ||
} | ||
defer func() { | ||
err = errors.Join(err, nodestore.Close()) | ||
}() | ||
|
||
datastore, err := nodestore.Datastore() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
stresser, err := edssser.NewEDSsser(path, datastore, cfg) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
stats, err := stresser.Run(cmd.Context()) | ||
if !errors.Is(err, context.Canceled) { | ||
return err | ||
} | ||
|
||
fmt.Printf("%s", stats.Finalize()) | ||
return nil | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
package edssser | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"os" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ipfs/go-datastore" | ||
|
||
"github.com/celestiaorg/celestia-app/pkg/da" | ||
|
||
"github.com/celestiaorg/celestia-node/share/eds" | ||
"github.com/celestiaorg/celestia-node/share/eds/edstest" | ||
) | ||
|
||
type Config struct { | ||
EDSSize int | ||
EDSWrites int | ||
EnableLog bool | ||
LogFilePath string | ||
StatLogFreq int | ||
} | ||
|
||
// EDSsser stand for EDS Store Stresser. | ||
type EDSsser struct { | ||
config Config | ||
datastore datastore.Batching | ||
edsstoreMu sync.Mutex | ||
edsstore *eds.Store | ||
|
||
statsFileMu sync.Mutex | ||
statsFile *os.File | ||
} | ||
|
||
func NewEDSsser(path string, datastore datastore.Batching, cfg Config) (*EDSsser, error) { | ||
edsstore, err := eds.NewStore(path, datastore) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &EDSsser{ | ||
config: cfg, | ||
datastore: datastore, | ||
edsstore: edsstore, | ||
}, nil | ||
} | ||
|
||
func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) { | ||
ss.edsstoreMu.Lock() | ||
defer ss.edsstoreMu.Unlock() | ||
|
||
err = ss.edsstore.Start(ctx) | ||
if err != nil { | ||
return stats, err | ||
} | ||
defer func() { | ||
err = errors.Join(err, ss.edsstore.Stop(ctx)) | ||
}() | ||
|
||
edsHashes, err := ss.edsstore.List() | ||
if err != nil { | ||
return stats, err | ||
} | ||
fmt.Printf("recovered %d EDSes\n\n", len(edsHashes)) | ||
|
||
defer func() { | ||
err = errors.Join(err, ss.dumpStat(stats.Finalize())) | ||
}() | ||
|
||
t := &testing.T{} | ||
for toWrite := ss.config.EDSWrites - len(edsHashes); ctx.Err() == nil && toWrite > 0; toWrite-- { | ||
// divide by 2 to get ODS size as expected by RandEDS | ||
square := edstest.RandEDS(t, ss.config.EDSSize/2) | ||
dah, err := da.NewDataAvailabilityHeader(square) | ||
if err != nil { | ||
return stats, err | ||
} | ||
|
||
now := time.Now() | ||
err = ss.edsstore.Put(ctx, dah.Hash(), square) | ||
if err != nil { | ||
return stats, err | ||
} | ||
took := time.Since(now) | ||
|
||
stats.TotalWritten++ | ||
stats.TotalTime += took | ||
if took < stats.MinTime || stats.MinTime == 0 { | ||
stats.MinTime = took | ||
} else if took > stats.MaxTime { | ||
stats.MaxTime = took | ||
} | ||
|
||
if ss.config.EnableLog { | ||
fmt.Println("square written", "size", ss.config.EDSSize, "took", took) | ||
|
||
if stats.TotalWritten%ss.config.StatLogFreq == 0 { | ||
stats := stats.Finalize() | ||
fmt.Println(stats) | ||
go func() { | ||
err := ss.dumpStat(stats) | ||
if err != nil { | ||
fmt.Printf("error dumping stats: %s\n", err.Error()) | ||
} | ||
}() | ||
} | ||
} | ||
} | ||
|
||
return stats, nil | ||
} | ||
|
||
func (ss *EDSsser) dumpStat(stats Stats) (err error) { | ||
ss.statsFileMu.Lock() | ||
defer ss.statsFileMu.Unlock() | ||
|
||
ss.statsFile, err = os.Create(ss.config.LogFilePath + "/edssser_stats.txt") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
_, err = ss.statsFile.Write([]byte(stats.String())) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return ss.statsFile.Close() | ||
} | ||
|
||
type Stats struct { | ||
TotalWritten int | ||
TotalTime, MinTime, MaxTime, AvgTime time.Duration | ||
// Deviation ? | ||
} | ||
|
||
func (stats Stats) Finalize() Stats { | ||
if stats.TotalTime != 0 { | ||
stats.AvgTime = stats.TotalTime / time.Duration(stats.TotalWritten) | ||
} | ||
return stats | ||
} | ||
|
||
func (stats Stats) String() string { | ||
return fmt.Sprintf(` | ||
TotalWritten %d | ||
TotalWritingTime %v | ||
MaxTime %s | ||
MinTime %s | ||
AvgTime %s | ||
`, | ||
stats.TotalWritten, | ||
stats.TotalTime, | ||
stats.MaxTime, | ||
stats.MinTime, | ||
stats.AvgTime, | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters