Skip to content

Commit

Permalink
Change multiple docs to single in DocEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Mar 17, 2023
1 parent 1713c2c commit 2285241
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 256 deletions.
10 changes: 5 additions & 5 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,16 @@ func FromDocEvent(docEvent *api.DocEvent) (*sync.DocEvent, error) {
return nil, err
}

documentIDs, err := FromDocumentIDs(docEvent.DocumentIds)
documentIDs, err := FromDocumentID(docEvent.DocumentId)
if err != nil {
return nil, err
}

return &sync.DocEvent{
Type: eventType,
Publisher: *client,
DocumentIDs: documentIDs,
DocumentKeys: FromDocumentKeys(docEvent.DocumentKeys),
Type: eventType,
Publisher: *client,
DocumentID: documentIDs,
DocumentKey: FromDocumentKey(docEvent.DocumentKey),
}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ func ToDocEvent(docEvent sync.DocEvent) (*api.DocEvent, error) {
}

return &api.DocEvent{
Type: eventType,
Publisher: ToClient(docEvent.Publisher),
DocumentIds: ToDocumentIDs(docEvent.DocumentIDs),
DocumentKeys: ToDocumentKeys(docEvent.DocumentKeys),
Type: eventType,
Publisher: ToClient(docEvent.Publisher),
DocumentId: ToDocumentID(docEvent.DocumentID),
DocumentKey: ToDocumentKey(docEvent.DocumentKey),
}, nil
}

