Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Event Feeds #174

Merged
merged 24 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,10 @@ For supported functions, see
[StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v2#StreamOptFn) in
the API reference.

## Event Feeds (beta)

The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/cdc/#event-feeds). See [example](event_feed_example_test.go).

## Debug logging

To enable debug logging set the `FAUNA_DEBUG` environment variable to an integer for the value of the desired [slog.Level](https://pkg.go.dev/log/slog#Level).
Expand Down
66 changes: 58 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Client struct {
maxBackoff time.Duration

// lazily cached URLs
queryURL, streamURL *url.URL
queryURL, streamURL, feedURL *url.URL

logger Logger
}
Expand Down Expand Up @@ -197,10 +197,6 @@ func (c *Client) parseQueryURL() (*url.URL, error) {
}
}

if c.queryURL == nil {
return nil, fmt.Errorf("query url is not set")
}

return c.queryURL, nil
}

Expand All @@ -213,11 +209,19 @@ func (c *Client) parseStreamURL() (*url.URL, error) {
}
}

if c.streamURL == nil {
return nil, fmt.Errorf("stream url is not set")
return c.streamURL, nil
}

func (c *Client) parseFeedURL() (*url.URL, error) {
if c.feedURL == nil {
if feedURL, err := url.Parse(c.url); err != nil {
return nil, err
} else {
c.feedURL = feedURL.JoinPath("feed", "1")
}
}

return c.streamURL, nil
return c.feedURL, nil
}

func (c *Client) doWithRetry(req *http.Request) (attempts int, r *http.Response, err error) {
Expand Down Expand Up @@ -423,3 +427,49 @@ func (c *Client) String() string {
func (c *Client) setHeader(key, val string) {
c.headers[key] = val
}

// Feed opens an event feed from the event source
func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) {
feedOpts, err := parseFeedOptions(opts...)
if err != nil {
return nil, err
}

return newEventFeed(c, stream, feedOpts)
}

// FeedFromQuery opens an event feed from a query
func (c *Client) FeedFromQuery(query *Query, opts ...FeedOptFn) (*EventFeed, error) {
feedOpts, err := parseFeedOptions(opts...)
if err != nil {
return nil, err
}
if feedOpts.Cursor != nil {
return nil, fmt.Errorf("cannot use EventFeedCursor with FeedFromQuery")
}

res, err := c.Query(query)
pnwpedro marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

eventSource, ok := res.Data.(EventSource)
if !ok {
return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data)
}

return newEventFeed(c, eventSource, feedOpts)
}

func parseFeedOptions(opts ...FeedOptFn) (*feedOptions, error) {
feedOpts := feedOptions{}
for _, optFn := range opts {
optFn(&feedOpts)
}

if feedOpts.StartTS != nil && feedOpts.Cursor != nil {
return nil, fmt.Errorf("cannot use EventFeedStartTime and EventFeedCursor together")
}

return &feedOpts, nil
}
20 changes: 20 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,23 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string {

return strings.ReplaceAll(params.Encode(), "&", ",")
}

// FeedOptFn function to set options on the [fauna.EventFeed]
type FeedOptFn func(req *feedOptions)

// EventFeedCursor set the cursor for the [fauna.EventFeed]
// cannot be used with [EventFeedStartTime] or in [fauna.Client.FeedFromQuery]
func EventFeedCursor(cursor string) FeedOptFn {
return func(req *feedOptions) { req.Cursor = &cursor }
}

// EventFeedStartTime set the start time for the [fauna.EventFeed]
// cannot be used with [EventFeedCursor]
func EventFeedStartTime(ts int64) FeedOptFn {
return func(req *feedOptions) { req.StartTS = &ts }
}

// EventFeedPageSize set the page size for the [fauna.EventFeed]
func EventFeedPageSize(ts int) FeedOptFn {
return func(req *feedOptions) { req.PageSize = &ts }
}
94 changes: 94 additions & 0 deletions event_feed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package fauna

import (
"encoding/json"
)

// EventFeed represents an event feed subscription.
type EventFeed struct {
client *Client

source EventSource

decoder *json.Decoder

opts *feedOptions
lastCursor string
}

type feedOptions struct {
PageSize *int
Cursor *string
StartTS *int64
}

func newEventFeed(client *Client, source EventSource, opts *feedOptions) (*EventFeed, error) {
feed := &EventFeed{
pnwpedro marked this conversation as resolved.
Show resolved Hide resolved
client: client,
source: source,
opts: opts,
}

return feed, nil
}

func (ef *EventFeed) newFeedRequest() (*feedRequest, error) {
req := feedRequest{
apiRequest: apiRequest{
ef.client.ctx,
ef.client.headers,
},
Source: ef.source,
Cursor: ef.lastCursor,
}
if ef.opts.StartTS != nil {
req.StartTS = *ef.opts.StartTS
}
if ef.opts.Cursor != nil {
req.Cursor = *ef.opts.Cursor
}
if ef.opts.PageSize != nil {
req.PageSize = *ef.opts.PageSize
}

return &req, nil
}

func (ef *EventFeed) open() error {
req, err := ef.newFeedRequest()
if err != nil {
return err
}

byteStream, err := req.do(ef.client)
if err != nil {
return err
}

ef.decoder = json.NewDecoder(byteStream)

return nil
}

// FeedPage represents the response from [fauna.EventFeed.Next]
type FeedPage struct {
Events []Event `json:"events"`
Cursor string `json:"cursor"`
HasNext bool `json:"has_next"`
Stats Stats `json:"stats"`
}

// Next retrieves the next FeedPage from the [fauna.EventFeed]
func (ef *EventFeed) Next(page *FeedPage) error {
if err := ef.open(); err != nil {
return err
}

if err := ef.decoder.Decode(&page); err != nil {
return err
}

ef.lastCursor = page.Cursor

return nil
}
48 changes: 48 additions & 0 deletions event_feed_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package fauna_test

import (
"fmt"
"log"

"github.com/fauna/fauna-go/v3"
)

func ExampleEventFeed_Next() {
client := fauna.NewClient("secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal))

query, queryErr := fauna.FQL(`Collection.byName("EventFeedTest")?.delete()
Collection.create({ name: "EventFeedTest" })
EventFeedTest.all().eventSource()`, nil)
if queryErr != nil {
log.Fatal(queryErr.Error())
}

feed, feedErr := client.FeedFromQuery(query)
if feedErr != nil {
log.Fatal(feedErr.Error())
}

addOne, _ := fauna.FQL(`EventFeedTest.create({ foo: 'bar' })`, nil)
_, addOneErr := client.Query(addOne)
if addOneErr != nil {
log.Fatal(addOneErr.Error())
}

for {
var page fauna.FeedPage
eventErr := feed.Next(&page)
if eventErr != nil {
log.Fatal(eventErr.Error())
}

for _, event := range page.Events {
fmt.Println(event.Type)
}

if !page.HasNext {
break
}
}

// Output: add
}
Loading
Loading