Skip to content
This repository has been archived by the owner on Dec 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #122 from babylonhealth/stream-revamp
Browse files Browse the repository at this point in the history
Rethinking supported connection methods
  • Loading branch information
Mikolaj Leszczynski authored Sep 24, 2020
2 parents 74e76f6 + 40801f6 commit 5acbb68
Show file tree
Hide file tree
Showing 59 changed files with 831 additions and 639 deletions.
14 changes: 13 additions & 1 deletion .idea/codeStyles/Project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ from scratch.

### Orbit ❤️ Android

- Subscribe to state and side effects through LiveData
- Subscribe to state and side effects through Flow
- ViewModel support, along with SavedState!

### Testing 🤖
Expand Down Expand Up @@ -159,9 +159,13 @@ projects as well as lifecycle independent services.

## Connecting to a ViewModel

Now we need to wire up the `ViewModel` to our UI. Orbit provides various methods
of connecting via optional modules. For Android, the most convenient way to
connect is via `LiveData`, as it manages subscription disposal automatically.
Now we need to wire up the `ViewModel` to our UI. We expose coroutine
`Flow`s through which one can conveniently subscribe to updates.
Alternatively you can convert these to your preferred type using
externally provided extension methods e.g.
[asLiveData](https://developer.android.com/reference/kotlin/androidx/lifecycle/package-summary#(kotlinx.coroutines.flow.Flow).asLiveData(kotlin.coroutines.CoroutineContext,%20kotlin.Long))
or
[asObservable](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-observable.html).

``` kotlin
class CalculatorActivity: AppCompatActivity() {
Expand All @@ -174,10 +178,12 @@ class CalculatorActivity: AppCompatActivity() {
addButton.setOnClickListener { viewModel.add(1234) }
subtractButton.setOnClickListener { viewModel.subtract(1234) }

// NOTE: Live data support is provided by the live data module:
// com.babylon.orbit2:orbit-livedata
viewModel.container.state.observe(this, Observer { render(it) })
viewModel.container.sideEffect.observe(this, Observer { handleSideEffect(it) })
lifecycleScope.launchWhenCreated {
viewModel.container.stateFlow.collect { render(it) }
}
lifecycleScope.launchWhenCreated {
viewModel.container.sideEffectFlow.collect { handleSideEffect(it) }
}
}

private fun render(state: CalculatorState) {
Expand Down
83 changes: 61 additions & 22 deletions orbit-2-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ It provides all the basic parts of Orbit.
- [Architecture](#architecture)
- [Orbit concepts](#orbit-concepts)
- [Side effects](#side-effects)
- [Limitations](#limitations)
- [Including the module](#including-the-module)
- [Orbit container](#orbit-container)
- [Subscribing to the container](#subscribing-to-the-container)
- [ContainerHost](#containerhost)
- [Core Orbit operators](#core-orbit-operators)
- [Transform](#transform)
Expand Down Expand Up @@ -76,6 +78,17 @@ The UI does not have to be aware of all side effects (e.g. why should the UI
care if you send analytics events?). As such you can have side effects that do
not post any event back to the UI.

Side effects are cached if there are no observers, guaranteeing critical
events such as navigation are delivered after re-subscription.

#### Limitations

`Container.sideEffectFlow` is designed to be collected by only one
observer. This ensures that side effect caching works in a predictable
way. If your particular use case requires multi-casting use `broadcast`
on the side effect flow, but be aware that caching will not work for the
resulting `BroadcastChannel`.

## Including the module

Orbit 2 is a modular framework. You will need this module to get started!
Expand All @@ -93,6 +106,43 @@ Orbit MVI system. It retains the state, allows you to listen to side effects and
state updates and allows you to modify the state through the `orbit` function
which executes Orbit operators of your desired business logic.

### Subscribing to the container

[Container](src/main/java/com/babylon/orbit2/Container.kt) exposes flows
that emit updates to the container state and side effects.

- State emissions are conflated
- Side effects are cached by default if no observers are listening. This
can be changed via
[Container Settings](src/main/java/com/babylon/orbit2/Container.kt#Settings)

``` kotlin
data class ExampleState(val seen: List<String> = emptyList())

sealed class ExampleSideEffect {
data class Toast(val text: String)
}

fun main() {
// create a container
val container = container<ExampleState, ExampleSideEffect>(ExampleState())

// subscribe to updates
// For Android, use `lifecycleScope.launchWhenCreated` instead
CoroutineScope(Dispatchers.Main).launch {
container.stateFlow.collect {
// do something with the state
}
}
CoroutineScope(Dispatchers.Main).launch {
container.sideEffectFlow.collect {
// do something with the side effect
}
}
}

```

### ContainerHost

A [ContainerHost](src/main/java/com/babylon/orbit2/ContainerHost.kt) is not
Expand Down Expand Up @@ -122,16 +172,10 @@ Operators are invoked via the `orbit` function in a
commonly, a [Container](src/main/java/com/babylon/orbit2/Container.kt) directly)

For more information about which threads these operators run on please see
[threading](#threading).
[Threading](#threading).

``` kotlin
data class ExampleState(val seen: List<String> = emptyList())

sealed class ExampleSideEffect {
data class Toast(val text: String)
}

class ExampleViewModel : ContainerHost<ExampleState, ExampleSideEffect>, ViewModel() {
class Example : ContainerHost<ExampleState, ExampleSideEffect> {
override val container = container<ExampleState, ExampleSideEffect>(ExampleState())

fun example(number: Int) = orbit {
Expand All @@ -152,7 +196,7 @@ easily.
### Transform

``` kotlin
class ExampleViewModel : ContainerHost<ExampleState, ExampleSideEffect> {
class Example : ContainerHost<ExampleState, ExampleSideEffect> {
...

fun example(number: Int) = orbit {
Expand All @@ -177,7 +221,7 @@ a backend API or subscribe to a stream of location updates.
### Reduce

``` kotlin
class ExampleViewModel : ContainerHost<ExampleState, ExampleSideEffect> {
class Example : ContainerHost<ExampleState, ExampleSideEffect> {
...

fun example(number: Int) = orbit {
Expand All @@ -204,7 +248,7 @@ upstream reduction has completed.
### Side effect

``` kotlin
class ExampleViewModel : ContainerHost<ExampleState, ExampleSideEffect> {
class Example : ContainerHost<ExampleState, ExampleSideEffect> {
...

fun example(number: Int) = orbit {
Expand Down Expand Up @@ -253,7 +297,7 @@ Examples of using the exposed fields:

``` kotlin
perform("Toast the current state")
class ExampleViewModel : ContainerHost<ExampleState, ExampleSideEffect> {
class Example : ContainerHost<ExampleState, ExampleSideEffect> {
...

fun anotherExample(number: Int) = orbit {
Expand All @@ -268,7 +312,7 @@ class ExampleViewModel : ContainerHost<ExampleState, ExampleSideEffect> {

``` kotlin
perform("Toast the current state")
class ExampleViewModel : ContainerHost<ExampleState, ExampleSideEffect>, ViewModel() {
class Example : ContainerHost<ExampleState, ExampleSideEffect> {
override val container = container<ExampleState, ExampleSideEffect>(ExampleState()) {
onCreate()
}
Expand Down Expand Up @@ -302,15 +346,10 @@ done within particular `transform` blocks e.g. `transformSuspend`.
- `transform` and `transformX` calls execute in an `IO` thread so as not to
block the Orbit [Container](src/main/java/com/babylon/orbit2/Container.kt)
from accepting further events.
- Updates delivered via `Container.stateStream` and `Container.sideEffectStream`
come in on the same thread you call `Stream.connect` on. However the
connection to the stream has to be manually managed and cancelled. To make it
more convenient to consume a `Stream` we have created wrappers to turn it into
e.g. `LiveData`.

## Error handling

It is good practice to handle all of your errors within your flows. Orbit
does not provide any built-in exception handling because it cannot make
assumptions about how you respond to errors, avoiding putting your system in an
undefined state.
It is good practice to handle all of your errors within your flows.
Orbit does not provide any built-in exception handling because it cannot
make assumptions about how you respond to errors, avoiding putting your
system in an undefined state.
1 change: 1 addition & 0 deletions orbit-2-core/orbit-2-core_build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {

// Testing
testImplementation(project(":orbit-2-test"))
testImplementation(ProjectDependencies.kotlinCoroutinesTest)
GroupedDependencies.testsImplementation.forEach { testImplementation(it) }
testRuntimeOnly(ProjectDependencies.junitJupiterEngine)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object BaseDslPlugin : OrbitDslPlugin {
}
is Reduce -> flow.onEach { event ->
containerContext.withIdling(operator) {
containerContext.setState.send(
containerContext.setState(
createContext(event).block() as S
)
}
Expand Down
37 changes: 33 additions & 4 deletions orbit-2-core/src/main/java/com/babylon/orbit2/Container.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.babylon.orbit2

import com.babylon.orbit2.idling.IdlingResource
import com.babylon.orbit2.idling.NoopIdlingResource
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow

/**
* The heart of the Orbit MVI system. Represents an MVI container with its input and outputs.
Expand All @@ -33,17 +35,43 @@ interface Container<STATE : Any, SIDE_EFFECT : Any> {
*/
val currentState: STATE

/**
* A [Flow] of state updates. Emits the latest state upon subscription and serves only distinct
* values (through equality comparison).
*/
val stateFlow: Flow<STATE>

/**
* A [Flow] of one-off side effects posted from [Builder.sideEffect]. Caches side effects when there are no collectors.
* The size of the cache can be controlled via Container [Settings] and determines if and when the orbit thread suspends when you
* post a side effect. The default is unlimited. You don't have to touch this unless you are posting many side effects which could result in
* [OutOfMemoryError].
*
* This is designed to be collected by one observer only in order to ensure that side effect caching works in a predictable way.
* If your particular use case requires multi-casting use `broadcast` on this [Flow], but be aware that caching will not work for the
* resulting `BroadcastChannel`.
*/
val sideEffectFlow: Flow<SIDE_EFFECT>

/**
* A [Stream] of state updates. Emits the latest state upon subscription and serves only distinct
* values (only changed states are emitted) by default.
* Emissions come in on the main coroutine dispatcher if installed, with the default dispatcher as the fallback. However,
* the connection to the stream has to be manually managed and cancelled when appropriate.
*/
@Suppress("DEPRECATION")
@Deprecated("stateStream is deprecated and will be removed in Orbit 1.2.0, use stateFlow instead")
val stateStream: Stream<STATE>

/**
* A [Stream] of one-off side effects posted from [Builder.sideEffect].
* Depending on the [Settings] this container has been instantiated with, can support
* side effect caching when there are no listeners (default).
* Emissions come in on the main coroutine dispatcher if installed, with the default dispatcher as the fallback. However,
* the connection to the stream has to be manually managed and cancelled when appropriate.
*/
@Suppress("DEPRECATION")
@Deprecated("sideEffectStream is deprecated and will be removed in Orbit 1.2.0, use sideEffectFlow instead")
val sideEffectStream: Stream<SIDE_EFFECT>

/**
Expand All @@ -59,12 +87,13 @@ interface Container<STATE : Any, SIDE_EFFECT : Any> {
/**
* Represents additional settings to create the container with.
*
* @property sideEffectCaching When true the side effects are cached when there are no
* subscribers, to be emitted later upon first subscription.
* On by default.
* @property sideEffectBufferSize Defines how many side effects can be buffered before the container suspends. If you are
* sending many side effects and getting out of memory exceptions this can be turned down to suspend the container instead.
* Unlimited by default.
* @property idlingRegistry The registry used by the container for signalling idling for UI tests
*/
class Settings(
val sideEffectCaching: Boolean = true,
val sideEffectBufferSize: Int = Channel.UNLIMITED,
val idlingRegistry: IdlingResource = NoopIdlingResource()
)
}
52 changes: 52 additions & 0 deletions orbit-2-core/src/main/java/com/babylon/orbit2/FlowExtensions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2020 Babylon Partners Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.babylon.orbit2

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import java.io.Closeable
import kotlin.coroutines.EmptyCoroutineContext

@ExperimentalCoroutinesApi
@Suppress("DEPRECATION")
internal fun <T> Flow<T>.asStream(): Stream<T> {
return object : Stream<T> {
override fun observe(lambda: (T) -> Unit): Closeable {

val job = CoroutineScope(streamCollectionDispatcher).launch {
this@asStream.collect {
lambda(it)
}
}

return Closeable { job.cancel() }
}
}
}

private val streamCollectionDispatcher
get() = try {
Dispatchers.Main.also {
it.isDispatchNeeded(EmptyCoroutineContext) // Try to perform an operation on the dispatcher
}
} catch (ias: IllegalStateException) {
Dispatchers.Default
}
Loading

0 comments on commit 5acbb68

Please sign in to comment.