Skip to content
This repository has been archived by the owner on Dec 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #82 from babylonhealth/orbit-2-performance
Browse files Browse the repository at this point in the history
[PDT-159] Orbit2 performance improvements
  • Loading branch information
Mikolaj Leszczynski authored Jun 11, 2020
2 parents edb9bac + 3698cf3 commit 16d8315
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 139 deletions.
25 changes: 25 additions & 0 deletions .idea/jarRepositories.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions buildSrc/src/main/kotlin/DependencyManagement.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ object Versions {

const val kotlin = "1.3.72"
const val coroutines = "1.3.5"
const val coroutineExtensions = "0.0.4"

const val androidLifecycles = "2.2.0"
const val androidLifecyclesSavedState = "2.2.0"
Expand Down Expand Up @@ -62,8 +61,6 @@ object ProjectDependencies {
"org.jetbrains.kotlinx:kotlinx-coroutines-core:${Versions.coroutines}"
const val kotlinCoroutinesRx2 =
"org.jetbrains.kotlinx:kotlinx-coroutines-rx2:${Versions.coroutines}"
const val kotlinCoroutineExtensions =
"com.github.akarnokd:kotlin-flow-extensions:${Versions.coroutineExtensions}"
const val kotlinTest = "org.jetbrains.kotlin:kotlin-test:${Versions.kotlin}"

// Android libraries
Expand Down
1 change: 0 additions & 1 deletion orbit-2-core/orbit-2-core_build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ plugins {
dependencies {
implementation(kotlin("stdlib-jdk8"))
implementation(ProjectDependencies.kotlinCoroutines)
implementation(ProjectDependencies.kotlinCoroutineExtensions)

// Testing
GroupedDependencies.testsImplementationJUnit5.forEach { testImplementation(it) }
Expand Down
14 changes: 7 additions & 7 deletions orbit-2-core/src/main/java/com/babylon/orbit2/BasePlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,21 @@ object BasePlugin : OrbitPlugin {
}
is SideEffect<*, *, *> -> flow.onEach {
with(operator as SideEffect<S, SE, E>) {
createContext(it).let {
createContext(it).let { context ->
SideEffectContext(
it.state,
it.event,
context.state,
context.event,
containerContext.postSideEffect
)
}
.block()
}
}
is Reduce -> flow.onEach {
is Reduce -> flow.onEach { event ->
with(operator) {
containerContext.setState {
createContext(it).block() as S
}
containerContext.setState.send(
createContext(event).block() as S
)
}
}
else -> flow
Expand Down
3 changes: 2 additions & 1 deletion orbit-2-core/src/main/java/com/babylon/orbit2/OrbitPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.babylon.orbit2

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow

interface OrbitPlugin {
Expand All @@ -29,7 +30,7 @@ interface OrbitPlugin {

class ContainerContext<S : Any, SE : Any>(
val backgroundDispatcher: CoroutineDispatcher,
val setState: suspend (() -> S) -> Unit,
val setState: SendChannel<S>,
val postSideEffect: (SE) -> Unit
)
}
57 changes: 22 additions & 35 deletions orbit-2-core/src/main/java/com/babylon/orbit2/RealContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,80 +16,67 @@

package com.babylon.orbit2

import hu.akarnokd.kotlin.flow.replay
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.Executors

@ExperimentalCoroutinesApi
@FlowPreview
open class RealContainer<STATE : Any, SIDE_EFFECT : Any>(
initialState: STATE,
settings: Container.Settings,
orbitDispatcher: CoroutineDispatcher = DEFAULT_DISPATCHER,
backgroundDispatcher: CoroutineDispatcher = Dispatchers.IO
) : Container<STATE, SIDE_EFFECT> {
override val currentState: STATE
get() = stateChannel.value
private val scope = CoroutineScope(orbitDispatcher)
private val stateChannel = ConflatedBroadcastChannel(initialState)
private val sideEffectChannel = Channel<SIDE_EFFECT>(Channel.RENDEZVOUS)
private val scope = CoroutineScope(orbitDispatcher)
private val stateMutex = Mutex()
private val sideEffectMutex = Mutex()
private val pluginContext = OrbitPlugin.ContainerContext(
backgroundDispatcher = backgroundDispatcher,
setState = stateChannel,
postSideEffect = { event: SIDE_EFFECT ->
scope.launch {
// Ensure side effect ordering
sideEffectMutex.withLock {
sideEffectChannel.send(event)
}
}
}
)

override val currentState: STATE
get() = stateChannel.value

override val stateStream: Stream<STATE> =
stateChannel.asFlow().distinctUntilChanged().replay(1) { it }.asStream()
override val stateStream = stateChannel.asStateStream { currentState }

override val sideEffectStream: Stream<SIDE_EFFECT> =
if (settings.sideEffectCaching) {
sideEffectChannel.asCachingStream(scope)
} else {
sideEffectChannel.asStream(scope)
sideEffectChannel.asNonCachingStream()
}

override fun orbit(
init: Builder<STATE, SIDE_EFFECT, Unit>.() -> Builder<STATE, SIDE_EFFECT, *>
) {
override fun orbit(init: Builder<STATE, SIDE_EFFECT, Unit>.() -> Builder<STATE, SIDE_EFFECT, *>) {
scope.launch {
collectFlow(init)
}
}

private val pluginContext = OrbitPlugin.ContainerContext<STATE, SIDE_EFFECT>(
backgroundDispatcher = backgroundDispatcher,
setState = {
scope.launch {
stateMutex.withLock {
val reduced = it()
stateChannel.send(reduced)
}
}.join()
},
postSideEffect = { event: SIDE_EFFECT ->
scope.launch {
sideEffectMutex.withLock {
sideEffectChannel.send(event)
}
}
}
)

@Suppress("UNCHECKED_CAST")
suspend fun collectFlow(
init: Builder<STATE, SIDE_EFFECT, Unit>.() -> Builder<STATE, SIDE_EFFECT, *>
) {
suspend fun collectFlow(init: Builder<STATE, SIDE_EFFECT, Unit>.() -> Builder<STATE, SIDE_EFFECT, *>) {
Builder<STATE, SIDE_EFFECT, Unit>()
.init().stack.fold(flowOf(Unit)) { flow: Flow<Any>, operator: Operator<STATE, *> ->
Orbit.plugins.fold(flow) { flow2: Flow<Any>, plugin: OrbitPlugin ->
Expand Down
80 changes: 31 additions & 49 deletions orbit-2-core/src/main/java/com/babylon/orbit2/StreamExtensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,93 +19,79 @@ package com.babylon.orbit2
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.broadcast
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.io.Closeable
import java.util.concurrent.atomic.AtomicInteger

internal fun <T> Flow<T>.asStream(): Stream<T> {
@ExperimentalCoroutinesApi
internal fun <T> BroadcastChannel<T>.asStateStream(initial: () -> T): Stream<T> {
return object : Stream<T> {
override fun observe(lambda: (T) -> Unit): Closeable {
val scope = CoroutineScope(Dispatchers.Unconfined)
scope.launch {
this@asStream.collect {
lambda(it)
val sub = this@asStateStream.openSubscription()

CoroutineScope(Dispatchers.Unconfined).launch {
var lastState = initial()
lambda(lastState)

for (state in sub) {
if (state != lastState) {
lastState = state
lambda(state)
}
}
}
return Closeable { scope.cancel() }
return Closeable { sub.cancel() }
}
}
}

internal fun <T> Channel<T>.asStream(originalScope: CoroutineScope): Stream<T> {
return object : Stream<T> {
private val broadcastChannel = originalScope.broadcast(
capacity = 1024,
start = CoroutineStart.DEFAULT
) {
for (item in this@asStream) {
if (isActive) {
send(item)
} else {
break
}
}
}
@ExperimentalCoroutinesApi
internal fun <T> Channel<T>.asNonCachingStream(): Stream<T> {
val broadcastChannel = this.broadcast(start = CoroutineStart.DEFAULT)

return object : Stream<T> {
override fun observe(lambda: (T) -> Unit): Closeable {
val scope = CoroutineScope(Dispatchers.Unconfined)
val receiveChannel = broadcastChannel.openSubscription()
scope.launch {
CoroutineScope(Dispatchers.Unconfined).launch {
for (item in receiveChannel) {
lambda(item)
}
}
return Closeable {
receiveChannel.cancel()
scope.cancel()
}
}
}
}

@ExperimentalCoroutinesApi
internal fun <T> Channel<T>.asCachingStream(originalScope: CoroutineScope): Stream<T> {
return object : Stream<T> {
private val channels = mutableSetOf<ReceiveChannel<T>>()
private val subCount = AtomicInteger(0)
private val buffer = mutableListOf<T>()
private val bufferMutex = Mutex()
private val channel = BroadcastChannel<T>(Channel.BUFFERED)

init {
originalScope.launch {
for (item in this@asCachingStream) {
bufferMutex.withLock {
if (channels.isEmpty()) {
buffer.add(item)
} else {
channel.send(item)
}
if (subCount.get() == 0) {
buffer.add(item)
} else {
channel.send(item)
}
}
}
}

override fun observe(lambda: (T) -> Unit): Closeable {
val scope = CoroutineScope(Dispatchers.Unconfined)
val receiveChannel = channel.openSubscription()

scope.launch {
bufferMutex.withLock {
channels += receiveChannel
CoroutineScope(Dispatchers.Unconfined).launch {
if (subCount.compareAndSet(0, 1)) {
buffer.forEach { buffered ->
channel.send(buffered)
}
Expand All @@ -116,12 +102,8 @@ internal fun <T> Channel<T>.asCachingStream(originalScope: CoroutineScope): Stre
}
}
return Closeable {
runBlocking {
bufferMutex.withLock {
channels.remove(receiveChannel)
receiveChannel.cancel()
}
}
receiveChannel.cancel()
subCount.decrementAndGet()
}
}
}
Expand Down
Loading

0 comments on commit 16d8315

Please sign in to comment.