Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs - add Streaming Updates page #1049

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api/rtk-query/createApi.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ export const defaultSerializeQueryArgs: SerializeQueryArgs<any> = ({
```
- `onCacheEntryAdded` _(optional)_ - Available to both [queries](../../usage/rtk-query/queries.mdx) and [mutations](../../usage/rtk-query/mutations.mdx)

- Can be used for TODO
- Can be used for [streaming updates](../../usage/rtk-query/streaming-updates.mdx).
- ```ts title="Mutation onCacheEntryAdded signature" no-transpile
async function onCacheEntryAdded(
arg: QueryArg,
Expand Down
7 changes: 4 additions & 3 deletions docs/api/rtk-query/created-api/cache-management.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const updateQueryResult = (
interface PatchCollection {
patches: Patch[];
inversePatches: Patch[];
undo: () => void;
}
```

Expand All @@ -37,9 +38,9 @@ A Redux thunk action creator that, when dispatched, creates and applies a set of

The thunk action creator accepts three arguments: the name of the endpoint we are updating (such as `'getPost'`), any relevant query arguments, and a callback function. The callback receives an Immer-wrapped `draft` of the current state, and may modify the draft to match the expected results after the mutation completes successfully.

The thunk returns an object containing `{patches: Patch[], inversePatches: Patch[]}`, generated using Immer's [`produceWithPatches` method](https://immerjs.github.io/immer/patches).
The thunk returns an object containing `{patches: Patch[], inversePatches: Patch[], undo: () => void}`. The `patches` and `inversePatches` are generated using Immer's [`produceWithPatches` method](https://immerjs.github.io/immer/patches).

This is typically used as the first step in implementing optimistic updates. The generated `inversePatches` can be passed between the `onStart` and `onError` callbacks in a mutation definition by assigning the `inversePatches` array to the provided `context` object, and then the updates can be reverted by calling `patchQueryResult(endpointName, args, inversePatches)`.
This is typically used as the first step in implementing optimistic updates. The generated `inversePatches` can be used to revert the updates by calling `dispatch(patchQueryResult(endpointName, args, inversePatches))`. Alternatively, the `undo` method can be called directly to achieve the same effect.

### `patchQueryResult`

Expand All @@ -59,7 +60,7 @@ A Redux thunk that applies a JSON diff/patch array to the cached data for a give

The thunk accepts three arguments: the name of the endpoint we are updating (such as `'getPost'`), any relevant query arguments, and a JSON diff/patch array as produced by Immer's `produceWithPatches`.

This is typically used as the second step in implementing optimistic updates. If a request fails, the optimistically-applied changes can be reverted by dispatching `patchQueryResult` with the `inversePatches` that were generated by `updateQueryResult` earlier.
This is typically used as the second step in implementing optimistic updates. If a request fails, the optimistically-applied changes can be reverted by dispatching `patchQueryResult` with the `inversePatches` that were generated by `updateQueryResult` earlier, or by calling the `undo` method returned from dispatching `updateQueryResult`.

### `prefetch`

Expand Down
8 changes: 4 additions & 4 deletions docs/usage/rtk-query/cached-data.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,10 @@ The matrix below shows examples of which invalidated tags will affect and invali
<table>
<thead>
<tr>
<th class="diagonal-cell">
<div class="diagonal-cell--content">
<div class="diagonal-cell--topRight">Provided</div>
<div class="diagonal-cell--bottomLeft">Invalidated</div>
<th className="diagonal-cell">
<div className="diagonal-cell--content">
<div className="diagonal-cell--topRight">Provided</div>
<div className="diagonal-cell--bottomLeft">Invalidated</div>
</div>
</th>
<th>
Expand Down
230 changes: 230 additions & 0 deletions docs/usage/rtk-query/streaming-updates.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
---
id: streaming-updates
title: Streaming Updates
sidebar_label: Streaming Updates
hide_title: true
---

# Streaming Updates

RTK Query gives you the ability to receive **streaming updates** for persistent queries. This enables a query to react to an ongoing connection to the server (typically using WebSockets), updating the cached data as updates to the query are received from the server when they occur.

Streaming updates can be used to enable the API to receive real-time updates to the back-end data, such as new entries being created, or important properties being updated.

To enable streaming updates for a query, pass the asynchronous `onCacheEntryAdded` function to the query, including the logic for how to update the query when streamed data is received. See [anatomy of an endpoint](../../api/rtk-query/createApi#anatomy-of-an-endpoint) for more details.

## When to use streaming updates

Primarily updates to query data should be done via [`polling`](./polling) intermittently on an interval, using [`cache invalidation`](./cached-data#advanced-invalidation-with-abstract-tag-ids) to invalidate data based on tags associated with queries & mutations, or with [`refetchOnMountOrArgChange`](../../api/rtk-query/createApi#refetchonmountorargchange) to fetch fresh data when a component using the data mounts.

However, streaming updates is particularly useful for scenarios involving:

- _Small, frequent changes to large objects_. Rather than repeatedly polling for a large object repeatedly, the object can be fetched with an initial query, and streaming updates can update individual properties as updates are received.
- _External event-driven updates_. Where data may be changed by the server or otherwise external users and where real-time updates are expected to be shown to an active user, polling alone would result in periods of stale data in between queries, causing state to easily get out of sync. Streaming updates can update all active clients as the updates occur rather than waiting for the next interval to elapse.

Example use cases that benefit from streaming updates are:

- GraphQL subscriptions
- Real-time chat applications
- Real-time multiplayer games
- Collaborative document editing with multiple concurrent users

### Example - Websocket Chat API

```ts
// file: schemaValidators.ts noEmit
import type { Message } from './api'

export function isMessage(message: unknown): message is Message {
// in real code this would check `message` to ensure it is a `Message`
return true
}

// file: api.ts
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
id: number
channel: Channel
userName: string
text: string
}

export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<Message[], Channel>({
query: (channel) => `messages/${channel}`,
// highlight-start
async onCacheEntryAdded(
arg,
{ updateCacheEntry, firstValueResolved, cleanup }
) {
// create a websocket connection when the cache subscription starts
const ws = new WebSocket('ws://localhost:8080')
try {
// wait for the initial query to resolve before proceeding
await firstValueResolved

// when data is received from the socket connection to the server,
// if it is a message and for the appropriate channel,
// update our query result with the received message
const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return

updateCacheEntry((draft) => {
draft.push(data)
})
}

ws.addEventListener('message', listener)
} catch {
// no-op in case `cleanup` resolves before `firstValueResolved`,
// in which case `firstValueResolved` will throw
}
// cleanup will resolve when the cache subscription is no longer active
await cleanup
// perform cleanup steps once the `cleanup` promise resolves
ws.close()
},
// highlight-end
}),
}),
})

