From 766f660ca3b8da2006a72fdb31ca5281c8c915eb Mon Sep 17 00:00:00 2001 From: Shigma Date: Mon, 6 May 2024 14:34:33 +0800 Subject: [PATCH] feat(database): build new span from around --- packages/database/src/channel.ts | 84 +++++++++++++++++++++++--------- packages/database/src/index.ts | 2 + 2 files changed, 63 insertions(+), 23 deletions(-) diff --git a/packages/database/src/channel.ts b/packages/database/src/channel.ts index 3d83fe72..885f8f1e 100644 --- a/packages/database/src/channel.ts +++ b/packages/database/src/channel.ts @@ -10,8 +10,14 @@ export enum SyncStatus { FAILED, } +enum SpanType { + LOCAL, + SYNC, + REMOTE, +} + interface Span { - type: 'synced' | 'syncing' | 'buffer' + type: SpanType front: Endpoint back: Endpoint prev?: Span @@ -25,7 +31,7 @@ interface Span { namespace Span { export function from(data: Message[], reverse: boolean) { if (reverse) data.reverse() - const span = { data, type: 'buffer' } as Span + const span = { data, type: SpanType.LOCAL } as Span span.back = [data[0].uid, data[0].id] span.front = [data[data.length - 1].uid, data[data.length - 1].id] return span @@ -69,11 +75,11 @@ export class SyncChannel { const { syncFlag, id: frontId, uid: frontUid } = data.pop()! const front: Endpoint = [frontUid, frontId] if (syncFlag === SyncFlag.BOTH) { - this._spans.push({ front, back: front, type: 'synced' }) + this._spans.push({ front, back: front, type: SpanType.REMOTE }) } else if (syncFlag === SyncFlag.FRONT) { const { syncFlag, id, uid } = data.pop()! if (syncFlag === SyncFlag.BACK) { - this._spans.push({ front, back: [uid, id], type: 'synced' }) + this._spans.push({ front, back: [uid, id], type: SpanType.REMOTE }) } else { throw new Error('malformed sync flag') } @@ -85,11 +91,11 @@ export class SyncChannel { async queue(session: Session) { const message = Message.from(session.event.message!, session.platform) - if (this._hasLatest && this._spans[0]?.type === 'buffer') { + if (this._hasLatest && this._spans[0]?.type === SpanType.LOCAL) { this._spans[0].data!.push(message) } else { this._spans.unshift({ - type: 'buffer', + type: SpanType.LOCAL, front: [message.uid, message.id], back: [message.uid, message.id], data: [message], @@ -148,7 +154,7 @@ export class SyncChannel { remove(this._spans, span) span.next.back = span.back } else { - span.type = 'synced' + span.type = SpanType.REMOTE delete span.data delete span.prev delete span.next @@ -157,10 +163,14 @@ export class SyncChannel { async getMessageList(id: string, direction: Universal.Direction, limit: number) { let span: Span | undefined, message: Message | undefined + + // condition 1: message in local for (span of this._spans) { message = span.data?.find(message => message.id === id) if (message) break } + + // condition 2: message in database if (!message) { const data = await this.ctx.database .select('satori.message') @@ -172,17 +182,36 @@ export class SyncChannel { if (!span) throw new Error('malformed sync span') } } - if (!span || !message) { + + // condition 3: message not found + if (!message) { const result = await this.bot.getMessageList(this.channelId, id, 'around') - const data = result.data.map(item => Message.from(item, this.bot.platform)) - span = Span.from(data, direction === 'before') - message = data.find(message => message.id === id)! + const index = result.data.findIndex(item => item.id === id) + if (index === -1) throw new Error('malformed message list') + message = Message.from(result.data[index], this.bot.platform) + const data = [message] + let prev: Span | undefined, next: Span | undefined + for (let i = index - 1; i >= 0; i--) { + prev = this._spans.find(span => span.front[1] === result.data[i].id) + if (prev) break + data.unshift(Message.from(result.data[i], this.bot.platform)) + } + for (let i = index + 1; i < result.data.length; i++) { + next = this._spans.find(span => span.back[1] === result.data[i].id) + if (next) break + data.push(Message.from(result.data[i], this.bot.platform)) + } + span = Span.from(data, false) + span.prev = prev + span.next = next + this._spans.push(span) } + if (direction === 'around') { - limit = Math.floor(limit / 2) + 1 + limit = Math.floor(limit / 2) } - const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'before') - const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'after') + const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span!, message, limit + 1, 'before') + const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span!, message, limit + 1, 'after') const [before, after] = await Promise.all([beforeTask, afterTask]) after.shift() before.shift() @@ -193,7 +222,7 @@ export class SyncChannel { } private async syncHistory(next: Span, message: Message | { uid: bigint }, limit: number, direction: 'before' | 'after') { - const buffer: Message[] = [] + const local: Message[] = [] const dir = ({ before: { front: 'front', @@ -214,9 +243,17 @@ export class SyncChannel { $gte: '$lte', }, } as const)[direction] + outer: while (true) { - if ('id' in message && next[dir.front][0] === message.uid) { - buffer.push(message) + if (next.data) { + const index = next.data.findIndex(item => item.uid === message.uid) + if (direction === 'before') { + local.push(...next.data.slice(0, index + 1).reverse()) + } else { + local.push(...next.data.slice(index)) + } + } else if ('id' in message && next[dir.front][0] === message.uid) { + local.push(message) } else { const before = await this.ctx.database .select('satori.message') @@ -228,11 +265,12 @@ export class SyncChannel { }, }) .orderBy('uid', dir.desc) - .limit(limit - buffer.length) + .limit(limit - local.length) .execute() - buffer.push(...before) + local.push(...before) } - if (buffer.length >= limit) return buffer + if (local.length >= limit) return local.slice(0, limit) + let token = next[dir.back][1] const data: Message[] = [] while (token) { @@ -252,8 +290,8 @@ export class SyncChannel { } data.push(Message.from(item, this.bot.platform)) } - if (data.length + buffer.length >= limit) { - buffer.push(...data) + if (data.length + local.length >= limit) { + local.push(...data) break } token = result.next! @@ -262,7 +300,7 @@ export class SyncChannel { _span[dir.next] = next this._spans.push(_span) this.flushSpan(_span) - return buffer.slice(0, limit) + return local.slice(0, limit) } } } diff --git a/packages/database/src/index.ts b/packages/database/src/index.ts index f592a78c..641a6f08 100644 --- a/packages/database/src/index.ts +++ b/packages/database/src/index.ts @@ -132,6 +132,7 @@ class SatoriDatabase extends Service { }) this.ctx.on('message-deleted', async (session) => { + // TODO update local message await this.ctx.database.set('satori.message', { messageId: session.messageId, platform: session.platform, @@ -142,6 +143,7 @@ class SatoriDatabase extends Service { }) this.ctx.on('message-updated', async (session) => { + // TODO update local message await this.ctx.database.set('satori.message', { messageId: session.messageId, platform: session.platform,