Skip to content

Commit

Permalink
Merge pull request #49 from anyproto/GO-2176-account-space-limit
Browse files Browse the repository at this point in the history
GO-2176 account space limit
  • Loading branch information
cheggaaa committed Oct 25, 2023
2 parents 455c7f8 + a2ace89 commit 8487d16
Show file tree
Hide file tree
Showing 46 changed files with 3,877 additions and 1,920 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
# redis for tests
services:
redis:
image: redis
image: redis/redis-stack-server
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
# redis for tests
services:
redis:
image: redis
image: redis/redis-stack-server
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ deps:
go build -o deps github.com/gogo/protobuf/protoc-gen-gogofaster

proto:
protoc --gogofaster_out=:. index/redisindex/indexproto/protos/*.proto
protoc --gogofaster_out=:. index/indexproto/protos/*.proto
28 changes: 15 additions & 13 deletions cmd/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"flag"
"fmt"
"github.com/anyproto/any-sync-filenode/account"
"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/filenode"
"github.com/anyproto/any-sync-filenode/index/redisindex"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/redisprovider"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
Expand All @@ -24,12 +25,13 @@ import (
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/nodeconfstore"
"go.uber.org/zap"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"

"github.com/anyproto/any-sync-filenode/account"
"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/filenode"
"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/redisprovider"

// import this to keep govvv in go.mod on mod tidy
_ "github.com/ahmetb/govvv/integration-test/app-different-package/mypkg"
Expand Down Expand Up @@ -114,7 +116,7 @@ func Bootstrap(a *app.App) {
Register(limit.New()).
Register(store()).
Register(redisprovider.New()).
Register(redisindex.New()).
Register(index.New()).
Register(metric.New()).
Register(server.New()).
Register(filenode.New())
Expand Down
13 changes: 8 additions & 5 deletions cmd/filenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package main

import (
"context"
"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/store/s3store"
"os"
"testing"

commonaccount "github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/rpc"
"github.com/stretchr/testify/require"
"os"
"testing"

"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/store/s3store"
)

var ctx = context.Background()
Expand All @@ -27,7 +29,8 @@ func TestBootstrap(t *testing.T) {
Drpc: rpc.Config{},
Metric: metric.Config{},
S3Store: s3store.Config{
Bucket: "test",
Bucket: "test",
IndexBucket: "testIndex",
},
FileDevStore: config.FileDevStore{},
NetworkStorePath: tmpDir,
Expand Down
1 change: 1 addition & 0 deletions etc/any-sync-filenode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ s3Store:
region: eu-central-1
profile: default
bucket: anytype-test
indexBucket: anytype-test
maxThreads: 16

redis:
Expand Down
147 changes: 98 additions & 49 deletions filenode/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package filenode
import (
"context"
"errors"
"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/index/redisindex"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/store"
"github.com/anyproto/any-sync-filenode/testutil"

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonfile/fileblockstore"
Expand All @@ -17,6 +13,11 @@ import (
"github.com/anyproto/any-sync/net/rpc/server"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"go.uber.org/zap"

"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/store"
)

const CName = "filenode.filenode"
Expand Down Expand Up @@ -45,7 +46,7 @@ type fileNode struct {

func (fn *fileNode) Init(a *app.App) (err error) {
fn.store = a.MustComponent(fileblockstore.CName).(store.Store)
fn.index = a.MustComponent(redisindex.CName).(index.Index)
fn.index = a.MustComponent(index.CName).(index.Index)
fn.limit = a.MustComponent(limit.CName).(limit.Limit)
fn.handler = &rpcHandler{f: fn}
fn.metric = a.MustComponent(metric.CName).(metric.Metric)
Expand All @@ -57,7 +58,7 @@ func (fn *fileNode) Name() (name string) {
}

func (fn *fileNode) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
exists, err := fn.index.Exists(ctx, k)
exists, err := fn.index.CidExists(ctx, k)
if err != nil {
return nil, err
}
Expand All @@ -72,25 +73,33 @@ func (fn *fileNode) Add(ctx context.Context, spaceId string, fileId string, bs [
if err != nil {
return err
}
unlock, err := fn.index.Lock(ctx, testutil.BlocksToKeys(bs))
unlock, err := fn.index.BlocksLock(ctx, bs)
if err != nil {
return err
}
defer unlock()
toUpload, err := fn.index.GetNonExistentBlocks(ctx, bs)
toUpload, err := fn.index.BlocksGetNonExistent(ctx, bs)
if err != nil {
return err
}
if len(toUpload) > 0 {
if err = fn.store.Add(ctx, toUpload); err != nil {
return err
}
if err = fn.index.BlocksAdd(ctx, bs); err != nil {
return err
}
}
return fn.index.Bind(ctx, storeKey, fileId, bs)
cidEntries, err := fn.index.CidEntriesByBlocks(ctx, bs)
if err != nil {
return err
}
defer cidEntries.Release()
return fn.index.FileBind(ctx, storeKey, fileId, cidEntries)
}

func (fn *fileNode) Check(ctx context.Context, spaceId string, cids ...cid.Cid) (result []*fileproto.BlockAvailability, err error) {
var storeKey string
var storeKey index.Key
if spaceId != "" {
if storeKey, err = fn.StoreKey(ctx, spaceId, false); err != nil {
return
Expand All @@ -99,7 +108,7 @@ func (fn *fileNode) Check(ctx context.Context, spaceId string, cids ...cid.Cid)
result = make([]*fileproto.BlockAvailability, 0, len(cids))
var inSpaceM = make(map[string]struct{})
if spaceId != "" {
inSpace, err := fn.index.ExistsInStorage(ctx, storeKey, cids)
inSpace, err := fn.index.CidExistsInSpace(ctx, storeKey, cids)
if err != nil {
return nil, err
}
Expand All @@ -116,7 +125,7 @@ func (fn *fileNode) Check(ctx context.Context, spaceId string, cids ...cid.Cid)
res.Status = fileproto.AvailabilityStatus_ExistsInSpace
} else {
var ex bool
if ex, err = fn.index.Exists(ctx, k); err != nil {
if ex, err = fn.index.CidExists(ctx, k); err != nil {
return nil, err
} else if ex {
res.Status = fileproto.AvailabilityStatus_Exists
Expand All @@ -132,60 +141,101 @@ func (fn *fileNode) BlocksBind(ctx context.Context, spaceId, fileId string, cids
if err != nil {
return err
}
unlock, err := fn.index.Lock(ctx, cids)
cidEntries, err := fn.index.CidEntries(ctx, cids)
if err != nil {
return err
}
defer unlock()
return fn.index.BindCids(ctx, storeKey, fileId, cids)
defer cidEntries.Release()
return fn.index.FileBind(ctx, storeKey, fileId, cidEntries)
}

func (fn *fileNode) StoreKey(ctx context.Context, spaceId string, checkLimit bool) (storageKey string, err error) {
func (fn *fileNode) StoreKey(ctx context.Context, spaceId string, checkLimit bool) (storageKey index.Key, err error) {
if spaceId == "" {
return "", fileprotoerr.ErrForbidden
return storageKey, fileprotoerr.ErrForbidden
}
// this call also confirms that space exists and valid
limitBytes, storageKey, err := fn.limit.Check(ctx, spaceId)
limitBytes, groupId, err := fn.limit.Check(ctx, spaceId)
if err != nil {
return
}

if storageKey != spaceId {
// try to move store to the new key
mErr := fn.index.MoveStorage(ctx, spaceId, storageKey)
if mErr != nil && mErr != index.ErrStorageNotFound && mErr != index.ErrTargetStorageExists {
return "", mErr
}
storageKey = index.Key{
GroupId: groupId,
SpaceId: spaceId,
}

if e := fn.index.Migrate(ctx, storageKey); e != nil {
log.WarnCtx(ctx, "space migrate error", zap.String("spaceId", spaceId), zap.Error(e))
}

if checkLimit {
currentSize, e := fn.index.StorageSize(ctx, storageKey)
info, e := fn.index.GroupInfo(ctx, groupId)
if e != nil {
return "", e
return storageKey, e
}
if currentSize >= limitBytes {
return "", fileprotoerr.ErrSpaceLimitExceeded
if info.BytesUsage >= limitBytes {
return storageKey, fileprotoerr.ErrSpaceLimitExceeded
}
}
return
}

func (fn *fileNode) SpaceInfo(ctx context.Context, spaceId string) (info *fileproto.SpaceInfoResponse, err error) {
info = &fileproto.SpaceInfoResponse{}
var (
storageKey = index.Key{SpaceId: spaceId}
limitBytes uint64
)
if limitBytes, storageKey.GroupId, err = fn.limit.Check(ctx, spaceId); err != nil {
return nil, err
}
groupInfo, err := fn.index.GroupInfo(ctx, storageKey.GroupId)
if err != nil {
return nil, err
}
if info, err = fn.spaceInfo(ctx, storageKey, groupInfo); err != nil {
return nil, err
}
info.LimitBytes = limitBytes
return
}

func (fn *fileNode) AccountInfo(ctx context.Context) (info *fileproto.AccountInfoResponse, err error) {
info = &fileproto.AccountInfoResponse{}
// we have space/identity validation in limit.Check
var storageKey string
if info.LimitBytes, storageKey, err = fn.limit.Check(ctx, spaceId); err != nil {
var groupId string

if info.LimitBytes, groupId, err = fn.limit.Check(ctx, ""); err != nil {
return nil, err
}
if info.UsageBytes, err = fn.index.StorageSize(ctx, storageKey); err != nil {

groupInfo, err := fn.index.GroupInfo(ctx, groupId)
if err != nil {
return nil, err
}
si, err := fn.index.StorageInfo(ctx, storageKey)
info.TotalCidsCount = groupInfo.CidsCount
info.TotalUsageBytes = groupInfo.BytesUsage
for _, spaceId := range groupInfo.SpaceIds {
spaceInfo, err := fn.spaceInfo(ctx, index.Key{GroupId: groupId, SpaceId: spaceId}, groupInfo)
if err != nil {
return nil, err
}
spaceInfo.LimitBytes = info.LimitBytes
info.Spaces = append(info.Spaces, spaceInfo)
}
return
}

func (fn *fileNode) spaceInfo(ctx context.Context, key index.Key, groupInfo index.GroupInfo) (info *fileproto.SpaceInfoResponse, err error) {
info = &fileproto.SpaceInfoResponse{}
info.SpaceId = key.SpaceId
spaceInfo, err := fn.index.SpaceInfo(ctx, key)
if err != nil {
return nil, err
}
info.CidsCount = uint64(si.CidCount)
info.FilesCount = uint64(si.FileCount)
info.TotalUsageBytes = groupInfo.BytesUsage
info.FilesCount = uint64(spaceInfo.FileCount)
info.CidsCount = spaceInfo.CidsCount
info.SpaceUsageBytes = spaceInfo.BytesUsage
return
}

Expand All @@ -194,26 +244,25 @@ func (fn *fileNode) FilesDelete(ctx context.Context, spaceId string, fileIds []s
if err != nil {
return
}
for _, fileId := range fileIds {
if err = fn.index.UnBind(ctx, storeKey, fileId); err != nil {
return
}
}
return
return fn.index.FileUnbind(ctx, storeKey, fileIds...)
}

func (fn *fileNode) FileInfo(ctx context.Context, spaceId, fileId string) (info *fileproto.FileInfo, err error) {
func (fn *fileNode) FileInfo(ctx context.Context, spaceId string, fileIds ...string) (info []*fileproto.FileInfo, err error) {
storeKey, err := fn.StoreKey(ctx, spaceId, false)
if err != nil {
return
}
fi, err := fn.index.FileInfo(ctx, storeKey, fileId)
fis, err := fn.index.FileInfo(ctx, storeKey, fileIds...)
if err != nil {
return nil, err
}
return &fileproto.FileInfo{
FileId: fileId,
UsageBytes: fi.BytesUsage,
CidsCount: fi.CidCount,
}, nil
info = make([]*fileproto.FileInfo, len(fis))
for i, fi := range fis {
info[i] = &fileproto.FileInfo{
FileId: fileIds[i],
UsageBytes: fi.BytesUsage,
CidsCount: uint32(fi.CidsCount),
}
}
return
}
Loading

0 comments on commit 8487d16

Please sign in to comment.