export const { useGetMessagesQuery } = api
```

#### What to expect

When the `getMessages` query is triggered (e.g. via a component mounting with the `useGetMessagesQuery()` hook), a `cache entry` will be added based on the serialized arguments for the endpoint. The associated query will be fired off based on the `query` property to fetch the initial data for the cache. Meanwhile, the asynchronous `onCacheEntryAdded` callback will begin, and create a new WebSocket connection. Once the response for the initial query is received, the cache will be populated with the response data, and the `firstValueResolved` promise will resolve. After awaiting the `firstValueResolved` promise, the `message` event listener will be added to the WebSocket connection, which updates the cache data when an associated message is received.

When there are no more active subscriptions to the data (e.g. when the subscribed components remain unmounted for a sufficient amount of time), the `cleanup` promise will resolve, allowing the remaining code to run and close the websocket connection. RTK Query will also remove the associated data from the cache.

If a query for the corresponding cache entry runs later, it will overwrite the whole cache entry, and the streaming update listeners will continue to work on the updated data.

### Example - Websocket Chat API with a transformed response shape

```ts
// file: schemaValidators.ts noEmit
import type { Message } from './api'

export function isMessage(message: unknown): message is Message {
// in real code this would check `message` to ensure it is a `Message`

return true
}

// file: api.ts
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { createEntityAdapter, EntityState } from '@reduxjs/toolkit'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
id: number
channel: Channel
userName: string
text: string
}

// highlight-start
const messagesAdapter = createEntityAdapter<Message>()
// highlight-end
export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
// highlight-start
getMessages: build.query<EntityState<Message>, Channel>({
// highlight-end
query: (channel) => `messages/${channel}`,
// highlight-start
transformResponse(response: Message[]) {
return messagesAdapter.addMany(
messagesAdapter.getInitialState(),
response
)
},
// highlight-end
async onCacheEntryAdded(
arg,
{ updateCacheEntry, firstValueResolved, cleanup }
) {
const ws = new WebSocket('ws://localhost:8080')
try {
await firstValueResolved

const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return

updateCacheEntry((draft) => {
// highlight-start
messagesAdapter.upsertOne(draft, data)
// highlight-end
})
}

ws.addEventListener('message', listener)
} catch {}
await cleanup
ws.close()
},
}),
}),
})

export const { useGetMessagesQuery } = api
```

This example demonstrates how the [previous example](#example---websocket-chat-api) can be altered to allow for transforming the response shape when adding data to the cache.

For example, the data is transformed from this shape:

```ts no-transpile
[
{
id: 0
channel: 'redux'
userName: 'Mark'
text: 'Welcome to #redux!'
},
{
id: 1
channel: 'redux'
userName: 'Lenz'
text: 'Glad to be here!'
},
]
```

To this:

```ts no-transpile
{
// The unique IDs of each item. Must be strings or numbers
ids: [0, 1],
// A lookup table mapping entity IDs to the corresponding entity objects
entities: {
0: {
id: 0,
channel: "redux",
userName: "Mark",
text: "Welcome to #redux!",
},
1: {
id: 1,
channel: "redux",
userName: "Lenz",
text: "Glad to be here!",
},
},
};
```

A key point to keep in mind is that updates to the cached data within the `onCacheEntryAdded` callback must respect the transformed data shape which will be present for the cached data. The example shows how [`createEntityAdapter`](../../api/createEntityAdapter) can be used for the initial `transformResponse`, and again when streamed updates are received to upsert received items into the cached data, while maintaining the normalized state structure.
3 changes: 2 additions & 1 deletion website/sidebars.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
"usage/rtk-query/error-handling",
"usage/rtk-query/conditional-fetching",
"usage/rtk-query/pagination",
"usage/rtk-query/polling",
"usage/rtk-query/prefetching",
"usage/rtk-query/polling",
"usage/rtk-query/streaming-updates",
"usage/rtk-query/optimistic-updates",
"usage/rtk-query/code-splitting",
"usage/rtk-query/code-generation",
Expand Down