Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to debounce updates and debounce awareness updates #12

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
196 changes: 172 additions & 24 deletions src/client/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,26 @@ export interface ProviderConfiguration {
* (Optional) Add the authentication data
*/
auth?: { [key: string]: any }
/**
* The time over which to debounce document updates before syncing
*/
debounceTime?: number
/**
* The maximum time to wait before debouncing updates
*/
maxWaitForDebouncingUpdates?: number
/**
* The time over which to debounce document awareness updates before syncing
*/
debounceAwarenessTime?: number
/**
* The maximum time to wait before debouncing awareness updates
*/
maxWaitForDebouncingAwareness?: number
/**
* Notify pending state when debouncing
*/
onPending?: (pending: boolean) => void
}

/**
Expand Down Expand Up @@ -64,6 +84,67 @@ export class SocketIOProvider extends Observable<string> {
* @type {Socket}
*/
public socket: Socket
/**
* The time over which to debounce document updates before syncing
* @type {number}
* @private
*/
public readonly debounceTime?: number
/**
* The maximum time to wait before debouncing updates
* @type {number}
* @private
*/
public readonly maxWaitForDebouncingUpdates?: number
/**
* The maximum time to wait before debouncing awareness updates
* @type {number}
* @private
*/
public readonly maxWaitForDebouncingAwareness?: number
/**
* The time over which to debounce document awareness updates before syncing
* @type {number}
* @private
*/
public readonly debounceAwarenessTime?: number
/**
* The timer used to debounce document updates
* @type {ReturnType<typeof setTimeout>}
* @private
*/
private updateTimer?: ReturnType<typeof setTimeout>
/**
* The unix timestamp of the last update,
* used to ensure updates are sent within maxWaitForDebouncingUpdates
* @type {number}
* @private
*/
private lastUpdate?: number
/**
* The unix timestamp of the last awareness update,
* used to ensure updates are sent within maxWaitForDebouncingAwareness
* @type {number}
* @private
*/
private lastAwarenessUpdate?: number
/**
* The timer used to debounce document awareness updates
* @type {ReturnType<typeof setTimeout>}
* @private
*/
private updateAwarenessTimer?: ReturnType<typeof setTimeout>
/**
* Notify pending state when debouncing
* @type {((pending: boolean) => void) | undefined}
*/
public onPending: ((pending: boolean) => void) | undefined
/**
* The pending debouncing updates
* @type {Uint8Array[]}
* @private
*/
private pendingUpdates: Uint8Array[]
/**
* The yjs document
* @type {Y.Doc}
Expand Down Expand Up @@ -101,7 +182,7 @@ export class SocketIOProvider extends Observable<string> {
* @type {Partial<ManagerOptions & SocketOptions> | undefined}
* @private
*/
private readonly _socketIoOptions: Partial<ManagerOptions & SocketOptions> | undefined;
private readonly _socketIoOptions: Partial<ManagerOptions & SocketOptions> | undefined

