Skip to content

Commit

Permalink
feat(database): sync span on demand
Browse files Browse the repository at this point in the history
  • Loading branch information
shigma committed May 7, 2024
1 parent 152ff1e commit 05086e8
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 111 deletions.
114 changes: 38 additions & 76 deletions packages/database/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ export class SyncChannel {
public _spans: Span[] = []
public _query: { platform: string; 'channel.id': string }

public hasLatest = false
public hasEarliest = false

private _initTask?: Promise<void>
private _hasLatest = false

constructor(public ctx: Context, public bot: Bot, public guildId: string, public channelId: string) {
this._query = { platform: bot.platform, 'channel.id': channelId }
Expand Down Expand Up @@ -57,33 +59,37 @@ export class SyncChannel {
let right = this._spans.length
while (left < right) {
const mid = Math.floor((left + right) / 2)
if (this._spans[mid].front[0] < sid) {
if (this._spans[mid].back[0] <= sid) {
right = mid
} else {
left = mid
left = mid + 1
}
}
return left
}

private insert(data: Message[], index?: number) {
insert(data: Message[], links: Span.PrevNext<Span> = {}, forced: Span.PrevNext<boolean> = {}) {
if (!data.length && !links.prev && !links.next) {
throw new Error('unexpected empty span')
}
const back: Span.Endpoint = [data[0].sid, data[0].id]
const front: Span.Endpoint = [data[data.length - 1].sid, data[data.length - 1].id]
const span = new Span(this, Span.Type.LOCAL, front, back, data)
if (typeof index !== 'number') {
index = this.binarySearch(front[0])
}
const index = this.binarySearch(back[0])
this._spans.splice(index, 0, span)
span.link('before', links.prev)
span.merge('before')
span.link('after', links.next)
span.merge('after')
span.flush(forced)
return span
}

async queue(session: Session) {
const prev = this._hasLatest ? this._spans[0] : undefined
const prev = this.hasLatest ? this._spans[0] : undefined
const message = Message.from(session.event.message!, session.platform, 'after', prev?.front[0])
const span = this.insert([message], 0)
span.link('prev', prev)
this._hasLatest = true
span.flush()
this.hasLatest = true
this.insert([message], { prev }, { prev: true, next: true })
}

// TODO handle default limit
Expand Down Expand Up @@ -115,8 +121,9 @@ export class SyncChannel {
.where({ ...this._query, id })
.execute()
if (data[0]) {
const span = this._spans.find(span => span.front[0] <= data[0].sid && data[0].sid <= span.back[0])
if (!span) throw new Error('malformed sync span')
const { sid } = data[0]
const span = this._spans[this.binarySearch(sid)]
if (!span || span.back[0] > sid || span.front[0] < sid) throw new Error('malformed sync span')
return [span, data[0]]
}

Expand All @@ -127,7 +134,7 @@ export class SyncChannel {
let index: number
// TODO handle special case
// 1. data length = 0
// 2. next undefined
// 2. next undefined (final)
const result = await this.bot.getMessageList(this.channelId, id, direction, limit, 'asc')
if (direction === 'around') {
index = result.data.findIndex(item => item.id === id)
Expand Down Expand Up @@ -155,11 +162,7 @@ export class SyncChannel {
}

if (data.length) {
span = this.insert(data)
span.prev = prev
if (span.prev) span.prev.next = span
span.next = next
if (span.next) span.next.prev = span
span = this.insert(data, { prev, next })
} else {
span = prev ?? next!
}
Expand All @@ -173,87 +176,46 @@ export class SyncChannel {
return [span, message!, exclusive]
}

private async getHistory(span: Span, message: MessageLike, limit: number, direction: 'before' | 'after') {
private async getHistory(span: Span, message: MessageLike, limit: number, dir: Span.Direction) {
const buffer: Message[] = []
const dir = ({
before: {
front: 'back',
back: 'front',
prev: 'next',
next: 'prev',
asc: 'desc',
push: 'unshift',
$lte: '$gte',
$gte: '$lte',
},
after: {
front: 'front',
back: 'back',
prev: 'prev',
next: 'next',
asc: 'asc',
push: 'push',
$lte: '$lte',
$gte: '$gte',
},
} as const)[direction]
const w = Span.words[dir]

while (true) {
if (span.data) {
const index = span.data.findIndex(item => item.sid === message.sid)
if (direction === 'before') {
if (dir === 'before') {
buffer.unshift(...span.data.slice(0, index + 1))
} else {
buffer.push(...span.data.slice(index))
}
} else if ('id' in message && span[dir.front][0] === message.sid) {
buffer[dir.push](message)
} else if ('id' in message && span[w.front][0] === message.sid) {
buffer[w.push](message)
} else {
const before = await this.ctx.database
.select('satori.message')
.where({
...this._query,
sid: {
[dir.$gte]: message.sid,
[dir.$lte]: span[dir.front][0],
[w.$gte]: message.sid,
[w.$lte]: span[w.front][0],
},
})
.orderBy('sid', dir.asc)
.orderBy('sid', w.order)
.limit(limit - buffer.length)
.execute()
if (direction === 'before') before.reverse()
buffer[dir.push](...before)
if (dir === 'before') before.reverse()
buffer[w.push](...before)
}
if (buffer.length >= limit) break

span[dir.next] ??= await (span[`${dir.next}Task`] ??= (async (prev: Span) => {
const data: Message[] = []
const result = await this.bot.getMessageList(this.channelId, prev[dir.front][1], direction, limit - buffer.length, dir.asc)
let next: Span | undefined, last: Message | undefined
for (const item of result.data) {
next = this._spans.find(span => span[dir.back][1] === item.id)
if (next) break
last = Message.from(item, this.bot.platform, direction, last?.sid)
data[dir.push](last)
}
if (data.length) {
// TODO sync new span
const span = this.insert(data)
span.link(dir.prev, prev)
span.link(dir.next, next)
return span
} else {
// FIXME sync edge case?
return next!
}
})(span))
const next = span[w.next] ?? await (span[`${w.next}Task`] ??= span.extend(dir, limit - buffer.length))

if (!span[dir.next]) break
span = span[dir.next]!
message = { sid: span[dir.back][0] }
if (!next) break
span = next
message = { sid: span[w.back][0] }
}

if (direction === 'before') {
if (dir === 'before') {
return buffer.slice(-limit)
} else {
return buffer.slice(0, limit)
Expand Down
17 changes: 12 additions & 5 deletions packages/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
if (session.bot.hidden) return
const key = platform + '/' + guildId + '/' + channelId
this._channels[key] ||= new SyncChannel(this.ctx, session.bot, session.guildId, session.channelId)
this._channels[key].queue(session)
if (this._channels[key].bot === session.bot) this._channels[key].queue(session)
})

this.ctx.on('message-deleted', async (session) => {
Expand All @@ -111,20 +111,27 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
})

this.ctx.on('bot-status-updated', async (bot) => {
this.onBotOnline(bot)
this.updateBot(bot)
})

this.ctx.bots.forEach(async (bot) => {
this.onBotOnline(bot)
this.updateBot(bot)
})
}

async stop() {
this.stopped = true
}

private async onBotOnline(bot: Bot) {
if (bot.status !== Universal.Status.ONLINE || bot.hidden || !bot.getMessageList || !bot.getGuildList) return
private async updateBot(bot: Bot) {
if (bot.hidden || !await bot.supports('message.list') || !await bot.supports('guild.list')) return
if (bot.status !== Universal.Status.ONLINE) {
for (const channel of Object.values(this._channels)) {
if (channel.bot !== bot) continue
channel.hasLatest = false
}
return
}
const tasks: Promise<any>[] = []
for await (const guild of bot.getGuildIter()) {
const key = bot.platform + '/' + guild.id
Expand Down
106 changes: 76 additions & 30 deletions packages/database/src/span.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import { SyncChannel } from './channel'

export class Span {
prev?: Span
prevTask?: Promise<Span>
prevTask?: Promise<Span | undefined>
prevData?: Message[]
next?: Span
nextTask?: Promise<Span>
nextTask?: Promise<Span | undefined>
nextData?: Message[]
syncTask?: Promise<void>

constructor(
Expand All @@ -18,41 +20,34 @@ export class Span {
public data?: Message[],
) {}

link(dir: 'prev' | 'next', span?: Span) {
this[dir] = span
if (span) span[dir === 'prev' ? 'next' : 'prev'] = this
link(dir: Span.Direction, span?: Span) {
const w = Span.words[dir]
this[w.next] = span
if (span) span[w.prev] = this
}

mergeNext() {
if (this.next?.type !== this.type) return false
remove(this.channel._spans, this.next)
this.data?.push(...this.next.data!)
this.front = this.next.front
this.nextTask = this.next.nextTask
this.link('next', this.next.next)
merge(dir: Span.Direction) {
const w = Span.words[dir]
const next = this[w.next]
if (next?.type !== this.type) return false
remove(this.channel._spans, next)
this.data?.[w.push](...next.data!)
this[w.front] = next[w.front]
this[w.task] = next[w.task]
this.link(dir, next[w.next])
return true
}

mergePrev() {
if (this.prev?.type !== this.type) return false
remove(this.channel._spans, this.prev)
this.data?.unshift(...this.prev.data!)
this.back = this.prev.back
this.prevTask = this.prev.prevTask
this.link('prev', this.prev.prev)
return true
}

async flush() {
async flush(forced: Span.PrevNext<boolean> = {}) {
if (this.type !== Span.Type.LOCAL) throw new Error('expect local span')
while (this.mergeNext());
while (this.mergePrev());
if (!forced.prev && !this.prev && !(this === this.channel._spans.at(0) && this.channel.hasEarliest)) return
if (!forced.next && !this.next && !(this === this.channel._spans.at(-1) && this.channel.hasLatest)) return
await Promise.all([this.prev?.syncTask, this.next?.syncTask])
if (!this.channel._spans.includes(this)) return
return this.syncTask ||= this.sync()
}

async sync() {
private async sync() {
this.type = Span.Type.SYNC
await this.channel.ctx.database.upsert('satori.message', (row) => {
const data: Update<Message>[] = clone(this.data!)
Expand All @@ -63,7 +58,7 @@ export class Span {
flag: $.bitAnd(row.flag, $.bitNot(Message.Flag.BACK)),
})
} else {
data[data.length - 1].flag |= Message.Flag.FRONT
data.at(-1)!.flag |= Message.Flag.FRONT
}
if (this.prev?.type === Span.Type.REMOTE) {
data.unshift({
Expand All @@ -72,23 +67,74 @@ export class Span {
flag: $.bitAnd(row.flag, $.bitNot(Message.Flag.FRONT)),
})
} else {
data[0].flag |= Message.Flag.BACK
data.at(0)!.flag |= Message.Flag.BACK
}
return data
}, ['sid', 'channel.id', 'platform'])
this.type = Span.Type.REMOTE
delete this.data
this.mergeNext()
this.mergePrev()
this.merge('after')
this.merge('before')
}

async extend(dir: Span.Direction, limit: number) {
const w = Span.words[dir]
const result = await this.channel.bot.getMessageList(this.channel.channelId, this[w.front][1], dir, limit, w.order)
const data: Message[] = []
let next: Span | undefined, last: Message | undefined
for (const item of result.data) {
next = this.channel._spans.find(span => span[w.back][1] === item.id)
if (next) break
last = Message.from(item, this.channel.bot.platform, dir, last?.sid)
data[w.push](last)
}
if (dir === 'before' && !result.next) this.channel.hasEarliest = true
if (data.length || next) {
return this.channel.insert(data, {
[w.prev]: this,
[w.next]: next,
})
}
}
}

export namespace Span {
export type Direction = 'before' | 'after'
export type Endpoint = [bigint, string]

export enum Type {
LOCAL,
SYNC,
REMOTE,
}

export interface PrevNext<T> {
prev?: T
next?: T
}

export const words = {
before: {
prev: 'next',
next: 'prev',
push: 'unshift',
front: 'back',
back: 'front',
task: 'prevTask',
order: 'desc',
$lte: '$gte',
$gte: '$lte',
},
after: {
prev: 'prev',
next: 'next',
push: 'push',
front: 'front',
back: 'back',
task: 'nextTask',
order: 'asc',
$lte: '$lte',
$gte: '$gte',
},
} as const
}

0 comments on commit 05086e8

Please sign in to comment.