Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tech/engine tests #250

Merged
merged 6 commits into from
Nov 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion chat-engine/src/testFixtures/kotlin/fixture/Fixtures.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,15 @@ fun anImageMeta(
fun aRoomState(
roomOverview: RoomOverview = aRoomOverview(),
events: List<RoomEvent> = listOf(aRoomMessageEvent()),
) = RoomState(roomOverview, events)
) = RoomState(roomOverview, events)

fun aRoomInvite(
from: RoomMember = aRoomMember(),
roomId: RoomId = aRoomId(),
inviteMeta: RoomInvite.InviteMeta = RoomInvite.InviteMeta.DirectMessage,
) = RoomInvite(from, roomId, inviteMeta)

fun aTypingEvent(
roomId: RoomId = aRoomId(),
members: List<RoomMember> = listOf(aRoomMember())
) = Typing(roomId, members)
23 changes: 23 additions & 0 deletions core/src/main/kotlin/app/dapk/st/core/extensions/FlowExtensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,26 @@ suspend fun <T> Flow<T>.firstOrNull(count: Int, predicate: suspend (T) -> Boolea

return result
}

inline fun <T1, T2, T3, T4, T5, T6, R> combine(
flow: Flow<T1>,
flow2: Flow<T2>,
flow3: Flow<T3>,
flow4: Flow<T4>,
flow5: Flow<T5>,
flow6: Flow<T6>,
crossinline transform: suspend (T1, T2, T3, T4, T5, T6) -> R
): Flow<R> {
return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6) { args: Array<*> ->
@Suppress("UNCHECKED_CAST")
transform(
args[0] as T1,
args[1] as T2,
args[2] as T3,
args[3] as T4,
args[4] as T5,
args[5] as T6,
)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package app.dapk.st.engine

import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.common.RoomMember
import app.dapk.st.matrix.common.UserId
import app.dapk.st.matrix.common.asString
import app.dapk.st.matrix.message.MessageService
import app.dapk.st.matrix.room.RoomService

internal typealias DirectoryMergeWithLocalEchosUseCase = suspend (OverviewState, UserId, Map<RoomId, List<MessageService.LocalEcho>>) -> OverviewState

internal class DirectoryMergeWithLocalEchosUseCaseImpl(
private val roomService: RoomService,
) : DirectoryMergeWithLocalEchosUseCase {

override suspend fun invoke(overview: OverviewState, selfId: UserId, echos: Map<RoomId, List<MessageService.LocalEcho>>): OverviewState {
return when {
echos.isEmpty() -> overview
else -> overview.map {
when (val roomEchos = echos[it.roomId]) {
null -> it
else -> it.mergeWithLocalEchos(
member = roomService.findMember(it.roomId, selfId) ?: RoomMember(
selfId,
null,
avatarUrl = null,
),
echos = roomEchos,
)
}
}
}
}

private fun RoomOverview.mergeWithLocalEchos(member: RoomMember, echos: List<MessageService.LocalEcho>): RoomOverview {
val latestEcho = echos.maxByOrNull { it.timestampUtc }
return if (latestEcho != null && latestEcho.timestampUtc > (this.lastMessage?.utcTimestamp ?: 0)) {
this.copy(
lastMessage = RoomOverview.LastMessage(
content = when (val message = latestEcho.message) {
is MessageService.Message.TextMessage -> message.content.body.asString()
is MessageService.Message.ImageMessage -> "\uD83D\uDCF7"
},
utcTimestamp = latestEcho.timestampUtc,
author = member,
)
)
} else {
this
}
}

}
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
package app.dapk.st.engine

import app.dapk.st.matrix.common.*
import app.dapk.st.core.extensions.combine
import app.dapk.st.matrix.common.CredentialsStore
import app.dapk.st.matrix.message.MessageService
import app.dapk.st.matrix.room.RoomService
import app.dapk.st.matrix.sync.RoomStore
import app.dapk.st.matrix.sync.SyncService
import app.dapk.st.matrix.sync.SyncService.SyncEvent.Typing
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map

internal class DirectoryUseCase(
private val syncService: SyncService,
private val messageService: MessageService,
private val roomService: RoomService,
private val credentialsStore: CredentialsStore,
private val roomStore: RoomStore,
private val mergeLocalEchosUseCase: DirectoryMergeWithLocalEchosUseCase,
) {

fun state(): Flow<DirectoryState> {
return flow { emit(credentialsStore.credentials()!!.userId) }.flatMapMerge { userId ->
return flow { emit(credentialsStore.credentials()!!.userId) }.flatMapConcat { userId ->
combine(
overviewDatasource(),
syncService.startSyncing(),
syncService.overview().map { it.map { it.engine() } },
messageService.localEchos(),
roomStore.observeUnreadCountById(),
syncService.events(),
roomStore.observeMuted(),
) { overviewState, localEchos, unread, events, muted ->
overviewState.mergeWithLocalEchos(localEchos, userId).map { roomOverview ->
) { _, overviewState, localEchos, unread, events, muted ->
mergeLocalEchosUseCase.invoke(overviewState, userId, localEchos).map { roomOverview ->
DirectoryItem(
overview = roomOverview,
unreadCount = UnreadCount(unread[roomOverview.roomId] ?: 0),
Expand All @@ -36,50 +40,4 @@ internal class DirectoryUseCase(
}
}
}

private fun overviewDatasource() = combine(
syncService.startSyncing(),
syncService.overview().map { it.map { it.engine() } }
) { _, overview -> overview }.filterNotNull()

private suspend fun OverviewState.mergeWithLocalEchos(localEchos: Map<RoomId, List<MessageService.LocalEcho>>, userId: UserId): OverviewState {
return when {
localEchos.isEmpty() -> this
else -> this.map {
when (val roomEchos = localEchos[it.roomId]) {
null -> it
else -> it.mergeWithLocalEchos(
member = roomService.findMember(it.roomId, userId) ?: RoomMember(
userId,
null,
avatarUrl = null,
),
echos = roomEchos,
)
}
}
}
}

private fun RoomOverview.mergeWithLocalEchos(member: RoomMember, echos: List<MessageService.LocalEcho>): RoomOverview {
val latestEcho = echos.maxByOrNull { it.timestampUtc }
return if (latestEcho != null && latestEcho.timestampUtc > (this.lastMessage?.utcTimestamp ?: 0)) {
this.copy(
lastMessage = RoomOverview.LastMessage(
content = when (val message = latestEcho.message) {
is MessageService.Message.TextMessage -> message.content.body.asString()
is MessageService.Message.ImageMessage -> "\uD83D\uDCF7"
},
utcTimestamp = latestEcho.timestampUtc,
author = member,
)
)
} else {
this
}
}

}



Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app.dapk.st.engine

import app.dapk.st.matrix.sync.SyncService
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.map

class InviteUseCase(
Expand All @@ -14,6 +13,6 @@ class InviteUseCase(
private fun invitesDatasource() = combine(
syncService.startSyncing(),
syncService.invites().map { it.map { it.engine() } }
) { _, invites -> invites }.filterNotNull()
) { _, invites -> invites }

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app.dapk.st.engine
import app.dapk.st.core.Base64
import app.dapk.st.core.BuildMeta
import app.dapk.st.core.CoroutineDispatchers
import app.dapk.st.core.JobBag
import app.dapk.st.core.extensions.ErrorTracker
import app.dapk.st.matrix.MatrixClient
import app.dapk.st.matrix.MatrixTaskRunner
Expand Down Expand Up @@ -172,14 +173,14 @@ class MatrixEngine internal constructor(
DirectoryUseCase(
matrix.syncService(),
matrix.messageService(),
matrix.roomService(),
credentialsStore,
roomStore
roomStore,
DirectoryMergeWithLocalEchosUseCaseImpl(matrix.roomService()),
)
}
val timelineUseCase = unsafeLazy {
val matrix = lazyMatrix.value
val mergeWithLocalEchosUseCase = MergeWithLocalEchosUseCaseImpl(LocalEchoMapper(MetaMapper()))
val mergeWithLocalEchosUseCase = TimelineMergeWithLocalEchosUseCaseImpl(LocalEchoMapper(MetaMapper()))
val timeline = TimelineUseCaseImpl(matrix.syncService(), matrix.messageService(), matrix.roomService(), mergeWithLocalEchosUseCase)
ReadMarkingTimeline(roomStore, credentialsStore, timeline, matrix.roomService())
}
Expand All @@ -190,7 +191,16 @@ class MatrixEngine internal constructor(
}

val mediaDecrypter = unsafeLazy { MatrixMediaDecrypter(base64) }
val pushHandler = unsafeLazy { MatrixPushHandler(backgroundScheduler, credentialsStore, lazyMatrix.value.syncService(), roomStore) }
val pushHandler = unsafeLazy {
MatrixPushHandler(
backgroundScheduler,
credentialsStore,
lazyMatrix.value.syncService(),
roomStore,
coroutineDispatchers,
JobBag(),
)
}

val invitesUseCase = unsafeLazy { InviteUseCase(lazyMatrix.value.syncService()) }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package app.dapk.st.engine

import app.dapk.st.core.AppLogTag
import app.dapk.st.core.CoroutineDispatchers
import app.dapk.st.core.JobBag
import app.dapk.st.core.log
import app.dapk.st.matrix.common.CredentialsStore
import app.dapk.st.matrix.common.EventId
Expand All @@ -9,17 +11,20 @@ import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.message.BackgroundScheduler
import app.dapk.st.matrix.sync.RoomStore
import app.dapk.st.matrix.sync.SyncService
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull

private var previousJob: Job? = null

@OptIn(DelicateCoroutinesApi::class)
class MatrixPushHandler(
private val backgroundScheduler: BackgroundScheduler,
private val credentialsStore: CredentialsStore,
private val syncService: SyncService,
private val roomStore: RoomStore,
private val dispatchers: CoroutineDispatchers,
private val jobBag: JobBag,
) : PushHandler {

override fun onNewToken(payload: JsonString) {
Expand All @@ -35,13 +40,12 @@ class MatrixPushHandler(

override fun onMessageReceived(eventId: EventId?, roomId: RoomId?) {
log(AppLogTag.PUSH, "push received")
previousJob?.cancel()
previousJob = GlobalScope.launch {
jobBag.replace(MatrixPushHandler::class, dispatchers.global.launch {
when (credentialsStore.credentials()) {
null -> log(AppLogTag.PUSH, "push ignored due to missing api credentials")
else -> doSync(roomId, eventId)
}
}
})
}

private suspend fun doSync(roomId: RoomId?, eventId: EventId?) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import app.dapk.st.matrix.common.EventId
import app.dapk.st.matrix.common.RoomMember
import app.dapk.st.matrix.message.MessageService

internal typealias MergeWithLocalEchosUseCase = (RoomState, RoomMember, List<MessageService.LocalEcho>) -> RoomState
internal typealias TimelineMergeWithLocalEchosUseCase = (RoomState, RoomMember, List<MessageService.LocalEcho>) -> RoomState

internal class MergeWithLocalEchosUseCaseImpl(
internal class TimelineMergeWithLocalEchosUseCaseImpl(
private val localEventMapper: LocalEchoMapper,
) : MergeWithLocalEchosUseCase {
) : TimelineMergeWithLocalEchosUseCase {

override fun invoke(roomState: RoomState, member: RoomMember, echos: List<MessageService.LocalEcho>): RoomState {
val echosByEventId = echos.associateBy { it.eventId }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import app.dapk.st.matrix.common.EventId
import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.common.UserId
import app.dapk.st.matrix.room.RoomService
import app.dapk.st.matrix.sync.RoomEvent
import app.dapk.st.matrix.sync.RoomStore
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
Expand All @@ -24,7 +22,7 @@ class ReadMarkingTimeline(
val credentials = credentialsStore.credentials()!!
roomStore.markRead(roomId)
emit(credentials)
}.flatMapMerge { credentials ->
}.flatMapConcat { credentials ->
var lastKnownReadEvent: EventId? = null
observeTimelineUseCase.invoke(roomId, credentials.userId).distinctUntilChanged().onEach { state ->
state.latestMessageEventFromOthers(self = credentials.userId)?.let {
Expand All @@ -37,8 +35,9 @@ class ReadMarkingTimeline(
}
}

private suspend fun updateRoomReadStateAsync(latestReadEvent: EventId, state: MessengerPageState, isReadReceiptsDisabled: Boolean): Deferred<*> {
return coroutineScope {
@Suppress("DeferredResultUnused")
private suspend fun updateRoomReadStateAsync(latestReadEvent: EventId, state: MessengerPageState, isReadReceiptsDisabled: Boolean) {
coroutineScope {
async {
runCatching {
roomService.markFullyRead(state.roomState.roomOverview.roomId, latestReadEvent, isPrivate = isReadReceiptsDisabled)
Expand All @@ -48,10 +47,9 @@ class ReadMarkingTimeline(
}
}

}

private fun MessengerPageState.latestMessageEventFromOthers(self: UserId) = this.roomState.events
.filterIsInstance<RoomEvent.Message>()
.filterNot { it.author.id == self }
.firstOrNull()
?.eventId
private fun MessengerPageState.latestMessageEventFromOthers(self: UserId) = this.roomState.events
.filterIsInstance<RoomEvent.Message>()
.filterNot { it.author.id == self }
.firstOrNull()
?.eventId
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal class TimelineUseCaseImpl(
private val syncService: SyncService,
private val messageService: MessageService,
private val roomService: RoomService,
private val mergeWithLocalEchosUseCase: MergeWithLocalEchosUseCase
private val timelineMergeWithLocalEchosUseCase: TimelineMergeWithLocalEchosUseCase,
) : ObserveTimelineUseCase {

override fun invoke(roomId: RoomId, userId: UserId): Flow<MessengerPageState> {
Expand All @@ -30,7 +30,7 @@ internal class TimelineUseCaseImpl(
roomState = when {
localEchos.isEmpty() -> roomState
else -> {
mergeWithLocalEchosUseCase.invoke(
timelineMergeWithLocalEchosUseCase.invoke(
roomState,
roomService.findMember(roomId, userId) ?: userId.toFallbackMember(),
localEchos,
Expand Down
Loading