Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis cleanup #239

Merged
merged 6 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ jobs:
minio:
image: bitnami/minio:latest
env:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- 9000:9000
options: --name minio-server
Expand Down
160 changes: 160 additions & 0 deletions otredis/cleanup_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package otredis

import (
"context"
"fmt"
"io"
"os"
"os/signal"
"path/filepath"
"strconv"
"sync/atomic"
"syscall"
"time"

"github.com/DoNewsCode/core/logging"
"github.com/go-kit/log"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

// NewCleanupCommand creates a new command to clean up unused redis keys.
func NewCleanupCommand(maker Maker, baseLogger log.Logger) *cobra.Command {
const cursorFileName = "redis-scan-cursor"
var (
logger = logging.WithLevel(baseLogger)
cursorPath string
batchSize int64
prefix string
instance string
)

type stats struct {
scanned uint64
removed uint64
}

removeKeys := func(ctx context.Context, cursor *uint64, stats *stats, threshold time.Duration) error {
var (
keys []string
err error
)

logger.Info(fmt.Sprintf("scanning redis keys from cursor %d", *cursor))
if err := os.WriteFile(cursorPath, []byte(fmt.Sprintf("%d", *cursor)), os.ModePerm); err != nil {
logger.Err("cannot store cursor to cursor location", err, nil)
}

redisClient, err := maker.Make(instance)
if err != nil {
return fmt.Errorf("cannot find redis instance under the name of %s: %w", instance, err)
}
keys, *cursor, err = redisClient.Scan(ctx, *cursor, prefix+"*", batchSize).Result()
if err != nil {
return err
}
stats.scanned += uint64(len(keys))

group, ctx := errgroup.WithContext(ctx)
for _, key := range keys {
key := key
group.Go(func() error {
idleTime, _ := redisClient.ObjectIdleTime(ctx, key).Result()
if idleTime > threshold {
logger.Info(fmt.Sprintf("removing %s from redis as it is %s old", key, idleTime))
_, err := redisClient.Del(ctx, key).Result()
if err != nil {
return err
}
atomic.AddUint64(&stats.removed, 1)
}
return nil
})
}
if err := group.Wait(); err != nil {
return err
}
return nil
}

initCursor := func() (uint64, error) {
cursorPath = filepath.Join(os.TempDir(), fmt.Sprintf("%s.%s.txt", cursorFileName, instance))
if _, err := os.Stat(cursorPath); os.IsNotExist(err) {
return 0, nil
}
f, err := os.Open(cursorPath)
if err != nil {
return 0, err
}
defer f.Close()
lastCursor, err := io.ReadAll(f)
if err != nil {
return 0, err
}
cursor, err := strconv.ParseUint(string(lastCursor), 10, 64)
if err != nil {
return 0, err
}
return cursor, nil
}

cmd := &cobra.Command{
Use: "cleanup",
Short: "clean up idle keys in redis",
Args: cobra.ExactArgs(1),
Long: `clean up idle keys in redis based on the last access time.`,
RunE: func(cmd *cobra.Command, args []string) error {
var (
cursor uint64
stats stats
duration time.Duration
)

defer func() {
if stats.scanned == 0 {
return
}
logger.Info(fmt.Sprintf("%d keys scanned, %d keys removed(%.2f%%)", stats.scanned, stats.removed, float64(stats.removed)/float64(stats.scanned)), nil)
}()

duration, err := time.ParseDuration(args[0])
if err != nil {
return fmt.Errorf("first argument must be valid duration string, got %w", err)
}

cursor, err = initCursor()
if err != nil {
return fmt.Errorf("error restoring cursor from file: %w", err)
}

shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case <-cmd.Context().Done():
return cmd.Context().Err()
case <-shutdown:
return nil
default:
if err := removeKeys(cmd.Context(), &cursor, &stats, duration); err != nil {
return fmt.Errorf("error while removing keys: %w", err)
}
}
if cursor == 0 {
break
}
}
logger.Info("redis key clean-up completed", nil)
os.Remove(cursorPath)
return nil
},
}

