Skip to content

Commit

Permalink
Move Client.Watch inside Client.Attach (#803)
Browse files Browse the repository at this point in the history
In this commit, the existing Watch() function has been renamed to
runWatchLoop(), similar to JS, and moved inside Attach().
runWatchLoop() creates a watch channel and inserts it into the
client's attachment.

Additionally, a Subscribe() function has been added, which simply
retrieves the watch channel from the client's attachment corresponding
to the document key and hands it over to the user. Throughout this
process, we've ensured that users can disconnect the watch at any time
via context and included logic to handle race conditions and various
document states properly.

Unfortunately, we haven't considered reconnection in case of watch
connection failure. This is something that needs to be addressed.

---------

Co-authored-by: karockai <karockai@gmail.com>
  • Loading branch information
krapie and karockai authored Apr 16, 2024
1 parent f31e4bf commit 5ce2aa3
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 61 deletions.
75 changes: 62 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os"
"path/filepath"
"strings"
"sync/atomic"

"connectrpc.com/connect"
"github.com/rs/xid"
Expand Down Expand Up @@ -72,12 +73,22 @@ var (

// ErrInitializationNotReceived occurs when the first response of the watch stream is not received.
ErrInitializationNotReceived = errors.New("initialization is not received")

// ErrAlreadySubscribed occurs when the client is already subscribed to the document.
ErrAlreadySubscribed = errors.New("already subscribed")
)

// Attachment represents the document attached.
type Attachment struct {
doc *document.Document
docID types.ID

// TODO(krapie): We need to consider the case where a client opens multiple subscriptions for the same document.
isSubscribed atomic.Bool

rch <-chan WatchResponse
watchCtx context.Context
closeWatchStream context.CancelFunc
}

// Client is a normal client that can communicate with the server.
Expand Down Expand Up @@ -322,6 +333,15 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
docID: types.ID(res.Msg.DocumentId),
}

watchCtx, cancelFunc := context.WithCancel(ctx)
c.attachments[doc.Key()].watchCtx = watchCtx
c.attachments[doc.Key()].closeWatchStream = cancelFunc

err = c.runWatchLoop(watchCtx, doc)
if err != nil {
return err
}

return nil
}

Expand All @@ -346,6 +366,8 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document, options ...
return ErrDocumentNotAttached
}

attachment.closeWatchStream()

