From 5ce2aa3d7b83f6e0ac4433c4c6a34662dbc7b10f Mon Sep 17 00:00:00 2001 From: Kevin Park Date: Tue, 16 Apr 2024 19:41:52 +0900 Subject: [PATCH] Move Client.Watch inside Client.Attach (#803) In this commit, the existing Watch() function has been renamed to runWatchLoop(), similar to JS, and moved inside Attach(). runWatchLoop() creates a watch channel and inserts it into the client's attachment. Additionally, a Subscribe() function has been added, which simply retrieves the watch channel from the client's attachment corresponding to the document key and hands it over to the user. Throughout this process, we've ensured that users can disconnect the watch at any time via context and included logic to handle race conditions and various document states properly. Unfortunately, we haven't considered reconnection in case of watch connection failure. This is something that needs to be addressed. --------- Co-authored-by: karockai --- client/client.go | 75 ++++++++++++++++++++++----- test/bench/grpc_bench_test.go | 4 +- test/integration/admin_test.go | 5 +- test/integration/auth_webhook_test.go | 2 +- test/integration/document_test.go | 42 +++++++-------- test/integration/presence_test.go | 27 ++++------ test/integration/server_test.go | 2 +- 7 files changed, 96 insertions(+), 61 deletions(-) diff --git a/client/client.go b/client/client.go index f1673a9ac..747d958f0 100644 --- a/client/client.go +++ b/client/client.go @@ -28,6 +28,7 @@ import ( "os" "path/filepath" "strings" + "sync/atomic" "connectrpc.com/connect" "github.com/rs/xid" @@ -72,12 +73,22 @@ var ( // ErrInitializationNotReceived occurs when the first response of the watch stream is not received. ErrInitializationNotReceived = errors.New("initialization is not received") + + // ErrAlreadySubscribed occurs when the client is already subscribed to the document. + ErrAlreadySubscribed = errors.New("already subscribed") ) // Attachment represents the document attached. type Attachment struct { doc *document.Document docID types.ID + + // TODO(krapie): We need to consider the case where a client opens multiple subscriptions for the same document. + isSubscribed atomic.Bool + + rch <-chan WatchResponse + watchCtx context.Context + closeWatchStream context.CancelFunc } // Client is a normal client that can communicate with the server. @@ -322,6 +333,15 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ... docID: types.ID(res.Msg.DocumentId), } + watchCtx, cancelFunc := context.WithCancel(ctx) + c.attachments[doc.Key()].watchCtx = watchCtx + c.attachments[doc.Key()].closeWatchStream = cancelFunc + + err = c.runWatchLoop(watchCtx, doc) + if err != nil { + return err + } + return nil } @@ -346,6 +366,8 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document, options ... return ErrDocumentNotAttached } + attachment.closeWatchStream() + if err := doc.Update(func(root *json.Object, p *presence.Presence) error { p.Clear() return nil @@ -406,21 +428,36 @@ func (c *Client) Sync(ctx context.Context, options ...SyncOptions) error { return nil } -// Watch subscribes to events on a given documentIDs. +// Subscribe subscribes to events on a given document. +func (c *Client) Subscribe( + doc *document.Document, +) (<-chan WatchResponse, context.CancelFunc, error) { + attachment, ok := c.attachments[doc.Key()] + if !ok { + return nil, nil, ErrDocumentNotAttached + } + + if !attachment.isSubscribed.CompareAndSwap(false, true) { + return nil, nil, ErrAlreadySubscribed + } + + return attachment.rch, attachment.closeWatchStream, nil +} + +// runWatchLoop subscribes to events on a given documentIDs. // If an error occurs before stream initialization, the second response, error, -// is returned. If the context "ctx" is canceled or timed out, returned channel +// is returned. If the context "watchCtx" is canceled or timed out, returned channel // is closed, and "WatchResponse" from this closed channel has zero events and // nil "Err()". -func (c *Client) Watch( +func (c *Client) runWatchLoop( ctx context.Context, doc *document.Document, -) (<-chan WatchResponse, error) { +) error { attachment, ok := c.attachments[doc.Key()] if !ok { - return nil, ErrDocumentNotAttached + return ErrDocumentNotAttached } - rch := make(chan WatchResponse) stream, err := c.client.WatchDocument( ctx, withShardKey(connect.NewRequest(&api.WatchDocumentRequest{ @@ -429,40 +466,52 @@ func (c *Client) Watch( }, ), c.options.APIKey, doc.Key().String())) if err != nil { - return nil, err + return err } // NOTE(hackerwins): We need to receive the first response to initialize - // the watch stream. Watch should be blocked until the first response is + // the watch stream. runWatchLoop should be blocked until the first response is // received. if !stream.Receive() { - return nil, ErrInitializationNotReceived + return ErrInitializationNotReceived } if _, err := handleResponse(stream.Msg(), doc); err != nil { - return nil, err + return err } if err = stream.Err(); err != nil { - return nil, err + return err } + rch := make(chan WatchResponse) + attachment.rch = rch + go func() { for stream.Receive() { pbResp := stream.Msg() resp, err := handleResponse(pbResp, doc) if err != nil { rch <- WatchResponse{Err: err} + ctx.Done() close(rch) return } - if resp == nil { + if resp == nil || !attachment.isSubscribed.Load() { continue } rch <- *resp } if err = stream.Err(); err != nil { + attachment.isSubscribed.Store(false) rch <- WatchResponse{Err: err} + ctx.Done() close(rch) + + // If watch stream is disconnected, we re-establish the watch stream. + err = c.runWatchLoop(ctx, doc) + if err != nil { + return + } return } }() @@ -504,7 +553,7 @@ func (c *Client) Watch( } }() - return rch, nil + return nil } func handleResponse( diff --git a/test/bench/grpc_bench_test.go b/test/bench/grpc_bench_test.go index 39bb924ca..55107eb6f 100644 --- a/test/bench/grpc_bench_test.go +++ b/test/bench/grpc_bench_test.go @@ -221,9 +221,9 @@ func BenchmarkRPC(b *testing.B) { }) assert.NoError(b, err) - rch1, err := c1.Watch(ctx, d1) + rch1, _, err := c1.Subscribe(d1) assert.NoError(b, err) - rch2, err := c2.Watch(ctx, d2) + rch2, _, err := c2.Subscribe(d2) assert.NoError(b, err) done1 := make(chan bool) diff --git a/test/integration/admin_test.go b/test/integration/admin_test.go index b02fa8748..096ba5172 100644 --- a/test/integration/admin_test.go +++ b/test/integration/admin_test.go @@ -83,15 +83,14 @@ func TestAdmin(t *testing.T) { t.Run("document event propagation on removal test", func(t *testing.T) { ctx := context.Background() - watchCtx, cancel := context.WithCancel(ctx) - defer cancel() // 01. c1 attaches and watches d1. d1 := document.New(helper.TestDocKey(t)) assert.NoError(t, c1.Attach(ctx, d1)) wg := sync.WaitGroup{} wg.Add(1) - rch, err := c1.Watch(watchCtx, d1) + rch, cancel, err := c1.Subscribe(d1) + defer cancel() assert.NoError(t, err) go func() { defer wg.Done() diff --git a/test/integration/auth_webhook_test.go b/test/integration/auth_webhook_test.go index 8f196b8bf..a37a6bf41 100644 --- a/test/integration/auth_webhook_test.go +++ b/test/integration/auth_webhook_test.go @@ -177,7 +177,7 @@ func TestProjectAuthWebhook(t *testing.T) { err = cli.Attach(ctx, doc) assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err)) - _, err = cli.Watch(ctx, doc) + _, _, err = cli.Subscribe(doc) assert.Equal(t, client.ErrDocumentNotAttached, err) }) } diff --git a/test/integration/document_test.go b/test/integration/document_test.go index a163ab758..d3633373d 100644 --- a/test/integration/document_test.go +++ b/test/integration/document_test.go @@ -149,7 +149,7 @@ func TestDocument(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - _, err := c1.Watch(ctx, d1) + _, _, err := c1.Subscribe(d1) assert.ErrorIs(t, err, client.ErrDocumentNotAttached) err = c1.Attach(ctx, d1) @@ -163,8 +163,9 @@ func TestDocument(t *testing.T) { // 01. cli1 watches doc1. wg.Add(1) - rch, err := c1.Watch(ctx, d1) + rch, _, err := c1.Subscribe(d1) assert.NoError(t, err) + go func() { defer wg.Done() @@ -402,23 +403,21 @@ func TestDocument(t *testing.T) { t.Run("removed document removal with watching test", func(t *testing.T) { ctx := context.Background() - watchCtx, cancel := context.WithCancel(ctx) - defer cancel() // 01. c1 creates d1 without attaching. d1 := document.New(helper.TestDocKey(t)) - _, err := c1.Watch(watchCtx, d1) + _, _, err := c1.Subscribe(d1) assert.ErrorIs(t, err, client.ErrDocumentNotAttached) // 02. c1 attaches d1 and watches it. assert.NoError(t, c1.Attach(ctx, d1)) - _, err = c1.Watch(watchCtx, d1) + _, _, err = c1.Subscribe(d1) assert.NoError(t, err) // 03. c1 removes d1 and watches it. assert.NoError(t, c1.Remove(ctx, d1)) assert.Equal(t, d1.Status(), document.StatusRemoved) - _, err = c1.Watch(watchCtx, d1) + _, _, err = c1.Subscribe(d1) assert.ErrorIs(t, err, client.ErrDocumentNotAttached) }) @@ -437,19 +436,19 @@ func TestDocument(t *testing.T) { d1 := document.New(helper.TestDocKey(t)) assert.NoError(t, c1.Attach(ctx, d1)) - rch1, err := c1.Watch(ctx, d1) + rch1, _, err := c1.Subscribe(d1) assert.NoError(t, err) d1.SubscribeBroadcastEvent("mention", handler) d2 := document.New(helper.TestDocKey(t)) assert.NoError(t, c2.Attach(ctx, d2)) - rch2, err := c2.Watch(ctx, d2) + rch2, _, err := c2.Subscribe(d2) assert.NoError(t, err) d2.SubscribeBroadcastEvent("mention", handler) d3 := document.New(helper.TestDocKey(t)) assert.NoError(t, c3.Attach(ctx, d3)) - rch3, err := c3.Watch(ctx, d3) + rch3, _, err := c3.Subscribe(d3) assert.NoError(t, err) d3.SubscribeBroadcastEvent("mention", handler) @@ -499,13 +498,13 @@ func TestDocument(t *testing.T) { d1 := document.New(helper.TestDocKey(t)) assert.NoError(t, c1.Attach(ctx, d1)) - rch1, err := c1.Watch(ctx, d1) + rch1, _, err := c1.Subscribe(d1) assert.NoError(t, err) d1.SubscribeBroadcastEvent("mention", handler) d2 := document.New(helper.TestDocKey(t)) assert.NoError(t, c2.Attach(ctx, d2)) - rch2, err := c2.Watch(ctx, d2) + rch2, _, err := c2.Subscribe(d2) assert.NoError(t, err) d2.SubscribeBroadcastEvent("mention", handler) @@ -557,14 +556,14 @@ func TestDocument(t *testing.T) { d1 := document.New(helper.TestDocKey(t)) assert.NoError(t, c1.Attach(ctx, d1)) - rch1, err := c1.Watch(ctx, d1) + rch1, _, err := c1.Subscribe(d1) assert.NoError(t, err) d1.SubscribeBroadcastEvent("mention", handler) // c2 doesn't subscribe to the "mention" broadcast event. d2 := document.New(helper.TestDocKey(t)) assert.NoError(t, c2.Attach(ctx, d2)) - rch2, err := c2.Watch(ctx, d2) + rch2, _, err := c2.Subscribe(d2) assert.NoError(t, err) // The unsubscriber c2 broadcasts the "mention" event. @@ -603,7 +602,7 @@ func TestDocument(t *testing.T) { d1 := document.New(helper.TestDocKey(t)) assert.NoError(t, c1.Attach(ctx, d1)) - _, err := c1.Watch(ctx, d1) + _, _, err := c1.Subscribe(d1) assert.NoError(t, err) d1.SubscribeBroadcastEvent("mention", nil) @@ -626,13 +625,13 @@ func TestDocument(t *testing.T) { d1 := document.New(helper.TestDocKey(t)) assert.NoError(t, c1.Attach(ctx, d1)) - rch1, err := c1.Watch(ctx, d1) + rch1, _, err := c1.Subscribe(d1) assert.NoError(t, err) d1.SubscribeBroadcastEvent("mention", handler) d2 := document.New(helper.TestDocKey(t)) assert.NoError(t, c2.Attach(ctx, d2)) - rch2, err := c2.Watch(ctx, d2) + rch2, _, err := c2.Subscribe(d2) assert.NoError(t, err) d2.SubscribeBroadcastEvent("mention", handler) @@ -711,9 +710,8 @@ func TestDocumentWithProjects(t *testing.T) { d1 := document.New(helper.TestDocKey(t)) err = c1.Attach(ctx, d1) assert.NoError(t, err) - watch1Ctx, cancel1 := context.WithCancel(ctx) + rch, cancel1, err := c1.Subscribe(d1) defer cancel1() - rch, err := c1.Watch(watch1Ctx, d1) assert.NoError(t, err) go func() { @@ -750,8 +748,7 @@ func TestDocumentWithProjects(t *testing.T) { }) d2 := document.New(helper.TestDocKey(t)) assert.NoError(t, c2.Attach(ctx, d2)) - watch2Ctx, cancel2 := context.WithCancel(ctx) - _, err = c2.Watch(watch2Ctx, d2) + _, cancel2, err := c2.Subscribe(d2) assert.NoError(t, err) // c2 updates the document, so c1 receives a documents changed event. @@ -764,8 +761,7 @@ func TestDocumentWithProjects(t *testing.T) { // d3 is in another project, so c1 and c2 should not receive events. d3 := document.New(helper.TestDocKey(t)) assert.NoError(t, c3.Attach(ctx, d3)) - watch3Ctx, cancel3 := context.WithCancel(ctx) - _, err = c3.Watch(watch3Ctx, d3) + _, cancel3, err := c3.Subscribe(d3) assert.NoError(t, err) assert.NoError(t, d3.Update(func(root *json.Object, p *presence.Presence) error { root.SetString("key3", "value3") diff --git a/test/integration/presence_test.go b/test/integration/presence_test.go index e2b37aa69..9f0a8e891 100644 --- a/test/integration/presence_test.go +++ b/test/integration/presence_test.go @@ -139,9 +139,8 @@ func TestPresence(t *testing.T) { wgEvents := sync.WaitGroup{} wgEvents.Add(1) - watch1Ctx, cancel1 := context.WithCancel(ctx) + wrch, cancel1, err := c1.Subscribe(d1) defer cancel1() - wrch, err := c1.Watch(watch1Ctx, d1) assert.NoError(t, err) go func() { defer func() { @@ -177,8 +176,7 @@ func TestPresence(t *testing.T) { c2.ID().String(): {}, }, }) - watch2Ctx, cancel2 := context.WithCancel(ctx) - _, err = c2.Watch(watch2Ctx, d2) + _, cancel2, err := c2.Subscribe(d2) assert.NoError(t, err) // 04. Update the second client's presence. @@ -224,9 +222,8 @@ func TestPresence(t *testing.T) { wgEvents := sync.WaitGroup{} wgEvents.Add(1) - watch1Ctx, cancel1 := context.WithCancel(ctx) + wrch, cancel1, err := c1.Subscribe(d1) defer cancel1() - wrch, err := c1.Watch(watch1Ctx, d1) assert.NoError(t, err) go func() { defer func() { @@ -263,9 +260,8 @@ func TestPresence(t *testing.T) { c2.ID().String(): {}, }, }) - watch2Ctx, cancel2 := context.WithCancel(ctx) + _, cancel2, err := c2.Subscribe(d2) defer cancel2() - _, err = c2.Watch(watch2Ctx, d2) assert.NoError(t, err) // 04. Update the second client's presence. @@ -313,9 +309,8 @@ func TestPresence(t *testing.T) { wgEvents := sync.WaitGroup{} wgEvents.Add(1) - watch1Ctx, cancel1 := context.WithCancel(ctx) + wrch, cancel1, err := c1.Subscribe(d1) defer cancel1() - wrch, err := c1.Watch(watch1Ctx, d1) assert.NoError(t, err) go func() { defer func() { @@ -352,9 +347,8 @@ func TestPresence(t *testing.T) { c2.ID().String(): {}, }, }) - watch2Ctx, cancel2 := context.WithCancel(ctx) + _, cancel2, err := c2.Subscribe(d2) defer cancel2() - _, err = c2.Watch(watch2Ctx, d2) assert.NoError(t, err) // 04. Update the second client's presence. @@ -405,9 +399,8 @@ func TestPresence(t *testing.T) { wgEvents := sync.WaitGroup{} wgEvents.Add(1) - watch1Ctx, cancel1 := context.WithCancel(ctx) + wrch, cancel1, err := c1.Subscribe(d1) defer cancel1() - wrch, err := c1.Watch(watch1Ctx, d1) assert.NoError(t, err) go func() { defer func() { @@ -452,13 +445,11 @@ func TestPresence(t *testing.T) { c2.ID().String(): d2.MyPresence(), }, }) - watch2Ctx, cancel2 := context.WithCancel(ctx) - _, err = c2.Watch(watch2Ctx, d2) + _, cancel2, err := c2.Subscribe(d2) assert.NoError(t, err) assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) - watch3Ctx, cancel3 := context.WithCancel(ctx) - _, err = c2.Watch(watch3Ctx, d3) + _, cancel3, err := c2.Subscribe(d3) assert.NoError(t, err) // 05. The second client unwatch the documents attached by itself. diff --git a/test/integration/server_test.go b/test/integration/server_test.go index 93c53eb99..30dc03d5e 100644 --- a/test/integration/server_test.go +++ b/test/integration/server_test.go @@ -45,7 +45,7 @@ func TestServer(t *testing.T) { assert.NoError(t, cli.Attach(ctx, doc)) wg := sync.WaitGroup{} - wrch, err := cli.Watch(ctx, doc) + wrch, _, err := cli.Subscribe(doc) assert.NoError(t, err) go func() {