From 0c1b09bcdaba3e4645ce74908e68cea71c82c5b7 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 1 Dec 2023 02:21:06 +0100 Subject: [PATCH 1/6] Change diff --- go.mod | 2 +- go.sum | 2 + nodehead/mock_nodehead/mock_nodehead.go | 52 ++++++-- nodehead/nodehead.go | 33 +++++- nodespace/mock_nodespace/mock_nodespace.go | 71 +++++------ nodespace/rpchandler.go | 30 ++++- nodestorage/keys.go | 4 +- .../mock_nodestorage/mock_nodestorage.go | 36 ++++-- nodestorage/spacestorage.go | 15 +++ nodestorage/storageservice.go | 16 ++- .../coldsync/mock_coldsync/mock_coldsync.go | 12 +- nodesync/hotsync/mock_hotsync/mock_hotsync.go | 16 ++- nodesync/mock_nodesync/mock_nodesync.go | 12 +- nodesync/nodediff.go | 18 +-- nodesync/nodesyncproto/nodesync.pb.go | 112 ++++++++++++------ nodesync/nodesyncproto/protos/nodesync.proto | 1 + 16 files changed, 303 insertions(+), 129 deletions(-) diff --git a/go.mod b/go.mod index 83bb26b0..e7ffa600 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/ahmetb/govvv v0.3.0 github.com/akrylysov/pogreb v0.10.1 - github.com/anyproto/any-sync v0.3.10 + github.com/anyproto/any-sync v0.3.11-0.20231130232324-a8a62b092308 github.com/anyproto/go-chash v0.1.0 github.com/gogo/protobuf v1.3.2 github.com/prometheus/client_golang v1.17.0 diff --git a/go.sum b/go.sum index 84a8a03c..0257c2ca 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/akrylysov/pogreb v0.10.1 h1:FqlR8VR7uCbJdfUob916tPM+idpKgeESDXOA1K0DK github.com/akrylysov/pogreb v0.10.1/go.mod h1:pNs6QmpQ1UlTJKDezuRWmaqkgUE2TuU0YTWyqJZ7+lI= github.com/anyproto/any-sync v0.3.10 h1:OaumDhf136XN5gHAsRm5kQm3z0Rvk+VdR3jMwW1LrnA= github.com/anyproto/any-sync v0.3.10/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= +github.com/anyproto/any-sync v0.3.11-0.20231130232324-a8a62b092308 h1:lKAbSBPiB8G0HweCMm02obccnIps6AIF/RL3elMhCQo= +github.com/anyproto/any-sync v0.3.11-0.20231130232324-a8a62b092308/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= github.com/anyproto/go-chash v0.1.0 h1:I9meTPjXFRfXZHRJzjOHC/XF7Q5vzysKkiT/grsogXY= github.com/anyproto/go-chash v0.1.0/go.mod h1:0UjNQi3PDazP0fINpFYu6VKhuna+W/V+1vpXHAfNgLY= github.com/anyproto/go-slip10 v1.0.0 h1:uAEtSuudR3jJBOfkOXf3bErxVoxbuKwdoJN55M1i6IA= diff --git a/nodehead/mock_nodehead/mock_nodehead.go b/nodehead/mock_nodehead/mock_nodehead.go index f98eed64..15842c70 100644 --- a/nodehead/mock_nodehead/mock_nodehead.go +++ b/nodehead/mock_nodehead/mock_nodehead.go @@ -1,6 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/anyproto/any-sync-node/nodehead (interfaces: NodeHead) - +// +// Generated by this command: +// +// mockgen -destination mock_nodehead/mock_nodehead.go github.com/anyproto/any-sync-node/nodehead NodeHead +// // Package mock_nodehead is a generated GoMock package. package mock_nodehead @@ -45,7 +49,7 @@ func (m *MockNodeHead) Close(arg0 context.Context) error { } // Close indicates an expected call of Close. -func (mr *MockNodeHeadMockRecorder) Close(arg0 interface{}) *gomock.Call { +func (mr *MockNodeHeadMockRecorder) Close(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockNodeHead)(nil).Close), arg0) } @@ -60,11 +64,26 @@ func (m *MockNodeHead) GetHead(arg0 string) (string, error) { } // GetHead indicates an expected call of GetHead. -func (mr *MockNodeHeadMockRecorder) GetHead(arg0 interface{}) *gomock.Call { +func (mr *MockNodeHeadMockRecorder) GetHead(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHead", reflect.TypeOf((*MockNodeHead)(nil).GetHead), arg0) } +// GetOldHead mocks base method. +func (m *MockNodeHead) GetOldHead(arg0 string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOldHead", arg0) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetOldHead indicates an expected call of GetOldHead. +func (mr *MockNodeHeadMockRecorder) GetOldHead(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOldHead", reflect.TypeOf((*MockNodeHead)(nil).GetOldHead), arg0) +} + // Init mocks base method. func (m *MockNodeHead) Init(arg0 *app.App) error { m.ctrl.T.Helper() @@ -74,7 +93,7 @@ func (m *MockNodeHead) Init(arg0 *app.App) error { } // Init indicates an expected call of Init. -func (mr *MockNodeHeadMockRecorder) Init(arg0 interface{}) *gomock.Call { +func (mr *MockNodeHeadMockRecorder) Init(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockNodeHead)(nil).Init), arg0) } @@ -88,7 +107,7 @@ func (m *MockNodeHead) LDiff(arg0 int) ldiff.Diff { } // LDiff indicates an expected call of LDiff. -func (mr *MockNodeHeadMockRecorder) LDiff(arg0 interface{}) *gomock.Call { +func (mr *MockNodeHeadMockRecorder) LDiff(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LDiff", reflect.TypeOf((*MockNodeHead)(nil).LDiff), arg0) } @@ -117,7 +136,7 @@ func (m *MockNodeHead) Ranges(arg0 context.Context, arg1 int, arg2 []ldiff.Range } // Ranges indicates an expected call of Ranges. -func (mr *MockNodeHeadMockRecorder) Ranges(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockNodeHeadMockRecorder) Ranges(arg0, arg1, arg2, arg3 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ranges", reflect.TypeOf((*MockNodeHead)(nil).Ranges), arg0, arg1, arg2, arg3) } @@ -131,7 +150,7 @@ func (m *MockNodeHead) ReloadHeadFromStore(arg0 string) error { } // ReloadHeadFromStore indicates an expected call of ReloadHeadFromStore. -func (mr *MockNodeHeadMockRecorder) ReloadHeadFromStore(arg0 interface{}) *gomock.Call { +func (mr *MockNodeHeadMockRecorder) ReloadHeadFromStore(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReloadHeadFromStore", reflect.TypeOf((*MockNodeHead)(nil).ReloadHeadFromStore), arg0) } @@ -145,7 +164,7 @@ func (m *MockNodeHead) Run(arg0 context.Context) error { } // Run indicates an expected call of Run. -func (mr *MockNodeHeadMockRecorder) Run(arg0 interface{}) *gomock.Call { +func (mr *MockNodeHeadMockRecorder) Run(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockNodeHead)(nil).Run), arg0) } @@ -160,7 +179,22 @@ func (m *MockNodeHead) SetHead(arg0, arg1 string) (int, error) { } // SetHead indicates an expected call of SetHead. -func (mr *MockNodeHeadMockRecorder) SetHead(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockNodeHeadMockRecorder) SetHead(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHead", reflect.TypeOf((*MockNodeHead)(nil).SetHead), arg0, arg1) } + +// SetOldHead mocks base method. +func (m *MockNodeHead) SetOldHead(arg0, arg1 string) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetOldHead", arg0, arg1) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SetOldHead indicates an expected call of SetOldHead. +func (mr *MockNodeHeadMockRecorder) SetOldHead(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetOldHead", reflect.TypeOf((*MockNodeHead)(nil).SetOldHead), arg0, arg1) +} diff --git a/nodehead/nodehead.go b/nodehead/nodehead.go index caf34904..2839f5a5 100644 --- a/nodehead/nodehead.go +++ b/nodehead/nodehead.go @@ -4,7 +4,9 @@ package nodehead import ( "context" "errors" - "github.com/anyproto/any-sync-node/nodestorage" + "sync" + "time" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/logger" @@ -13,8 +15,8 @@ import ( "github.com/anyproto/any-sync/nodeconf" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "sync" - "time" + + "github.com/anyproto/any-sync-node/nodestorage" ) const CName = "node.nodespace.nodehead" @@ -35,6 +37,8 @@ func New() NodeHead { type NodeHead interface { SetHead(spaceId, head string) (part int, err error) GetHead(spaceId string) (head string, err error) + SetOldHead(spaceId, head string) (part int, err error) + GetOldHead(spaceId string) (head string, err error) ReloadHeadFromStore(spaceId string) error LDiff(partId int) ldiff.Diff Ranges(ctx context.Context, part int, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) @@ -44,12 +48,14 @@ type NodeHead interface { type nodeHead struct { mu sync.Mutex partitions map[int]ldiff.Diff + oldHashes map[string]string nodeconf nodeconf.NodeConf spaceStore nodestorage.NodeStorage } func (n *nodeHead) Init(a *app.App) (err error) { n.partitions = map[int]ldiff.Diff{} + n.oldHashes = map[string]string{} n.nodeconf = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) n.spaceStore = a.MustComponent(spacestorage.CName).(nodestorage.NodeStorage) n.spaceStore.OnWriteHash(func(_ context.Context, spaceId, hash string) { @@ -57,6 +63,11 @@ func (n *nodeHead) Init(a *app.App) (err error) { log.Error("can't set head", zap.Error(e)) } }) + n.spaceStore.OnWriteOldHash(func(_ context.Context, spaceId, hash string) { + if _, e := n.SetOldHead(spaceId, hash); e != nil { + log.Error("can't set old head", zap.Error(e)) + } + }) if m := a.Component(metric.CName); m != nil { n.registerMetrics(m.(metric.Metric)) } @@ -119,6 +130,13 @@ func (n *nodeHead) SetHead(spaceId, head string) (part int, err error) { return } +func (n *nodeHead) SetOldHead(spaceId, head string) (part int, err error) { + n.mu.Lock() + defer n.mu.Unlock() + n.oldHashes[spaceId] = head + return +} + func (n *nodeHead) Ranges(ctx context.Context, part int, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) { return n.LDiff(part).Ranges(ctx, ranges, resBuf) } @@ -150,6 +168,15 @@ func (n *nodeHead) GetHead(spaceId string) (hash string, err error) { return el.Head, nil } +func (n *nodeHead) GetOldHead(spaceId string) (hash string, err error) { + n.mu.Lock() + defer n.mu.Unlock() + if hash, ok := n.oldHashes[spaceId]; ok { + return hash, nil + } + return "", ErrSpaceNotFound +} + func (n *nodeHead) ReloadHeadFromStore(spaceId string) error { return n.loadHeadFromStore(spaceId) } diff --git a/nodespace/mock_nodespace/mock_nodespace.go b/nodespace/mock_nodespace/mock_nodespace.go index d8d0a9fe..ca37384b 100644 --- a/nodespace/mock_nodespace/mock_nodespace.go +++ b/nodespace/mock_nodespace/mock_nodespace.go @@ -1,6 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/anyproto/any-sync-node/nodespace (interfaces: Service,NodeSpace) - +// +// Generated by this command: +// +// mockgen -destination mock_nodespace/mock_nodespace.go github.com/anyproto/any-sync-node/nodespace Service,NodeSpace +// // Package mock_nodespace is a generated GoMock package. package mock_nodespace @@ -15,6 +19,7 @@ import ( commonspace "github.com/anyproto/any-sync/commonspace" headsync "github.com/anyproto/any-sync/commonspace/headsync" syncacl "github.com/anyproto/any-sync/commonspace/object/acl/syncacl" + treesyncer "github.com/anyproto/any-sync/commonspace/object/treesyncer" objectsync "github.com/anyproto/any-sync/commonspace/objectsync" objecttreebuilder "github.com/anyproto/any-sync/commonspace/objecttreebuilder" spacestorage "github.com/anyproto/any-sync/commonspace/spacestorage" @@ -71,7 +76,7 @@ func (m *MockService) Close(arg0 context.Context) error { } // Close indicates an expected call of Close. -func (mr *MockServiceMockRecorder) Close(arg0 interface{}) *gomock.Call { +func (mr *MockServiceMockRecorder) Close(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockService)(nil).Close), arg0) } @@ -86,7 +91,7 @@ func (m *MockService) GetSpace(arg0 context.Context, arg1 string) (nodespace.Nod } // GetSpace indicates an expected call of GetSpace. -func (mr *MockServiceMockRecorder) GetSpace(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockServiceMockRecorder) GetSpace(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSpace", reflect.TypeOf((*MockService)(nil).GetSpace), arg0, arg1) } @@ -100,7 +105,7 @@ func (m *MockService) Init(arg0 *app.App) error { } // Init indicates an expected call of Init. -func (mr *MockServiceMockRecorder) Init(arg0 interface{}) *gomock.Call { +func (mr *MockServiceMockRecorder) Init(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockService)(nil).Init), arg0) } @@ -129,7 +134,7 @@ func (m *MockService) PickSpace(arg0 context.Context, arg1 string) (nodespace.No } // PickSpace indicates an expected call of PickSpace. -func (mr *MockServiceMockRecorder) PickSpace(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockServiceMockRecorder) PickSpace(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PickSpace", reflect.TypeOf((*MockService)(nil).PickSpace), arg0, arg1) } @@ -143,7 +148,7 @@ func (m *MockService) Run(arg0 context.Context) error { } // Run indicates an expected call of Run. -func (mr *MockServiceMockRecorder) Run(arg0 interface{}) *gomock.Call { +func (mr *MockServiceMockRecorder) Run(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockService)(nil).Run), arg0) } @@ -236,7 +241,7 @@ func (m *MockNodeSpace) DeleteTree(arg0 context.Context, arg1 string) error { } // DeleteTree indicates an expected call of DeleteTree. -func (mr *MockNodeSpaceMockRecorder) DeleteTree(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockNodeSpaceMockRecorder) DeleteTree(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTree", reflect.TypeOf((*MockNodeSpace)(nil).DeleteTree), arg0, arg1) } @@ -266,7 +271,7 @@ func (m *MockNodeSpace) GetNodePeers(arg0 context.Context) ([]peer.Peer, error) } // GetNodePeers indicates an expected call of GetNodePeers. -func (mr *MockNodeSpaceMockRecorder) GetNodePeers(arg0 interface{}) *gomock.Call { +func (mr *MockNodeSpaceMockRecorder) GetNodePeers(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNodePeers", reflect.TypeOf((*MockNodeSpace)(nil).GetNodePeers), arg0) } @@ -280,7 +285,7 @@ func (m *MockNodeSpace) HandleMessage(arg0 context.Context, arg1 objectsync.Hand } // HandleMessage indicates an expected call of HandleMessage. -func (mr *MockNodeSpaceMockRecorder) HandleMessage(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockNodeSpaceMockRecorder) HandleMessage(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleMessage", reflect.TypeOf((*MockNodeSpace)(nil).HandleMessage), arg0, arg1) } @@ -295,7 +300,7 @@ func (m *MockNodeSpace) HandleRangeRequest(arg0 context.Context, arg1 *spacesync } // HandleRangeRequest indicates an expected call of HandleRangeRequest. -func (mr *MockNodeSpaceMockRecorder) HandleRangeRequest(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockNodeSpaceMockRecorder) HandleRangeRequest(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleRangeRequest", reflect.TypeOf((*MockNodeSpace)(nil).HandleRangeRequest), arg0, arg1) } @@ -310,7 +315,7 @@ func (m *MockNodeSpace) HandleSyncRequest(arg0 context.Context, arg1 *spacesyncp } // HandleSyncRequest indicates an expected call of HandleSyncRequest. -func (mr *MockNodeSpaceMockRecorder) HandleSyncRequest(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockNodeSpaceMockRecorder) HandleSyncRequest(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleSyncRequest", reflect.TypeOf((*MockNodeSpace)(nil).HandleSyncRequest), arg0, arg1) } @@ -338,37 +343,11 @@ func (m *MockNodeSpace) Init(arg0 context.Context) error { } // Init indicates an expected call of Init. -func (mr *MockNodeSpaceMockRecorder) Init(arg0 interface{}) *gomock.Call { +func (mr *MockNodeSpaceMockRecorder) Init(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockNodeSpace)(nil).Init), arg0) } -// IsDeleted mocks base method. -func (m *MockNodeSpace) IsDeleted() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsDeleted") - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsDeleted indicates an expected call of IsDeleted. -func (mr *MockNodeSpaceMockRecorder) IsDeleted() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsDeleted", reflect.TypeOf((*MockNodeSpace)(nil).IsDeleted)) -} - -// SetIsDeleted mocks base method. -func (m *MockNodeSpace) SetIsDeleted(arg0 bool) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetIsDeleted", arg0) -} - -// SetIsDeleted indicates an expected call of SetIsDeleted. -func (mr *MockNodeSpaceMockRecorder) SetIsDeleted(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIsDeleted", reflect.TypeOf((*MockNodeSpace)(nil).SetIsDeleted), arg0) -} - // Storage mocks base method. func (m *MockNodeSpace) Storage() spacestorage.SpaceStorage { m.ctrl.T.Helper() @@ -425,6 +404,20 @@ func (mr *MockNodeSpaceMockRecorder) TreeBuilder() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TreeBuilder", reflect.TypeOf((*MockNodeSpace)(nil).TreeBuilder)) } +// TreeSyncer mocks base method. +func (m *MockNodeSpace) TreeSyncer() treesyncer.TreeSyncer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TreeSyncer") + ret0, _ := ret[0].(treesyncer.TreeSyncer) + return ret0 +} + +// TreeSyncer indicates an expected call of TreeSyncer. +func (mr *MockNodeSpaceMockRecorder) TreeSyncer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TreeSyncer", reflect.TypeOf((*MockNodeSpace)(nil).TreeSyncer)) +} + // TryClose mocks base method. func (m *MockNodeSpace) TryClose(arg0 time.Duration) (bool, error) { m.ctrl.T.Helper() @@ -435,7 +428,7 @@ func (m *MockNodeSpace) TryClose(arg0 time.Duration) (bool, error) { } // TryClose indicates an expected call of TryClose. -func (mr *MockNodeSpaceMockRecorder) TryClose(arg0 interface{}) *gomock.Call { +func (mr *MockNodeSpaceMockRecorder) TryClose(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryClose", reflect.TypeOf((*MockNodeSpace)(nil).TryClose), arg0) } diff --git a/nodespace/rpchandler.go b/nodespace/rpchandler.go index 17ae4009..3a93f1bc 100644 --- a/nodespace/rpchandler.go +++ b/nodespace/rpchandler.go @@ -3,6 +3,9 @@ package nodespace import ( "context" "encoding/hex" + "math" + "time" + "github.com/anyproto/any-sync/commonspace" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/consensus/consensusproto" @@ -12,8 +15,6 @@ import ( "github.com/gogo/protobuf/proto" "go.uber.org/zap" "golang.org/x/exp/slices" - "math" - "time" ) type rpcHandler struct { @@ -248,8 +249,9 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR } func (r *rpcHandler) tryNodeHeadSync(req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse) { - if len(req.Ranges) == 1 { - if req.Ranges[0].From == 0 && req.Ranges[0].To == math.MaxUint64 { + if len(req.Ranges) == 1 && (req.Ranges[0].From == 0 && req.Ranges[0].To == math.MaxUint64) { + switch req.DiffType { + case spacesyncproto.DiffType_Precalculated: hash, err := r.s.nodeHead.GetHead(req.SpaceId) if err != nil { return @@ -259,6 +261,26 @@ func (r *rpcHandler) tryNodeHeadSync(req *spacesyncproto.HeadSyncRequest) (resp return } log.Debug("got head sync with nodehead", zap.String("spaceId", req.SpaceId)) + return &spacesyncproto.HeadSyncResponse{ + DiffType: spacesyncproto.DiffType_Precalculated, + Results: []*spacesyncproto.HeadSyncResult{ + { + Hash: hashB, + // this makes diff not compareResults and create new batch directly (see (d *diff) Diff) + Count: 1, + }, + }, + } + case spacesyncproto.DiffType_Initial: + hash, err := r.s.nodeHead.GetOldHead(req.SpaceId) + if err != nil { + return + } + hashB, err := hex.DecodeString(hash) + if err != nil { + return + } + log.Debug("got head sync with old nodehead", zap.String("spaceId", req.SpaceId)) return &spacesyncproto.HeadSyncResponse{ Results: []*spacesyncproto.HeadSyncResult{ { diff --git a/nodestorage/keys.go b/nodestorage/keys.go index f98c882e..f343b4bc 100644 --- a/nodestorage/keys.go +++ b/nodestorage/keys.go @@ -2,8 +2,9 @@ package nodestorage import ( "fmt" - "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "strings" + + "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" ) type aclKeys struct { @@ -63,6 +64,7 @@ var ( spaceSettingsIdKey = []byte("spaceSettingsId") deletedKey = []byte("spaceDeleted") spaceHashKey = []byte("spaceHash") + oldSpaceHashKey = []byte("oldSpaceHash") ) func (s spaceKeys) SpaceIdKey() []byte { diff --git a/nodestorage/mock_nodestorage/mock_nodestorage.go b/nodestorage/mock_nodestorage/mock_nodestorage.go index 324dd6ba..5df9a568 100644 --- a/nodestorage/mock_nodestorage/mock_nodestorage.go +++ b/nodestorage/mock_nodestorage/mock_nodestorage.go @@ -1,6 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/anyproto/any-sync-node/nodestorage (interfaces: NodeStorage) - +// +// Generated by this command: +// +// mockgen -destination mock_nodestorage/mock_nodestorage.go github.com/anyproto/any-sync-node/nodestorage NodeStorage +// // Package mock_nodestorage is a generated GoMock package. package mock_nodestorage @@ -62,7 +66,7 @@ func (m *MockNodeStorage) CreateSpaceStorage(arg0 spacestorage.SpaceStorageCreat } // CreateSpaceStorage indicates an expected call of CreateSpaceStorage. -func (mr *MockNodeStorageMockRecorder) CreateSpaceStorage(arg0 interface{}) *gomock.Call { +func (mr *MockNodeStorageMockRecorder) CreateSpaceStorage(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSpaceStorage", reflect.TypeOf((*MockNodeStorage)(nil).CreateSpaceStorage), arg0) } @@ -76,7 +80,7 @@ func (m *MockNodeStorage) DeleteSpaceStorage(arg0 context.Context, arg1 string) } // DeleteSpaceStorage indicates an expected call of DeleteSpaceStorage. -func (mr *MockNodeStorageMockRecorder) DeleteSpaceStorage(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockNodeStorageMockRecorder) DeleteSpaceStorage(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSpaceStorage", reflect.TypeOf((*MockNodeStorage)(nil).DeleteSpaceStorage), arg0, arg1) } @@ -104,7 +108,7 @@ func (m *MockNodeStorage) Init(arg0 *app.App) error { } // Init indicates an expected call of Init. -func (mr *MockNodeStorageMockRecorder) Init(arg0 interface{}) *gomock.Call { +func (mr *MockNodeStorageMockRecorder) Init(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockNodeStorage)(nil).Init), arg0) } @@ -130,11 +134,23 @@ func (m *MockNodeStorage) OnWriteHash(arg0 func(context.Context, string, string) } // OnWriteHash indicates an expected call of OnWriteHash. -func (mr *MockNodeStorageMockRecorder) OnWriteHash(arg0 interface{}) *gomock.Call { +func (mr *MockNodeStorageMockRecorder) OnWriteHash(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnWriteHash", reflect.TypeOf((*MockNodeStorage)(nil).OnWriteHash), arg0) } +// OnWriteOldHash mocks base method. +func (m *MockNodeStorage) OnWriteOldHash(arg0 func(context.Context, string, string)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnWriteOldHash", arg0) +} + +// OnWriteOldHash indicates an expected call of OnWriteOldHash. +func (mr *MockNodeStorageMockRecorder) OnWriteOldHash(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnWriteOldHash", reflect.TypeOf((*MockNodeStorage)(nil).OnWriteOldHash), arg0) +} + // SpaceExists mocks base method. func (m *MockNodeStorage) SpaceExists(arg0 string) bool { m.ctrl.T.Helper() @@ -144,7 +160,7 @@ func (m *MockNodeStorage) SpaceExists(arg0 string) bool { } // SpaceExists indicates an expected call of SpaceExists. -func (mr *MockNodeStorageMockRecorder) SpaceExists(arg0 interface{}) *gomock.Call { +func (mr *MockNodeStorageMockRecorder) SpaceExists(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SpaceExists", reflect.TypeOf((*MockNodeStorage)(nil).SpaceExists), arg0) } @@ -159,7 +175,7 @@ func (m *MockNodeStorage) SpaceStorage(arg0 string) (spacestorage.SpaceStorage, } // SpaceStorage indicates an expected call of SpaceStorage. -func (mr *MockNodeStorageMockRecorder) SpaceStorage(arg0 interface{}) *gomock.Call { +func (mr *MockNodeStorageMockRecorder) SpaceStorage(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SpaceStorage", reflect.TypeOf((*MockNodeStorage)(nil).SpaceStorage), arg0) } @@ -173,7 +189,7 @@ func (m *MockNodeStorage) StoreDir(arg0 string) string { } // StoreDir indicates an expected call of StoreDir. -func (mr *MockNodeStorageMockRecorder) StoreDir(arg0 interface{}) *gomock.Call { +func (mr *MockNodeStorageMockRecorder) StoreDir(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreDir", reflect.TypeOf((*MockNodeStorage)(nil).StoreDir), arg0) } @@ -187,7 +203,7 @@ func (m *MockNodeStorage) TryLockAndDo(arg0 string, arg1 func() error) error { } // TryLockAndDo indicates an expected call of TryLockAndDo. -func (mr *MockNodeStorageMockRecorder) TryLockAndDo(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockNodeStorageMockRecorder) TryLockAndDo(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryLockAndDo", reflect.TypeOf((*MockNodeStorage)(nil).TryLockAndDo), arg0, arg1) } @@ -202,7 +218,7 @@ func (m *MockNodeStorage) WaitSpaceStorage(arg0 context.Context, arg1 string) (s } // WaitSpaceStorage indicates an expected call of WaitSpaceStorage. -func (mr *MockNodeStorageMockRecorder) WaitSpaceStorage(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockNodeStorageMockRecorder) WaitSpaceStorage(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitSpaceStorage", reflect.TypeOf((*MockNodeStorage)(nil).WaitSpaceStorage), arg0, arg1) } diff --git a/nodestorage/spacestorage.go b/nodestorage/spacestorage.go index c8cddefe..a17a5c37 100644 --- a/nodestorage/spacestorage.go +++ b/nodestorage/spacestorage.go @@ -266,6 +266,13 @@ func (s *spaceStorage) WriteSpaceHash(hash string) error { return s.objDb.Put(spaceHashKey, []byte(hash)) } +func (s *spaceStorage) WriteOldSpaceHash(hash string) error { + if s.service.onWriteHash != nil { + defer s.service.onWriteOldHash(context.Background(), s.spaceId, hash) + } + return s.objDb.Put(oldSpaceHashKey, []byte(hash)) +} + func (s *spaceStorage) ReadSpaceHash() (hash string, err error) { v, err := s.objDb.Get(spaceHashKey) if err != nil { @@ -274,6 +281,14 @@ func (s *spaceStorage) ReadSpaceHash() (hash string, err error) { return string(v), nil } +func (s *spaceStorage) ReadOldSpaceHash() (hash string, err error) { + v, err := s.objDb.Get(oldSpaceHashKey) + if err != nil { + return "", err + } + return string(v), nil +} + func (s *spaceStorage) Close(ctx context.Context) (err error) { defer s.service.unlockSpaceStorage(s.spaceId) return s.objDb.Close() diff --git a/nodestorage/storageservice.go b/nodestorage/storageservice.go index df08c805..1f648786 100644 --- a/nodestorage/storageservice.go +++ b/nodestorage/storageservice.go @@ -33,6 +33,7 @@ type NodeStorage interface { TryLockAndDo(spaceId string, do func() error) (err error) AllSpaceIds() (ids []string, err error) OnWriteHash(onWrite func(ctx context.Context, spaceId, hash string)) + OnWriteOldHash(onWrite func(ctx context.Context, spaceId, hash string)) StoreDir(spaceId string) (path string) DeleteSpaceStorage(ctx context.Context, spaceId string) error } @@ -43,11 +44,12 @@ type lockSpace struct { } type storageService struct { - rootPath string - delStorage DeletionStorage - onWriteHash func(ctx context.Context, spaceId, hash string) - lockedSpaces map[string]*lockSpace - mu sync.Mutex + rootPath string + delStorage DeletionStorage + onWriteHash func(ctx context.Context, spaceId, hash string) + onWriteOldHash func(ctx context.Context, spaceId, hash string) + lockedSpaces map[string]*lockSpace + mu sync.Mutex } func (s *storageService) Run(ctx context.Context) (err error) { @@ -218,3 +220,7 @@ func (s *storageService) StoreDir(spaceId string) (path string) { func (s *storageService) OnWriteHash(onWrite func(ctx context.Context, spaceId string, hash string)) { s.onWriteHash = onWrite } + +func (s *storageService) OnWriteOldHash(onWrite func(ctx context.Context, spaceId string, hash string)) { + s.onWriteOldHash = onWrite +} diff --git a/nodesync/coldsync/mock_coldsync/mock_coldsync.go b/nodesync/coldsync/mock_coldsync/mock_coldsync.go index 5f83c0be..5d57c00f 100644 --- a/nodesync/coldsync/mock_coldsync/mock_coldsync.go +++ b/nodesync/coldsync/mock_coldsync/mock_coldsync.go @@ -1,6 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/anyproto/any-sync-node/nodesync/coldsync (interfaces: ColdSync) - +// +// Generated by this command: +// +// mockgen -destination mock_coldsync/mock_coldsync.go github.com/anyproto/any-sync-node/nodesync/coldsync ColdSync +// // Package mock_coldsync is a generated GoMock package. package mock_coldsync @@ -45,7 +49,7 @@ func (m *MockColdSync) ColdSyncHandle(arg0 *nodesyncproto.ColdSyncRequest, arg1 } // ColdSyncHandle indicates an expected call of ColdSyncHandle. -func (mr *MockColdSyncMockRecorder) ColdSyncHandle(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockColdSyncMockRecorder) ColdSyncHandle(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdSyncHandle", reflect.TypeOf((*MockColdSync)(nil).ColdSyncHandle), arg0, arg1) } @@ -59,7 +63,7 @@ func (m *MockColdSync) Init(arg0 *app.App) error { } // Init indicates an expected call of Init. -func (mr *MockColdSyncMockRecorder) Init(arg0 interface{}) *gomock.Call { +func (mr *MockColdSyncMockRecorder) Init(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockColdSync)(nil).Init), arg0) } @@ -87,7 +91,7 @@ func (m *MockColdSync) Sync(arg0 context.Context, arg1, arg2 string) error { } // Sync indicates an expected call of Sync. -func (mr *MockColdSyncMockRecorder) Sync(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockColdSyncMockRecorder) Sync(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Sync", reflect.TypeOf((*MockColdSync)(nil).Sync), arg0, arg1, arg2) } diff --git a/nodesync/hotsync/mock_hotsync/mock_hotsync.go b/nodesync/hotsync/mock_hotsync/mock_hotsync.go index 873ae079..99abef33 100644 --- a/nodesync/hotsync/mock_hotsync/mock_hotsync.go +++ b/nodesync/hotsync/mock_hotsync/mock_hotsync.go @@ -1,6 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/anyproto/any-sync-node/nodesync/hotsync (interfaces: HotSync) - +// +// Generated by this command: +// +// mockgen -destination mock_hotsync/mock_hotsync.go github.com/anyproto/any-sync-node/nodesync/hotsync HotSync +// // Package mock_hotsync is a generated GoMock package. package mock_hotsync @@ -45,7 +49,7 @@ func (m *MockHotSync) Close(arg0 context.Context) error { } // Close indicates an expected call of Close. -func (mr *MockHotSyncMockRecorder) Close(arg0 interface{}) *gomock.Call { +func (mr *MockHotSyncMockRecorder) Close(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockHotSync)(nil).Close), arg0) } @@ -59,7 +63,7 @@ func (m *MockHotSync) Init(arg0 *app.App) error { } // Init indicates an expected call of Init. -func (mr *MockHotSyncMockRecorder) Init(arg0 interface{}) *gomock.Call { +func (mr *MockHotSyncMockRecorder) Init(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockHotSync)(nil).Init), arg0) } @@ -87,7 +91,7 @@ func (m *MockHotSync) Run(arg0 context.Context) error { } // Run indicates an expected call of Run. -func (mr *MockHotSyncMockRecorder) Run(arg0 interface{}) *gomock.Call { +func (mr *MockHotSyncMockRecorder) Run(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockHotSync)(nil).Run), arg0) } @@ -99,7 +103,7 @@ func (m *MockHotSync) SetMetric(arg0, arg1 *atomic.Uint32) { } // SetMetric indicates an expected call of SetMetric. -func (mr *MockHotSyncMockRecorder) SetMetric(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockHotSyncMockRecorder) SetMetric(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMetric", reflect.TypeOf((*MockHotSync)(nil).SetMetric), arg0, arg1) } @@ -111,7 +115,7 @@ func (m *MockHotSync) UpdateQueue(arg0 []string) { } // UpdateQueue indicates an expected call of UpdateQueue. -func (mr *MockHotSyncMockRecorder) UpdateQueue(arg0 interface{}) *gomock.Call { +func (mr *MockHotSyncMockRecorder) UpdateQueue(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateQueue", reflect.TypeOf((*MockHotSync)(nil).UpdateQueue), arg0) } diff --git a/nodesync/mock_nodesync/mock_nodesync.go b/nodesync/mock_nodesync/mock_nodesync.go index d24b87d5..0b75c1b0 100644 --- a/nodesync/mock_nodesync/mock_nodesync.go +++ b/nodesync/mock_nodesync/mock_nodesync.go @@ -1,6 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/anyproto/any-sync-node/nodesync (interfaces: NodeSync) - +// +// Generated by this command: +// +// mockgen -destination mock_nodesync/mock_nodesync.go github.com/anyproto/any-sync-node/nodesync NodeSync +// // Package mock_nodesync is a generated GoMock package. package mock_nodesync @@ -44,7 +48,7 @@ func (m *MockNodeSync) Close(arg0 context.Context) error { } // Close indicates an expected call of Close. -func (mr *MockNodeSyncMockRecorder) Close(arg0 interface{}) *gomock.Call { +func (mr *MockNodeSyncMockRecorder) Close(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockNodeSync)(nil).Close), arg0) } @@ -58,7 +62,7 @@ func (m *MockNodeSync) Init(arg0 *app.App) error { } // Init indicates an expected call of Init. -func (mr *MockNodeSyncMockRecorder) Init(arg0 interface{}) *gomock.Call { +func (mr *MockNodeSyncMockRecorder) Init(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockNodeSync)(nil).Init), arg0) } @@ -86,7 +90,7 @@ func (m *MockNodeSync) Run(arg0 context.Context) error { } // Run indicates an expected call of Run. -func (mr *MockNodeSyncMockRecorder) Run(arg0 interface{}) *gomock.Call { +func (mr *MockNodeSyncMockRecorder) Run(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockNodeSync)(nil).Run), arg0) } diff --git a/nodesync/nodediff.go b/nodesync/nodediff.go index 9df15774..1e4b3a90 100644 --- a/nodesync/nodediff.go +++ b/nodesync/nodediff.go @@ -2,10 +2,12 @@ package nodesync import ( "context" - "github.com/anyproto/any-sync-node/nodehead" - "github.com/anyproto/any-sync-node/nodesync/nodesyncproto" + "github.com/anyproto/any-sync/app/ldiff" "golang.org/x/exp/slices" + + "github.com/anyproto/any-sync-node/nodehead" + "github.com/anyproto/any-sync-node/nodesync/nodesyncproto" ) type nodeRemoteDiff struct { @@ -17,9 +19,9 @@ func (n nodeRemoteDiff) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf protoRanges := make([]*nodesyncproto.PartitionSyncRange, len(ranges)) for i, r := range ranges { protoRanges[i] = &nodesyncproto.PartitionSyncRange{ - From: r.From, - To: r.To, - Limit: uint32(r.Limit), + From: r.From, + To: r.To, + Elements: r.Elements, } } req := &nodesyncproto.PartitionSyncRequest{ @@ -61,9 +63,9 @@ func (n *nodeRemoteDiffHandler) PartitionSync(ctx context.Context, req *nodesync var ranges = make([]ldiff.Range, len(req.Ranges)) for i, r := range req.Ranges { ranges[i] = ldiff.Range{ - From: r.From, - To: r.To, - Limit: int(r.Limit), + From: r.From, + To: r.To, + Elements: r.Elements, } } diff --git a/nodesync/nodesyncproto/nodesync.pb.go b/nodesync/nodesyncproto/nodesync.pb.go index 5006ad21..2833ab9a 100644 --- a/nodesync/nodesyncproto/nodesync.pb.go +++ b/nodesync/nodesyncproto/nodesync.pb.go @@ -52,9 +52,10 @@ func (ErrCodes) EnumDescriptor() ([]byte, []int) { // PartitionSyncRange presenting a request for one range type PartitionSyncRange struct { - From uint64 `protobuf:"varint,1,opt,name=from,proto3" json:"from,omitempty"` - To uint64 `protobuf:"varint,2,opt,name=to,proto3" json:"to,omitempty"` - Limit uint32 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"` + From uint64 `protobuf:"varint,1,opt,name=from,proto3" json:"from,omitempty"` + To uint64 `protobuf:"varint,2,opt,name=to,proto3" json:"to,omitempty"` + Limit uint32 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"` + Elements bool `protobuf:"varint,4,opt,name=elements,proto3" json:"elements,omitempty"` } func (m *PartitionSyncRange) Reset() { *m = PartitionSyncRange{} } @@ -111,6 +112,13 @@ func (m *PartitionSyncRange) GetLimit() uint32 { return 0 } +func (m *PartitionSyncRange) GetElements() bool { + if m != nil { + return m.Elements + } + return false +} + // PartitionSyncResult presenting a response for one range type PartitionSyncResult struct { Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` @@ -443,38 +451,39 @@ func init() { } var fileDescriptor_e54c508e329d960e = []byte{ - // 491 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x53, 0x41, 0x6f, 0xd3, 0x30, - 0x14, 0xae, 0xb3, 0xb2, 0x66, 0xaf, 0xdb, 0x88, 0xbc, 0x01, 0x51, 0x05, 0x21, 0xe4, 0x42, 0x05, - 0xa8, 0x9b, 0xba, 0x03, 0x12, 0x27, 0x44, 0x95, 0x43, 0x2f, 0x03, 0x79, 0x02, 0x21, 0x6e, 0x26, - 0x76, 0x59, 0xa4, 0xd4, 0xce, 0x6c, 0x57, 0xa2, 0x77, 0x7e, 0x00, 0x7f, 0x84, 0xff, 0xc1, 0x71, - 0x47, 0x8e, 0xa8, 0xbd, 0xf0, 0x33, 0x90, 0xdd, 0xa4, 0xb4, 0xd5, 0x56, 0x2e, 0xc9, 0xfb, 0x5e, - 0xde, 0xfb, 0xf2, 0x7d, 0x9f, 0x13, 0x78, 0x21, 0x24, 0xe3, 0x7a, 0x2a, 0xb2, 0x93, 0xba, 0x28, - 0x95, 0x34, 0xf2, 0xc4, 0x5d, 0xf5, 0xb2, 0xd9, 0x73, 0x18, 0xb7, 0xa9, 0x98, 0x9e, 0x4b, 0xc6, - 0x2f, 0xa6, 0x22, 0x4b, 0xce, 0x01, 0xbf, 0xa3, 0xca, 0xe4, 0x26, 0x97, 0xc2, 0x36, 0x08, 0x15, - 0x5f, 0x38, 0xc6, 0xd0, 0x1c, 0x29, 0x39, 0x0e, 0x51, 0x8c, 0xba, 0x4d, 0xe2, 0x6a, 0x7c, 0x08, - 0x9e, 0x91, 0xa1, 0xe7, 0x3a, 0x9e, 0x91, 0xf8, 0x18, 0xee, 0x14, 0xf9, 0x38, 0x37, 0xe1, 0x4e, - 0x8c, 0xba, 0x07, 0x64, 0x01, 0x92, 0x6f, 0x08, 0x8e, 0xd6, 0x09, 0xb9, 0x9e, 0x14, 0xc6, 0x32, - 0x5e, 0x52, 0x7d, 0xe9, 0x18, 0xf7, 0x89, 0xab, 0xf1, 0x00, 0x7c, 0x5e, 0xf0, 0x31, 0x17, 0x46, - 0x87, 0x5e, 0xbc, 0xd3, 0x6d, 0xf7, 0x9f, 0xf6, 0x56, 0xb4, 0xf5, 0x6e, 0xe0, 0x49, 0x17, 0xf3, - 0x64, 0xb9, 0x68, 0x65, 0x64, 0x72, 0x22, 0x96, 0x32, 0x1c, 0x48, 0x5e, 0x43, 0xe7, 0xf6, 0x6d, - 0x6b, 0x25, 0x67, 0x4e, 0xca, 0x1e, 0xf1, 0x72, 0xe6, 0xc4, 0x71, 0xca, 0x9c, 0xb9, 0x3d, 0xe2, - 0xea, 0xe4, 0x0a, 0x8e, 0x37, 0x18, 0xae, 0x26, 0x5c, 0x1b, 0x1c, 0x43, 0xbb, 0xac, 0xfb, 0x43, - 0x56, 0x25, 0xb4, 0xda, 0xc2, 0x2f, 0x61, 0x57, 0xd9, 0x14, 0x6b, 0x53, 0x8f, 0xb7, 0x98, 0xb2, - 0x73, 0xa4, 0x1a, 0x4f, 0x2e, 0xe0, 0xde, 0xa6, 0xe8, 0x52, 0x0a, 0xcd, 0xf1, 0x2b, 0x68, 0x29, - 0x67, 0x40, 0x87, 0xc8, 0x51, 0xc6, 0xff, 0xcb, 0x89, 0xd4, 0x0b, 0xc9, 0x73, 0xb8, 0x3b, 0x90, - 0x05, 0x5b, 0xb5, 0x10, 0x42, 0x4b, 0x97, 0x34, 0xe3, 0xc3, 0x3a, 0x83, 0x1a, 0x26, 0x1f, 0x21, - 0xf8, 0x37, 0x5c, 0xbd, 0xbc, 0x03, 0xfe, 0x28, 0x2f, 0xb8, 0xa0, 0x63, 0x5e, 0x05, 0xb4, 0xc4, - 0x36, 0x38, 0x46, 0x0d, 0x75, 0xd9, 0xef, 0x13, 0x57, 0xbb, 0x03, 0x51, 0xd9, 0x59, 0x3f, 0x6c, - 0x56, 0x07, 0x62, 0xc1, 0xb3, 0x14, 0xfc, 0x54, 0xa9, 0x81, 0xfd, 0x12, 0xf1, 0x21, 0xc0, 0x7b, - 0xc1, 0xbf, 0x96, 0x3c, 0x33, 0x9c, 0x05, 0x0d, 0xfc, 0x00, 0x8e, 0xd2, 0x0a, 0x0d, 0xa4, 0x54, - 0x2c, 0x17, 0xd4, 0x48, 0x15, 0x20, 0x1c, 0x40, 0x3b, 0x55, 0x4a, 0xaa, 0xb7, 0xa3, 0x91, 0xe6, - 0x26, 0xf8, 0xd3, 0xea, 0xff, 0x40, 0xe0, 0xd7, 0xbe, 0xf1, 0x07, 0x38, 0x58, 0xb3, 0x8e, 0x9f, - 0x6c, 0x8b, 0xc5, 0x79, 0xef, 0x24, 0x5b, 0x93, 0x5b, 0x38, 0x1e, 0x82, 0x5f, 0xa7, 0x80, 0x1f, - 0xae, 0xcd, 0x6f, 0x24, 0xd9, 0x79, 0x74, 0xcb, 0xd3, 0x05, 0xd1, 0x29, 0x7a, 0x73, 0xfa, 0x73, - 0x16, 0xa1, 0xeb, 0x59, 0x84, 0x7e, 0xcf, 0x22, 0xf4, 0x7d, 0x1e, 0x35, 0xae, 0xe7, 0x51, 0xe3, - 0xd7, 0x3c, 0x6a, 0x7c, 0xba, 0x7f, 0xf3, 0x3f, 0xfb, 0x79, 0xd7, 0xdd, 0xce, 0xfe, 0x06, 0x00, - 0x00, 0xff, 0xff, 0xac, 0x38, 0x6f, 0xec, 0xd4, 0x03, 0x00, 0x00, + // 503 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x53, 0xc1, 0x6e, 0xd3, 0x4c, + 0x10, 0xce, 0xa6, 0xf9, 0x1b, 0x77, 0xd2, 0xf6, 0xb7, 0xb6, 0x05, 0x2c, 0x0b, 0x8c, 0xf1, 0x85, + 0x08, 0x50, 0x5a, 0xa5, 0x07, 0x24, 0x4e, 0x88, 0xc8, 0x87, 0x5c, 0x00, 0x6d, 0x05, 0x42, 0xdc, + 0x16, 0xef, 0x86, 0x1a, 0x39, 0xbb, 0xee, 0xee, 0x46, 0x22, 0x77, 0x1e, 0x80, 0x17, 0xe1, 0x3d, + 0x38, 0xf6, 0xc8, 0x11, 0x25, 0x17, 0x1e, 0x03, 0xed, 0xc6, 0x4e, 0x93, 0xa8, 0x0d, 0x17, 0x7b, + 0xbe, 0xf1, 0xec, 0xcc, 0xf7, 0x7d, 0x3b, 0x86, 0x67, 0x42, 0x32, 0xae, 0xa7, 0x22, 0x3b, 0xa9, + 0x83, 0x52, 0x49, 0x23, 0x4f, 0xdc, 0x53, 0x2f, 0x93, 0x3d, 0x87, 0x71, 0x87, 0x8a, 0xe9, 0x6b, + 0xc9, 0xf8, 0xf9, 0x54, 0x64, 0xc9, 0x17, 0xc0, 0x6f, 0xa9, 0x32, 0xb9, 0xc9, 0xa5, 0xb0, 0x09, + 0x42, 0xc5, 0x67, 0x8e, 0x31, 0xb4, 0x46, 0x4a, 0x8e, 0x03, 0x14, 0xa3, 0x6e, 0x8b, 0xb8, 0x18, + 0x1f, 0x42, 0xd3, 0xc8, 0xa0, 0xe9, 0x32, 0x4d, 0x23, 0xf1, 0x31, 0xfc, 0x57, 0xe4, 0xe3, 0xdc, + 0x04, 0x3b, 0x31, 0xea, 0x1e, 0x90, 0x05, 0xc0, 0x21, 0x78, 0xbc, 0xe0, 0x63, 0x2e, 0x8c, 0x0e, + 0x5a, 0x31, 0xea, 0x7a, 0x64, 0x89, 0x93, 0x6f, 0x08, 0x8e, 0xd6, 0x87, 0x71, 0x3d, 0x29, 0x8c, + 0x9d, 0x76, 0x41, 0xf5, 0x85, 0x9b, 0xb6, 0x4f, 0x5c, 0x8c, 0x07, 0x2b, 0x7d, 0x9a, 0xf1, 0x4e, + 0xb7, 0xd3, 0x7f, 0xdc, 0x5b, 0xe1, 0xdd, 0xbb, 0xa1, 0x4f, 0xba, 0xa8, 0xbf, 0x1e, 0x68, 0x29, + 0x66, 0x72, 0x22, 0x96, 0x14, 0x1d, 0x48, 0x5e, 0x42, 0x78, 0xfb, 0x69, 0x2b, 0x33, 0x67, 0x8e, + 0xca, 0x1e, 0x69, 0xe6, 0xcc, 0x91, 0xe3, 0x94, 0x39, 0xe1, 0x7b, 0xc4, 0xc5, 0xc9, 0x25, 0x1c, + 0x6f, 0x74, 0xb8, 0x9c, 0x70, 0x6d, 0x70, 0x0c, 0x9d, 0xb2, 0xce, 0x0f, 0x59, 0xe5, 0xde, 0x6a, + 0x0a, 0x3f, 0x87, 0x5d, 0x65, 0x1d, 0xae, 0x45, 0x3d, 0xdc, 0x22, 0xca, 0xd6, 0x91, 0xaa, 0x3c, + 0x39, 0x87, 0x3b, 0x9b, 0xa4, 0x4b, 0x29, 0x34, 0xc7, 0x2f, 0xa0, 0xad, 0x9c, 0x00, 0x1d, 0x20, + 0xd7, 0x32, 0xfe, 0x97, 0x4f, 0xa4, 0x3e, 0x90, 0x3c, 0x85, 0xff, 0x07, 0xb2, 0x60, 0xab, 0x12, + 0x02, 0x68, 0xeb, 0x92, 0x66, 0x7c, 0x58, 0x7b, 0x50, 0xc3, 0xe4, 0x03, 0xf8, 0xd7, 0xc5, 0xd5, + 0xf0, 0x10, 0xbc, 0x51, 0x5e, 0x70, 0x41, 0xc7, 0xbc, 0x32, 0x68, 0x89, 0xad, 0x71, 0x8c, 0x1a, + 0xea, 0xbc, 0xdf, 0x27, 0x2e, 0x76, 0x17, 0xa2, 0xb2, 0xb3, 0xbe, 0x5b, 0x0d, 0x7b, 0x21, 0x16, + 0x3c, 0x49, 0xc1, 0x4b, 0x95, 0x1a, 0xd8, 0x2d, 0xc5, 0x87, 0x00, 0xef, 0x04, 0xff, 0x5a, 0xf2, + 0xcc, 0x70, 0xe6, 0x37, 0xf0, 0x3d, 0x38, 0x4a, 0x2b, 0x34, 0x90, 0x52, 0xb1, 0x5c, 0x50, 0x23, + 0x95, 0x8f, 0xb0, 0x0f, 0x9d, 0x54, 0x29, 0xa9, 0xde, 0x8c, 0x46, 0x9a, 0x1b, 0xff, 0x4f, 0xbb, + 0xff, 0x03, 0x81, 0x57, 0xeb, 0xc6, 0xef, 0xe1, 0x60, 0x4d, 0x3a, 0x7e, 0xb4, 0xcd, 0x16, 0xa7, + 0x3d, 0x4c, 0xb6, 0x3a, 0xb7, 0x50, 0x3c, 0x04, 0xaf, 0x76, 0x01, 0xdf, 0x5f, 0xab, 0xdf, 0x70, + 0x32, 0x7c, 0x70, 0xcb, 0xd7, 0x45, 0xa3, 0x53, 0xf4, 0xea, 0xf4, 0xe7, 0x2c, 0x42, 0x57, 0xb3, + 0x08, 0xfd, 0x9e, 0x45, 0xe8, 0xfb, 0x3c, 0x6a, 0x5c, 0xcd, 0xa3, 0xc6, 0xaf, 0x79, 0xd4, 0xf8, + 0x78, 0xf7, 0xe6, 0xff, 0xf9, 0xd3, 0xae, 0x7b, 0x9d, 0xfd, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x8e, + 0xc4, 0xd1, 0xd9, 0xf0, 0x03, 0x00, 0x00, } func (m *PartitionSyncRange) Marshal() (dAtA []byte, err error) { @@ -497,6 +506,16 @@ func (m *PartitionSyncRange) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Elements { + i-- + if m.Elements { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x20 + } if m.Limit != 0 { i = encodeVarintNodesync(dAtA, i, uint64(m.Limit)) i-- @@ -778,6 +797,9 @@ func (m *PartitionSyncRange) Size() (n int) { if m.Limit != 0 { n += 1 + sovNodesync(uint64(m.Limit)) } + if m.Elements { + n += 2 + } return n } @@ -978,6 +1000,26 @@ func (m *PartitionSyncRange) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Elements", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNodesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Elements = bool(v != 0) default: iNdEx = preIndex skippy, err := skipNodesync(dAtA[iNdEx:]) diff --git a/nodesync/nodesyncproto/protos/nodesync.proto b/nodesync/nodesyncproto/protos/nodesync.proto index 8f864f1f..05c1286f 100644 --- a/nodesync/nodesyncproto/protos/nodesync.proto +++ b/nodesync/nodesyncproto/protos/nodesync.proto @@ -21,6 +21,7 @@ message PartitionSyncRange { uint64 from = 1; uint64 to = 2; uint32 limit = 3; + bool elements = 4; } // PartitionSyncResult presenting a response for one range From 89dc6c5b067bd99d80a1ab78306d0d83d2564452 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 1 Dec 2023 11:59:19 +0100 Subject: [PATCH 2/6] Fix test --- nodesync/nodesync_test.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/nodesync/nodesync_test.go b/nodesync/nodesync_test.go index 12963a45..fac3abd6 100644 --- a/nodesync/nodesync_test.go +++ b/nodesync/nodesync_test.go @@ -2,14 +2,9 @@ package nodesync import ( "context" - "github.com/anyproto/any-sync-node/nodehead" - "github.com/anyproto/any-sync-node/nodehead/mock_nodehead" - "github.com/anyproto/any-sync-node/nodespace" - "github.com/anyproto/any-sync-node/nodespace/mock_nodespace" - "github.com/anyproto/any-sync-node/nodesync/coldsync" - "github.com/anyproto/any-sync-node/nodesync/coldsync/mock_coldsync" - "github.com/anyproto/any-sync-node/nodesync/hotsync" - "github.com/anyproto/any-sync-node/nodesync/hotsync/mock_hotsync" + "testing" + "time" + "github.com/anyproto/any-sync/accountservice" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/ldiff" @@ -24,8 +19,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - "testing" - "time" + + "github.com/anyproto/any-sync-node/nodehead" + "github.com/anyproto/any-sync-node/nodehead/mock_nodehead" + "github.com/anyproto/any-sync-node/nodespace" + "github.com/anyproto/any-sync-node/nodespace/mock_nodespace" + "github.com/anyproto/any-sync-node/nodesync/coldsync" + "github.com/anyproto/any-sync-node/nodesync/coldsync/mock_coldsync" + "github.com/anyproto/any-sync-node/nodesync/hotsync" + "github.com/anyproto/any-sync-node/nodesync/hotsync/mock_hotsync" ) var ctx = context.Background() @@ -85,8 +87,10 @@ func TestNodeSync_Sync(t *testing.T) { for i := 0; i < nodeconf.PartitionCount; i++ { if i == 0 { + // calling twice because we now do a twostep diff + // the same as actually was done with real nodes before fx1.nodeHead.EXPECT().LDiff(i).Return(ld1) - fx2.nodeHead.EXPECT().LDiff(i).Return(ld2) + fx2.nodeHead.EXPECT().LDiff(i).Times(2).Return(ld2) } else { fx1.nodeHead.EXPECT().LDiff(i).Return(emptyLdiff) fx2.nodeHead.EXPECT().LDiff(i).Return(emptyLdiff) From eb90389491b3905219b7569ee62eb08ac93ed148 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 1 Dec 2023 22:44:23 +0100 Subject: [PATCH 3/6] Populate old hashes on start --- go.mod | 2 +- go.sum | 2 ++ nodehead/nodehead.go | 11 +++++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index e7ffa600..5f6e5ce6 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/ahmetb/govvv v0.3.0 github.com/akrylysov/pogreb v0.10.1 - github.com/anyproto/any-sync v0.3.11-0.20231130232324-a8a62b092308 + github.com/anyproto/any-sync v0.3.11-0.20231201204216-39e28159bb73 github.com/anyproto/go-chash v0.1.0 github.com/gogo/protobuf v1.3.2 github.com/prometheus/client_golang v1.17.0 diff --git a/go.sum b/go.sum index 0257c2ca..1060407f 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/anyproto/any-sync v0.3.10 h1:OaumDhf136XN5gHAsRm5kQm3z0Rvk+VdR3jMwW1L github.com/anyproto/any-sync v0.3.10/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= github.com/anyproto/any-sync v0.3.11-0.20231130232324-a8a62b092308 h1:lKAbSBPiB8G0HweCMm02obccnIps6AIF/RL3elMhCQo= github.com/anyproto/any-sync v0.3.11-0.20231130232324-a8a62b092308/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= +github.com/anyproto/any-sync v0.3.11-0.20231201204216-39e28159bb73 h1:cctETuWFXH1XPUXd5iYgdoE2DY6uKMjfB3eSzQPVhB0= +github.com/anyproto/any-sync v0.3.11-0.20231201204216-39e28159bb73/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= github.com/anyproto/go-chash v0.1.0 h1:I9meTPjXFRfXZHRJzjOHC/XF7Q5vzysKkiT/grsogXY= github.com/anyproto/go-chash v0.1.0/go.mod h1:0UjNQi3PDazP0fINpFYu6VKhuna+W/V+1vpXHAfNgLY= github.com/anyproto/go-slip10 v1.0.0 h1:uAEtSuudR3jJBOfkOXf3bErxVoxbuKwdoJN55M1i6IA= diff --git a/nodehead/nodehead.go b/nodehead/nodehead.go index 2839f5a5..bf237b0c 100644 --- a/nodehead/nodehead.go +++ b/nodehead/nodehead.go @@ -114,6 +114,17 @@ func (n *nodeHead) loadHeadFromStore(spaceId string) (err error) { if _, err = n.SetHead(spaceId, hash); err != nil { return } + oldHash, err := ss.ReadOldSpaceHash() + if err != nil { + return + } + // that means that the hash was not set before + if oldHash == "" { + oldHash = hash + } + if _, err = n.SetOldHead(spaceId, oldHash); err != nil { + return + } return } From e27c9764f25bb9b656615198fc460d753bcb849b Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 1 Dec 2023 23:10:59 +0100 Subject: [PATCH 4/6] Remove heads on storage delete --- go.mod | 2 +- go.sum | 8 +--- nodehead/mock_nodehead/mock_nodehead.go | 14 ++++++ nodehead/nodehead.go | 20 +++++++++ nodehead/nodehead_test.go | 43 +++++++++++++++++-- .../mock_nodestorage/mock_nodestorage.go | 12 ++++++ nodestorage/storageservice.go | 19 +++++--- 7 files changed, 101 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 5f6e5ce6..208fa2df 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/ahmetb/govvv v0.3.0 github.com/akrylysov/pogreb v0.10.1 - github.com/anyproto/any-sync v0.3.11-0.20231201204216-39e28159bb73 + github.com/anyproto/any-sync v0.3.11-0.20231201214252-51f44b8032bf github.com/anyproto/go-chash v0.1.0 github.com/gogo/protobuf v1.3.2 github.com/prometheus/client_golang v1.17.0 diff --git a/go.sum b/go.sum index 1060407f..7c92ce52 100644 --- a/go.sum +++ b/go.sum @@ -6,12 +6,8 @@ github.com/ahmetb/govvv v0.3.0 h1:YGLGwEyiUwHFy5eh/RUhdupbuaCGBYn5T5GWXp+WJB0= github.com/ahmetb/govvv v0.3.0/go.mod h1:4WRFpdWtc/YtKgPFwa1dr5+9hiRY5uKAL08bOlxOR6s= github.com/akrylysov/pogreb v0.10.1 h1:FqlR8VR7uCbJdfUob916tPM+idpKgeESDXOA1K0DK4w= github.com/akrylysov/pogreb v0.10.1/go.mod h1:pNs6QmpQ1UlTJKDezuRWmaqkgUE2TuU0YTWyqJZ7+lI= -github.com/anyproto/any-sync v0.3.10 h1:OaumDhf136XN5gHAsRm5kQm3z0Rvk+VdR3jMwW1LrnA= -github.com/anyproto/any-sync v0.3.10/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= -github.com/anyproto/any-sync v0.3.11-0.20231130232324-a8a62b092308 h1:lKAbSBPiB8G0HweCMm02obccnIps6AIF/RL3elMhCQo= -github.com/anyproto/any-sync v0.3.11-0.20231130232324-a8a62b092308/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= -github.com/anyproto/any-sync v0.3.11-0.20231201204216-39e28159bb73 h1:cctETuWFXH1XPUXd5iYgdoE2DY6uKMjfB3eSzQPVhB0= -github.com/anyproto/any-sync v0.3.11-0.20231201204216-39e28159bb73/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= +github.com/anyproto/any-sync v0.3.11-0.20231201214252-51f44b8032bf h1:Qo86yY73aYcAIxU8Kt+r0KftzEsx03N6uo7I4vN26VU= +github.com/anyproto/any-sync v0.3.11-0.20231201214252-51f44b8032bf/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= github.com/anyproto/go-chash v0.1.0 h1:I9meTPjXFRfXZHRJzjOHC/XF7Q5vzysKkiT/grsogXY= github.com/anyproto/go-chash v0.1.0/go.mod h1:0UjNQi3PDazP0fINpFYu6VKhuna+W/V+1vpXHAfNgLY= github.com/anyproto/go-slip10 v1.0.0 h1:uAEtSuudR3jJBOfkOXf3bErxVoxbuKwdoJN55M1i6IA= diff --git a/nodehead/mock_nodehead/mock_nodehead.go b/nodehead/mock_nodehead/mock_nodehead.go index 15842c70..1fce442d 100644 --- a/nodehead/mock_nodehead/mock_nodehead.go +++ b/nodehead/mock_nodehead/mock_nodehead.go @@ -54,6 +54,20 @@ func (mr *MockNodeHeadMockRecorder) Close(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockNodeHead)(nil).Close), arg0) } +// DeleteHeads mocks base method. +func (m *MockNodeHead) DeleteHeads(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteHeads", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteHeads indicates an expected call of DeleteHeads. +func (mr *MockNodeHeadMockRecorder) DeleteHeads(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteHeads", reflect.TypeOf((*MockNodeHead)(nil).DeleteHeads), arg0) +} + // GetHead mocks base method. func (m *MockNodeHead) GetHead(arg0 string) (string, error) { m.ctrl.T.Helper() diff --git a/nodehead/nodehead.go b/nodehead/nodehead.go index bf237b0c..13abcca1 100644 --- a/nodehead/nodehead.go +++ b/nodehead/nodehead.go @@ -39,6 +39,7 @@ type NodeHead interface { GetHead(spaceId string) (head string, err error) SetOldHead(spaceId, head string) (part int, err error) GetOldHead(spaceId string) (head string, err error) + DeleteHeads(spaceId string) error ReloadHeadFromStore(spaceId string) error LDiff(partId int) ldiff.Diff Ranges(ctx context.Context, part int, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) @@ -68,6 +69,11 @@ func (n *nodeHead) Init(a *app.App) (err error) { log.Error("can't set old head", zap.Error(e)) } }) + n.spaceStore.OnDeleteStorage(func(_ context.Context, spaceId string) { + if e := n.DeleteHeads(spaceId); e != nil { + log.Error("can't delete space from nodehead", zap.Error(e)) + } + }) if m := a.Component(metric.CName); m != nil { n.registerMetrics(m.(metric.Metric)) } @@ -128,6 +134,17 @@ func (n *nodeHead) loadHeadFromStore(spaceId string) (err error) { return } +func (n *nodeHead) DeleteHeads(spaceId string) error { + n.mu.Lock() + defer n.mu.Unlock() + delete(n.oldHashes, spaceId) + part := n.nodeconf.Partition(spaceId) + if ld, ok := n.partitions[part]; ok { + return ld.RemoveId(spaceId) + } + return nil +} + func (n *nodeHead) SetHead(spaceId, head string) (part int, err error) { part = n.nodeconf.Partition(spaceId) n.mu.Lock() @@ -174,6 +191,9 @@ func (n *nodeHead) GetHead(spaceId string) (hash string, err error) { n.mu.Unlock() el, err := ld.Element(spaceId) if err != nil { + if errors.Is(err, ldiff.ErrElementNotFound) { + return "", ErrSpaceNotFound + } return } return el.Head, nil diff --git a/nodehead/nodehead_test.go b/nodehead/nodehead_test.go index e146447a..ae160243 100644 --- a/nodehead/nodehead_test.go +++ b/nodehead/nodehead_test.go @@ -3,7 +3,10 @@ package nodehead import ( "context" "encoding/hex" - "github.com/anyproto/any-sync-node/nodestorage" + "math" + "os" + "testing" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" @@ -17,9 +20,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - "math" - "os" - "testing" + + "github.com/anyproto/any-sync-node/nodestorage" ) var ctx = context.Background() @@ -100,6 +102,39 @@ func TestNodeHead_GetSpaceHash(t *testing.T) { assert.Equal(t, ErrSpaceNotFound, err) } +func TestNodeHead_GetOldSpaceHash(t *testing.T) { + fx := newFixture(t, "") + defer fx.Finish(t) + hash := "af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262" + _, err := fx.SetOldHead("space1", hash) + require.NoError(t, err) + + head, err := fx.GetOldHead("space1") + require.NoError(t, err) + assert.Equal(t, hash, head) + + _, err = fx.GetOldHead("not found") + assert.Equal(t, ErrSpaceNotFound, err) +} + +func TestNodeHead_DeleteHeads(t *testing.T) { + fx := newFixture(t, "") + defer fx.Finish(t) + hash := "af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262" + _, err := fx.SetHead("space1", hash) + require.NoError(t, err) + _, err = fx.SetOldHead("space1", hash) + require.NoError(t, err) + + err = fx.NodeHead.(*nodeHead).DeleteHeads("space1") + require.NoError(t, err) + + _, err = fx.GetHead("space1") + assert.Equal(t, ErrSpaceNotFound, err) + _, err = fx.GetOldHead("space1") + assert.Equal(t, ErrSpaceNotFound, err) +} + func newFixture(t *testing.T, dataPath string) *fixture { var tmpDir string if dataPath != "" { diff --git a/nodestorage/mock_nodestorage/mock_nodestorage.go b/nodestorage/mock_nodestorage/mock_nodestorage.go index 5df9a568..156bba67 100644 --- a/nodestorage/mock_nodestorage/mock_nodestorage.go +++ b/nodestorage/mock_nodestorage/mock_nodestorage.go @@ -127,6 +127,18 @@ func (mr *MockNodeStorageMockRecorder) Name() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockNodeStorage)(nil).Name)) } +// OnDeleteStorage mocks base method. +func (m *MockNodeStorage) OnDeleteStorage(arg0 func(context.Context, string)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnDeleteStorage", arg0) +} + +// OnDeleteStorage indicates an expected call of OnDeleteStorage. +func (mr *MockNodeStorageMockRecorder) OnDeleteStorage(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnDeleteStorage", reflect.TypeOf((*MockNodeStorage)(nil).OnDeleteStorage), arg0) +} + // OnWriteHash mocks base method. func (m *MockNodeStorage) OnWriteHash(arg0 func(context.Context, string, string)) { m.ctrl.T.Helper() diff --git a/nodestorage/storageservice.go b/nodestorage/storageservice.go index 1f648786..82f9ec7a 100644 --- a/nodestorage/storageservice.go +++ b/nodestorage/storageservice.go @@ -32,6 +32,7 @@ type NodeStorage interface { SpaceStorage(spaceId string) (spacestorage.SpaceStorage, error) TryLockAndDo(spaceId string, do func() error) (err error) AllSpaceIds() (ids []string, err error) + OnDeleteStorage(onDelete func(ctx context.Context, spaceId string)) OnWriteHash(onWrite func(ctx context.Context, spaceId, hash string)) OnWriteOldHash(onWrite func(ctx context.Context, spaceId, hash string)) StoreDir(spaceId string) (path string) @@ -44,12 +45,13 @@ type lockSpace struct { } type storageService struct { - rootPath string - delStorage DeletionStorage - onWriteHash func(ctx context.Context, spaceId, hash string) - onWriteOldHash func(ctx context.Context, spaceId, hash string) - lockedSpaces map[string]*lockSpace - mu sync.Mutex + rootPath string + delStorage DeletionStorage + onWriteHash func(ctx context.Context, spaceId, hash string) + onDeleteStorage func(ctx context.Context, spaceId string) + onWriteOldHash func(ctx context.Context, spaceId, hash string) + lockedSpaces map[string]*lockSpace + mu sync.Mutex } func (s *storageService) Run(ctx context.Context) (err error) { @@ -165,6 +167,7 @@ func (s *storageService) DeleteSpaceStorage(ctx context.Context, spaceId string) } return fmt.Errorf("can't delete datadir '%s': %w", dbPath, err) } + s.onDeleteStorage(ctx, spaceId) return os.RemoveAll(dbPath) }) if err == nil { @@ -224,3 +227,7 @@ func (s *storageService) OnWriteHash(onWrite func(ctx context.Context, spaceId s func (s *storageService) OnWriteOldHash(onWrite func(ctx context.Context, spaceId string, hash string)) { s.onWriteOldHash = onWrite } + +func (s *storageService) OnDeleteStorage(onDelete func(ctx context.Context, spaceId string)) { + s.onDeleteStorage = onDelete +} From cb8af980d4ef7a5c612166517f0f08672c122c95 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 1 Dec 2023 23:33:25 +0100 Subject: [PATCH 5/6] Check onDeleteStorage not nil --- nodestorage/storageservice.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nodestorage/storageservice.go b/nodestorage/storageservice.go index 82f9ec7a..f9cf522c 100644 --- a/nodestorage/storageservice.go +++ b/nodestorage/storageservice.go @@ -167,7 +167,9 @@ func (s *storageService) DeleteSpaceStorage(ctx context.Context, spaceId string) } return fmt.Errorf("can't delete datadir '%s': %w", dbPath, err) } - s.onDeleteStorage(ctx, spaceId) + if s.onDeleteStorage != nil { + s.onDeleteStorage(ctx, spaceId) + } return os.RemoveAll(dbPath) }) if err == nil { From cb1360ef6e21333fd47f17772e10f5c8eb6684d3 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 4 Dec 2023 15:01:28 +0100 Subject: [PATCH 6/6] Update go.mod --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 208fa2df..2c71ee53 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/ahmetb/govvv v0.3.0 github.com/akrylysov/pogreb v0.10.1 - github.com/anyproto/any-sync v0.3.11-0.20231201214252-51f44b8032bf + github.com/anyproto/any-sync v0.3.11 github.com/anyproto/go-chash v0.1.0 github.com/gogo/protobuf v1.3.2 github.com/prometheus/client_golang v1.17.0 diff --git a/go.sum b/go.sum index 7c92ce52..8f4cc752 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/ahmetb/govvv v0.3.0 h1:YGLGwEyiUwHFy5eh/RUhdupbuaCGBYn5T5GWXp+WJB0= github.com/ahmetb/govvv v0.3.0/go.mod h1:4WRFpdWtc/YtKgPFwa1dr5+9hiRY5uKAL08bOlxOR6s= github.com/akrylysov/pogreb v0.10.1 h1:FqlR8VR7uCbJdfUob916tPM+idpKgeESDXOA1K0DK4w= github.com/akrylysov/pogreb v0.10.1/go.mod h1:pNs6QmpQ1UlTJKDezuRWmaqkgUE2TuU0YTWyqJZ7+lI= -github.com/anyproto/any-sync v0.3.11-0.20231201214252-51f44b8032bf h1:Qo86yY73aYcAIxU8Kt+r0KftzEsx03N6uo7I4vN26VU= -github.com/anyproto/any-sync v0.3.11-0.20231201214252-51f44b8032bf/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= +github.com/anyproto/any-sync v0.3.11 h1:Y++bHerO8nAh1b1z/xUAoFFXXJ5UI7duSa0LRvmmFXc= +github.com/anyproto/any-sync v0.3.11/go.mod h1:ZM/hfNni7fdzSYzDqSg8QyO1iIxWskWjYiykT1qaceo= github.com/anyproto/go-chash v0.1.0 h1:I9meTPjXFRfXZHRJzjOHC/XF7Q5vzysKkiT/grsogXY= github.com/anyproto/go-chash v0.1.0/go.mod h1:0UjNQi3PDazP0fINpFYu6VKhuna+W/V+1vpXHAfNgLY= github.com/anyproto/go-slip10 v1.0.0 h1:uAEtSuudR3jJBOfkOXf3bErxVoxbuKwdoJN55M1i6IA=