cmd.Flags().Int64VarP(&batchSize, "batchSize", "b", 100, "specify the redis scan batch size")
cmd.Flags().StringVarP(&cursorPath, "cursorPath", "c", cursorPath, "specify the location to store the cursor, so that the next execution can continue from where it's left off.")
cmd.Flags().StringVarP(&prefix, "prefix", "p", "", "specify the prefix of redis keys to be scanned")
cmd.Flags().StringVarP(&instance, "instance", "i", "default", "specify the redis instance to be scanned")

return cmd
}
41 changes: 41 additions & 0 deletions otredis/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package otredis

import (
"github.com/DoNewsCode/core/di"
"github.com/go-kit/log"
"github.com/spf13/cobra"
)

// Module is the registration unit for package core. It provides redis command.
type Module struct {
maker Maker
logger log.Logger
}

// ModuleIn contains the input parameters needed for creating the new module.
type ModuleIn struct {
di.In

Maker Maker
Logger log.Logger
}

// New creates a Module.
func New(in ModuleIn) Module {
return Module{
maker: in.Maker,
logger: in.Logger,
}
}

// ProvideCommand provides redis commands.
func (m Module) ProvideCommand(command *cobra.Command) {
cleanupCmd := NewCleanupCommand(m.maker, m.logger)
redisCmd := &cobra.Command{
Use: "redis",
Short: "manage redis",
Long: "manage redis, such as cleaning up redis cache",
}
redisCmd.AddCommand(cleanupCmd)
command.AddCommand(redisCmd)
}
83 changes: 83 additions & 0 deletions otredis/module_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package otredis

import (
"context"
"os"
"strings"
"testing"
"time"

"github.com/DoNewsCode/core"
"github.com/DoNewsCode/core/di"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
)

func TestModule_ProvideCommand(t *testing.T) {
if os.Getenv("REDIS_ADDR") == "" {
t.Skip("set REDIS_ADDR to run TestModule_ProvideRunGroup")
return
}
addrs := strings.Split(os.Getenv("REDIS_ADDR"), ",")

c := core.New(core.WithInline("redis.default.addrs", addrs))
c.ProvideEssentials()
c.Provide(di.Deps{
provideRedisFactory(&providersOption{}),
di.Bind(new(Factory), new(Maker)),
})
c.AddModuleFunc(New)
rootCmd := cobra.Command{}
c.ApplyRootCommand(&rootCmd)
assert.True(t, rootCmd.HasSubCommands())

cases := []struct {
name string
args []string
before func(t *testing.T, maker Maker)
after func(t *testing.T, maker Maker)
}{
{
"cleanup",
[]string{"redis", "cleanup", "1ms"},
func(t *testing.T, maker Maker) {
client, _ := maker.Make("default")
client.Set(context.Background(), "foo", "bar", 0)
time.Sleep(time.Second)
},
func(t *testing.T, maker Maker) {
client, _ := maker.Make("default")
results, _ := client.Keys(context.Background(), "*").Result()
assert.Empty(t, results)
},
},
{
"cleanup",
[]string{"redis", "cleanup", "1ms", "-p", "bar"},
func(t *testing.T, maker Maker) {
client, _ := maker.Make("default")
client.Set(context.Background(), "foo", "bar", 0)
client.Set(context.Background(), "bar", "bar", 0)
time.Sleep(time.Second)
},
func(t *testing.T, maker Maker) {
client, _ := maker.Make("default")
results, _ := client.Keys(context.Background(), "*").Result()
assert.Len(t, results, 1)
},
},
}

for _, cc := range cases {
t.Run(cc.name, func(t *testing.T) {
c.Invoke(func(maker Maker) {
cc.before(t, maker)
})
rootCmd.SetArgs(cc.args)
rootCmd.Execute()
c.Invoke(func(maker Maker) {
cc.after(t, maker)
})
})
}
}