Skip to content

Commit

Permalink
stat: check key for existing
Browse files Browse the repository at this point in the history
  • Loading branch information
cheggaaa committed Aug 19, 2024
1 parent 3e1723a commit 75854c0
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 52 deletions.
12 changes: 11 additions & 1 deletion filenode/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,26 @@ func (fn *fileNode) SpaceInfo(ctx context.Context, spaceId string) (info *filepr
func (fn *fileNode) BatchAccountInfo(ctx context.Context, identities []string) ([]*fileproto.AccountInfoResponse, error) {
accountInfos := make([]*fileproto.AccountInfoResponse, 0, len(identities))
for _, identity := range identities {
accountInfo, err := fn.accountInfo(ctx, identity)
accountInfo, err := fn.AccountInfo(ctx, identity)
if err != nil {
return nil, err
}
if accountInfo == nil {
continue
}
accountInfos = append(accountInfos, accountInfo)
}
return accountInfos, nil
}

func (fn *fileNode) AccountInfo(ctx context.Context, identity string) (*fileproto.AccountInfoResponse, error) {
exists, err := fn.index.CheckKey(ctx, index.GroupKey(index.Key{GroupId: identity}))
if err != nil {
return nil, err
}
if !exists {
return nil, nil
}
return fn.accountInfo(ctx, identity)
}

Expand Down
8 changes: 4 additions & 4 deletions index/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func (ri *redisIndex) FileBind(ctx context.Context, key Key, fileId string, cids
}

func (ri *redisIndex) fileBind(ctx context.Context, key Key, fileId string, cids *CidEntries, entry groupSpaceEntry) (err error) {
var gk = groupKey(key)
var sk = spaceKey(key)
var gk = GroupKey(key)
var sk = SpaceKey(key)

// get file entry
fileInfo, isNewFile, err := ri.getFileEntry(ctx, key, fileId)
Expand Down Expand Up @@ -53,7 +53,7 @@ func (ri *redisIndex) fileBind(ctx context.Context, key Key, fileId string, cids
)
_, err = ri.cl.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for i, idx := range newFileCidIdx {
ck := cidKey(cids.entries[idx].Cid)
ck := CidKey(cids.entries[idx].Cid)
cidExistSpaceCmds[i] = pipe.HExists(ctx, sk, ck)
if !isolatedSpace {
cidExistGroupCmds[i] = pipe.HExists(ctx, gk, ck)
Expand Down Expand Up @@ -96,7 +96,7 @@ func (ri *redisIndex) fileBind(ctx context.Context, key Key, fileId string, cids
_, err = ri.cl.TxPipelined(ctx, func(tx redis.Pipeliner) error {
// increment cid refs
for _, idx := range newFileCidIdx {
ck := cidKey(cids.entries[idx].Cid)
ck := CidKey(cids.entries[idx].Cid)
if !isolatedSpace {
tx.HIncrBy(ctx, gk, ck, 1)
}
Expand Down
2 changes: 1 addition & 1 deletion index/cidentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ func (ce *cidEntry) Save(ctx context.Context, cl redis.Cmdable) error {
if err != nil {
return err
}
return cl.Set(ctx, cidKey(ce.Cid), data, 0).Err()
return cl.Set(ctx, CidKey(ce.Cid), data, 0).Err()
}
12 changes: 6 additions & 6 deletions index/cids.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
)

func (ri *redisIndex) CidExists(ctx context.Context, c cid.Cid) (ok bool, err error) {
return ri.CheckKey(ctx, cidKey(c))
return ri.CheckKey(ctx, CidKey(c))
}

func (ri *redisIndex) CidEntries(ctx context.Context, cids []cid.Cid) (entries *CidEntries, err error) {
Expand Down Expand Up @@ -66,7 +66,7 @@ func (ri *redisIndex) CidEntriesByBlocks(ctx context.Context, bs []blocks.Block)
}

func (ri *redisIndex) getAndAddToEntries(ctx context.Context, entries *CidEntries, c cid.Cid) (err error) {
_, release, err := ri.AcquireKey(ctx, cidKey(c))
_, release, err := ri.AcquireKey(ctx, CidKey(c))
if err != nil {
return
}
Expand All @@ -87,7 +87,7 @@ func (ri *redisIndex) getAndAddToEntries(ctx context.Context, entries *CidEntrie

func (ri *redisIndex) BlocksAdd(ctx context.Context, bs []blocks.Block) (err error) {
for _, b := range bs {
exists, release, err := ri.AcquireKey(ctx, cidKey(b.Cid()))
exists, release, err := ri.AcquireKey(ctx, CidKey(b.Cid()))
if err != nil {
return err
}
Expand All @@ -105,15 +105,15 @@ func (ri *redisIndex) BlocksAdd(ctx context.Context, bs []blocks.Block) (err err
}

func (ri *redisIndex) CidExistsInSpace(ctx context.Context, k Key, cids []cid.Cid) (exists []cid.Cid, err error) {
_, release, err := ri.AcquireKey(ctx, spaceKey(k))
_, release, err := ri.AcquireKey(ctx, SpaceKey(k))
if err != nil {
return
}
defer release()
var existsRes = make([]*redis.BoolCmd, len(cids))
_, err = ri.cl.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for i, c := range cids {
existsRes[i] = pipe.HExists(ctx, spaceKey(k), cidKey(c))
existsRes[i] = pipe.HExists(ctx, SpaceKey(k), CidKey(c))
}
return nil
})
Expand All @@ -130,7 +130,7 @@ func (ri *redisIndex) CidExistsInSpace(ctx context.Context, k Key, cids []cid.Ci
}

func (ri *redisIndex) getCidEntry(ctx context.Context, c cid.Cid) (entry *cidEntry, err error) {
ck := cidKey(c)
ck := CidKey(c)
cidData, err := ri.cl.Get(ctx, ck).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
Expand Down
2 changes: 1 addition & 1 deletion index/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (ri *redisIndex) spaceDelete(ctx context.Context, key Key, entry groupSpace
if !entry.spaceExists {
return false, nil
}
sk := spaceKey(key)
sk := SpaceKey(key)

keys, err := ri.cl.HKeys(ctx, sk).Result()
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions index/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ func (f *fileEntry) Save(ctx context.Context, k Key, fileId string, cl redis.Pip
if err != nil {
return
}
cl.HSet(ctx, spaceKey(k), fileKey(fileId), data)
cl.HSet(ctx, SpaceKey(k), FileKey(fileId), data)
}

func (ri *redisIndex) getFileEntry(ctx context.Context, k Key, fileId string) (entry *fileEntry, isCreated bool, err error) {
result, err := ri.cl.HGet(ctx, spaceKey(k), fileKey(fileId)).Result()
result, err := ri.cl.HGet(ctx, SpaceKey(k), FileKey(fileId)).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return
}
Expand Down Expand Up @@ -58,11 +58,11 @@ func (f *spaceEntry) Save(ctx context.Context, k Key, cl redis.Pipeliner) {
if err != nil {
return
}
cl.HSet(ctx, spaceKey(k), infoKey, data)
cl.HSet(ctx, SpaceKey(k), infoKey, data)
}

func (ri *redisIndex) getSpaceEntry(ctx context.Context, key Key) (entry *spaceEntry, err error) {
result, err := ri.cl.HGet(ctx, spaceKey(key), infoKey).Result()
result, err := ri.cl.HGet(ctx, SpaceKey(key), infoKey).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func (f *groupEntry) Save(ctx context.Context, cl redis.Cmdable) {
if err != nil {
return
}
cl.HSet(ctx, groupKey(Key{GroupId: f.GroupId}), infoKey, data)
cl.HSet(ctx, GroupKey(Key{GroupId: f.GroupId}), infoKey, data)
}

func (f *groupEntry) AddSpaceId(spaceId string) {
Expand All @@ -104,7 +104,7 @@ func (f *groupEntry) AddSpaceId(spaceId string) {
}

func (ri *redisIndex) getGroupEntry(ctx context.Context, key Key) (entry *groupEntry, err error) {
result, err := ri.cl.HGet(ctx, groupKey(key), infoKey).Result()
result, err := ri.cl.HGet(ctx, GroupKey(key), infoKey).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return
}
Expand Down
20 changes: 11 additions & 9 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Index interface {
FileInfo(ctx context.Context, key Key, fileIds ...string) (fileInfo []FileInfo, err error)
FilesList(ctx context.Context, key Key) (fileIds []string, err error)

CheckKey(ctx context.Context, key string) (exists bool, err error)

GroupInfo(ctx context.Context, groupId string) (info GroupInfo, err error)
SpaceInfo(ctx context.Context, key Key) (info SpaceInfo, err error)

Expand Down Expand Up @@ -157,7 +159,7 @@ func (ri *redisIndex) Run(ctx context.Context) (err error) {
}

func (ri *redisIndex) FileInfo(ctx context.Context, key Key, fileIds ...string) (fileInfos []FileInfo, err error) {
_, release, err := ri.AcquireKey(ctx, spaceKey(key))
_, release, err := ri.AcquireKey(ctx, SpaceKey(key))
if err != nil {
return
}
Expand All @@ -177,7 +179,7 @@ func (ri *redisIndex) FileInfo(ctx context.Context, key Key, fileIds ...string)
}

func (ri *redisIndex) FilesList(ctx context.Context, key Key) (fileIds []string, err error) {
sk := spaceKey(key)
sk := SpaceKey(key)
_, release, err := ri.AcquireKey(ctx, sk)
if err != nil {
return
Expand All @@ -197,7 +199,7 @@ func (ri *redisIndex) FilesList(ctx context.Context, key Key) (fileIds []string,

func (ri *redisIndex) BlocksGetNonExistent(ctx context.Context, bs []blocks.Block) (nonExistent []blocks.Block, err error) {
for _, b := range bs {
ex, err := ri.CheckKey(ctx, cidKey(b.Cid()))
ex, err := ri.CheckKey(ctx, CidKey(b.Cid()))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -229,7 +231,7 @@ func (ri *redisIndex) BlocksLock(ctx context.Context, bs []blocks.Block) (unlock
}

func (ri *redisIndex) GroupInfo(ctx context.Context, groupId string) (info GroupInfo, err error) {
_, release, err := ri.AcquireKey(ctx, groupKey(Key{GroupId: groupId}))
_, release, err := ri.AcquireKey(ctx, GroupKey(Key{GroupId: groupId}))
if err != nil {
return
}
Expand All @@ -248,7 +250,7 @@ func (ri *redisIndex) GroupInfo(ctx context.Context, groupId string) (info Group
}

func (ri *redisIndex) SpaceInfo(ctx context.Context, key Key) (info SpaceInfo, err error) {
_, release, err := ri.AcquireKey(ctx, spaceKey(key))
_, release, err := ri.AcquireKey(ctx, SpaceKey(key))
if err != nil {
return
}
Expand All @@ -275,21 +277,21 @@ func (ri *redisIndex) Close(ctx context.Context) error {
return nil
}

func cidKey(c cid.Cid) string {
func CidKey(c cid.Cid) string {
return "c:" + c.String()
}

func spaceKey(k Key) string {
func SpaceKey(k Key) string {
hash := strconv.FormatUint(uint64(xxhash.ChecksumString32(k.GroupId)), 36)
return "s:" + k.SpaceId + ".{" + hash + "}"
}

func groupKey(k Key) string {
func GroupKey(k Key) string {
hash := strconv.FormatUint(uint64(xxhash.ChecksumString32(k.GroupId)), 36)
return "g:" + k.GroupId + ".{" + hash + "}"
}

func fileKey(fileId string) string {
func FileKey(fileId string) string {
return "f:" + fileId
}

Expand Down
18 changes: 9 additions & 9 deletions index/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type spaceLimitOp struct {
}

func (op *spaceLimitOp) SetGroupLimit(ctx context.Context, groupId string, limit uint64) (err error) {
_, release, err := op.AcquireKey(ctx, groupKey(Key{GroupId: groupId}))
_, release, err := op.AcquireKey(ctx, GroupKey(Key{GroupId: groupId}))
if err != nil {
return
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (op *spaceLimitOp) decreaseIsolatedLimit(ctx context.Context, k float64) (n

func (op *spaceLimitOp) decreaseIsolatedLimitForSpace(ctx context.Context, spaceId string, k float64) (newIsolatedLimit uint64, err error) {
key := Key{GroupId: op.groupEntry.GroupId, SpaceId: spaceId}
_, release, err := op.AcquireKey(ctx, spaceKey(key))
_, release, err := op.AcquireKey(ctx, SpaceKey(key))
if err != nil {
return
}
Expand Down Expand Up @@ -176,8 +176,8 @@ func (op *spaceLimitOp) SetSpaceLimit(ctx context.Context, key Key, limit uint64

func (op *spaceLimitOp) isolateSpace(ctx context.Context, entry groupSpaceEntry) (err error) {
key := Key{GroupId: entry.group.GroupId, SpaceId: entry.space.Id}
sk := spaceKey(key)
gk := groupKey(key)
sk := SpaceKey(key)
gk := GroupKey(key)

keys, err := op.cl.HGetAll(ctx, sk).Result()
if err != nil {
Expand Down Expand Up @@ -212,7 +212,7 @@ func (op *spaceLimitOp) isolateSpace(ctx context.Context, entry groupSpaceEntry)
var groupDecrResults = make([]*redis.IntCmd, len(cids))
if _, err = op.cl.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
for i, c := range cids {
groupDecrResults[i] = pipe.HIncrBy(ctx, gk, cidKey(c), -cidRefs[i])
groupDecrResults[i] = pipe.HIncrBy(ctx, gk, CidKey(c), -cidRefs[i])
}
return nil
}); err != nil {
Expand All @@ -231,7 +231,7 @@ func (op *spaceLimitOp) isolateSpace(ctx context.Context, entry groupSpaceEntry)
if len(toDeleteIdx) > 0 {
if _, err = op.cl.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
for _, idx := range toDeleteIdx {
pipe.HDel(ctx, gk, cidKey(cids[idx]))
pipe.HDel(ctx, gk, CidKey(cids[idx]))
op.groupEntry.Size_ -= cidEntries.entries[idx].Size_
op.groupEntry.CidCount--
}
Expand All @@ -245,8 +245,8 @@ func (op *spaceLimitOp) isolateSpace(ctx context.Context, entry groupSpaceEntry)

func (op *spaceLimitOp) uniteSpace(ctx context.Context, entry groupSpaceEntry) (err error) {
key := Key{GroupId: entry.group.GroupId, SpaceId: entry.space.Id}
sk := spaceKey(key)
gk := groupKey(key)
sk := SpaceKey(key)
gk := GroupKey(key)

keys, err := op.cl.HKeys(ctx, sk).Result()
if err != nil {
Expand Down Expand Up @@ -278,7 +278,7 @@ func (op *spaceLimitOp) uniteSpace(ctx context.Context, entry groupSpaceEntry) (
var groupDecrResults = make([]*redis.IntCmd, len(cids))
if _, err = op.cl.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
for i, c := range cids {
groupDecrResults[i] = pipe.HIncrBy(ctx, gk, cidKey(c), 1)
groupDecrResults[i] = pipe.HIncrBy(ctx, gk, CidKey(c), 1)
}
return nil
}); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions index/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func (ri *redisIndex) AcquireKey(ctx context.Context, key string) (exists bool,
}

func (ri *redisIndex) AcquireSpace(ctx context.Context, key Key) (entry groupSpaceEntry, release func(), err error) {
gExists, gRelease, err := ri.AcquireKey(ctx, groupKey(key))
gExists, gRelease, err := ri.AcquireKey(ctx, GroupKey(key))
if err != nil {
return
}
sExists, sRelease, err := ri.AcquireKey(ctx, spaceKey(key))
sExists, sRelease, err := ri.AcquireKey(ctx, SpaceKey(key))
if err != nil {
gRelease()
return
Expand Down
10 changes: 5 additions & 5 deletions index/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestRedisIndex_PersistKeys(t *testing.T) {
bs := testutil.NewRandBlocks(5)
require.NoError(t, fx.BlocksAdd(ctx, bs))
for _, b := range bs {
fx.persistStore.EXPECT().IndexPut(ctx, cidKey(b.Cid()), gomock.Any())
fx.persistStore.EXPECT().IndexPut(ctx, CidKey(b.Cid()), gomock.Any())
}

time.Sleep(time.Second * 3)
Expand All @@ -37,7 +37,7 @@ func TestRedisIndex_PersistKeys(t *testing.T) {
fx.PersistKeys(ctx)

for _, b := range bs {
res, err := fx.cl.BFExists(ctx, bloomFilterKey(cidKey(b.Cid())), cidKey(b.Cid())).Result()
res, err := fx.cl.BFExists(ctx, bloomFilterKey(CidKey(b.Cid())), CidKey(b.Cid())).Result()
require.NoError(t, err)
assert.True(t, res)
}
Expand All @@ -51,8 +51,8 @@ func TestRedisIndex_AcquireKey(t *testing.T) {
bs := testutil.NewRandBlocks(5)
require.NoError(t, fx.BlocksAdd(ctx, bs))
for _, b := range bs {
fx.persistStore.EXPECT().IndexPut(ctx, cidKey(b.Cid()), gomock.Any()).Do(func(_ context.Context, key string, value []byte) {
if key == cidKey(bs[0].Cid()) {
fx.persistStore.EXPECT().IndexPut(ctx, CidKey(b.Cid()), gomock.Any()).Do(func(_ context.Context, key string, value []byte) {
if key == CidKey(bs[0].Cid()) {
fx.persistStore.EXPECT().IndexGet(ctx, key).Return(nil, nil)
} else {
fx.persistStore.EXPECT().IndexGet(ctx, key).Return(value, nil)
Expand All @@ -63,7 +63,7 @@ func TestRedisIndex_AcquireKey(t *testing.T) {
fx.PersistKeys(ctx)

for i, b := range bs {
ex, release, err := fx.AcquireKey(ctx, cidKey(b.Cid()))
ex, release, err := fx.AcquireKey(ctx, CidKey(b.Cid()))
require.NoError(t, err)
if i == 0 {
require.False(t, ex)
Expand Down
15 changes: 15 additions & 0 deletions index/mock_index/mock_index.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 75854c0

Please sign in to comment.