Skip to content

Commit

Permalink
Merge pull request #53 from anyproto/GO-2061-space-deletion
Browse files Browse the repository at this point in the history
GO-2061 deletion log, space delete
  • Loading branch information
cheggaaa authored Nov 1, 2023
2 parents 6ab8532 + 6f58eec commit f59f5b2
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 51 deletions.
9 changes: 5 additions & 4 deletions cmd/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/anyproto/any-sync-filenode/account"
"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/deletelog"
"github.com/anyproto/any-sync-filenode/filenode"
"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/limit"
Expand Down Expand Up @@ -109,8 +110,6 @@ func Bootstrap(a *app.App) {
Register(nodeconf.New()).
Register(peerservice.New()).
Register(secureservice.New()).
Register(yamux.New()).
Register(quic.New()).
Register(pool.New()).
Register(coordinatorclient.New()).
Register(limit.New()).
Expand All @@ -119,6 +118,8 @@ func Bootstrap(a *app.App) {
Register(index.New()).
Register(metric.New()).
Register(server.New()).
Register(filenode.New())

Register(filenode.New()).
Register(deletelog.New()).
Register(yamux.New()).
Register(quic.New())
}
115 changes: 115 additions & 0 deletions deletelog/deletelog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package deletelog

import (
"context"
"errors"
"time"

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
"github.com/anyproto/any-sync/coordinator/coordinatorproto"
"github.com/anyproto/any-sync/util/periodicsync"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"

"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/redisprovider"
)

const CName = "filenode.deletionLog"

const lastKey = "deletionLastId.{system}"

const recordsLimit = 1000

var log = logger.NewNamed(CName)

func New() app.ComponentRunnable {
return new(deleteLog)
}

type deleteLog struct {
redis redis.UniversalClient
coordinatorClient coordinatorclient.CoordinatorClient
redsync *redsync.Redsync
ticker periodicsync.PeriodicSync
index index.Index
disableTicker bool
}

func (d *deleteLog) Init(a *app.App) (err error) {
d.redis = a.MustComponent(redisprovider.CName).(redisprovider.RedisProvider).Redis()
d.coordinatorClient = a.MustComponent(coordinatorclient.CName).(coordinatorclient.CoordinatorClient)
d.redsync = redsync.New(goredis.NewPool(d.redis))
d.index = a.MustComponent(index.CName).(index.Index)
return
}

func (d *deleteLog) Name() (name string) {
return CName
}

func (d *deleteLog) Run(ctx context.Context) (err error) {
if !d.disableTicker {
d.ticker = periodicsync.NewPeriodicSync(60, time.Hour, d.checkLog, log)
d.ticker.Run()
}
return
}

func (d *deleteLog) checkLog(ctx context.Context) (err error) {
mu := d.redsync.NewMutex("_lock:deletion", redsync.WithExpiry(time.Minute*10))
if err = mu.LockContext(ctx); err != nil {
return
}
defer func() {
_, _ = mu.Unlock()
}()
st := time.Now()
lastId, err := d.redis.Get(ctx, lastKey).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return
}

recs, err := d.coordinatorClient.DeletionLog(ctx, lastId, recordsLimit)
if err != nil {
return
}
var handledCount, deletedCount int
var ok bool
for _, rec := range recs {
if rec.Status == coordinatorproto.DeletionLogRecordStatus_Remove && rec.FileGroup != "" {
ok, err = d.index.SpaceDelete(ctx, index.Key{
GroupId: rec.FileGroup,
SpaceId: rec.SpaceId,
})
if err != nil {
return
}
handledCount++
if ok {
deletedCount++
}
}
if err = d.redis.Set(ctx, lastKey, rec.Id, 0).Err(); err != nil {
return
}
}
log.Info("processing deletion log",
zap.Int("records", len(recs)),
zap.Int("handled", handledCount),
zap.Int("deleted", deletedCount),
zap.Duration("dur", time.Since(st)),
)
return
}

func (d *deleteLog) Close(ctx context.Context) (err error) {
if d.ticker != nil {
d.ticker.Close()
}
return
}
96 changes: 96 additions & 0 deletions deletelog/deletelog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package deletelog

import (
"context"
"testing"
"time"

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
"github.com/anyproto/any-sync/coordinator/coordinatorclient/mock_coordinatorclient"
"github.com/anyproto/any-sync/coordinator/coordinatorproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/index/mock_index"
"github.com/anyproto/any-sync-filenode/redisprovider/testredisprovider"
)