if err := doc.Update(func(root *json.Object, p *presence.Presence) error {
p.Clear()
return nil
Expand Down Expand Up @@ -406,21 +428,36 @@ func (c *Client) Sync(ctx context.Context, options ...SyncOptions) error {
return nil
}

// Watch subscribes to events on a given documentIDs.
// Subscribe subscribes to events on a given document.
func (c *Client) Subscribe(
doc *document.Document,
) (<-chan WatchResponse, context.CancelFunc, error) {
attachment, ok := c.attachments[doc.Key()]
if !ok {
return nil, nil, ErrDocumentNotAttached
}

if !attachment.isSubscribed.CompareAndSwap(false, true) {
return nil, nil, ErrAlreadySubscribed
}

return attachment.rch, attachment.closeWatchStream, nil
}

// runWatchLoop subscribes to events on a given documentIDs.
// If an error occurs before stream initialization, the second response, error,
// is returned. If the context "ctx" is canceled or timed out, returned channel
// is returned. If the context "watchCtx" is canceled or timed out, returned channel
// is closed, and "WatchResponse" from this closed channel has zero events and
// nil "Err()".
func (c *Client) Watch(
func (c *Client) runWatchLoop(
ctx context.Context,
doc *document.Document,
) (<-chan WatchResponse, error) {
) error {
attachment, ok := c.attachments[doc.Key()]
if !ok {
return nil, ErrDocumentNotAttached
return ErrDocumentNotAttached
}

rch := make(chan WatchResponse)
stream, err := c.client.WatchDocument(
ctx,
withShardKey(connect.NewRequest(&api.WatchDocumentRequest{
Expand All @@ -429,40 +466,52 @@ func (c *Client) Watch(
},
), c.options.APIKey, doc.Key().String()))
if err != nil {
return nil, err
return err
}

// NOTE(hackerwins): We need to receive the first response to initialize
// the watch stream. Watch should be blocked until the first response is
// the watch stream. runWatchLoop should be blocked until the first response is
// received.
if !stream.Receive() {
return nil, ErrInitializationNotReceived
return ErrInitializationNotReceived
}
if _, err := handleResponse(stream.Msg(), doc); err != nil {
return nil, err
return err
}
if err = stream.Err(); err != nil {
return nil, err
return err
}

rch := make(chan WatchResponse)
attachment.rch = rch

go func() {
for stream.Receive() {
pbResp := stream.Msg()
resp, err := handleResponse(pbResp, doc)
if err != nil {
rch <- WatchResponse{Err: err}
ctx.Done()
close(rch)
return
}
if resp == nil {
if resp == nil || !attachment.isSubscribed.Load() {
continue
}

rch <- *resp
}
if err = stream.Err(); err != nil {
attachment.isSubscribed.Store(false)
rch <- WatchResponse{Err: err}
ctx.Done()
close(rch)

// If watch stream is disconnected, we re-establish the watch stream.
err = c.runWatchLoop(ctx, doc)
if err != nil {
return
}
return
}
}()
Expand Down Expand Up @@ -504,7 +553,7 @@ func (c *Client) Watch(
}
}()

return rch, nil
return nil
}

func handleResponse(
Expand Down
4 changes: 2 additions & 2 deletions test/bench/grpc_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,9 @@ func BenchmarkRPC(b *testing.B) {
})
assert.NoError(b, err)

rch1, err := c1.Watch(ctx, d1)
rch1, _, err := c1.Subscribe(d1)
assert.NoError(b, err)
rch2, err := c2.Watch(ctx, d2)
rch2, _, err := c2.Subscribe(d2)
assert.NoError(b, err)

done1 := make(chan bool)
Expand Down
5 changes: 2 additions & 3 deletions test/integration/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,14 @@ func TestAdmin(t *testing.T) {

t.Run("document event propagation on removal test", func(t *testing.T) {
ctx := context.Background()
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()

// 01. c1 attaches and watches d1.
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
wg := sync.WaitGroup{}
wg.Add(1)
rch, err := c1.Watch(watchCtx, d1)
rch, cancel, err := c1.Subscribe(d1)
defer cancel()
assert.NoError(t, err)
go func() {
defer wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion test/integration/auth_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestProjectAuthWebhook(t *testing.T) {
err = cli.Attach(ctx, doc)
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))

_, err = cli.Watch(ctx, doc)
_, _, err = cli.Subscribe(doc)
assert.Equal(t, client.ErrDocumentNotAttached, err)
})
}
Expand Down
42 changes: 19 additions & 23 deletions test/integration/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestDocument(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))

_, err := c1.Watch(ctx, d1)
_, _, err := c1.Subscribe(d1)
assert.ErrorIs(t, err, client.ErrDocumentNotAttached)

err = c1.Attach(ctx, d1)
Expand All @@ -163,8 +163,9 @@ func TestDocument(t *testing.T) {

// 01. cli1 watches doc1.
wg.Add(1)
rch, err := c1.Watch(ctx, d1)
rch, _, err := c1.Subscribe(d1)
assert.NoError(t, err)

go func() {
defer wg.Done()

Expand Down Expand Up @@ -402,23 +403,21 @@ func TestDocument(t *testing.T) {

t.Run("removed document removal with watching test", func(t *testing.T) {
ctx := context.Background()
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()

// 01. c1 creates d1 without attaching.
d1 := document.New(helper.TestDocKey(t))
_, err := c1.Watch(watchCtx, d1)
_, _, err := c1.Subscribe(d1)
assert.ErrorIs(t, err, client.ErrDocumentNotAttached)

// 02. c1 attaches d1 and watches it.
assert.NoError(t, c1.Attach(ctx, d1))
_, err = c1.Watch(watchCtx, d1)
_, _, err = c1.Subscribe(d1)
assert.NoError(t, err)

// 03. c1 removes d1 and watches it.
assert.NoError(t, c1.Remove(ctx, d1))
assert.Equal(t, d1.Status(), document.StatusRemoved)
_, err = c1.Watch(watchCtx, d1)
_, _, err = c1.Subscribe(d1)
assert.ErrorIs(t, err, client.ErrDocumentNotAttached)
})

Expand All @@ -437,19 +436,19 @@ func TestDocument(t *testing.T) {

d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch1, err := c1.Watch(ctx, d1)
rch1, _, err := c1.Subscribe(d1)
assert.NoError(t, err)
d1.SubscribeBroadcastEvent("mention", handler)

d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch2, err := c2.Watch(ctx, d2)
rch2, _, err := c2.Subscribe(d2)
assert.NoError(t, err)
d2.SubscribeBroadcastEvent("mention", handler)

d3 := document.New(helper.TestDocKey(t))
assert.NoError(t, c3.Attach(ctx, d3))
rch3, err := c3.Watch(ctx, d3)
rch3, _, err := c3.Subscribe(d3)
assert.NoError(t, err)
d3.SubscribeBroadcastEvent("mention", handler)

Expand Down Expand Up @@ -499,13 +498,13 @@ func TestDocument(t *testing.T) {

d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch1, err := c1.Watch(ctx, d1)
rch1, _, err := c1.Subscribe(d1)
assert.NoError(t, err)
d1.SubscribeBroadcastEvent("mention", handler)

d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch2, err := c2.Watch(ctx, d2)
rch2, _, err := c2.Subscribe(d2)
assert.NoError(t, err)
d2.SubscribeBroadcastEvent("mention", handler)

Expand Down Expand Up @@ -557,14 +556,14 @@ func TestDocument(t *testing.T) {

d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch1, err := c1.Watch(ctx, d1)
rch1, _, err := c1.Subscribe(d1)
assert.NoError(t, err)
d1.SubscribeBroadcastEvent("mention", handler)

// c2 doesn't subscribe to the "mention" broadcast event.
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch2, err := c2.Watch(ctx, d2)
rch2, _, err := c2.Subscribe(d2)
assert.NoError(t, err)

// The unsubscriber c2 broadcasts the "mention" event.
Expand Down Expand Up @@ -603,7 +602,7 @@ func TestDocument(t *testing.T) {

d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
_, err := c1.Watch(ctx, d1)
_, _, err := c1.Subscribe(d1)
assert.NoError(t, err)
d1.SubscribeBroadcastEvent("mention", nil)

Expand All @@ -626,13 +625,13 @@ func TestDocument(t *testing.T) {

d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch1, err := c1.Watch(ctx, d1)
rch1, _, err := c1.Subscribe(d1)
assert.NoError(t, err)
d1.SubscribeBroadcastEvent("mention", handler)

d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch2, err := c2.Watch(ctx, d2)
rch2, _, err := c2.Subscribe(d2)
assert.NoError(t, err)
d2.SubscribeBroadcastEvent("mention", handler)

Expand Down Expand Up @@ -711,9 +710,8 @@ func TestDocumentWithProjects(t *testing.T) {
d1 := document.New(helper.TestDocKey(t))
err = c1.Attach(ctx, d1)
assert.NoError(t, err)
watch1Ctx, cancel1 := context.WithCancel(ctx)
rch, cancel1, err := c1.Subscribe(d1)
defer cancel1()
rch, err := c1.Watch(watch1Ctx, d1)
assert.NoError(t, err)

go func() {
Expand Down Expand Up @@ -750,8 +748,7 @@ func TestDocumentWithProjects(t *testing.T) {
})
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
watch2Ctx, cancel2 := context.WithCancel(ctx)
_, err = c2.Watch(watch2Ctx, d2)
_, cancel2, err := c2.Subscribe(d2)
assert.NoError(t, err)

// c2 updates the document, so c1 receives a documents changed event.
Expand All @@ -764,8 +761,7 @@ func TestDocumentWithProjects(t *testing.T) {
// d3 is in another project, so c1 and c2 should not receive events.
d3 := document.New(helper.TestDocKey(t))
assert.NoError(t, c3.Attach(ctx, d3))
watch3Ctx, cancel3 := context.WithCancel(ctx)
_, err = c3.Watch(watch3Ctx, d3)
_, cancel3, err := c3.Subscribe(d3)
assert.NoError(t, err)
assert.NoError(t, d3.Update(func(root *json.Object, p *presence.Presence) error {
root.SetString("key3", "value3")
Expand Down
Loading

0 comments on commit 5ce2aa3

Please sign in to comment.