Skip to content

Commit

Permalink
Fix duplicate changes when syncing and detaching (#896)
Browse files Browse the repository at this point in the history
This commit addresses the issue of duplicate changes being inserted
when PushPull and Detach occur simultaneously. Previously, there was
logic to filter out duplicate changes in PushPull, but during Detach,
ClientInfo's Checkpoint was set to 0, preventing the filtering of
duplicates.

This commit adjusts the order of updates to filter out duplicate
changes before updating ClientInfo's Checkpoint, resolving the problem.
  • Loading branch information
hackerwins authored Jun 14, 2024
1 parent 23b3662 commit db1b28a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 32 deletions.
34 changes: 27 additions & 7 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ func SnapshotKey(projectID types.ID, docKey key.Key) sync.Key {
return sync.NewKey(fmt.Sprintf("snapshot-%s-%s", projectID, docKey))
}

// PushPullOptions represents the options for PushPull.
type PushPullOptions struct {
// Mode represents the sync mode.
Mode types.SyncMode

// Status represents the status of the document to be updated.
Status document.StatusType
}

// PushPull stores the given changes and returns accumulated changes of the
// given document.
//
Expand All @@ -59,7 +68,7 @@ func PushPull(
clientInfo *database.ClientInfo,
docInfo *database.DocInfo,
reqPack *change.Pack,
mode types.SyncMode,
opts PushPullOptions,
) (*ServerPack, error) {
start := gotime.Now()
defer func() {
Expand All @@ -76,20 +85,31 @@ func PushPull(
be.Metrics.AddPushPullReceivedOperations(reqPack.OperationsLen())

// 02. pull pack: pull changes or a snapshot from the database and create a response pack.
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq, mode)
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq, opts.Mode)
if err != nil {
return nil, err
}
be.Metrics.AddPushPullSentChanges(respPack.ChangesLen())
be.Metrics.AddPushPullSentOperations(respPack.OperationsLen())
be.Metrics.AddPushPullSnapshotBytes(respPack.SnapshotLen())

// 03. update the client's document and checkpoint.
docRefKey := docInfo.RefKey()
if err := clientInfo.UpdateCheckpoint(docRefKey.DocID, respPack.Checkpoint); err != nil {
return nil, err
if opts.Status == document.StatusRemoved {
if err := clientInfo.RemoveDocument(docInfo.ID); err != nil {
return nil, err
}
} else if opts.Status == document.StatusDetached {
if err := clientInfo.DetachDocument(docInfo.ID); err != nil {
return nil, err
}
} else {
if err := clientInfo.UpdateCheckpoint(docRefKey.DocID, respPack.Checkpoint); err != nil {
return nil, err
}
}

// 03. store pushed changes, docInfo and checkpoint of the client to DB.
// 04. store pushed changes, docInfo and checkpoint of the client to DB.
if len(pushedChanges) > 0 || reqPack.IsRemoved {
if err := be.DB.CreateChangeInfos(
ctx,
Expand All @@ -107,7 +127,7 @@ func PushPull(
return nil, err
}

// 04. update and find min synced ticket for garbage collection.
// 05. update and find min synced ticket for garbage collection.
// NOTE(hackerwins): Since the client could not receive the response, the
// requested seq(reqPack) is stored instead of the response seq(resPack).
minSyncedTicket, err := be.DB.UpdateAndFindMinSyncedTicket(
Expand Down Expand Up @@ -135,7 +155,7 @@ func PushPull(
minSyncedTicket.ToTestString(),
)

// 05. publish document change event then store snapshot asynchronously.
// 06. publish document change event then store snapshot asynchronously.
if len(pushedChanges) > 0 || reqPack.IsRemoved {
be.Background.AttachGoroutine(func(ctx context.Context) {
publisherID, err := clientInfo.ID.ToActorID()
Expand Down
44 changes: 25 additions & 19 deletions server/rpc/yorkie_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend"
Expand Down Expand Up @@ -160,7 +161,10 @@ func (s *yorkieServer) AttachDocument(
return nil, err
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -243,18 +247,18 @@ func (s *yorkieServer) DetachDocument(
return nil, err
}

var status document.StatusType
if req.Msg.RemoveIfNotAttached && !isAttached {
pack.IsRemoved = true
if err := clientInfo.RemoveDocument(docInfo.ID); err != nil {
return nil, err
}
status = document.StatusRemoved
} else {
if err := clientInfo.DetachDocument(docInfo.ID); err != nil {
return nil, err
}
status = document.StatusDetached
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: status,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -316,11 +320,6 @@ func (s *yorkieServer) PushPullChanges(
}()
}

syncMode := types.SyncModePushPull
if req.Msg.PushOnly {
syncMode = types.SyncModePushOnly
}

clientInfo, err := clients.FindActiveClientInfo(ctx, s.backend.DB, types.ClientRefKey{
ProjectID: project.ID,
ClientID: types.IDFromActorID(actorID),
Expand All @@ -342,7 +341,15 @@ func (s *yorkieServer) PushPullChanges(
return nil, err
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, syncMode)
syncMode := types.SyncModePushPull
if req.Msg.PushOnly {
syncMode = types.SyncModePushOnly
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: syncMode,
Status: document.StatusAttached,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -531,11 +538,10 @@ func (s *yorkieServer) RemoveDocument(
return nil, err
}

if err := clientInfo.RemoveDocument(docInfo.ID); err != nil {
return nil, err
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusRemoved,
})
if err != nil {
return nil, err
}
Expand Down
30 changes: 24 additions & 6 deletions test/bench/push_pull_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ func benchmarkPushChanges(
assert.NoError(b, err)
b.StartTimer()

_, err = packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pack, types.SyncModePushPull)
_, err = packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)
}
}
Expand All @@ -171,14 +174,20 @@ func benchmarkPullChanges(
}
docInfo, err := documents.FindDocInfoByRefKey(ctx, be, docRefKey)
assert.NoError(b, err)
_, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, types.SyncModePushPull)
_, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)

docInfo, err = documents.FindDocInfoByRefKey(ctx, be, docRefKey)
assert.NoError(b, err)
b.StartTimer()

_, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, types.SyncModePushPull)
_, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)
}
}
Expand Down Expand Up @@ -208,7 +217,10 @@ func benchmarkPushSnapshots(
assert.NoError(b, err)
b.StartTimer()

pulled, err := packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pushPack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pushPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)

b.StopTimer()
Expand Down Expand Up @@ -244,14 +256,20 @@ func benchmarkPullSnapshot(
}
docInfo, err := documents.FindDocInfoByRefKey(ctx, be, docRefKey)
assert.NoError(b, err)
_, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, types.SyncModePushPull)
_, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)

docInfo, err = documents.FindDocInfoByRefKey(ctx, be, docRefKey)
assert.NoError(b, err)
b.StartTimer()

_, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, types.SyncModePushPull)
_, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)
}
}
Expand Down

0 comments on commit db1b28a

Please sign in to comment.