var ctx = context.Background()

func TestDeleteLog_checkLog(t *testing.T) {
t.Run("no records", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
fx.coord.EXPECT().DeletionLog(ctx, "", recordsLimit).Return(nil, nil)
require.NoError(t, fx.checkLog(ctx))
})
t.Run("success", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
now := time.Now().Unix()
fx.coord.EXPECT().DeletionLog(ctx, "", recordsLimit).Return([]*coordinatorproto.DeletionLogRecord{
{
Id: "1",
SpaceId: "s1",
Status: coordinatorproto.DeletionLogRecordStatus_Ok,
Timestamp: now,
FileGroup: "f1",
},
{
Id: "2",
SpaceId: "s2",
Status: coordinatorproto.DeletionLogRecordStatus_Remove,
Timestamp: now,
FileGroup: "f2",
},
}, nil)
fx.index.EXPECT().SpaceDelete(ctx, index.Key{
GroupId: "f2",
SpaceId: "s2",
})
require.NoError(t, fx.checkLog(ctx))
lastId, err := fx.redis.Get(ctx, lastKey).Result()
require.NoError(t, err)
assert.Equal(t, "2", lastId)
})

}

func newFixture(t *testing.T) *fixture {
ctrl := gomock.NewController(t)
fx := &fixture{
ctrl: ctrl,
a: new(app.App),
coord: mock_coordinatorclient.NewMockCoordinatorClient(ctrl),
index: mock_index.NewMockIndex(ctrl),
deleteLog: New().(*deleteLog),
}
fx.disableTicker = true
fx.coord.EXPECT().Name().Return(coordinatorclient.CName).AnyTimes()
fx.coord.EXPECT().Init(gomock.Any()).AnyTimes()
fx.index.EXPECT().Name().Return(index.CName).AnyTimes()
fx.index.EXPECT().Init(gomock.Any()).AnyTimes()
fx.index.EXPECT().Run(gomock.Any()).AnyTimes()
fx.index.EXPECT().Close(gomock.Any()).AnyTimes()

fx.a.Register(testredisprovider.NewTestRedisProviderNum(7)).Register(fx.coord).Register(fx.index).Register(fx.deleteLog)
require.NoError(t, fx.a.Start(ctx))

return fx
}

type fixture struct {
ctrl *gomock.Controller
a *app.App
coord *mock_coordinatorclient.MockCoordinatorClient
index *mock_index.MockIndex
*deleteLog
}

func (fx *fixture) finish(t *testing.T) {
require.NoError(t, fx.a.Close(ctx))
fx.ctrl.Finish()
}
5 changes: 5 additions & 0 deletions filenode/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func (fn *fileNode) SpaceInfo(ctx context.Context, spaceId string) (info *filepr
if limitBytes, storageKey.GroupId, err = fn.limit.Check(ctx, spaceId); err != nil {
return nil, err
}

if e := fn.index.Migrate(ctx, storageKey); e != nil {
log.WarnCtx(ctx, "space migrate error", zap.String("spaceId", spaceId), zap.Error(e))
}

groupInfo, err := fn.index.GroupInfo(ctx, storageKey.GroupId)
if err != nil {
return nil, err
Expand Down
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/OneOfOne/xxhash v1.2.8
github.com/ahmetb/govvv v0.3.0
github.com/anyproto/any-sync v0.3.5
github.com/anyproto/any-sync v0.3.7
github.com/aws/aws-sdk-go v1.46.6
github.com/cespare/xxhash/v2 v2.2.0
github.com/go-redsync/redsync/v4 v4.10.0
Expand Down Expand Up @@ -35,7 +35,7 @@ require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20230821062121-407c9e7a662f // indirect
github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
Expand All @@ -45,37 +45,37 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.31.0 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/libp2p/go-libp2p v0.32.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.11.0 // indirect
github.com/multiformats/go-multiaddr v0.12.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/quic-go/qtls-go1-20 v0.3.4 // indirect
github.com/quic-go/quic-go v0.39.1 // indirect
github.com/quic-go/quic-go v0.39.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/zeebo/errs v1.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
storj.io/drpc v0.0.33 // indirect
Expand Down
Loading

0 comments on commit f59f5b2

Please sign in to comment.