Skip to content

Commit

Permalink
feat(database): support span.type
Browse files Browse the repository at this point in the history
  • Loading branch information
shigma committed May 5, 2024
1 parent e27ee3a commit 3654016
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 144 deletions.
238 changes: 99 additions & 139 deletions packages/database/src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
import { Context, Logger, remove, Session, Universal } from '@satorijs/satori'
import { Bot, Context, Logger, remove, Session, Universal } from '@satorijs/satori'
import { Flatten, Query } from 'minato'
import { Message, SyncFlag } from '.'

const logger = new Logger('sync')

export enum SyncStatus {
INIT,
SYNCED,
READY,
FAILED,
}

interface Span {
type: 'synced' | 'syncing' | 'buffer'
front: Endpoint
back: Endpoint
prev?: Span
prevTask?: Promise<void>
next?: Span
nextTask?: Promise<void>
data?: Message[]
task?: Promise<void>
latest?: boolean
dataTask?: Promise<void>
}

namespace Span {
export function from(data: Message[], reverse: boolean) {
if (reverse) data.reverse()
const span = { data } as Span
const span = { data, type: 'buffer' } as Span
span.back = [data[0].uid, data[0].id]
span.front = [data[data.length - 1].uid, data[data.length - 1].id]
return span
Expand All @@ -33,27 +35,27 @@ namespace Span {
type Endpoint = [bigint, string]

export class SyncChannel {
public data: SyncChannel.Data
/** 消息同步区间,倒序存放 */
public _spans: Span[] = []
public status = SyncStatus.INIT
private _spans: Span[] = []
private _status = SyncStatus.INIT

private _buffer: Message[] = []
private _initTask?: Promise<void>
private _queueTask = Promise.resolve()

private _hasLatest = false
private _baseQuery: Query.Expr<Flatten<Message>>

constructor(private ctx: Context, platform: string, guildId: string, channelId: string) {
this.data = { platform, guildId, channelId }
this._baseQuery = {
platform,
'channel.id': channelId,
}
constructor(private ctx: Context, public bot: Bot, public guildId: string, public channelId: string) {
this._baseQuery = { platform: bot.platform, 'channel.id': channelId }
this._initTask ||= this._init().then(() => {
this._status = SyncStatus.READY
}, (error) => {
logger.warn(error)
this._status = SyncStatus.FAILED
})
}

private async init() {
logger.debug('init channel %s %s %s', this.data.platform, this.data.guildId, this.data.channelId)
private async _init() {
logger.debug('init channel %s %s %s', this.bot.platform, this.guildId, this.channelId)
const data = await this.ctx.database
.select('satori.message')
.where({
Expand All @@ -67,74 +69,74 @@ export class SyncChannel {
const { syncFlag, id: frontId, uid: frontUid } = data.pop()!
const front: Endpoint = [frontUid, frontId]
if (syncFlag === SyncFlag.BOTH) {
this._spans.push({ front, back: front })
this._spans.push({ front, back: front, type: 'synced' })
} else if (syncFlag === SyncFlag.FRONT) {
const { syncFlag, id, uid } = data.pop()!
if (syncFlag === SyncFlag.BACK) {
this._spans.push({ front, back: [uid, id] })
this._spans.push({ front, back: [uid, id], type: 'synced' })
} else {
throw new Error('malformed sync flag')
}
} else {
throw new Error('malformed sync flag')
}
}
this.status = SyncStatus.SYNCED
}

accept(session: Session) {
if (!this.data.assignee) {
this.data.assignee = session.selfId
} else if (this.data.assignee !== session.selfId) {
return true
}

if (session.event.channel?.name) {
this.data.channelName = session.event.channel.name
}
}

async queue(session: Session) {
if (this.accept(session)) return
this._buffer.push(Message.from(session.event.message!, session.platform))
const message = Message.from(session.event.message!, session.platform)
if (this._hasLatest && this._spans[0]?.type === 'buffer') {
this._spans[0].data!.push(message)
} else {
this._spans.unshift({
type: 'buffer',
front: [message.uid, message.id],
back: [message.uid, message.id],
data: [message],
prev: this._hasLatest ? this._spans[0] : undefined,
})
}
this._hasLatest = true
await this._initTask
if (this._status === SyncStatus.FAILED) return
try {
if (this.status === SyncStatus.INIT) {
await (this._initTask ||= this.init())
}
if (this.status === SyncStatus.SYNCED) {
return this._queueTask = this._queueTask.then(() => this.flush())
}
await (this._queueTask = this._queueTask.then(async () => {
await this._flushSpan(this._spans[0], true)
}))
} catch (error) {
logger.warn(error)
this.status = SyncStatus.FAILED
this._status = SyncStatus.FAILED
}
}

private async flushSpan(span: Span) {
if (!span.data) return
return span.task ||= this._flushSpan(span)
return span.dataTask ||= this._flushSpan(span)
}

private async _flushSpan(span: Span) {
private async _flushSpan(span: Span, isLastest = false) {
if (!span.data) return
const data: Partial<Message>[] = span.data.slice()
if (span.next) {
data.unshift({
uid: span.next.front[0],
syncFlag: span.next.front[0] === span.next.back[0] ? SyncFlag.FRONT : SyncFlag.NONE,
})
} else {
span.data[span.data.length - 1].syncFlag = SyncFlag.FRONT
}
if (span.prev) {
data.unshift({
uid: span.prev.back[0],
syncFlag: span.prev.front[0] === span.prev.back[0] ? SyncFlag.BACK : SyncFlag.NONE,
})
} else {
span.data[0].syncFlag = span.data[0].syncFlag ? SyncFlag.BOTH : SyncFlag.BACK
}
await this.ctx.database.upsert('satori.message', data)
do {
const data: Partial<Message>[] = isLastest ? span.data.splice(0) : span.data.slice()
if (span.next) {
data.unshift({
uid: span.next.front[0],
syncFlag: span.next.front[0] === span.next.back[0] ? SyncFlag.FRONT : SyncFlag.NONE,
})
} else {
span.data[span.data.length - 1].syncFlag = SyncFlag.FRONT
}
if (span.prev) {
data.unshift({
uid: span.prev.back[0],
syncFlag: span.prev.front[0] === span.prev.back[0] ? SyncFlag.BACK : SyncFlag.NONE,
})
} else {
span.data[0].syncFlag = span.data[0].syncFlag ? SyncFlag.BOTH : SyncFlag.BACK
}
await this.ctx.database.upsert('satori.message', data)
// eslint-disable-next-line no-unmodified-loop-condition
} while (isLastest && span.data.length)
if (span.prev && span.next) {
remove(this._spans, span)
remove(this._spans, span.next)
Expand All @@ -145,83 +147,53 @@ export class SyncChannel {
} else if (span.next) {
remove(this._spans, span)
span.next.back = span.back
}
delete span.data
delete span.prev
delete span.next
}

private async flush() {
while (this._buffer.length) {
const data = this._buffer.splice(0)
if (this._spans[0]?.latest) {
const { front, back } = this._spans[0]
const last = data.pop()!
await this.ctx.database.upsert('satori.message', [
{ uid: front, syncFlag: front === back ? SyncFlag.BACK : SyncFlag.NONE },
...data,
{ ...last, syncFlag: SyncFlag.FRONT },
])
this._spans[0].front = [last.uid, last.id]
} else {
const back = data.pop()!
let front = data.shift()
if (front) {
await this.ctx.database.upsert('satori.message', [
{ ...front, syncFlag: SyncFlag.BACK },
...data,
{ ...back, syncFlag: SyncFlag.FRONT },
])
} else {
front = back
await this.ctx.database.upsert('satori.message', [
{ ...back, syncFlag: SyncFlag.BOTH },
])
}
this._spans.unshift({
front: [front.uid, front.id],
back: [back.uid, back.id],
latest: true,
})
}
} else {
span.type = 'synced'
delete span.data
delete span.prev
delete span.next
}
}

async getMessageList(id: string, direction: Universal.Direction, limit: number) {
if (this._buffer.some(message => message.id === id)) {
// TODO
} else {
const [message] = await this.ctx.database
let span: Span | undefined, message: Message | undefined
for (span of this._spans) {
message = span.data?.find(message => message.id === id)
if (message) break
}
if (!message) {
const data = await this.ctx.database
.select('satori.message')
.where({ ...this._baseQuery, id })
.execute()
if (message) {
const span = this._spans.find(span => span.front[0] <= message.uid && message.uid <= span.back[0])
if (data[0]) {
message = data[0]
span = this._spans.find(span => span.front[0] <= data[0].uid && data[0].uid <= span.back[0])
if (!span) throw new Error('malformed sync span')
if (direction === 'around') {
limit = Math.floor(limit / 2) + 1
}
const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'before')
const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'after')
const [before, after] = await Promise.all([beforeTask, afterTask])
after.shift()
before.shift()
before.reverse()
if (direction === 'after') return after
if (direction === 'before') return before
return [...before, message, ...after]
} else {
const { channelId, platform, assignee } = this.data
const bot = this.ctx.bots[`${platform}:${assignee}`]
const result = await bot.getMessageList(channelId, id, 'around')
}
}
if (!span || !message) {
const result = await this.bot.getMessageList(this.channelId, id, 'around')
const data = result.data.map(item => Message.from(item, this.bot.platform))
span = Span.from(data, direction === 'before')
message = data.find(message => message.id === id)!
}
if (direction === 'around') {
limit = Math.floor(limit / 2) + 1
}
const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'before')
const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'after')
const [before, after] = await Promise.all([beforeTask, afterTask])
after.shift()
before.shift()
before.reverse()
if (direction === 'after') return after
if (direction === 'before') return before
return [...before, message, ...after]
}

private async syncHistory(next: Span, message: Message | { uid: bigint }, limit: number, direction: 'before' | 'after') {
const buffer: Message[] = []
const { channelId, platform, assignee } = this.data
const bot = this.ctx.bots[`${platform}:${assignee}`]
const dir = ({
before: {
front: 'front',
Expand Down Expand Up @@ -264,7 +236,7 @@ export class SyncChannel {
let token = next[dir.back][1]
const data: Message[] = []
while (token) {
const result = await bot.getMessageList(channelId, token, direction)
const result = await this.bot.getMessageList(this.channelId, token, direction)
if (direction === 'before') result.data.reverse()
for (const item of result.data) {
const prev = this._spans.find(span => span[dir.front][1] === item.id)
Expand All @@ -278,7 +250,7 @@ export class SyncChannel {
message = { uid: prev[dir.front][0] }
continue outer
}
data.push(Message.from(item, platform))
data.push(Message.from(item, this.bot.platform))
}
if (data.length + buffer.length >= limit) {
buffer.push(...data)
Expand All @@ -294,15 +266,3 @@ export class SyncChannel {
}
}
}

export namespace SyncChannel {
export interface Data {
platform: string
guildId: string
channelId: string
assignee?: string
guildName?: string
channelName?: string
avatar?: string
}
}
7 changes: 2 additions & 5 deletions packages/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
const { platform, guildId, channelId } = session
if (session.bot.hidden) return
const key = platform + '/' + guildId + '/' + channelId
this._channels[key] ||= new SyncChannel(this.ctx, session.platform, session.guildId, session.channelId)
this._channels[key] ||= new SyncChannel(this.ctx, session.bot, session.guildId, session.channelId)
this._channels[key].queue(session)
})

Expand Down Expand Up @@ -173,10 +173,7 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
tasks.push((async () => {
for await (const channel of bot.getChannelIter(guild.id)) {
const key = bot.platform + '/' + guild.id + '/' + channel.id
this._channels[key] ||= new SyncChannel(this.ctx, bot.platform, guild.id, channel.id)
this._channels[key].data.assignee = bot.selfId
this._channels[key].data.guildName = guild.name
this._channels[key].data.channelName = channel.name
this._channels[key] ||= new SyncChannel(this.ctx, bot, guild.id, channel.id)
}
})())
}
Expand Down

0 comments on commit 3654016

Please sign in to comment.