/**
* SocketIOProvider constructor
Expand All @@ -117,9 +198,14 @@ export class SocketIOProvider extends Observable<string> {
awareness = new AwarenessProtocol.Awareness(doc),
resyncInterval = -1,
disableBc = false,
auth = {}
}: ProviderConfiguration,
socketIoOptions: Partial<ManagerOptions & SocketOptions> | undefined = undefined) {
auth = {},
debounceTime,
maxWaitForDebouncingUpdates,
debounceAwarenessTime,
maxWaitForDebouncingAwareness,
onPending
}: ProviderConfiguration,
socketIoOptions: Partial<ManagerOptions & SocketOptions> | undefined = undefined) {
super()
while (url[url.length - 1] === '/') {
url = url.slice(0, url.length - 1)
Expand All @@ -131,7 +217,7 @@ export class SocketIOProvider extends Observable<string> {

this._broadcastChannel = `${url}/${roomName}`
this.disableBc = disableBc
this._socketIoOptions = socketIoOptions;
this._socketIoOptions = socketIoOptions

this.socket = io(`${this.url}/yjs|${roomName}`, {
autoConnect: false,
Expand All @@ -140,6 +226,18 @@ export class SocketIOProvider extends Observable<string> {
auth: auth,
...socketIoOptions
})
this.debounceTime = debounceTime
this.maxWaitForDebouncingUpdates = maxWaitForDebouncingUpdates ?? debounceTime
this.debounceAwarenessTime = debounceAwarenessTime
this.maxWaitForDebouncingAwareness = maxWaitForDebouncingAwareness ?? debounceAwarenessTime
this.onPending = onPending
this.pendingUpdates = []

this.initSyncListeners()

this.initAwarenessListeners()

this.initSystemListeners()

this.doc.on('update', this.onUpdateDoc)

Expand All @@ -149,12 +247,6 @@ export class SocketIOProvider extends Observable<string> {

this.socket.on('connect_error', (error) => this.onSocketConnectionError(error))

this.initSyncListeners()

this.initAwarenessListeners()

this.initSystemListeners()

awareness.on('update', this.awarenessUpdate)

if (autoConnect) this.connect()
Expand Down Expand Up @@ -333,29 +425,58 @@ export class SocketIOProvider extends Observable<string> {
if (typeof window !== 'undefined') window.removeEventListener('beforeunload', this.beforeUnloadHandler)
else if (typeof process !== 'undefined') process.off('exit', this.beforeUnloadHandler)
this.awareness.off('update', this.awarenessUpdate)
this.awareness.destroy();
this.awareness.destroy()
this.doc.off('update', this.onUpdateDoc)
super.destroy()
}

private readonly onUpdateDocInner = (update: Uint8Array, origin: SocketIOProvider): void => {
this.socket.emit('sync-update', update)
if (this.bcconnected) {
bc.publish(this._broadcastChannel, {
type: 'sync-update',
data: update
}, this)
}
}

/**
* This function is executed when the document is updated, if the instance that
* emit the change is not this, it emit the changes by socket and broadcast channel.
* @private
* @param {Uint8Array} update Document update
* @param {SocketIOProvider} origin The SocketIOProvider instance that emits the change.
* @type {(update: Uint8Array, origin: SocketIOProvider) => void}
*/
private readonly onUpdateDoc = (update: Uint8Array, origin: SocketIOProvider): void => {
if (origin !== this) {
this.socket.emit('sync-update', update)
if (this.bcconnected) {
bc.publish(this._broadcastChannel, {
type: 'sync-update',
data: update
}, this)
}
if (origin === this) {
return
}
if (this.debounceTime === undefined) {
this.onUpdateDocInner(update, origin)
}
if (this.maxWaitForDebouncingUpdates !== undefined && this.lastUpdate !== undefined && Date.now() - this.lastUpdate >= this.maxWaitForDebouncingUpdates) {
// Ensure updates are sent at least once every maxWaitForDebouncingUpdates
this.pendingUpdates.push(update)
clearTimeout(this.updateTimer)
this.updateTimer = undefined
this.lastUpdate = Date.now()
const mergedUpdate = Y.mergeUpdates(this.pendingUpdates)
this.onUpdateDocInner(mergedUpdate, origin)
this.onPending?.(false)
}
if (this.updateTimer !== undefined) {
this.onPending?.(true)
this.pendingUpdates.push(update)
clearTimeout(this.updateTimer)
} else {
this.pendingUpdates = [update]
}
this.updateTimer = setTimeout(() => {
this.lastUpdate = Date.now()
const mergedUpdate = Y.mergeUpdates(this.pendingUpdates)
this.onUpdateDocInner(mergedUpdate, origin)
this.onPending?.(false)
this.updateTimer = undefined
}, this.debounceTime)
}

/**
Expand All @@ -369,13 +490,12 @@ export class SocketIOProvider extends Observable<string> {
}

/**
* This function is executed when the local awareness changes and this broadcasts the changes per socket and broadcast channel.
* @private
* @param {{ added: number[], updated: number[], removed: number[] }} awarenessChanges The clients added, updated and removed
* @param {SocketIOProvider | null} origin The SocketIOProvider instance that emits the change.
* @type {({ added, updated, removed }: { added: number[], updated: number[], removed: number[] }, origin: SocketIOProvider | null) => void}
*/
private readonly awarenessUpdate = ({ added, updated, removed }: AwarenessChange, origin: SocketIOProvider | null): void => {
private readonly awarenessUpdateInner = ({ added, updated, removed }: AwarenessChange, origin: SocketIOProvider | null): void => {
const changedClients = added.concat(updated).concat(removed)
this.socket.emit('awareness-update', AwarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients))
if (this.bcconnected) {
Expand All @@ -386,6 +506,34 @@ export class SocketIOProvider extends Observable<string> {
}
}

/**
* This function is executed when the local awareness changes and this broadcasts the changes per socket and broadcast channel.
* @private
* @param {{ added: number[], updated: number[], removed: number[] }} awarenessChanges The clients added, updated and removed
* @param {SocketIOProvider | null} origin The SocketIOProvider instance that emits the change.
* @type {({ added, updated, removed }: { added: number[], updated: number[], removed: number[] }, origin: SocketIOProvider | null) => void}
*/
private readonly awarenessUpdate = (awarenessChange: AwarenessChange, origin: SocketIOProvider | null): void => {
if (this.debounceAwarenessTime === undefined) {
this.awarenessUpdateInner(awarenessChange, origin)
}
if (this.updateAwarenessTimer !== undefined) {
clearTimeout(this.updateAwarenessTimer)
}
if (this.maxWaitForDebouncingAwareness !== undefined && this.lastAwarenessUpdate !== undefined && Date.now() - this.lastAwarenessUpdate >= this.maxWaitForDebouncingAwareness) {
// Ensure waiting no longer than `debounceAwarenessTime` for an awareness update
clearTimeout(this.updateAwarenessTimer)
this.updateAwarenessTimer = undefined
this.lastAwarenessUpdate = Date.now()
this.awarenessUpdateInner(awarenessChange, origin)
}
this.updateAwarenessTimer = setTimeout(() => {
this.lastAwarenessUpdate = Date.now()
this.awarenessUpdateInner(awarenessChange, origin)
this.updateAwarenessTimer = undefined
}, this.debounceAwarenessTime)
}

/**
* This function is executed when the windows will be unloaded or the process will be closed and this
* will remove the local client from awareness.
Expand Down
18 changes: 15 additions & 3 deletions src/server/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@ export class Document extends Y.Doc {
*/
public name: string
/**
* The socket connection
* The namespace connection
* @type {Namespace}
* @private
*/
private readonly namespace: Namespace
/**
* Indicator as to whether to send document updates only to local WebSockets
* @type {boolean}
* @private
*/
private readonly localOnly?: boolean
/**
* The document awareness
* @type {Awareness}
Expand All @@ -58,12 +64,14 @@ export class Document extends Y.Doc {
* @constructor
* @param {string} name Name for the document
* @param {Namespace} namespace The namespace connection
* @param {boolean} localOnly Indicator as to whether to send document updates only to local WebSockets
* @param {Callbacks} callbacks The document callbacks
*/
constructor (name: string, namespace: Namespace, callbacks?: Callbacks) {
constructor (name: string, namespace: Namespace, localOnly?: boolean, callbacks?: Callbacks) {
super({ gc: gcEnabled })
this.name = name
this.namespace = namespace
this.localOnly = localOnly
this.awareness = new AwarenessProtocol.Awareness(this)
this.awareness.setLocalState(null)
this.callbacks = callbacks
Expand All @@ -87,7 +95,11 @@ export class Document extends Y.Doc {
console.warn(error)
}
}
this.namespace.emit('sync-update', update)
if (this.localOnly !== undefined && this.localOnly) {
this.namespace.local.emit('sync-update', update)
} else {
this.namespace.emit('sync-update', update)
}
}

/**
Expand Down
Loading