diff --git a/cmd/varlogsn/cli.go b/cmd/varlogsn/cli.go index 49c15c406..693bb2103 100644 --- a/cmd/varlogsn/cli.go +++ b/cmd/varlogsn/cli.go @@ -74,6 +74,7 @@ func newStartCommand() *cli.Command { flagStorageMetricsLogInterval, flagStorageVerbose.BoolFlag(), flagStorageTrimDelay, + flagStorageTrimRate, // logger options flags.LogDir, diff --git a/cmd/varlogsn/flags.go b/cmd/varlogsn/flags.go index 18359b191..73b3f859e 100644 --- a/cmd/varlogsn/flags.go +++ b/cmd/varlogsn/flags.go @@ -200,6 +200,11 @@ var ( EnvVars: []string{"STORAGE_TRIM_DELAY"}, Usage: "Delay before deletion of log entries caused by Trim operation. If zero, lazy deletion waits for other log entries to be appended.", } + flagStorageTrimRate = &cli.StringFlag{ + Name: "storage-trim-rate", + EnvVars: []string{"STORAGE_TRIM_RATE"}, + Usage: "Trim deletion throttling rate in bytes per second. If zero, no throttling is applied.", + } // flags for telemetry. flagExporterType = flags.FlagDesc{ diff --git a/cmd/varlogsn/varlogsn.go b/cmd/varlogsn/varlogsn.go index bbab5640f..f6ef5e92a 100644 --- a/cmd/varlogsn/varlogsn.go +++ b/cmd/varlogsn/varlogsn.go @@ -280,6 +280,13 @@ func parseStorageOptions(c *cli.Context) (opts []storage.Option, err error) { if name := flagStorageTrimDelay.Name; c.IsSet(name) { opts = append(opts, storage.WithTrimDelay(c.Duration(name))) } + if name := flagStorageTrimRate.Name; c.IsSet(name) { + rate, err := units.FromByteSizeString(c.String(name)) + if err != nil { + return nil, err + } + opts = append(opts, storage.WithTrimRateByte(int(rate))) + } return opts, nil } diff --git a/internal/storage/config.go b/internal/storage/config.go index d5cf05440..895d74ee5 100644 --- a/internal/storage/config.go +++ b/internal/storage/config.go @@ -144,6 +144,7 @@ type config struct { verbose bool metricsLogInterval time.Duration trimDelay time.Duration + trimRateByte int logger *zap.Logger readOnly bool } @@ -256,6 +257,14 @@ func WithTrimDelay(trimDelay time.Duration) Option { }) } +// WithTrimRateByte is the Trim deletion speed in bytes per second. If the +// value is zero, Trim removes the log entries without throttling. +func WithTrimRateByte(trimRateByte int) Option { + return newFuncOption(func(cfg *config) { + cfg.trimRateByte = trimRateByte + }) +} + // ReadOnly makes storage read-only. It is helpful only for testing. Usually, // users do not have to call it. func ReadOnly() Option { diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 8ffda3ba0..626ac625c 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -113,6 +113,7 @@ func (s *Storage) newDB(path string, cfg *dbConfig) (*pebble.DB, error) { Levels: make([]pebble.LevelOptions, 7), ErrorIfExists: false, FlushDelayDeleteRange: s.trimDelay, + TargetByteDeletionRate: s.trimRateByte, } pebbleOpts.Levels[0].TargetFileSize = cfg.l0TargetFileSize for i := 0; i < len(pebbleOpts.Levels); i++ {