Skip to content

Commit

Permalink
ENG-6679: Introduce support for change feeds
Browse files Browse the repository at this point in the history
  • Loading branch information
erickpintor committed Sep 19, 2024
1 parent db5c395 commit 987b21e
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 4 deletions.
126 changes: 126 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ try:
except FaunaException as e:
print('error ocurred with stream: ', e)
```

### Stream options

The client configuration sets default options for the ``stream()``
Expand All @@ -408,6 +409,131 @@ options = StreamOptions(
client.stream(fql('Product.all().toStream()'), options)
```

## Change Feeds

<!-- TODO: turn "Changefeeds" into a link when available. -->

The driver supports Changefeeds.

### Start a changefeed

Changefeeds work by interactively polling from a [Fauna Stream](https://docs.fauna.com/fauna/current/learn/streaming).
To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a
[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).

To create a changefeed, pass the stream token to ``change_feed()`` method:

```python
from fauna import fql
from fauna.client import Client

client = Client()

response = client.query(fql('''
let set = Product.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
}
'''))

initialPage = response.data['initialPage']
streamToken = response.data['streamToken']

client.change_feed(streamToken)
```

You can also pass a query that produces a stream token directly to
``change_feed()``:

```python
query = fql('Product.all().changesOn(.price, .quantity)')

client.change_feed(query)
```

### Iterate on a changefeed

``change_feed()`` returns an iterator that emits pates of events on every
iteration. You can use a generator expression to iterate through the pages:

```python
query = fql('Product.all().changesOn(.price, .quantity)')

with client.change_feed(query) as feed:
for page in feed:
print('Page stats: ', page.stats)

for event in page.events:
eventType = event['type']
if (eventType == 'add'):
print('Add event: ', event)
## ...
elif (eventType == 'update'):
print('Update event: ', event)
## ...
elif (eventType == 'remove'):
print('Remove event: ', event)
## ...
```

Alternatively, you can iterate through events instead of pages with
``flatten()``:

```python
query = fql('Product.all().changesOn(.price, .quantity)')

with client.change_feed(query) as feed:
for event in feed.flatten():
eventType = event['type']
## ...
```

The changefeed iterator will stop once there are no more events to poll.

### Error handling

If a non-retryable error occurs when opening or processing a changefeed, Fauna
raises a ``FaunaException``:

```python
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException

client = Client()

try:
with client.change_feed(fql(
'Product.all().changesOn(.price, .quantity)'
)) as feed:
for event in feed.flatten():
print(event)
# ...
except FaunaException as e:
print('error ocurred with stream: ', e)
```

### Changefeed options

The client configuration sets default options for the ``change_feed()``
method.

You can pass a ``ChangeFeedOptions`` object to override these defaults:

```python
options = ChangeFeedOptions(
max_attempts=3,
max_backoff=20,
query_timeout=timedelta(seconds=5),
page_size=None,
cursor=None,
start_ts=None,
)

client.change_feed(fql('Product.all().toStream()'), options)
```

## Setup

```bash
Expand Down
6 changes: 4 additions & 2 deletions docker/feature-flags.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"fqlx_typecheck_default": true,
"persisted_fields": true,
"changes_by_collection_index": true,
"fql2_streams": true
"fql2_streams": true,
"change_feeds": true
}
},
{
Expand All @@ -20,7 +21,8 @@
"fqlx_typecheck_default": true,
"persisted_fields": true,
"changes_by_collection_index": true,
"fql2_streams": true
"fql2_streams": true,
"change_feeds": true
}
}
]
Expand Down
2 changes: 1 addition & 1 deletion fauna/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .client import Client, QueryOptions, StreamOptions
from .client import Client, QueryOptions, StreamOptions, ChangeFeedOptions
from .endpoints import Endpoints
from .headers import Header
142 changes: 141 additions & 1 deletion fauna/client/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, Iterator, Mapping, Optional, Union
from typing import Any, Dict, Iterator, Mapping, Optional, Union, List

import fauna
from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header
Expand Down Expand Up @@ -70,6 +70,27 @@ class StreamOptions:
status_events: bool = False


@dataclass
class ChangeFeedOptions:
"""
A dataclass representing options available for a change feed.
* max_attempts - The maximum number of times to attempt a change feed query when a retryable exception is thrown.
* max_backoff - The maximum backoff in seconds for an individual retry.
* query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
* start_ts - The starting timestamp of the change feed, exclusive. If set, Fauna will return events starting after
the timestamp.
* cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
* page_size - The desired number of events per page.
"""
max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
query_timeout: Optional[timedelta] = None
page_size: Optional[int] = None
start_ts: Optional[int] = None
cursor: Optional[str] = None


class Client:

def __init__(
Expand Down Expand Up @@ -438,6 +459,49 @@ def stream(
return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
self._max_attempts, self._max_backoff, opts, token)

def change_feed(
self,
fql: Union[StreamToken, Query],
opts: ChangeFeedOptions = ChangeFeedOptions()
) -> "ChangeFeedIterator":
"""
Opens a change feed in Fauna and returns an iterator that consume Fauna events.
:param fql: A Query that returns a StreamToken or a StreamToken.
:param opts: (Optional) Change feed options.
:return: a :class:`ChangeFeedIterator`
:raises ClientError: Invalid options provided
:raises NetworkError: HTTP Request failed in transit
:raises ProtocolError: HTTP error not from Fauna
:raises ServiceError: Fauna returned an error
:raises ValueError: Encoding and decoding errors
:raises TypeError: Invalid param types
"""

if isinstance(fql, Query):
token = self.query(fql).data
else:
token = fql

if not isinstance(token, StreamToken):
err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}."
raise TypeError(err_msg)

headers = self._headers.copy()
headers[_Header.Format] = "tagged"
headers[_Header.Authorization] = self._auth.bearer()

if opts.query_timeout is not None:
query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000)
headers[Header.QueryTimeoutMs] = str(query_timeout_ms)
elif self._query_timeout_ms is not None:
headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)

return ChangeFeedIterator(self._session, headers, self._endpoint + "/changefeed/1",
self._max_attempts, self._max_backoff, opts, token)

def _check_protocol(self, response_json: Any, status_code):
# TODO: Logic to validate wire protocol belongs elsewhere.
should_raise = False
Expand Down Expand Up @@ -574,6 +638,82 @@ def close(self):
self._stream.close()


@dataclass
class ChangeFeedPage:
events: List[Any]
cursor: str
stats: QueryStats

def __iter__(self) -> Iterator[Any]:
return iter(self.events)


class ChangeFeedIterator:
"""A class to provide an iterator on top of change feed pages."""

def __init__(self, http: HTTPClient, headers: Dict[str, str],
endpoint: str, max_attempts: int, max_backoff: int,
opts: ChangeFeedOptions, token: StreamToken):
self._http = http
self._headers = headers
self._endpoint = endpoint
self._max_attempts = opts.max_attempts or max_attempts
self._max_backoff = opts.max_backoff or max_backoff
self._request = {"token": token.token}
self._is_done = False

if opts.page_size is not None:
self._request["page_size"] = opts.page_size

if opts.cursor is not None:
self._request["cursor"] = opts.cursor
elif opts.start_ts is not None:
self._request["start_ts"] = opts.start_ts

def __iter__(self) -> Iterator[ChangeFeedPage]:
self._is_done = False
return self

def __next__(self) -> ChangeFeedPage:
if self._is_done:
raise StopIteration

retryable = Retryable[Any](self._max_attempts, self._max_backoff, self._next_page)
return retryable.run().response

def _next_page(self) -> ChangeFeedPage:
with self._http.request(
method="POST",
url=self._endpoint,
headers=self._headers,
data=self._request,
) as response:
status_code = response.status_code()
decoded = FaunaDecoder.decode(response.json())

if status_code > 399:
FaunaError.parse_error_and_throw(decoded, status_code)

self._is_done = not decoded["has_next"]
self._request["cursor"] = decoded["cursor"]

if "start_ts" in self._request:
del self._request["start_ts"]

for event in decoded["events"]:
if event["type"] == "error":
FaunaError.parse_error_and_throw(event, 400)

return ChangeFeedPage(events = decoded["events"],
cursor = decoded["cursor"],
stats = QueryStats(decoded["stats"]))

def flatten(self) -> Iterator:
"""A generator that yields events instead of pages of events."""
for page in self:
for event in page:
yield event

class QueryIterator:
"""A class to provider an iterator on top of Fauna queries."""

Expand Down
Loading

0 comments on commit 987b21e

Please sign in to comment.