From 05086e86a667bfe81aa0eb7d034a038c08550cd0 Mon Sep 17 00:00:00 2001 From: Shigma Date: Tue, 7 May 2024 14:51:55 +0800 Subject: [PATCH] feat(database): sync span on demand --- packages/database/src/channel.ts | 114 +++++++++++-------------------- packages/database/src/index.ts | 17 +++-- packages/database/src/span.ts | 106 ++++++++++++++++++++-------- 3 files changed, 126 insertions(+), 111 deletions(-) diff --git a/packages/database/src/channel.ts b/packages/database/src/channel.ts index d9ee703b..1be9b3da 100644 --- a/packages/database/src/channel.ts +++ b/packages/database/src/channel.ts @@ -16,8 +16,10 @@ export class SyncChannel { public _spans: Span[] = [] public _query: { platform: string; 'channel.id': string } + public hasLatest = false + public hasEarliest = false + private _initTask?: Promise - private _hasLatest = false constructor(public ctx: Context, public bot: Bot, public guildId: string, public channelId: string) { this._query = { platform: bot.platform, 'channel.id': channelId } @@ -57,33 +59,37 @@ export class SyncChannel { let right = this._spans.length while (left < right) { const mid = Math.floor((left + right) / 2) - if (this._spans[mid].front[0] < sid) { + if (this._spans[mid].back[0] <= sid) { right = mid } else { - left = mid + left = mid + 1 } } return left } - private insert(data: Message[], index?: number) { + insert(data: Message[], links: Span.PrevNext = {}, forced: Span.PrevNext = {}) { + if (!data.length && !links.prev && !links.next) { + throw new Error('unexpected empty span') + } const back: Span.Endpoint = [data[0].sid, data[0].id] const front: Span.Endpoint = [data[data.length - 1].sid, data[data.length - 1].id] const span = new Span(this, Span.Type.LOCAL, front, back, data) - if (typeof index !== 'number') { - index = this.binarySearch(front[0]) - } + const index = this.binarySearch(back[0]) this._spans.splice(index, 0, span) + span.link('before', links.prev) + span.merge('before') + span.link('after', links.next) + span.merge('after') + span.flush(forced) return span } async queue(session: Session) { - const prev = this._hasLatest ? this._spans[0] : undefined + const prev = this.hasLatest ? this._spans[0] : undefined const message = Message.from(session.event.message!, session.platform, 'after', prev?.front[0]) - const span = this.insert([message], 0) - span.link('prev', prev) - this._hasLatest = true - span.flush() + this.hasLatest = true + this.insert([message], { prev }, { prev: true, next: true }) } // TODO handle default limit @@ -115,8 +121,9 @@ export class SyncChannel { .where({ ...this._query, id }) .execute() if (data[0]) { - const span = this._spans.find(span => span.front[0] <= data[0].sid && data[0].sid <= span.back[0]) - if (!span) throw new Error('malformed sync span') + const { sid } = data[0] + const span = this._spans[this.binarySearch(sid)] + if (!span || span.back[0] > sid || span.front[0] < sid) throw new Error('malformed sync span') return [span, data[0]] } @@ -127,7 +134,7 @@ export class SyncChannel { let index: number // TODO handle special case // 1. data length = 0 - // 2. next undefined + // 2. next undefined (final) const result = await this.bot.getMessageList(this.channelId, id, direction, limit, 'asc') if (direction === 'around') { index = result.data.findIndex(item => item.id === id) @@ -155,11 +162,7 @@ export class SyncChannel { } if (data.length) { - span = this.insert(data) - span.prev = prev - if (span.prev) span.prev.next = span - span.next = next - if (span.next) span.next.prev = span + span = this.insert(data, { prev, next }) } else { span = prev ?? next! } @@ -173,87 +176,46 @@ export class SyncChannel { return [span, message!, exclusive] } - private async getHistory(span: Span, message: MessageLike, limit: number, direction: 'before' | 'after') { + private async getHistory(span: Span, message: MessageLike, limit: number, dir: Span.Direction) { const buffer: Message[] = [] - const dir = ({ - before: { - front: 'back', - back: 'front', - prev: 'next', - next: 'prev', - asc: 'desc', - push: 'unshift', - $lte: '$gte', - $gte: '$lte', - }, - after: { - front: 'front', - back: 'back', - prev: 'prev', - next: 'next', - asc: 'asc', - push: 'push', - $lte: '$lte', - $gte: '$gte', - }, - } as const)[direction] + const w = Span.words[dir] while (true) { if (span.data) { const index = span.data.findIndex(item => item.sid === message.sid) - if (direction === 'before') { + if (dir === 'before') { buffer.unshift(...span.data.slice(0, index + 1)) } else { buffer.push(...span.data.slice(index)) } - } else if ('id' in message && span[dir.front][0] === message.sid) { - buffer[dir.push](message) + } else if ('id' in message && span[w.front][0] === message.sid) { + buffer[w.push](message) } else { const before = await this.ctx.database .select('satori.message') .where({ ...this._query, sid: { - [dir.$gte]: message.sid, - [dir.$lte]: span[dir.front][0], + [w.$gte]: message.sid, + [w.$lte]: span[w.front][0], }, }) - .orderBy('sid', dir.asc) + .orderBy('sid', w.order) .limit(limit - buffer.length) .execute() - if (direction === 'before') before.reverse() - buffer[dir.push](...before) + if (dir === 'before') before.reverse() + buffer[w.push](...before) } if (buffer.length >= limit) break - span[dir.next] ??= await (span[`${dir.next}Task`] ??= (async (prev: Span) => { - const data: Message[] = [] - const result = await this.bot.getMessageList(this.channelId, prev[dir.front][1], direction, limit - buffer.length, dir.asc) - let next: Span | undefined, last: Message | undefined - for (const item of result.data) { - next = this._spans.find(span => span[dir.back][1] === item.id) - if (next) break - last = Message.from(item, this.bot.platform, direction, last?.sid) - data[dir.push](last) - } - if (data.length) { - // TODO sync new span - const span = this.insert(data) - span.link(dir.prev, prev) - span.link(dir.next, next) - return span - } else { - // FIXME sync edge case? - return next! - } - })(span)) + const next = span[w.next] ?? await (span[`${w.next}Task`] ??= span.extend(dir, limit - buffer.length)) - if (!span[dir.next]) break - span = span[dir.next]! - message = { sid: span[dir.back][0] } + if (!next) break + span = next + message = { sid: span[w.back][0] } } - if (direction === 'before') { + if (dir === 'before') { return buffer.slice(-limit) } else { return buffer.slice(0, limit) diff --git a/packages/database/src/index.ts b/packages/database/src/index.ts index d994cccc..0c38bfa3 100644 --- a/packages/database/src/index.ts +++ b/packages/database/src/index.ts @@ -85,7 +85,7 @@ class SatoriDatabase extends Service { if (session.bot.hidden) return const key = platform + '/' + guildId + '/' + channelId this._channels[key] ||= new SyncChannel(this.ctx, session.bot, session.guildId, session.channelId) - this._channels[key].queue(session) + if (this._channels[key].bot === session.bot) this._channels[key].queue(session) }) this.ctx.on('message-deleted', async (session) => { @@ -111,11 +111,11 @@ class SatoriDatabase extends Service { }) this.ctx.on('bot-status-updated', async (bot) => { - this.onBotOnline(bot) + this.updateBot(bot) }) this.ctx.bots.forEach(async (bot) => { - this.onBotOnline(bot) + this.updateBot(bot) }) } @@ -123,8 +123,15 @@ class SatoriDatabase extends Service { this.stopped = true } - private async onBotOnline(bot: Bot) { - if (bot.status !== Universal.Status.ONLINE || bot.hidden || !bot.getMessageList || !bot.getGuildList) return + private async updateBot(bot: Bot) { + if (bot.hidden || !await bot.supports('message.list') || !await bot.supports('guild.list')) return + if (bot.status !== Universal.Status.ONLINE) { + for (const channel of Object.values(this._channels)) { + if (channel.bot !== bot) continue + channel.hasLatest = false + } + return + } const tasks: Promise[] = [] for await (const guild of bot.getGuildIter()) { const key = bot.platform + '/' + guild.id diff --git a/packages/database/src/span.ts b/packages/database/src/span.ts index d9debabb..3f7d1eff 100644 --- a/packages/database/src/span.ts +++ b/packages/database/src/span.ts @@ -5,9 +5,11 @@ import { SyncChannel } from './channel' export class Span { prev?: Span - prevTask?: Promise + prevTask?: Promise + prevData?: Message[] next?: Span - nextTask?: Promise + nextTask?: Promise + nextData?: Message[] syncTask?: Promise constructor( @@ -18,41 +20,34 @@ export class Span { public data?: Message[], ) {} - link(dir: 'prev' | 'next', span?: Span) { - this[dir] = span - if (span) span[dir === 'prev' ? 'next' : 'prev'] = this + link(dir: Span.Direction, span?: Span) { + const w = Span.words[dir] + this[w.next] = span + if (span) span[w.prev] = this } - mergeNext() { - if (this.next?.type !== this.type) return false - remove(this.channel._spans, this.next) - this.data?.push(...this.next.data!) - this.front = this.next.front - this.nextTask = this.next.nextTask - this.link('next', this.next.next) + merge(dir: Span.Direction) { + const w = Span.words[dir] + const next = this[w.next] + if (next?.type !== this.type) return false + remove(this.channel._spans, next) + this.data?.[w.push](...next.data!) + this[w.front] = next[w.front] + this[w.task] = next[w.task] + this.link(dir, next[w.next]) return true } - mergePrev() { - if (this.prev?.type !== this.type) return false - remove(this.channel._spans, this.prev) - this.data?.unshift(...this.prev.data!) - this.back = this.prev.back - this.prevTask = this.prev.prevTask - this.link('prev', this.prev.prev) - return true - } - - async flush() { + async flush(forced: Span.PrevNext = {}) { if (this.type !== Span.Type.LOCAL) throw new Error('expect local span') - while (this.mergeNext()); - while (this.mergePrev()); + if (!forced.prev && !this.prev && !(this === this.channel._spans.at(0) && this.channel.hasEarliest)) return + if (!forced.next && !this.next && !(this === this.channel._spans.at(-1) && this.channel.hasLatest)) return await Promise.all([this.prev?.syncTask, this.next?.syncTask]) if (!this.channel._spans.includes(this)) return return this.syncTask ||= this.sync() } - async sync() { + private async sync() { this.type = Span.Type.SYNC await this.channel.ctx.database.upsert('satori.message', (row) => { const data: Update[] = clone(this.data!) @@ -63,7 +58,7 @@ export class Span { flag: $.bitAnd(row.flag, $.bitNot(Message.Flag.BACK)), }) } else { - data[data.length - 1].flag |= Message.Flag.FRONT + data.at(-1)!.flag |= Message.Flag.FRONT } if (this.prev?.type === Span.Type.REMOTE) { data.unshift({ @@ -72,18 +67,39 @@ export class Span { flag: $.bitAnd(row.flag, $.bitNot(Message.Flag.FRONT)), }) } else { - data[0].flag |= Message.Flag.BACK + data.at(0)!.flag |= Message.Flag.BACK } return data }, ['sid', 'channel.id', 'platform']) this.type = Span.Type.REMOTE delete this.data - this.mergeNext() - this.mergePrev() + this.merge('after') + this.merge('before') + } + + async extend(dir: Span.Direction, limit: number) { + const w = Span.words[dir] + const result = await this.channel.bot.getMessageList(this.channel.channelId, this[w.front][1], dir, limit, w.order) + const data: Message[] = [] + let next: Span | undefined, last: Message | undefined + for (const item of result.data) { + next = this.channel._spans.find(span => span[w.back][1] === item.id) + if (next) break + last = Message.from(item, this.channel.bot.platform, dir, last?.sid) + data[w.push](last) + } + if (dir === 'before' && !result.next) this.channel.hasEarliest = true + if (data.length || next) { + return this.channel.insert(data, { + [w.prev]: this, + [w.next]: next, + }) + } } } export namespace Span { + export type Direction = 'before' | 'after' export type Endpoint = [bigint, string] export enum Type { @@ -91,4 +107,34 @@ export namespace Span { SYNC, REMOTE, } + + export interface PrevNext { + prev?: T + next?: T + } + + export const words = { + before: { + prev: 'next', + next: 'prev', + push: 'unshift', + front: 'back', + back: 'front', + task: 'prevTask', + order: 'desc', + $lte: '$gte', + $gte: '$lte', + }, + after: { + prev: 'prev', + next: 'next', + push: 'push', + front: 'front', + back: 'back', + task: 'nextTask', + order: 'asc', + $lte: '$lte', + $gte: '$gte', + }, + } as const }