diff --git a/index/check.go b/index/check.go new file mode 100644 index 00000000..bce35993 --- /dev/null +++ b/index/check.go @@ -0,0 +1,364 @@ +package index + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/anyproto/any-sync-filenode/index/indexproto" +) + +func (ri *redisIndex) Check(ctx context.Context, key Key, doFix bool) (checkResults []CheckResult, err error) { + var toRelease []func() + defer func() { + for _, r := range toRelease { + r() + } + }() + + // acquire locks and fetch entries + gExists, gRelease, err := ri.AcquireKey(ctx, GroupKey(key)) + if err != nil { + return + } + toRelease = append(toRelease, gRelease) + if !gExists { + return + } + gEntry, err := ri.getGroupEntry(ctx, key) + if err != nil { + return + } + + var spaceChecks []*spaceContent + + // load spaces + for _, spaceId := range gEntry.GetSpaceIds() { + spaceKey := Key{GroupId: key.GroupId, SpaceId: spaceId} + sExists, sRelease, aErr := ri.AcquireKey(ctx, SpaceKey(spaceKey)) + if aErr != nil { + return nil, aErr + } + toRelease = append(toRelease, sRelease) + if sExists { + var ( + sEntry *spaceEntry + check *spaceContent + ) + sEntry, err = ri.getSpaceEntry(ctx, spaceKey) + if err != nil { + return nil, err + } + check, err = ri.loadSpaceContent(ctx, spaceKey, sEntry) + if err != nil { + return nil, err + } + spaceChecks = append(spaceChecks, check) + } + } + + var sumRefs = make(map[string]uint64) + var sumSize uint64 + for _, check := range spaceChecks { + var results []CheckResult + results, err = check.Check(ctx, ri) + if err != nil { + return nil, err + } + checkResults = append(checkResults, results...) + for k, ref := range check.actualRefs { + sumRefs[k] += ref + sumSize += check.cidEntries[k].Size_ + } + } + + groupCheck, err := ri.loadGroupContent(ctx, key, gEntry) + if err != nil { + return + } + + checkResults = append(checkResults, groupCheck.Check(sumRefs, sumSize)...) + if doFix { + err = ri.fix(ctx, key, checkResults) + } + return +} + +func (ri *redisIndex) fix(ctx context.Context, key Key, checkResults []CheckResult) (err error) { + for _, check := range checkResults { + switch { + case check.SpaceId != "": + if check.Key == infoKey { + if err = ri.fixSpaceEntry(ctx, key, check); err != nil { + return + } + } else { + if err = ri.fixSpaceCid(ctx, key, check); err != nil { + return + } + } + case check.CidEntry != nil: + if err = ri.fixCid(ctx, check); err != nil { + return + } + case check.GroupEntry != nil: + if check.Key == infoKey { + if err = ri.fixGroupEntry(ctx, key, check); err != nil { + return + } + } else { + if err = ri.fixGroupCid(ctx, key, check); err != nil { + return + } + } + } + } + return +} + +func (ri *redisIndex) fixSpaceEntry(ctx context.Context, key Key, check CheckResult) (err error) { + se := check.SpaceEntry + se.UpdateTime = time.Now().Unix() + data, err := se.Marshal() + if err != nil { + return + } + return ri.cl.HSet(ctx, SpaceKey(Key{GroupId: key.GroupId, SpaceId: check.SpaceId}), infoKey, data).Err() +} + +func (ri *redisIndex) fixSpaceCid(ctx context.Context, key Key, check CheckResult) (err error) { + if check.CidRef > 0 { + return ri.cl.HSet(ctx, SpaceKey(Key{GroupId: key.GroupId, SpaceId: check.SpaceId}), check.Key, check.CidRef).Err() + } else { + return ri.cl.HDel(ctx, SpaceKey(Key{GroupId: key.GroupId, SpaceId: check.SpaceId}), check.Key).Err() + } +} + +func (ri *redisIndex) fixGroupEntry(ctx context.Context, key Key, check CheckResult) (err error) { + ge := &groupEntry{ + GroupEntry: check.GroupEntry, + } + ge.Save(ctx, ri.cl) + return +} + +func (ri *redisIndex) fixGroupCid(ctx context.Context, key Key, check CheckResult) (err error) { + if check.CidRef > 0 { + return ri.cl.HSet(ctx, GroupKey(key), check.Key, check.CidRef).Err() + } else { + return ri.cl.HDel(ctx, GroupKey(key), check.Key).Err() + } +} + +func (ri *redisIndex) fixCid(ctx context.Context, v CheckResult) (err error) { + ce := v.CidEntry + ce.UpdateTime = time.Now().Unix() + data, err := ce.Marshal() + if err != nil { + return err + } + return ri.cl.Set(ctx, v.Key, data, 0).Err() +} + +func (ri *redisIndex) loadSpaceContent(ctx context.Context, key Key, se *spaceEntry) (sc *spaceContent, err error) { + sc = &spaceContent{ + entry: se, + files: make(map[string]*indexproto.FileEntry), + cids: make(map[string]uint64), + cidEntries: make(map[string]*cidEntry), + } + results, err := ri.cl.HGetAll(ctx, SpaceKey(key)).Result() + if err != nil { + return + } + for k, v := range results { + if strings.HasPrefix(k, "c:") { + sc.cids[k[2:]], _ = strconv.ParseUint(v, 10, 64) + } else if strings.HasPrefix(k, "f:") { + fileEntryProto := &indexproto.FileEntry{} + if err = fileEntryProto.Unmarshal([]byte(v)); err != nil { + return + } + sc.files[k[2:]] = fileEntryProto + } + } + return +} + +type spaceContent struct { + entry *spaceEntry + files map[string]*indexproto.FileEntry + cids map[string]uint64 + cidEntries map[string]*cidEntry + actualRefs map[string]uint64 +} + +type CheckResult struct { + Key string `json:"key"` + CidEntry *indexproto.CidEntry `json:"cid,omitempty"` + FileEntry *indexproto.FileEntry `json:"file,omitempty"` + SpaceEntry *indexproto.SpaceEntry `json:"space,omitempty"` + GroupEntry *indexproto.GroupEntry `json:"group,omitempty"` + CidRef uint64 `json:"cidRef,omitempty"` + Description string `json:"description"` + SpaceId string `json:"spaceId,omitempty"` +} + +func (sc *spaceContent) Check(ctx context.Context, ri *redisIndex) (checkResults []CheckResult, err error) { + sc.actualRefs = make(map[string]uint64) + // calc file refs + for _, file := range sc.files { + for _, fCid := range file.Cids { + sc.actualRefs[fCid] += 1 + } + } + + // load cid entries + cidStrings := make([]string, 0, len(sc.actualRefs)) + for cidString := range sc.actualRefs { + cidStrings = append(cidStrings, cidString) + } + entries, err := ri.CidEntriesByString(ctx, cidStrings) + if err != nil { + return + } + defer entries.Release() + for _, cEntry := range entries.entries { + sc.cidEntries[cEntry.Cid.String()] = cEntry + } + + // calc files sizes + for fileId, file := range sc.files { + var fileSize uint64 + for _, fCid := range file.Cids { + fileSize += sc.cidEntries[fCid].Size_ + } + if file.Size_ != fileSize { + fix := CheckResult{ + Key: "f:" + fileId, + Description: fmt.Sprintf("file size mismatch: %d -> %d", file.Size_, fileSize), + SpaceId: sc.entry.Id, + } + file.Size_ = fileSize + fix.FileEntry = file + checkResults = append(checkResults, fix) + } + } + + // check cid refs + var sumSize uint64 + for c, want := range sc.actualRefs { + if actual := sc.cids[c]; actual != want { + fix := CheckResult{ + Key: "c:" + c, + CidRef: want, + Description: fmt.Sprintf("space cid refs mismatch: stored: %d -> %d", actual, want), + SpaceId: sc.entry.Id, + } + checkResults = append(checkResults, fix) + } + cEntry := sc.cidEntries[c] + if cEntry.Refs < 1 { + cEntry.CidEntry.Refs = 1 + fix := CheckResult{ + Key: "c:" + c, + CidEntry: cEntry.CidEntry, + Description: "cid 0-ref", + } + checkResults = append(checkResults, fix) + } + sumSize += cEntry.Size_ + } + + // check space entry + if sc.entry.Size_ != sumSize || sc.entry.FileCount != uint32(len(sc.files)) || sc.entry.CidCount != uint64(len(sc.actualRefs)) { + fix := CheckResult{ + Key: "info", + Description: fmt.Sprintf("space entry; size: %d -> %d; cidsCount: %d -> %d; filesCount: %d -> %d", + sc.entry.Size_, sumSize, + sc.entry.CidCount, len(sc.actualRefs), + sc.entry.FileCount, len(sc.files), + ), + SpaceId: sc.entry.Id, + } + sc.entry.Size_ = sumSize + sc.entry.FileCount = uint32(len(sc.files)) + sc.entry.CidCount = uint64(len(sc.actualRefs)) + fix.SpaceEntry = sc.entry.SpaceEntry + checkResults = append(checkResults, fix) + } + + // check for extra cids + for c := range sc.cids { + if _, ok := sc.actualRefs[c]; !ok { + fix := CheckResult{ + Key: "c:" + c, + CidRef: 0, + Description: "extra cid", + SpaceId: sc.entry.Id, + } + checkResults = append(checkResults, fix) + } + } + return +} + +type groupContent struct { + entry *groupEntry + cids map[string]uint64 +} + +func (ri *redisIndex) loadGroupContent(ctx context.Context, key Key, ge *groupEntry) (gc *groupContent, err error) { + gc = &groupContent{ + entry: ge, + cids: make(map[string]uint64), + } + results, err := ri.cl.HGetAll(ctx, GroupKey(key)).Result() + if err != nil { + return + } + for k, v := range results { + if strings.HasPrefix(k, "c:") { + gc.cids[k[2:]], _ = strconv.ParseUint(v, 10, 64) + } + } + return +} + +func (gc *groupContent) Check(cidRefs map[string]uint64, sumSize uint64) (checkResults []CheckResult) { + for k, ref := range cidRefs { + if gRef := gc.cids[k]; gRef != ref { + fix := CheckResult{ + Key: "c:" + k, + GroupEntry: gc.entry.GroupEntry, + CidRef: ref, + Description: fmt.Sprintf("group ref mismatch: %d -> %d", gRef, ref), + } + checkResults = append(checkResults, fix) + } + } + for k, ref := range gc.cids { + if _, ok := cidRefs[k]; !ok { + fix := CheckResult{ + Key: "c:" + k, + GroupEntry: gc.entry.GroupEntry, + CidRef: 0, + Description: fmt.Sprintf("group ref extra: %d -> %d", ref, 0), + } + checkResults = append(checkResults, fix) + } + } + if gc.entry.Size_ != sumSize { + fix := CheckResult{ + Key: "info", + GroupEntry: gc.entry.GroupEntry, + Description: fmt.Sprintf("group size mismatch: %d -> %d", gc.entry.Size_, sumSize), + } + fix.GroupEntry.Size_ = sumSize + fix.GroupEntry.CidCount = uint64(len(cidRefs)) + checkResults = append(checkResults, fix) + } + return +} diff --git a/index/check_test.go b/index/check_test.go new file mode 100644 index 00000000..709b15df --- /dev/null +++ b/index/check_test.go @@ -0,0 +1,91 @@ +package index + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/anyproto/any-sync-filenode/testutil" +) + +func TestRedisIndex_Check(t *testing.T) { + fx := newFixture(t) + defer fx.Finish(t) + + // add some data + bs := testutil.NewRandBlocks(3) + var sumSize uint64 + for _, b := range bs { + sumSize += uint64(len(b.RawData())) + } + key := newRandKey() + fileId1 := testutil.NewRandCid().String() + fileId2 := testutil.NewRandCid().String() + require.NoError(t, fx.BlocksAdd(ctx, bs)) + + cidsA, err := fx.CidEntriesByBlocks(ctx, bs[:2]) + require.NoError(t, err) + require.NoError(t, fx.FileBind(ctx, key, fileId1, cidsA)) + cidsA.Release() + + cidsB, err := fx.CidEntriesByBlocks(ctx, bs) + require.NoError(t, err) + require.NoError(t, fx.FileBind(ctx, key, fileId2, cidsB)) + cidsB.Release() + + t.Run("fix space+group size", func(t *testing.T) { + se, err := fx.getSpaceEntry(ctx, key) + require.NoError(t, err) + se.Size_ += 100 + seData, _ := se.Marshal() + require.NoError(t, fx.cl.HSet(ctx, SpaceKey(key), infoKey, seData).Err()) + + ge, err := fx.getGroupEntry(ctx, key) + require.NoError(t, err) + ge.Size_ += 100 + geData, _ := ge.Marshal() + require.NoError(t, fx.cl.HSet(ctx, GroupKey(key), infoKey, geData).Err()) + + fixRes, err := fx.Check(ctx, key, true) + require.NoError(t, err) + assert.Len(t, fixRes, 2) + fixRes, err = fx.Check(ctx, key, false) + require.NoError(t, err) + assert.Len(t, fixRes, 0) + }) + t.Run("fix space+group cids", func(t *testing.T) { + require.NoError(t, fx.cl.HSet(ctx, SpaceKey(key), "c:"+bs[1].Cid().String(), 33).Err()) + require.NoError(t, fx.cl.HSet(ctx, SpaceKey(key), "c:"+testutil.NewRandCid().String(), 33).Err()) + require.NoError(t, fx.cl.HSet(ctx, GroupKey(key), "c:"+bs[1].Cid().String(), 43).Err()) + require.NoError(t, fx.cl.HSet(ctx, GroupKey(key), "c:"+testutil.NewRandCid().String(), 43).Err()) + + fixRes, err := fx.Check(ctx, key, true) + require.NoError(t, err) + assert.Len(t, fixRes, 4) + fixRes, err = fx.Check(ctx, key, false) + require.NoError(t, err) + assert.Len(t, fixRes, 0) + }) + + t.Run("fix cid", func(t *testing.T) { + ce, err := fx.getCidEntry(ctx, bs[0].Cid()) + require.NoError(t, err) + ce.Refs = 0 + require.NoError(t, ce.Save(ctx, fx.cl)) + + fixRes, err := fx.Check(ctx, key, true) + require.NoError(t, err) + assert.Len(t, fixRes, 1) + for _, f := range fixRes { + t.Log(f.Description) + } + fixRes, err = fx.Check(ctx, key, false) + require.NoError(t, err) + for _, f := range fixRes { + t.Log(f.Description) + } + assert.Len(t, fixRes, 0) + }) + +} diff --git a/index/index.go b/index/index.go index 22007f44..d5433640 100644 --- a/index/index.go +++ b/index/index.go @@ -60,6 +60,8 @@ type Index interface { Migrate(ctx context.Context, key Key) error + Check(ctx context.Context, key Key, doFix bool) (checkResults []CheckResult, err error) + SpaceDelete(ctx context.Context, key Key) (ok bool, err error) app.ComponentRunnable } diff --git a/index/mock_index/mock_index.go b/index/mock_index/mock_index.go index 7fe2f3d8..26891dd7 100644 --- a/index/mock_index/mock_index.go +++ b/index/mock_index/mock_index.go @@ -87,6 +87,21 @@ func (mr *MockIndexMockRecorder) BlocksLock(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlocksLock", reflect.TypeOf((*MockIndex)(nil).BlocksLock), arg0, arg1) } +// Check mocks base method. +func (m *MockIndex) Check(arg0 context.Context, arg1 index.Key, arg2 bool) ([]index.CheckResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Check", arg0, arg1, arg2) + ret0, _ := ret[0].([]index.CheckResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Check indicates an expected call of Check. +func (mr *MockIndexMockRecorder) Check(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Check", reflect.TypeOf((*MockIndex)(nil).Check), arg0, arg1, arg2) +} + // CheckKey mocks base method. func (m *MockIndex) CheckKey(arg0 context.Context, arg1 string) (bool, error) { m.ctrl.T.Helper() diff --git a/index/unbind.go b/index/unbind.go index a5a4493d..b73e7661 100644 --- a/index/unbind.go +++ b/index/unbind.go @@ -71,8 +71,11 @@ func (ri *redisIndex) fileUnbind(ctx context.Context, key Key, entry groupSpaceE spaceDecrKeys = make([]string, 0, len(cids.entries)) affectedCidIdx = make([]int, 0, len(cids.entries)) ) - - entry.space.FileCount-- + if entry.space.FileCount != 0 { + entry.space.FileCount-- + } else { + log.WarnCtx(ctx, "file: unable to decrement 0-ref", zap.String("spaceId", key.SpaceId)) + } for i, c := range cids.entries { ck := CidKey(c.Cid) if !isolatedSpace { @@ -82,8 +85,16 @@ func (ri *redisIndex) fileUnbind(ctx context.Context, key Key, entry groupSpaceE } if res == "1" { groupRemoveKeys = append(groupRemoveKeys, ck) - entry.group.Size_ -= c.Size_ - entry.group.CidCount-- + if entry.group.Size_-c.Size_ > entry.group.Size_ { + log.WarnCtx(ctx, "group: unable to decrement size", zap.Uint64("before", entry.group.Size_), zap.Uint64("size", c.Size_), zap.String("spaceId", key.SpaceId)) + } else { + entry.group.Size_ -= c.Size_ + } + if entry.group.CidCount != 0 { + entry.group.CidCount-- + } else { + log.WarnCtx(ctx, "group: unable to decrement 0-ref", zap.String("spaceId", key.SpaceId)) + } } else { groupDecrKeys = append(groupDecrKeys, ck) } @@ -94,8 +105,16 @@ func (ri *redisIndex) fileUnbind(ctx context.Context, key Key, entry groupSpaceE } if res == "1" { spaceRemoveKeys = append(spaceRemoveKeys, ck) - entry.space.Size_ -= c.Size_ - entry.space.CidCount-- + if entry.space.Size_-c.Size_ > entry.space.Size_ { + log.WarnCtx(ctx, "space: unable to decrement size", zap.Uint64("before", entry.space.Size_), zap.Uint64("size", c.Size_), zap.String("spaceId", key.SpaceId)) + } else { + entry.space.Size_ -= c.Size_ + } + if entry.space.CidCount != 0 { + entry.space.CidCount-- + } else { + log.WarnCtx(ctx, "space: unable to decrement 0-ref", zap.String("spaceId", key.SpaceId)) + } affectedCidIdx = append(affectedCidIdx, i) } else { spaceDecrKeys = append(spaceDecrKeys, ck) @@ -128,9 +147,14 @@ func (ri *redisIndex) fileUnbind(ctx context.Context, key Key, entry groupSpaceE // update cids for _, idx := range affectedCidIdx { - cids.entries[idx].Refs-- + if cids.entries[idx].Refs != 0 { + cids.entries[idx].Refs-- + } else { + log.WarnCtx(ctx, "cid: unable to decrement 0-ref", zap.String("cid", cids.entries[idx].Cid.String()), zap.String("spaceId", key.SpaceId)) + continue + } if saveErr := cids.entries[idx].Save(ctx, ri.cl); saveErr != nil { - log.WarnCtx(ctx, "unable to save cid info", zap.Error(saveErr), zap.String("cid", cids.entries[idx].Cid.String())) + log.WarnCtx(ctx, "unable to save cid info", zap.Error(saveErr), zap.String("cid", cids.entries[idx].Cid.String()), zap.String("spaceId", key.SpaceId)) } } return diff --git a/stat/identity.go b/stat/stat.go similarity index 61% rename from stat/identity.go rename to stat/stat.go index 2d701f16..368e5436 100644 --- a/stat/identity.go +++ b/stat/stat.go @@ -4,12 +4,15 @@ import ( "context" "encoding/json" "net/http" + "time" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonfile/fileproto" + + "github.com/anyproto/any-sync-filenode/index" ) -const CName = "stat.identity" +const CName = "filenode.stat" type accountInfoProvider interface { AccountInfo(ctx context.Context, identity string) (*fileproto.AccountInfoResponse, error) @@ -21,23 +24,25 @@ type Stat interface { } func New() Stat { - return &identityStat{} + return &statService{} } -type identityStat struct { +type statService struct { accountInfoProvider accountInfoProvider + index index.Index } -func (i *identityStat) Init(a *app.App) (err error) { +func (i *statService) Init(a *app.App) (err error) { i.accountInfoProvider = app.MustComponent[accountInfoProvider](a) + i.index = app.MustComponent[index.Index](a) return } -func (i *identityStat) Name() (name string) { +func (i *statService) Name() (name string) { return CName } -func (i *identityStat) Run(ctx context.Context) (err error) { +func (i *statService) Run(ctx context.Context) (err error) { http.HandleFunc("/stat/identity/{identity}", func(writer http.ResponseWriter, request *http.Request) { identity := request.PathValue("identity") if identity == "" { @@ -82,9 +87,40 @@ func (i *identityStat) Run(ctx context.Context) (err error) { return } }) + http.HandleFunc("/stat/check/{identity}", func(writer http.ResponseWriter, request *http.Request) { + identity := request.PathValue("identity") + if identity == "" { + http.Error(writer, "identity is empty", http.StatusBadRequest) + return + } + isDoFix := request.URL.Query().Get("fix") != "" + + st := time.Now() + res, err := i.index.Check(request.Context(), index.Key{GroupId: identity}, isDoFix) + if err != nil { + http.Error(writer, err.Error(), http.StatusInternalServerError) + return + } + + resp := struct { + Results []index.CheckResult `json:"results"` + Duration string `json:"duration"` + }{ + Results: res, + Duration: time.Since(st).String(), + } + + writer.Header().Set("Content-Type", "application/json") + writer.WriteHeader(http.StatusOK) + err = json.NewEncoder(writer).Encode(resp) + if err != nil { + http.Error(writer, err.Error(), http.StatusInternalServerError) + return + } + }) return nil } -func (i *identityStat) Close(ctx context.Context) (err error) { +func (i *statService) Close(ctx context.Context) (err error) { return } diff --git a/testutil/block.go b/testutil/block.go index e49a7aae..12cffc98 100644 --- a/testutil/block.go +++ b/testutil/block.go @@ -1,12 +1,13 @@ package testutil import ( - "github.com/anyproto/any-sync/util/cidutil" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" "io" "math/rand" "time" + + "github.com/anyproto/any-sync/util/cidutil" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" ) func NewRandSpaceId() string {