Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO-2176 account space limit #49

Merged
merged 12 commits into from
Oct 25, 2023
Merged
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ deps:
go build -o deps github.com/gogo/protobuf/protoc-gen-gogofaster

proto:
protoc --gogofaster_out=:. index/redisindex/indexproto/protos/*.proto
protoc --gogofaster_out=:. index/indexproto/protos/*.proto
28 changes: 15 additions & 13 deletions cmd/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"flag"
"fmt"
"github.com/anyproto/any-sync-filenode/account"
"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/filenode"
"github.com/anyproto/any-sync-filenode/index/redisindex"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/redisprovider"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
Expand All @@ -24,12 +25,13 @@ import (
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/nodeconfstore"
"go.uber.org/zap"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"

"github.com/anyproto/any-sync-filenode/account"
"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/filenode"
"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/redisprovider"

// import this to keep govvv in go.mod on mod tidy
_ "github.com/ahmetb/govvv/integration-test/app-different-package/mypkg"
Expand Down Expand Up @@ -114,7 +116,7 @@ func Bootstrap(a *app.App) {
Register(limit.New()).
Register(store()).
Register(redisprovider.New()).
Register(redisindex.New()).
Register(index.New()).
Register(metric.New()).
Register(server.New()).
Register(filenode.New())
Expand Down
13 changes: 8 additions & 5 deletions cmd/filenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package main

import (
"context"
"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/store/s3store"
"os"
"testing"

commonaccount "github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/rpc"
"github.com/stretchr/testify/require"
"os"
"testing"

"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/store/s3store"
)

var ctx = context.Background()
Expand All @@ -27,7 +29,8 @@ func TestBootstrap(t *testing.T) {
Drpc: rpc.Config{},
Metric: metric.Config{},
S3Store: s3store.Config{
Bucket: "test",
Bucket: "test",
IndexBucket: "testIndex",
},
FileDevStore: config.FileDevStore{},
NetworkStorePath: tmpDir,
Expand Down
1 change: 1 addition & 0 deletions etc/any-sync-filenode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ s3Store:
region: eu-central-1
profile: default
bucket: anytype-test
indexBucket: anytype-test
maxThreads: 16

redis:
Expand Down
147 changes: 98 additions & 49 deletions filenode/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package filenode
import (
"context"
"errors"
"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/index/redisindex"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/store"
"github.com/anyproto/any-sync-filenode/testutil"

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonfile/fileblockstore"
Expand All @@ -17,6 +13,11 @@ import (
"github.com/anyproto/any-sync/net/rpc/server"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"go.uber.org/zap"

"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/store"
)

const CName = "filenode.filenode"
Expand Down Expand Up @@ -45,7 +46,7 @@ type fileNode struct {

func (fn *fileNode) Init(a *app.App) (err error) {
fn.store = a.MustComponent(fileblockstore.CName).(store.Store)
fn.index = a.MustComponent(redisindex.CName).(index.Index)
fn.index = a.MustComponent(index.CName).(index.Index)
fn.limit = a.MustComponent(limit.CName).(limit.Limit)
fn.handler = &rpcHandler{f: fn}
fn.metric = a.MustComponent(metric.CName).(metric.Metric)
Expand All @@ -57,7 +58,7 @@ func (fn *fileNode) Name() (name string) {
}

func (fn *fileNode) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
exists, err := fn.index.Exists(ctx, k)
exists, err := fn.index.CidExists(ctx, k)
if err != nil {
return nil, err
}
Expand All @@ -72,25 +73,33 @@ func (fn *fileNode) Add(ctx context.Context, spaceId string, fileId string, bs [
if err != nil {
return err
}
unlock, err := fn.index.Lock(ctx, testutil.BlocksToKeys(bs))
unlock, err := fn.index.BlocksLock(ctx, bs)
if err != nil {
return err
}
defer unlock()
toUpload, err := fn.index.GetNonExistentBlocks(ctx, bs)
toUpload, err := fn.index.BlocksGetNonExistent(ctx, bs)
if err != nil {
return err
}
if len(toUpload) > 0 {
if err = fn.store.Add(ctx, toUpload); err != nil {
return err
}
if err = fn.index.BlocksAdd(ctx, bs); err != nil {
return err
}
}
return fn.index.Bind(ctx, storeKey, fileId, bs)
cidEntries, err := fn.index.CidEntriesByBlocks(ctx, bs)
if err != nil {
return err
}
defer cidEntries.Release()
return fn.index.FileBind(ctx, storeKey, fileId, cidEntries)
}

func (fn *fileNode) Check(ctx context.Context, spaceId string, cids ...cid.Cid) (result []*fileproto.BlockAvailability, err error) {
var storeKey string
var storeKey index.Key
if spaceId != "" {
if storeKey, err = fn.StoreKey(ctx, spaceId, false); err != nil {
return
Expand All @@ -99,7 +108,7 @@ func (fn *fileNode) Check(ctx context.Context, spaceId string, cids ...cid.Cid)
result = make([]*fileproto.BlockAvailability, 0, len(cids))
var inSpaceM = make(map[string]struct{})
if spaceId != "" {
inSpace, err := fn.index.ExistsInStorage(ctx, storeKey, cids)
inSpace, err := fn.index.CidExistsInSpace(ctx, storeKey, cids)
if err != nil {
return nil, err
}
Expand All @@ -116,7 +125,7 @@ func (fn *fileNode) Check(ctx context.Context, spaceId string, cids ...cid.Cid)
res.Status = fileproto.AvailabilityStatus_ExistsInSpace
} else {
var ex bool
if ex, err = fn.index.Exists(ctx, k); err != nil {
if ex, err = fn.index.CidExists(ctx, k); err != nil {
return nil, err
} else if ex {
res.Status = fileproto.AvailabilityStatus_Exists
Expand All @@ -132,60 +141,101 @@ func (fn *fileNode) BlocksBind(ctx context.Context, spaceId, fileId string, cids
if err != nil {
return err
}
unlock, err := fn.index.Lock(ctx, cids)
cidEntries, err := fn.index.CidEntries(ctx, cids)
if err != nil {
return err
}
defer unlock()
return fn.index.BindCids(ctx, storeKey, fileId, cids)
defer cidEntries.Release()
return fn.index.FileBind(ctx, storeKey, fileId, cidEntries)
}

func (fn *fileNode) StoreKey(ctx context.Context, spaceId string, checkLimit bool) (storageKey string, err error) {
func (fn *fileNode) StoreKey(ctx context.Context, spaceId string, checkLimit bool) (storageKey index.Key, err error) {
if spaceId == "" {
return "", fileprotoerr.ErrForbidden
return storageKey, fileprotoerr.ErrForbidden
}
// this call also confirms that space exists and valid
limitBytes, storageKey, err := fn.limit.Check(ctx, spaceId)
limitBytes, groupId, err := fn.limit.Check(ctx, spaceId)
if err != nil {
return
}

if storageKey != spaceId {
// try to move store to the new key
mErr := fn.index.MoveStorage(ctx, spaceId, storageKey)
if mErr != nil && mErr != index.ErrStorageNotFound && mErr != index.ErrTargetStorageExists {
return "", mErr
}
storageKey = index.Key{
GroupId: groupId,
SpaceId: spaceId,
}

if e := fn.index.Migrate(ctx, storageKey); e != nil {
log.WarnCtx(ctx, "space migrate error", zap.String("spaceId", spaceId), zap.Error(e))
}

if checkLimit {
currentSize, e := fn.index.StorageSize(ctx, storageKey)
info, e := fn.index.GroupInfo(ctx, groupId)
if e != nil {
return "", e
return storageKey, e
}
if currentSize >= limitBytes {
return "", fileprotoerr.ErrSpaceLimitExceeded
if info.BytesUsage >= limitBytes {
return storageKey, fileprotoerr.ErrSpaceLimitExceeded
}
}
return
}

func (fn *fileNode) SpaceInfo(ctx context.Context, spaceId string) (info *fileproto.SpaceInfoResponse, err error) {
info = &fileproto.SpaceInfoResponse{}
var (
storageKey = index.Key{SpaceId: spaceId}
limitBytes uint64
)
if limitBytes, storageKey.GroupId, err = fn.limit.Check(ctx, spaceId); err != nil {
return nil, err
}
groupInfo, err := fn.index.GroupInfo(ctx, storageKey.GroupId)
if err != nil {
return nil, err
}
if info, err = fn.spaceInfo(ctx, storageKey, groupInfo); err != nil {
return nil, err
}
info.LimitBytes = limitBytes
return
}

func (fn *fileNode) AccountInfo(ctx context.Context) (info *fileproto.AccountInfoResponse, err error) {
info = &fileproto.AccountInfoResponse{}
// we have space/identity validation in limit.Check
var storageKey string
if info.LimitBytes, storageKey, err = fn.limit.Check(ctx, spaceId); err != nil {
var groupId string

if info.LimitBytes, groupId, err = fn.limit.Check(ctx, ""); err != nil {
return nil, err
}
if info.UsageBytes, err = fn.index.StorageSize(ctx, storageKey); err != nil {

groupInfo, err := fn.index.GroupInfo(ctx, groupId)
if err != nil {
return nil, err
}
si, err := fn.index.StorageInfo(ctx, storageKey)
info.TotalCidsCount = groupInfo.CidsCount
info.TotalUsageBytes = groupInfo.BytesUsage
for _, spaceId := range groupInfo.SpaceIds {
spaceInfo, err := fn.spaceInfo(ctx, index.Key{GroupId: groupId, SpaceId: spaceId}, groupInfo)
if err != nil {
return nil, err
}
spaceInfo.LimitBytes = info.LimitBytes
info.Spaces = append(info.Spaces, spaceInfo)
}
return
}

func (fn *fileNode) spaceInfo(ctx context.Context, key index.Key, groupInfo index.GroupInfo) (info *fileproto.SpaceInfoResponse, err error) {
info = &fileproto.SpaceInfoResponse{}
info.SpaceId = key.SpaceId
spaceInfo, err := fn.index.SpaceInfo(ctx, key)
if err != nil {
return nil, err
}
info.CidsCount = uint64(si.CidCount)
info.FilesCount = uint64(si.FileCount)
info.TotalUsageBytes = groupInfo.BytesUsage
info.FilesCount = uint64(spaceInfo.FileCount)
info.CidsCount = spaceInfo.CidsCount
info.SpaceUsageBytes = spaceInfo.BytesUsage
return
}

Expand All @@ -194,26 +244,25 @@ func (fn *fileNode) FilesDelete(ctx context.Context, spaceId string, fileIds []s
if err != nil {
return
}
for _, fileId := range fileIds {
if err = fn.index.UnBind(ctx, storeKey, fileId); err != nil {
return
}
}
return
return fn.index.FileUnbind(ctx, storeKey, fileIds...)
}

func (fn *fileNode) FileInfo(ctx context.Context, spaceId, fileId string) (info *fileproto.FileInfo, err error) {
func (fn *fileNode) FileInfo(ctx context.Context, spaceId string, fileIds ...string) (info []*fileproto.FileInfo, err error) {
storeKey, err := fn.StoreKey(ctx, spaceId, false)
if err != nil {
return
}
fi, err := fn.index.FileInfo(ctx, storeKey, fileId)
fis, err := fn.index.FileInfo(ctx, storeKey, fileIds...)
if err != nil {
return nil, err
}
return &fileproto.FileInfo{
FileId: fileId,
UsageBytes: fi.BytesUsage,
CidsCount: fi.CidCount,
}, nil
info = make([]*fileproto.FileInfo, len(fis))
for i, fi := range fis {
info[i] = &fileproto.FileInfo{
FileId: fileIds[i],
UsageBytes: fi.BytesUsage,
CidsCount: uint32(fi.CidsCount),
}
}
return
}
Loading
Loading