Skip to content

Commit

Permalink
feat: add readonly document store
Browse files Browse the repository at this point in the history
Additionally:
- renames various `observe*`-functions to `listen*`
- renames createLocalDataset to createOptimisticStore
- adds robust handling of listener events not arriving or arriving out of order
- adds "id set"-listener to support subscribing to the set of document ids whose documents matches a certain given filter
  • Loading branch information
bjoerge committed Nov 28, 2024
1 parent 5a5fea9 commit 8b4d306
Show file tree
Hide file tree
Showing 16 changed files with 699 additions and 172 deletions.
142 changes: 17 additions & 125 deletions examples/web/App.tsx
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
import {
createClient,
type MutationEvent as ClientMutationEvent,
type ReconnectEvent,
type SanityClient,
type WelcomeEvent,
} from '@sanity/client'
import {createClient} from '@sanity/client'
import {CollapseIcon, ExpandIcon} from '@sanity/icons'
import {
createIfNotExists,
del,
type Mutation,
type Path,
type SanityDocumentBase,
SanityEncoder,
} from '@sanity/mutate'
import {
createLocalDataset,
type ListenerEvent,
type ListenerSyncEvent,
createDocumentEventListener,
createDocumentLoader,
createOptimisticStore,
createSharedListener,
type MutationGroup,
type RemoteDocumentEvent,
type SanityMutation,
} from '@sanity/mutate/_unstable_store'
import {
draft,
Expand Down Expand Up @@ -54,22 +47,8 @@ import {
TabPanel,
Text,
} from '@sanity/ui'
import {type RawPatch} from 'mendoza'
import {Fragment, type ReactNode, useCallback, useEffect, useState} from 'react'
import {
concatMap,
defer,
filter,
from,
map,
merge,
type Observable,
of,
share,
shareReplay,
tap,
timer,
} from 'rxjs'
import {concatMap, filter, from, merge, tap} from 'rxjs'
import styled from 'styled-components'

import {DocumentView} from './DocumentView'
Expand Down Expand Up @@ -173,65 +152,6 @@ function renderInput<Props extends InputProps<SanityAny>>(
const personDraft = draft(person)
type PersonDraft = Infer<typeof personDraft>

interface SharedListenerOptions {
shutdownDelay?: number
includeMutations?: boolean
}

function createSharedListener(
client: SanityClient,
options: SharedListenerOptions = {},
) {
const {shutdownDelay, includeMutations} = options
const allEvents$ = client
.listen(
'*[!(_id in path("_.**"))]',
{},
{
events: ['welcome', 'mutation', 'reconnect'],
includeResult: false,
includePreviousRevision: false,
visibility: 'transaction',
effectFormat: 'mendoza',
...(includeMutations ? {} : {includeMutations: false}),
},
)
.pipe(
share({
resetOnRefCountZero: shutdownDelay ? () => timer(shutdownDelay) : true,
}),
)

// Reconnect events emitted in case the connection is lost
const reconnect = allEvents$.pipe(
filter((event): event is ReconnectEvent => event.type === 'reconnect'),
)

// Welcome events are emitted when the listener is (re)connected
const welcome = allEvents$.pipe(
filter((event): event is WelcomeEvent => event.type === 'welcome'),
)

// Mutation events coming from the listener
const mutations = allEvents$.pipe(
filter((event): event is ClientMutationEvent => event.type === 'mutation'),
)

// Replay the latest connection event that was emitted either when the connection was disconnected ('reconnect'), established or re-established ('welcome')
const connectionEvent = merge(welcome, reconnect).pipe(
shareReplay({bufferSize: 1, refCount: true}),
)

// Emit the welcome event if the latest connection event was the 'welcome' event.
// Downstream subscribers will typically map the welcome event to an initial fetch
const replayWelcome = connectionEvent.pipe(
filter(latestConnectionEvent => latestConnectionEvent.type === 'welcome'),
)

// Combine into a single stream
return merge(replayWelcome, mutations, reconnect)
}

const sanityClient = createClient({
projectId: import.meta.env.VITE_SANITY_API_PROJECT_ID,
dataset: import.meta.env.VITE_SANITY_API_DATASET,
Expand All @@ -240,47 +160,19 @@ const sanityClient = createClient({
token: import.meta.env.VITE_SANITY_API_TOKEN,
})

const listener = createSharedListener(sanityClient)
const sharedListener = createSharedListener({
client: sanityClient,
})

const RECONNECT_EVENT: ReconnectEvent = {type: 'reconnect'}
const loadDocument = createDocumentLoader({client: sanityClient})

function observe(documentId: string) {
return defer(() => listener).pipe(
filter(
(event): event is WelcomeEvent | ClientMutationEvent | ReconnectEvent =>
event.type === 'welcome' ||
event.type === 'reconnect' ||
(event.type === 'mutation' && event.documentId === documentId),
),
concatMap(
(event): Observable<ListenerEvent> =>
event.type === 'reconnect'
? of(RECONNECT_EVENT)
: event.type === 'welcome'
? sanityClient.observable.getDocument(documentId).pipe(
map(
(doc: undefined | SanityDocumentBase): ListenerSyncEvent => ({
type: 'sync',
document: doc,
}),
),
)
: of({
type: 'mutation' as const,
documentId: event.documentId,
transactionId: event.transactionId,
transition: 'update',
effects: event.effects as {apply: RawPatch},
previousRev: event.previousRev!,
resultRev: event.resultRev!,
mutations: event.mutations as SanityMutation[],
}),
),
)
}
const listenDocument = createDocumentEventListener({
loadDocument,
listenerEvents: sharedListener,
})

const datastore = createLocalDataset({
observe,
const datastore = createOptimisticStore({
listen: listenDocument,
submit: transactions => {
return from(transactions).pipe(
concatMap(transaction =>
Expand Down Expand Up @@ -324,7 +216,7 @@ function App() {
}, [])
useEffect(() => {
const sub = datastore
.observeEvents(documentId)
.listenEvents(documentId)
.pipe(
tap(event => {
setDocumentState(current => {
Expand Down
38 changes: 19 additions & 19 deletions src/store/__test__/localDataset.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import {concat, delay, NEVER, of, take} from 'rxjs'
import {describe, expect, test} from 'vitest'

import {createLocalDataset} from '../createLocalDataset'
import {createOptimisticStore} from '../createOptimisticStore'
import {allValuesFrom, collectNotifications, sleep} from './helpers'

describe('observing documents', () => {
test('observing a document that does not exist on the backend', async () => {
const store = createLocalDataset({
observe: id => of({type: 'sync', id, document: undefined}),
const store = createOptimisticStore({
listen: id => of({type: 'sync', id, document: undefined}),
submit: () => NEVER,
})
await expect(
allValuesFrom(store.observeEvents('foo').pipe(take(1))),
allValuesFrom(store.listenEvents('foo').pipe(take(1))),
).resolves.toEqual([
{
type: 'sync',
Expand All @@ -24,13 +24,13 @@ describe('observing documents', () => {
})
test('observing a document that exist on the backend', async () => {
const doc = {_id: 'foo', _type: 'foo'}
const store = createLocalDataset({
observe: id =>
const store = createOptimisticStore({
listen: id =>
of({type: 'sync', id, document: doc} as const).pipe(delay(10)),
submit: () => NEVER,
})
await expect(
allValuesFrom(store.observeEvents(doc._id).pipe(take(1))),
allValuesFrom(store.listenEvents(doc._id).pipe(take(1))),
).resolves.toEqual([
{
type: 'sync',
Expand All @@ -47,16 +47,16 @@ describe('observing documents', () => {

test("observing a document that doesn't exist initially, but later is created", async () => {
const doc = {_id: 'foo', _type: 'foo'}
const store = createLocalDataset({
observe: id =>
const store = createOptimisticStore({
listen: id =>
concat(
of({type: 'sync', id, document: undefined} as const),
of({type: 'sync', id, document: doc} as const).pipe(delay(10)),
),
submit: () => NEVER,
})
await expect(
allValuesFrom(store.observeEvents(doc._id).pipe(take(2))),
allValuesFrom(store.listenEvents(doc._id).pipe(take(2))),
).resolves.toEqual([
{
type: 'sync',
Expand All @@ -80,13 +80,13 @@ describe('observing documents', () => {
})
describe('local mutations', () => {
test('mutating a document that does not exist on the backend', () => {
const store = createLocalDataset({
observe: id => of({type: 'sync', id, document: undefined}),
const store = createOptimisticStore({
listen: id => of({type: 'sync', id, document: undefined}),
submit: () => NEVER,
})

const {emissions, unsubscribe} = collectNotifications(
store.observeEvents('foo'),
store.listenEvents('foo'),
)

store.mutate([{type: 'create', document: {_id: 'foo', _type: 'foo'}}])
Expand Down Expand Up @@ -137,8 +137,8 @@ describe('local mutations', () => {

test("observing a document that doesn't exist initially, but later is created locally", async () => {
const doc = {_id: 'foo', _type: 'foo'}
const store = createLocalDataset({
observe: id =>
const store = createOptimisticStore({
listen: id =>
concat(
of({type: 'sync', id, document: undefined} as const).pipe(delay(10)),
of({type: 'sync', id, document: doc} as const),
Expand All @@ -147,7 +147,7 @@ describe('local mutations', () => {
})

const {emissions, unsubscribe} = collectNotifications(
store.observeEvents('foo'),
store.listenEvents('foo'),
)

store.mutate([{type: 'createIfNotExists', document: doc}])
Expand Down Expand Up @@ -248,14 +248,14 @@ describe('local mutations', () => {

test("error when creating a document locally using 'create', when it turns out later that it exists on the server ", async () => {
const doc = {_id: 'foo', _type: 'foo'}
const store = createLocalDataset({
observe: id =>
const store = createOptimisticStore({
listen: id =>
concat(of({type: 'sync', id, document: doc} as const).pipe(delay(10))),
submit: () => NEVER,
})

const {emissions, unsubscribe} = collectNotifications(
store.observeEvents('foo'),
store.listenEvents('foo'),
)

// this will go through at first, but then we'll get an error an instant later during rebase after the document is loaded from the server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export interface LocalDatasetBackend {
* After that, it should emit mutation events, error events or sync events
* @param id
*/
observe: (id: string) => Observable<ListenerEvent>
listen: (id: string) => Observable<ListenerEvent>
submit: (mutationGroups: Transaction[]) => Observable<SubmitResult>
}

Expand All @@ -70,7 +70,9 @@ const EMPTY_ARRAY: any[] = []
* Creates a local dataset that allows subscribing to documents by id and submitting mutations to be optimistically applied
* @param backend
*/
export function createLocalDataset(backend: LocalDatasetBackend): LocalDataset {
export function createOptimisticStore(
backend: LocalDatasetBackend,
): LocalDataset {
const local = createDocumentMap()
const remote = createDocumentMap()
const memoize = createReplayMemoizer(1000)
Expand All @@ -91,7 +93,7 @@ export function createLocalDataset(backend: LocalDatasetBackend): LocalDataset {
}

function getRemoteEvents(id: string) {
return backend.observe(id).pipe(
return backend.listen(id).pipe(
filter(
(event): event is Exclude<ListenerEvent, ReconnectEvent> =>
event.type !== 'reconnect',
Expand Down Expand Up @@ -176,7 +178,7 @@ export function createLocalDataset(backend: LocalDatasetBackend): LocalDataset {
)
}

function observeEvents(id: string) {
function listenEvents(id: string) {
return defer(() =>
memoize(id, merge(getLocalEvents(id), getRemoteEvents(id))),
)
Expand Down Expand Up @@ -237,9 +239,9 @@ export function createLocalDataset(backend: LocalDatasetBackend): LocalDataset {
})
return results
},
observeEvents,
observe: id =>
observeEvents(id).pipe(
listenEvents: listenEvents,
listen: id =>
listenEvents(id).pipe(
map(event =>
event.type === 'optimistic' ? event.after : event.after.local,
),
Expand Down
Loading

0 comments on commit 8b4d306

Please sign in to comment.