Skip to content

Commit

Permalink
feat(database): build new span from around
Browse files Browse the repository at this point in the history
  • Loading branch information
shigma committed May 6, 2024
1 parent 3654016 commit 766f660
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 23 deletions.
84 changes: 61 additions & 23 deletions packages/database/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@ export enum SyncStatus {
FAILED,
}

enum SpanType {
LOCAL,
SYNC,
REMOTE,
}

interface Span {
type: 'synced' | 'syncing' | 'buffer'
type: SpanType
front: Endpoint
back: Endpoint
prev?: Span
Expand All @@ -25,7 +31,7 @@ interface Span {
namespace Span {
export function from(data: Message[], reverse: boolean) {
if (reverse) data.reverse()
const span = { data, type: 'buffer' } as Span
const span = { data, type: SpanType.LOCAL } as Span
span.back = [data[0].uid, data[0].id]
span.front = [data[data.length - 1].uid, data[data.length - 1].id]
return span
Expand Down Expand Up @@ -69,11 +75,11 @@ export class SyncChannel {
const { syncFlag, id: frontId, uid: frontUid } = data.pop()!
const front: Endpoint = [frontUid, frontId]
if (syncFlag === SyncFlag.BOTH) {
this._spans.push({ front, back: front, type: 'synced' })
this._spans.push({ front, back: front, type: SpanType.REMOTE })
} else if (syncFlag === SyncFlag.FRONT) {
const { syncFlag, id, uid } = data.pop()!
if (syncFlag === SyncFlag.BACK) {
this._spans.push({ front, back: [uid, id], type: 'synced' })
this._spans.push({ front, back: [uid, id], type: SpanType.REMOTE })
} else {
throw new Error('malformed sync flag')
}
Expand All @@ -85,11 +91,11 @@ export class SyncChannel {

async queue(session: Session) {
const message = Message.from(session.event.message!, session.platform)
if (this._hasLatest && this._spans[0]?.type === 'buffer') {
if (this._hasLatest && this._spans[0]?.type === SpanType.LOCAL) {
this._spans[0].data!.push(message)
} else {
this._spans.unshift({
type: 'buffer',
type: SpanType.LOCAL,
front: [message.uid, message.id],
back: [message.uid, message.id],
data: [message],
Expand Down Expand Up @@ -148,7 +154,7 @@ export class SyncChannel {
remove(this._spans, span)
span.next.back = span.back
} else {
span.type = 'synced'
span.type = SpanType.REMOTE
delete span.data
delete span.prev
delete span.next
Expand All @@ -157,10 +163,14 @@ export class SyncChannel {

async getMessageList(id: string, direction: Universal.Direction, limit: number) {
let span: Span | undefined, message: Message | undefined

// condition 1: message in local
for (span of this._spans) {
message = span.data?.find(message => message.id === id)
if (message) break
}

// condition 2: message in database
if (!message) {
const data = await this.ctx.database
.select('satori.message')
Expand All @@ -172,17 +182,36 @@ export class SyncChannel {
if (!span) throw new Error('malformed sync span')
}
}
if (!span || !message) {

// condition 3: message not found
if (!message) {
const result = await this.bot.getMessageList(this.channelId, id, 'around')
const data = result.data.map(item => Message.from(item, this.bot.platform))
span = Span.from(data, direction === 'before')
message = data.find(message => message.id === id)!
const index = result.data.findIndex(item => item.id === id)
if (index === -1) throw new Error('malformed message list')
message = Message.from(result.data[index], this.bot.platform)
const data = [message]
let prev: Span | undefined, next: Span | undefined
for (let i = index - 1; i >= 0; i--) {
prev = this._spans.find(span => span.front[1] === result.data[i].id)
if (prev) break
data.unshift(Message.from(result.data[i], this.bot.platform))
}
for (let i = index + 1; i < result.data.length; i++) {
next = this._spans.find(span => span.back[1] === result.data[i].id)
if (next) break
data.push(Message.from(result.data[i], this.bot.platform))
}
span = Span.from(data, false)
span.prev = prev
span.next = next
this._spans.push(span)
}

if (direction === 'around') {
limit = Math.floor(limit / 2) + 1
limit = Math.floor(limit / 2)
}
const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'before')
const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'after')
const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span!, message, limit + 1, 'before')
const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span!, message, limit + 1, 'after')
const [before, after] = await Promise.all([beforeTask, afterTask])
after.shift()
before.shift()
Expand All @@ -193,7 +222,7 @@ export class SyncChannel {
}

private async syncHistory(next: Span, message: Message | { uid: bigint }, limit: number, direction: 'before' | 'after') {
const buffer: Message[] = []
const local: Message[] = []
const dir = ({
before: {
front: 'front',
Expand All @@ -214,9 +243,17 @@ export class SyncChannel {
$gte: '$lte',
},
} as const)[direction]

outer: while (true) {
if ('id' in message && next[dir.front][0] === message.uid) {
buffer.push(message)
if (next.data) {
const index = next.data.findIndex(item => item.uid === message.uid)
if (direction === 'before') {
local.push(...next.data.slice(0, index + 1).reverse())
} else {
local.push(...next.data.slice(index))
}
} else if ('id' in message && next[dir.front][0] === message.uid) {
local.push(message)
} else {
const before = await this.ctx.database
.select('satori.message')
Expand All @@ -228,11 +265,12 @@ export class SyncChannel {
},
})
.orderBy('uid', dir.desc)
.limit(limit - buffer.length)
.limit(limit - local.length)
.execute()
buffer.push(...before)
local.push(...before)
}
if (buffer.length >= limit) return buffer
if (local.length >= limit) return local.slice(0, limit)

let token = next[dir.back][1]
const data: Message[] = []
while (token) {
Expand All @@ -252,8 +290,8 @@ export class SyncChannel {
}
data.push(Message.from(item, this.bot.platform))
}
if (data.length + buffer.length >= limit) {
buffer.push(...data)
if (data.length + local.length >= limit) {
local.push(...data)
break
}
token = result.next!
Expand All @@ -262,7 +300,7 @@ export class SyncChannel {
_span[dir.next] = next
this._spans.push(_span)
this.flushSpan(_span)
return buffer.slice(0, limit)
return local.slice(0, limit)
}
}
}
2 changes: 2 additions & 0 deletions packages/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
})

this.ctx.on('message-deleted', async (session) => {
// TODO update local message
await this.ctx.database.set('satori.message', {
messageId: session.messageId,
platform: session.platform,
Expand All @@ -142,6 +143,7 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
})

this.ctx.on('message-updated', async (session) => {
// TODO update local message
await this.ctx.database.set('satori.message', {
messageId: session.messageId,
platform: session.platform,
Expand Down

0 comments on commit 766f660

Please sign in to comment.