Skip to content

Commit

Permalink
Merge pull request #62 from anyproto/GO-2474-cafe-migrate
Browse files Browse the repository at this point in the history
GO-2474 cafe migrate
  • Loading branch information
fb929 authored Nov 28, 2023
2 parents 0721492 + c40d403 commit fa946d5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
9 changes: 6 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package config

import (
"github.com/anyproto/any-sync-filenode/redisprovider"
"github.com/anyproto/any-sync-filenode/store/s3store"
"os"

commonaccount "github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/metric"
Expand All @@ -11,7 +11,9 @@ import (
"github.com/anyproto/any-sync/net/transport/yamux"
"github.com/anyproto/any-sync/nodeconf"
"gopkg.in/yaml.v3"
"os"

"github.com/anyproto/any-sync-filenode/redisprovider"
"github.com/anyproto/any-sync-filenode/store/s3store"
)

const CName = "config"
Expand Down Expand Up @@ -40,6 +42,7 @@ type Config struct {
Network nodeconf.Configuration `yaml:"network"`
NetworkStorePath string `yaml:"networkStorePath"`
NetworkUpdateIntervalSec int `yaml:"networkUpdateIntervalSec"`
CafeMigrateKey string `yaml:"cafeMigrateKey"`
}

func (c *Config) Init(a *app.App) (err error) {
Expand Down
32 changes: 27 additions & 5 deletions filenode/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ipfs/go-cid"
"go.uber.org/zap"

"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/store"
Expand All @@ -37,11 +38,12 @@ type Service interface {
}

type fileNode struct {
index index.Index
store store.Store
limit limit.Limit
metric metric.Metric
handler *rpcHandler
index index.Index
store store.Store
limit limit.Limit
metric metric.Metric
migrateKey string
handler *rpcHandler
}

func (fn *fileNode) Init(a *app.App) (err error) {
Expand All @@ -50,6 +52,7 @@ func (fn *fileNode) Init(a *app.App) (err error) {
fn.limit = a.MustComponent(limit.CName).(limit.Limit)
fn.handler = &rpcHandler{f: fn}
fn.metric = a.MustComponent(metric.CName).(metric.Metric)
fn.migrateKey = a.MustComponent(config.CName).(*config.Config).CafeMigrateKey
return fileproto.DRPCRegisterFile(a.MustComponent(server.CName).(server.DRPCServer), fn.handler)
}

Expand All @@ -69,6 +72,9 @@ func (fn *fileNode) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
}

func (fn *fileNode) Add(ctx context.Context, spaceId string, fileId string, bs []blocks.Block) error {
if fileId != "" && fileId == fn.migrateKey {
return fn.MigrateCafe(ctx, bs)
}
storeKey, err := fn.StoreKey(ctx, spaceId, true)
if err != nil {
return err
Expand Down Expand Up @@ -271,3 +277,19 @@ func (fn *fileNode) FileInfo(ctx context.Context, spaceId string, fileIds ...str
}
return
}

func (fn *fileNode) MigrateCafe(ctx context.Context, bs []blocks.Block) error {
unlock, err := fn.index.BlocksLock(ctx, bs)
if err != nil {
return err
}
defer unlock()

if err = fn.store.Add(ctx, bs); err != nil {
return err
}
if err = fn.index.BlocksAdd(ctx, bs); err != nil {
return err
}
return nil
}

0 comments on commit fa946d5

Please sign in to comment.