Skip to content

Commit

Permalink
ENG-6679: Introduce support for change feeds
Browse files Browse the repository at this point in the history
  • Loading branch information
erickpintor committed Sep 24, 2024
1 parent db5c395 commit 1048efd
Show file tree
Hide file tree
Showing 5 changed files with 461 additions and 4 deletions.
162 changes: 162 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ try:
except FaunaException as e:
print('error ocurred with stream: ', e)
```

### Stream options

The client configuration sets default options for the ``stream()``
Expand All @@ -408,6 +409,167 @@ options = StreamOptions(
client.stream(fql('Product.all().toStream()'), options)
```

## Change Feeds

<!-- TODO: turn "Change Feeds" into a link when available. -->

The driver supports Change Feeds.

### Request a Change Feed

A Change Feed asynchronously polls an [event stream](https://docs.fauna.com/fauna/current/learn/streaming),
represented by a stream token, for events.

To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a
[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).

To get paginated events for the stream, pass the stream token to
``change_feed()``:

```python
from fauna import fql
from fauna.client import Client

client = Client()

response = client.query(fql('''
let set = Product.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
}
'''))

initialPage = response.data['initialPage']
streamToken = response.data['streamToken']

client.change_feed(streamToken)
```

You can also pass a query that produces a stream token directly to
``change_feed()``:

```python
query = fql('Product.all().changesOn(.price, .quantity)')

client.change_feed(query)
```

### Iterate on a Change Feed

``change_feed()`` returns an iterator that emits pages of events. You can use a
generator expression to iterate through the pages:

```python
query = fql('Product.all().changesOn(.price, .quantity)')

with client.change_feed(query) as feed:
for page in feed:
print('Page stats: ', page.stats)

for event in page:
eventType = event['type']
if (eventType == 'add'):
print('Add event: ', event)
## ...
elif (eventType == 'update'):
print('Update event: ', event)
## ...
elif (eventType == 'remove'):
print('Remove event: ', event)
## ...
```

Alternatively, you can iterate through events instead of pages with
``flatten()``:

```python
query = fql('Product.all().changesOn(.price, .quantity)')

with client.change_feed(query) as feed:
for event in feed.flatten():
eventType = event['type']
## ...
```

The change feed iterator stops when there are no more events to poll.

### Error handling

If a non-retryable error occurs when opening or processing a change feed, Fauna
raises a ``FaunaException``:

```python
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException

client = Client()

try:
feed = client.change_feed(fql(
'Product.all().changesOn(.price, .quantity)'
))
for event in feed.flatten():
print(event)
# ...
except FaunaException as e:
print('error ocurred with change feed: ', e)
```

Errors can be raised at two different places:

1. At the ``change_feed`` method call;
2. At the page iteration.

This distinction allows for users to ignore errors originating from event
processing. For example:

```python
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException

client = Client()

# Imagine if there are some products with details = null.
# The ones without details will fail due to the toUpperCase call.
feed = client.change_feed(fql(
'Product.all().map(.details.toUpperCase()).toStream()'
))

for page in feed:
try:
for event in page:
print(event)
# ...
except FaunaException as e:
# Pages will stop at the first error encountered.
# Therefore, its safe to handle an event failures
# and then pull more pages.
print('error ocurred with event processing: ', e)
```

### Change Feed options

The client configuration sets default options for the ``change_feed()``
method.

You can pass a ``ChangeFeedOptions`` object to override these defaults:

```python
options = ChangeFeedOptions(
max_attempts=3,
max_backoff=20,
query_timeout=timedelta(seconds=5),
page_size=None,
cursor=None,
start_ts=None,
)

client.change_feed(fql('Product.all().toStream()'), options)
```

## Setup

```bash
Expand Down
6 changes: 4 additions & 2 deletions docker/feature-flags.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"fqlx_typecheck_default": true,
"persisted_fields": true,
"changes_by_collection_index": true,
"fql2_streams": true
"fql2_streams": true,
"change_feeds": true
}
},
{
Expand All @@ -20,7 +21,8 @@
"fqlx_typecheck_default": true,
"persisted_fields": true,
"changes_by_collection_index": true,
"fql2_streams": true
"fql2_streams": true,
"change_feeds": true
}
}
]
Expand Down
2 changes: 1 addition & 1 deletion fauna/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .client import Client, QueryOptions, StreamOptions
from .client import Client, QueryOptions, StreamOptions, ChangeFeedOptions
from .endpoints import Endpoints
from .headers import Header
145 changes: 144 additions & 1 deletion fauna/client/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, Iterator, Mapping, Optional, Union
from typing import Any, Dict, Iterator, Mapping, Optional, Union, List

import fauna
from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header
Expand Down Expand Up @@ -70,6 +70,27 @@ class StreamOptions:
status_events: bool = False


@dataclass
class ChangeFeedOptions:
"""
A dataclass representing options available for a change feed.
* max_attempts - The maximum number of times to attempt a change feed query when a retryable exception is thrown.
* max_backoff - The maximum backoff in seconds for an individual retry.
* query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
* start_ts - The starting timestamp of the change feed, exclusive. If set, Fauna will return events starting after
the timestamp.
* cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
* page_size - The desired number of events per page.
"""
max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
query_timeout: Optional[timedelta] = None
page_size: Optional[int] = None
start_ts: Optional[int] = None
cursor: Optional[str] = None


class Client:

def __init__(
Expand Down Expand Up @@ -438,6 +459,49 @@ def stream(
return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
self._max_attempts, self._max_backoff, opts, token)

def change_feed(
self,
fql: Union[StreamToken, Query],
opts: ChangeFeedOptions = ChangeFeedOptions()
) -> "ChangeFeedIterator":
"""
Opens a change feed in Fauna and returns an iterator that consume Fauna events.
:param fql: A Query that returns a StreamToken or a StreamToken.
:param opts: (Optional) Change feed options.
:return: a :class:`ChangeFeedIterator`
:raises ClientError: Invalid options provided
:raises NetworkError: HTTP Request failed in transit
:raises ProtocolError: HTTP error not from Fauna
:raises ServiceError: Fauna returned an error
:raises ValueError: Encoding and decoding errors
:raises TypeError: Invalid param types
"""

if isinstance(fql, Query):
token = self.query(fql).data
else:
token = fql

if not isinstance(token, StreamToken):
err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}."
raise TypeError(err_msg)

headers = self._headers.copy()
headers[_Header.Format] = "tagged"
headers[_Header.Authorization] = self._auth.bearer()

if opts.query_timeout is not None:
query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000)
headers[Header.QueryTimeoutMs] = str(query_timeout_ms)
elif self._query_timeout_ms is not None:
headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)

return ChangeFeedIterator(self._session, headers, self._endpoint + "/changefeed/1",
self._max_attempts, self._max_backoff, opts, token)

def _check_protocol(self, response_json: Any, status_code):
# TODO: Logic to validate wire protocol belongs elsewhere.
should_raise = False
Expand Down Expand Up @@ -574,6 +638,85 @@ def close(self):
self._stream.close()


class ChangeFeedPage:

def __init__(self, events: List[Any], cursor: str, stats: QueryStats):
self._events = events
self.cursor = cursor
self.stats = stats

def __len__(self):
return len(self._events)

def __iter__(self) -> Iterator[Any]:
for event in self._events:
if event["type"] == "error":
FaunaError.parse_error_and_throw(event, 400)
yield event


class ChangeFeedIterator:
"""A class to provide an iterator on top of change feed pages."""

def __init__(self, http: HTTPClient, headers: Dict[str, str],
endpoint: str, max_attempts: int, max_backoff: int,
opts: ChangeFeedOptions, token: StreamToken):
self._http = http
self._headers = headers
self._endpoint = endpoint
self._max_attempts = opts.max_attempts or max_attempts
self._max_backoff = opts.max_backoff or max_backoff
self._request = {"token": token.token}
self._is_done = False

if opts.page_size is not None:
self._request["page_size"] = opts.page_size

if opts.cursor is not None:
self._request["cursor"] = opts.cursor
elif opts.start_ts is not None:
self._request["start_ts"] = opts.start_ts

def __iter__(self) -> Iterator[ChangeFeedPage]:
self._is_done = False
return self

def __next__(self) -> ChangeFeedPage:
if self._is_done:
raise StopIteration

retryable = Retryable[Any](self._max_attempts, self._max_backoff, self._next_page)
return retryable.run().response

def _next_page(self) -> ChangeFeedPage:
with self._http.request(
method="POST",
url=self._endpoint,
headers=self._headers,
data=self._request,
) as response:
status_code = response.status_code()
decoded = FaunaDecoder.decode(response.json())

if status_code > 399:
FaunaError.parse_error_and_throw(decoded, status_code)

self._is_done = not decoded["has_next"]
self._request["cursor"] = decoded["cursor"]

if "start_ts" in self._request:
del self._request["start_ts"]

return ChangeFeedPage(decoded["events"],
decoded["cursor"],
QueryStats(decoded["stats"]))

def flatten(self) -> Iterator:
"""A generator that yields events instead of pages of events."""
for page in self:
for event in page:
yield event

class QueryIterator:
"""A class to provider an iterator on top of Fauna queries."""

Expand Down
Loading

0 comments on commit 1048efd

Please sign in to comment.