Kotlin multiplatform implementation of Reactive Extensions.
Should you have any questions or feedback welcome to the Kotlin Slack channel: #reaktive
Recommended minimum Gradle version is 5.3. Please read first the documentation about metadata publishing mode.
Add Bintray repository into your root build.gradle file:
repositories {
maven {
url "https://dl.bintray.com/badoo/maven"
}
}
There are a number of modules published:
reaktive
- the main Reaktive library (multiplatform)reaktive-annotations
- collection of annotations (mutiplatform)reaktive-testing
- testing utilities (multiplatform)utils
- some utilities likeClock
,AtomicReference
,Lock
, etc. (multiplatform)coroutines-interop
- Kotlin coroutines interoperability helpers (multiplatform)rxjava2-interop
- RxJava2 interoperability helpers (JVM and Android)rxjava3-interop
- RxJava3 interoperability helpers (JVM and Android)
Kotlin common (root publication):
implementation 'com.badoo.reaktive:<module-name>:<latest-version>'
JVM:
implementation 'com.badoo.reaktive:<module-name>-jvm:<latest-version>'
Android (debug and release):
implementation 'com.badoo.reaktive:<module-name>-android:<latest-version>'
iOS 32:
implementation 'com.badoo.reaktive:<module-name>-ios32:<latest-version>'
iOS 64:
implementation 'com.badoo.reaktive:<module-name>-ios64:<latest-version>'
iOS sim:
implementation 'com.badoo.reaktive:<module-name>-iossim:<latest-version>'
macOS x64:
implementation 'com.badoo.reaktive:<module-name>-macosx64:<latest-version>'
watchOS ARM32
implementation 'com.badoo.reaktive:<module-name>-watchosarm32:<latest-version>'
watchOS ARM64
implementation 'com.badoo.reaktive:<module-name>-watchosarm64:<latest-version>'
watchOS sim
implementation 'com.badoo.reaktive:<module-name>-watchossim:<latest-version>'
tvOS ARM64
implementation 'com.badoo.reaktive:<module-name>-tvosarm64:<latest-version>'
tvOS sim
implementation 'com.badoo.reaktive:<module-name>-tvossim:<latest-version>'
JavaScript:
implementation 'com.badoo.reaktive:<module-name>-js:<latest-version>'
Linux x64:
implementation 'com.badoo.reaktive:<module-name>-linuxx64:<latest-version>'
Linux ARM 32 hfp:
implementation 'com.badoo.reaktive:<module-name>-linuxarm32hfp:<latest-version>'
implementation 'com.badoo.reaktive:<module-name>:<latest-version>'
kotlin {
sourceSets {
commonMain {
dependencies {
implementation 'com.badoo.reaktive:reaktive:<latest-version>'
implementation 'com.badoo.reaktive:reaktive-annotations:<latest-version>'
implementation 'com.badoo.reaktive:coroutines-interop:<latest-version>'
}
}
commonTest {
dependencies {
implementation 'com.badoo.reaktive:reaktive-testing:<latest-version>'
}
}
}
}
- Multiplatform: JVM, Android, iOS, macOS, watchOS, tvOS, JavaScript, Linux X64, Linux ARM 32 hfp
- Schedulers support:
computationScheduler
- fixed thread pool equal to a number of coresioScheduler
- unbound thread pool with caching policynewThreadScheduler
- creates a new thread for each unit of worksingleScheduler
- executes tasks on a single shared background threadtrampolineScheduler
- queues tasks and executes them on one of the participating threadsmainScheduler
- executes tasks on main thread
- True multithreading for Kotlin/Native (there are some limitations)
- Thread local subscriptions without freezing for Kotlin/Native
- Supported sources:
Observable
,Maybe
,Single
,Completable
- Subjects:
PublishSubject
,BehaviorSubject
,ReplaySubject
,UnicastSubject
- Interoperability with Kotlin Coroutines: conversions between coroutines (including Flow) and Reaktive
- Interoperability with RxJava2 and RxJava3: conversion of sources between Reaktive and RxJava, ability to reuse RxJava's schedulers
Kotlin Native memory model and concurrency are very special. In general shared mutable state between threads is not allowed. Since Reaktive supports multithreading in Kotlin Native, please read the following documents before using it:
Object detachment is relatively difficult to achieve and is very error-prone when the objects are created from outside and are not fully managed by the library. This is why Reaktive prefers frozen state. Here are some hints:
- Any callback (and any captured objects) submitted to a Scheduler will be frozen
subscribeOn
freezes both its upstream source and downstream observer, all the Disposables (upstream's and downstream's) are frozen as well, all the values (including errors) are not frozen by the operatorobserveOn
freezes only its downstream observer and all the values (including errors) passed through it, plus all the Disposables, upstream source is not frozen by the operator- Other operators that use scheduler (like
debounce
,timer
,delay
, etc.) behave same asobserveOn
in most of the cases
Sometimes freezing is not acceptable, e.g. we might want to load some data in background and then update the UI. Obviously UI can not be frozen. With Reaktive it is possible to achieve such a behaviour in two ways:
Use threadLocal
operator:
val values = mutableListOf<Any>()
var isFinished = false
observable<Any> { emitter ->
// Background job
}
.subscribeOn(ioScheduler)
.observeOn(mainScheduler)
.threadLocal()
.doOnBeforeNext { values += it } // Callback is not frozen, we can updated the mutable list
.doOnBeforeFinally { isFinished = true } // Callback is not frozen, we can change the flag
.subscribe()
Set isThreadLocal
flag to true
in subscribe
operator:
val values = mutableListOf<Any>()
var isComplete = false
observable<Any> { emitter ->
// Background job
}
.subscribeOn(ioScheduler)
.observeOn(mainScheduler)
.subscribe(
isThreadLocal = true,
onNext = { values += it }, // Callback is not frozen, we can updated the mutable list
onComplete = { isComplete = true } // Callback is not frozen, we can change the flag
)
In both cases subscription (subscribe
call) must be performed on the Main thread.
This functionality is provided by the coroutines-interop
module which is published in two versions:
coroutines-interop:<version>
is based on stablekotlinx.coroutines
coroutines-interop:<version>-nmtc
is based on work-in-progress multi-threadedkotlinx.coroutines
There are few important limitations:
- Neither
Job
norCoroutineContext
can be frozen (until release of the multi-threaded coroutines). - Because of the first limitation all
xxxFromCoroutine {}
builders andFlow.asObservable()
converter are executed insiderunBlocking
block in Kotlin/Native and should be subscribed on a backgroundScheduler
. - Ktor does not work well in multithreaded environment in Kotlin/Native (it may crash), so please don't mix Ktor and "stable"
coroutines-interop
.
Consider the following example for corutines-interop
:
singleFromCoroutine {
/*
* This block will be executed inside `runBlocking` in Kotlin/Native.
* Please avoid using Ktor here, it may crash.
*/
}
.subscribeOn(ioScheduler)
.observeOn(mainScheduler)
.subscribe { /* Get the result here */ }
We recommend to avoid using Ktor in Kotlin/Native multithreaded environment until multithreaded coroutines, but if you really need consider the following function:
fun <T> singleFromCoroutineUnsafe(mainContext: CoroutineContext, block: suspend CoroutineScope.() -> T): Single<T> =
single { emitter ->
GlobalScope
.launch(mainContext) {
try {
emitter.onSuccess(block())
} catch (e: Throwable) {
emitter.onError(e)
}
}
.asDisposable()
.also(emitter::setDisposable)
}
Now you can use this function together with Ktor but make sure you are doing this always on Main thread, neither subscribeOn
nor observeOn
nor any other thread switch are allowed.
The multi-threaded kotlinx.coroutines
variant lifts some unpleasant restrictions:
- Both
Job
andCoroutineContext
can be frozen.
So there is one crucial difference:
- All
xxxFromCoroutine {}
builders andFlow.asObservable()
converter are executed asynchronously in all targets (including Kotlin/Native), so can be subscribed on any scheduler.
Limitations:
- Because multi-threaded coroutines are work-in-progress, there are possible issues.
- Ktor can be used out of the box, but still can not be frozen, so main thread only.
Converters Scheduler.asCoroutineDispatcher()
and CoroutineContext.asScheduler()
are available only in JVM and JS currently.
Reaktive provides an easy way to manage subscriptions: DisposableScope.
Take a look at the following examples:
val scope =
disposableScope {
observable.subscribeScoped(...) // Subscription will be disposed when the scope is disposed
doOnDispose {
// Will be called when the scope is disposed
}
someDisposable.scope() // `someDisposable` will be disposed when the scope is disposed
}
// At some point later
scope.dispose()
class MyPresenter(
private val view: MyView,
private val longRunningAction: Completable
) : DisposableScope by DisposableScope() {
init {
doOnDispose {
// Will be called when the presenter is disposed
}
}
fun load() {
view.showProgressBar()
// Subscription will be disposed when the presenter is disposed
longRunningAction.subscribeScoped(onComplete = view::hideProgressBar)
}
}
class MyActivity : AppCompatActivity(), DisposableScope by DisposableScope() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
MyPresenter(...).scope()
}
override fun onDestroy() {
dispose()
super.onDestroy()
}
}
Unlike coroutines Reaktive streams can be directly used from Swift. But since generics for interfaces are not exported to Swift, Reaktive provides a workaround.
You can wrap any Reaktive stream into a Wrapper
class:
class SharedDataSource {
fun load(): SingleWrapper<String> =
singleFromFunction {
// A long running operation
"A result"
}
.subscribeOn(ioScheduler)
.observeOn(mainScheduler)
.wrap()
}
Now if you enable Objective-C generics you will be able to use it from Swift:
let dataSource = SharedDataSource()
let disposable = dataSource
.load()
.subscribe(isThreadLocal: false, onSubscribe: nil, onError: nil) { (value: NSString) in print(value) }
// At some point later
disposable.dispose()
Wrappers
are available for Observable
, Single
, Maybe
and Completable
.