Expand Down
348 changes: 170 additions & 178 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ enum DocEventType {
message DocEvent {
DocEventType type = 1;
Client publisher = 2;
repeated string document_ids = 3;
repeated string document_keys = 4;
string document_id = 3;
string document_key = 4;
}

message ClientDocEvent {
Expand Down
2 changes: 1 addition & 1 deletion server/admin/cluster_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *clusterServer) BroadcastEvent(
if err := s.backend.Coordinator.UpdatePresence(
ctx,
&docEvent.Publisher,
docEvent.DocumentIDs[0],
docEvent.DocumentID,
); err != nil {
return nil, err
}
Expand Down
79 changes: 39 additions & 40 deletions server/backend/sync/memory/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,49 +161,48 @@ func (m *PubSub) Publish(
m.subscriptionsMapMu.RLock()
defer m.subscriptionsMapMu.RUnlock()

for idx, documentID := range event.DocumentIDs {
if logging.Enabled(zap.DebugLevel) {
logging.From(ctx).Debugf(`Publish(%s,%s) Start`, documentID.String(), publisherID.String())
}
documentID := event.DocumentID
if logging.Enabled(zap.DebugLevel) {
logging.From(ctx).Debugf(`Publish(%s,%s) Start`, documentID.String(), publisherID.String())
}

if subs, ok := m.subscriptionsMapByDocID[documentID]; ok {
for _, sub := range subs.Map() {
if sub.Subscriber().ID.Compare(publisherID) == 0 {
continue
}

if logging.Enabled(zap.DebugLevel) {
logging.From(ctx).Debugf(
`Publish %s(%s,%s) to %s`,
event.Type,
documentID.String(),
publisherID.String(),
sub.SubscriberID(),
)
}

watchDocEvent := sync.ClientDocEvent{
Type: event.Type,
Publisher: event.Publisher,
DocumentKey: event.DocumentKeys[idx],
}
// NOTE: When a subscription is being closed by a subscriber,
// the subscriber may not receive messages.
select {
case sub.Events() <- watchDocEvent:
case <-gotime.After(100 * gotime.Millisecond):
logging.From(ctx).Warnf(
`Publish(%s,%s) to %s timeout`,
documentID.String(),
publisherID.String(),
sub.SubscriberID(),
)
}
if subs, ok := m.subscriptionsMapByDocID[documentID]; ok {
for _, sub := range subs.Map() {
if sub.Subscriber().ID.Compare(publisherID) == 0 {
continue
}

if logging.Enabled(zap.DebugLevel) {
logging.From(ctx).Debugf(
`Publish %s(%s,%s) to %s`,
event.Type,
documentID.String(),
publisherID.String(),
sub.SubscriberID(),
)
}

watchDocEvent := sync.ClientDocEvent{
Type: event.Type,
Publisher: event.Publisher,
DocumentKey: event.DocumentKey,
}
// NOTE: When a subscription is being closed by a subscriber,
// the subscriber may not receive messages.
select {
case sub.Events() <- watchDocEvent:
case <-gotime.After(100 * gotime.Millisecond):
logging.From(ctx).Warnf(
`Publish(%s,%s) to %s timeout`,
documentID.String(),
publisherID.String(),
sub.SubscriberID(),
)
}
}
if logging.Enabled(zap.DebugLevel) {
logging.From(ctx).Debugf(`Publish(%s,%s) End`, documentID.String(), publisherID.String())
}
}
if logging.Enabled(zap.DebugLevel) {
logging.From(ctx).Debugf(`Publish(%s,%s) End`, documentID.String(), publisherID.String())
}
}

Expand Down
8 changes: 4 additions & 4 deletions server/backend/sync/memory/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func TestPubSub(t *testing.T) {
id := types.ID(t.Name() + "id")
docKey := key.Key(t.Name() + "docKey")
docEvent := sync.DocEvent{
Type: types.DocumentsWatchedEvent,
Publisher: actorB,
DocumentIDs: []types.ID{id},
DocumentKeys: []key.Key{docKey},
Type: types.DocumentsWatchedEvent,
Publisher: actorB,
DocumentID: id,
DocumentKey: docKey,
}
clientDocEvent := sync.ClientDocEvent{
Type: types.DocumentsWatchedEvent,
Expand Down
8 changes: 4 additions & 4 deletions server/backend/sync/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ func (s *Subscription) ID() string {

// DocEvent represents events that occur related to the document.
type DocEvent struct {
Type types.DocEventType
Publisher types.Client
DocumentIDs []types.ID
DocumentKeys []key.Key
Type types.DocEventType
Publisher types.Client
DocumentID types.ID
DocumentKey key.Key
}

// ClientDocEvent represents doc events that should be sent to the client.
Expand Down
8 changes: 4 additions & 4 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ func PushPull(
ctx,
publisherID,
sync.DocEvent{
Type: types.DocumentsChangedEvent,
Publisher: types.Client{ID: publisherID},
DocumentIDs: []types.ID{docInfo.ID},
DocumentKeys: []key.Key{docInfo.Key},
Type: types.DocumentsChangedEvent,
Publisher: types.Client{ID: publisherID},
DocumentID: docInfo.ID,
DocumentKey: docInfo.Key,
},
)

Expand Down
26 changes: 12 additions & 14 deletions server/rpc/yorkie_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,17 +553,16 @@ func (s *yorkieServer) UpdatePresence(
}
documentKey := converter.FromDocumentKey(req.DocumentKey)

// TODO(hackerwins): We need change documents to document.
err = s.backend.Coordinator.UpdatePresence(ctx, cli, documentID)
if err != nil {
return nil, err
}

s.backend.Coordinator.Publish(ctx, cli.ID, sync.DocEvent{
Type: types.DocumentsWatchedEvent,
Publisher: *cli,
DocumentIDs: []types.ID{documentID},
DocumentKeys: []key.Key{documentKey},
Type: types.DocumentsWatchedEvent,
Publisher: *cli,
DocumentID: documentID,
DocumentKey: documentKey,
})

return &api.UpdatePresenceResponse{}, nil
Expand All @@ -575,7 +574,6 @@ func (s *yorkieServer) watchDoc(
documentID types.ID,
documentKey key.Key,
) (*sync.Subscription, []types.Client, error) {
// TODO(hackerwins): We need change documents to document.
subscription, peers, err := s.backend.Coordinator.Subscribe(
ctx,
client,
Expand All @@ -590,10 +588,10 @@ func (s *yorkieServer) watchDoc(
ctx,
subscription.Subscriber().ID,
sync.DocEvent{
Type: types.DocumentsWatchedEvent,
Publisher: subscription.Subscriber(),
DocumentIDs: []types.ID{documentID},
DocumentKeys: []key.Key{documentKey},
Type: types.DocumentsWatchedEvent,
Publisher: subscription.Subscriber(),
DocumentID: documentID,
DocumentKey: documentKey,
},
)

Expand All @@ -611,10 +609,10 @@ func (s *yorkieServer) unwatchDoc(
ctx,
subscription.Subscriber().ID,
sync.DocEvent{
Type: types.DocumentsUnwatchedEvent,
Publisher: subscription.Subscriber(),
DocumentIDs: []types.ID{documentID},
DocumentKeys: []key.Key{documentKey},
Type: types.DocumentsUnwatchedEvent,
Publisher: subscription.Subscriber(),
DocumentID: documentID,
DocumentKey: documentKey,
},
)
}

1 comment on commit 2285241

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go Benchmark

Benchmark suite Current: 2285241 Previous: 57d784c Ratio
BenchmarkDocument/constructor_test 1235 ns/op 752 B/op 12 allocs/op 1340 ns/op 752 B/op 12 allocs/op 0.92
BenchmarkDocument/status_test 617 ns/op 720 B/op 10 allocs/op 712.7 ns/op 720 B/op 10 allocs/op 0.87
BenchmarkDocument/equals_test 7225 ns/op 5072 B/op 85 allocs/op 7713 ns/op 5072 B/op 85 allocs/op 0.94
BenchmarkDocument/nested_update_test 20830 ns/op 11033 B/op 235 allocs/op 22351 ns/op 11033 B/op 235 allocs/op 0.93
BenchmarkDocument/delete_test 27703 ns/op 14161 B/op 310 allocs/op 29943 ns/op 14162 B/op 310 allocs/op 0.93
BenchmarkDocument/object_test 9743 ns/op 5793 B/op 97 allocs/op 10310 ns/op 5792 B/op 97 allocs/op 0.95
BenchmarkDocument/array_test 40240 ns/op 10889 B/op 251 allocs/op 38517 ns/op 10889 B/op 251 allocs/op 1.04
BenchmarkDocument/text_test 37356 ns/op 14058 B/op 456 allocs/op 41513 ns/op 14058 B/op 456 allocs/op 0.90
BenchmarkDocument/text_composition_test 37822 ns/op 17538 B/op 461 allocs/op 40381 ns/op 17538 B/op 461 allocs/op 0.94
BenchmarkDocument/rich_text_test 98232 ns/op 36007 B/op 1108 allocs/op 105974 ns/op 36014 B/op 1108 allocs/op 0.93
BenchmarkDocument/counter_test 19781 ns/op 9058 B/op 212 allocs/op 21176 ns/op 9057 B/op 212 allocs/op 0.93
BenchmarkDocument/text_edit_gc_100 3949069 ns/op 1552608 B/op 17149 allocs/op 4235403 ns/op 1552772 B/op 17148 allocs/op 0.93
BenchmarkDocument/text_edit_gc_1000 320004722 ns/op 136630164 B/op 210677 allocs/op 345600036 ns/op 136649740 B/op 210758 allocs/op 0.93
BenchmarkDocument/text_split_gc_100 4658348 ns/op 2217451 B/op 16579 allocs/op 5017804 ns/op 2217295 B/op 16577 allocs/op 0.93
BenchmarkDocument/text_split_gc_1000 375972154 ns/op 214835282 B/op 211313 allocs/op 403719911 ns/op 214863165 B/op 211438 allocs/op 0.93
BenchmarkDocument/text_delete_all_10000 15879047 ns/op 5904955 B/op 41124 allocs/op 20215492 ns/op 5904946 B/op 41126 allocs/op 0.79
BenchmarkDocument/text_delete_all_100000 193506273 ns/op 53845528 B/op 415997 allocs/op 256564675 ns/op 53914644 B/op 416114 allocs/op 0.75
BenchmarkDocument/text_100 299702 ns/op 117748 B/op 5064 allocs/op 329749 ns/op 117749 B/op 5064 allocs/op 0.91
BenchmarkDocument/text_1000 3237190 ns/op 1152348 B/op 50068 allocs/op 3541237 ns/op 1152356 B/op 50068 allocs/op 0.91
BenchmarkDocument/array_1000 1676152 ns/op 1102049 B/op 11854 allocs/op 1873575 ns/op 1102022 B/op 11854 allocs/op 0.89
BenchmarkDocument/array_10000 18375021 ns/op 9906236 B/op 120705 allocs/op 20216385 ns/op 9906585 B/op 120707 allocs/op 0.91
BenchmarkDocument/array_gc_100 177302 ns/op 97414 B/op 1226 allocs/op 188009 ns/op 97404 B/op 1226 allocs/op 0.94
BenchmarkDocument/array_gc_1000 1949193 ns/op 1169834 B/op 12890 allocs/op 2039826 ns/op 1169787 B/op 12890 allocs/op 0.96
BenchmarkDocument/counter_1000 266671 ns/op 197878 B/op 6490 allocs/op 301226 ns/op 197879 B/op 6490 allocs/op 0.89
BenchmarkDocument/counter_10000 2871664 ns/op 2164795 B/op 69497 allocs/op 3428664 ns/op 2164800 B/op 69497 allocs/op 0.84
BenchmarkDocument/object_1000 1822731 ns/op 1450589 B/op 9902 allocs/op 2012194 ns/op 1450576 B/op 9902 allocs/op 0.91
BenchmarkDocument/object_10000 19817731 ns/op 12368372 B/op 101204 allocs/op 24305701 ns/op 12368628 B/op 101206 allocs/op 0.82
BenchmarkRPC/client_to_server 788360772 ns/op 24775600 B/op 415406 allocs/op 951991694 ns/op 24700956 B/op 414215 allocs/op 0.83
BenchmarkRPC/client_to_client_via_server 5023277001 ns/op 44364640 B/op 717689 allocs/op 4991291393 ns/op 44390144 B/op 718098 allocs/op 1.01
BenchmarkRPC/attach_large_document 1681180637 ns/op 2135297208 B/op 13976 allocs/op 1733731725 ns/op 2125791336 B/op 13951 allocs/op 0.97
BenchmarkRPC/adminCli_to_server 521054130 ns/op 19073776 B/op 315173 allocs/op 605932650 ns/op 19060772 B/op 314965 allocs/op 0.86
BenchmarkLocker 123.4 ns/op 16 B/op 1 allocs/op 135.9 ns/op 16 B/op 1 allocs/op 0.91
BenchmarkLockerParallel 106.2 ns/op 0 B/op 0 allocs/op 166.8 ns/op 0 B/op 0 allocs/op 0.64
BenchmarkLockerMoreKeys 373.7 ns/op 14 B/op 0 allocs/op 357.7 ns/op 14 B/op 0 allocs/op 1.04
BenchmarkSync/memory_sync_10_test 6889 ns/op 1341 B/op 39 allocs/op 7992 ns/op 1339 B/op 39 allocs/op 0.86
BenchmarkSync/memory_sync_100_test 59191 ns/op 9125 B/op 301 allocs/op 74876 ns/op 8772 B/op 279 allocs/op 0.79
BenchmarkSync/memory_sync_1000_test 606386 ns/op 84535 B/op 2753 allocs/op 743812 ns/op 81489 B/op 2564 allocs/op 0.82
BenchmarkSync/memory_sync_10000_test 6368434 ns/op 866354 B/op 27993 allocs/op 8018558 ns/op 865876 B/op 26886 allocs/op 0.79
BenchmarkSync/etcd_sync_100_test 0.2874 ns/op 0 B/op 0 allocs/op 0.2943 ns/op 0 B/op 0 allocs/op 0.98
BenchmarkTextEditing 23959594901 ns/op 8436397728 B/op 19836390 allocs/op 28894853557 ns/op 8436118528 B/op 19834840 allocs/op 0.83

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.