Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO-2477: Change diff #54

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/ahmetb/govvv v0.3.0
github.com/akrylysov/pogreb v0.10.1
github.com/anyproto/any-sync v0.3.10
github.com/anyproto/any-sync v0.3.11
github.com/anyproto/go-chash v0.1.0
github.com/gogo/protobuf v1.3.2
github.com/prometheus/client_golang v1.17.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/ahmetb/govvv v0.3.0 h1:YGLGwEyiUwHFy5eh/RUhdupbuaCGBYn5T5GWXp+WJB0=
github.com/ahmetb/govvv v0.3.0/go.mod h1:4WRFpdWtc/YtKgPFwa1dr5+9hiRY5uKAL08bOlxOR6s=
github.com/akrylysov/pogreb v0.10.1 h1:FqlR8VR7uCbJdfUob916tPM+idpKgeESDXOA1K0DK4w=
github.com/akrylysov/pogreb v0.10.1/go.mod h1:pNs6QmpQ1UlTJKDezuRWmaqkgUE2TuU0YTWyqJZ7+lI=
github.com/anyproto/any-sync v0.3.10 h1:OaumDhf136XN5gHAsRm5kQm3z0Rvk+VdR3jMwW1LrnA=
github.com/anyproto/any-sync v0.3.10/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo=
github.com/anyproto/any-sync v0.3.11 h1:Y++bHerO8nAh1b1z/xUAoFFXXJ5UI7duSa0LRvmmFXc=
github.com/anyproto/any-sync v0.3.11/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo=
github.com/anyproto/go-chash v0.1.0 h1:I9meTPjXFRfXZHRJzjOHC/XF7Q5vzysKkiT/grsogXY=
github.com/anyproto/go-chash v0.1.0/go.mod h1:0UjNQi3PDazP0fINpFYu6VKhuna+W/V+1vpXHAfNgLY=
github.com/anyproto/go-slip10 v1.0.0 h1:uAEtSuudR3jJBOfkOXf3bErxVoxbuKwdoJN55M1i6IA=
Expand Down
66 changes: 57 additions & 9 deletions nodehead/mock_nodehead/mock_nodehead.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 61 additions & 3 deletions nodehead/nodehead.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package nodehead
import (
"context"
"errors"
"github.com/anyproto/any-sync-node/nodestorage"
"sync"
"time"

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/ldiff"
"github.com/anyproto/any-sync/app/logger"
Expand All @@ -13,8 +15,8 @@ import (
"github.com/anyproto/any-sync/nodeconf"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"sync"
"time"

"github.com/anyproto/any-sync-node/nodestorage"
)

const CName = "node.nodespace.nodehead"
Expand All @@ -35,6 +37,9 @@ func New() NodeHead {
type NodeHead interface {
SetHead(spaceId, head string) (part int, err error)
GetHead(spaceId string) (head string, err error)
SetOldHead(spaceId, head string) (part int, err error)
GetOldHead(spaceId string) (head string, err error)
DeleteHeads(spaceId string) error
ReloadHeadFromStore(spaceId string) error
LDiff(partId int) ldiff.Diff
Ranges(ctx context.Context, part int, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error)
Expand All @@ -44,19 +49,31 @@ type NodeHead interface {
type nodeHead struct {
mu sync.Mutex
partitions map[int]ldiff.Diff
oldHashes map[string]string
nodeconf nodeconf.NodeConf
spaceStore nodestorage.NodeStorage
}

func (n *nodeHead) Init(a *app.App) (err error) {
n.partitions = map[int]ldiff.Diff{}
n.oldHashes = map[string]string{}
n.nodeconf = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
n.spaceStore = a.MustComponent(spacestorage.CName).(nodestorage.NodeStorage)
n.spaceStore.OnWriteHash(func(_ context.Context, spaceId, hash string) {
if _, e := n.SetHead(spaceId, hash); e != nil {
log.Error("can't set head", zap.Error(e))
}
})
n.spaceStore.OnWriteOldHash(func(_ context.Context, spaceId, hash string) {
if _, e := n.SetOldHead(spaceId, hash); e != nil {
log.Error("can't set old head", zap.Error(e))
}
})
n.spaceStore.OnDeleteStorage(func(_ context.Context, spaceId string) {
if e := n.DeleteHeads(spaceId); e != nil {
log.Error("can't delete space from nodehead", zap.Error(e))
}
})
if m := a.Component(metric.CName); m != nil {
n.registerMetrics(m.(metric.Metric))
}
Expand Down Expand Up @@ -103,9 +120,31 @@ func (n *nodeHead) loadHeadFromStore(spaceId string) (err error) {
if _, err = n.SetHead(spaceId, hash); err != nil {
return
}
oldHash, err := ss.ReadOldSpaceHash()
if err != nil {
return
}
// that means that the hash was not set before
if oldHash == "" {
oldHash = hash
}
if _, err = n.SetOldHead(spaceId, oldHash); err != nil {
return
}
return
}

func (n *nodeHead) DeleteHeads(spaceId string) error {
n.mu.Lock()
defer n.mu.Unlock()
delete(n.oldHashes, spaceId)
part := n.nodeconf.Partition(spaceId)
if ld, ok := n.partitions[part]; ok {
return ld.RemoveId(spaceId)
}
return nil
}

func (n *nodeHead) SetHead(spaceId, head string) (part int, err error) {
part = n.nodeconf.Partition(spaceId)
n.mu.Lock()
Expand All @@ -119,6 +158,13 @@ func (n *nodeHead) SetHead(spaceId, head string) (part int, err error) {
return
}

func (n *nodeHead) SetOldHead(spaceId, head string) (part int, err error) {
n.mu.Lock()
defer n.mu.Unlock()
n.oldHashes[spaceId] = head
return
}

func (n *nodeHead) Ranges(ctx context.Context, part int, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) {
return n.LDiff(part).Ranges(ctx, ranges, resBuf)
}
Expand All @@ -145,11 +191,23 @@ func (n *nodeHead) GetHead(spaceId string) (hash string, err error) {
n.mu.Unlock()
el, err := ld.Element(spaceId)
if err != nil {
if errors.Is(err, ldiff.ErrElementNotFound) {
return "", ErrSpaceNotFound
}
return
}
return el.Head, nil
}

func (n *nodeHead) GetOldHead(spaceId string) (hash string, err error) {
n.mu.Lock()
defer n.mu.Unlock()
if hash, ok := n.oldHashes[spaceId]; ok {
return hash, nil
}
return "", ErrSpaceNotFound
}

func (n *nodeHead) ReloadHeadFromStore(spaceId string) error {
return n.loadHeadFromStore(spaceId)
}
Expand Down
43 changes: 39 additions & 4 deletions nodehead/nodehead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package nodehead
import (
"context"
"encoding/hex"
"github.com/anyproto/any-sync-node/nodestorage"
"math"
"os"
"testing"

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/ldiff"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
Expand All @@ -17,9 +20,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"math"
"os"
"testing"

"github.com/anyproto/any-sync-node/nodestorage"
)

var ctx = context.Background()
Expand Down Expand Up @@ -100,6 +102,39 @@ func TestNodeHead_GetSpaceHash(t *testing.T) {
assert.Equal(t, ErrSpaceNotFound, err)
}

func TestNodeHead_GetOldSpaceHash(t *testing.T) {
fx := newFixture(t, "")
defer fx.Finish(t)
hash := "af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262"
_, err := fx.SetOldHead("space1", hash)
require.NoError(t, err)

head, err := fx.GetOldHead("space1")
require.NoError(t, err)
assert.Equal(t, hash, head)

_, err = fx.GetOldHead("not found")
assert.Equal(t, ErrSpaceNotFound, err)
}

func TestNodeHead_DeleteHeads(t *testing.T) {
fx := newFixture(t, "")
defer fx.Finish(t)
hash := "af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262"
_, err := fx.SetHead("space1", hash)
require.NoError(t, err)
_, err = fx.SetOldHead("space1", hash)
require.NoError(t, err)

err = fx.NodeHead.(*nodeHead).DeleteHeads("space1")
require.NoError(t, err)

_, err = fx.GetHead("space1")
assert.Equal(t, ErrSpaceNotFound, err)
_, err = fx.GetOldHead("space1")
assert.Equal(t, ErrSpaceNotFound, err)
}

func newFixture(t *testing.T, dataPath string) *fixture {
var tmpDir string
if dataPath != "" {
Expand Down
Loading