Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 1.3.0-M1 #1258

Merged
merged 59 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
6e81083
Expand only package.json file during prepareNodePackage, otherwise fi…
qwwdfsad Apr 26, 2019
f8eac76
Rename transform parameters for consistency with stdlib
elizarov Apr 27, 2019
e569bd3
Fix exception types for channels to ensure transparency & reporting (…
elizarov Apr 30, 2019
5ea1bed
Rename TimedRunnable to TimedRunnableObsolete in obsolete kotlinx.cor…
qwwdfsad Apr 30, 2019
6e3faa7
Fixed spurious exception during select.onJoin clause registration
elizarov Apr 24, 2019
9614536
Fixes kotlin-coroutines-test TestCoroutineDispatcher docs
elizarov Apr 30, 2019
b552d2c
Merge remote-tracking branch 'origin/master' into develop
elizarov May 6, 2019
b81f5f0
Knit: fix handling of backquote when making anchor reference
elizarov May 6, 2019
485eab1
Ensure that there are no references to atomicfu in classes
elizarov May 2, 2019
a2499a3
Fixed Dispatcher docs
AOrobator Apr 27, 2019
725addf
Flow documentation improvements
qwwdfsad May 13, 2019
71fa64f
Use downloaded package list for dokka (and knit, transitively) to avo…
qwwdfsad May 15, 2019
c022ab6
Make withContext cancellable on return (instead of atomically cancell…
qwwdfsad May 14, 2019
7af7ede
Add overload with long for CoroutinesTimeout.seconds
qwwdfsad May 13, 2019
b7e93f0
Update Kotlin to 1.3.31
qwwdfsad Apr 29, 2019
641d671
Flow performance improvements: mark crucial Flow DSL (unsafeFlow, col…
qwwdfsad Apr 29, 2019
2596414
Add flowOf(value), use unsafeFlow in trivial flow builders
qwwdfsad Apr 29, 2019
a9f8c0d
Make Flow.fold inlineable
qwwdfsad Apr 30, 2019
c42d338
Reactive scrabble benchmarks
qwwdfsad Apr 30, 2019
c438a0e
Numbers benchmark that exposes some of Flow weaknesses
qwwdfsad Apr 30, 2019
78f3e23
Safe flow benchmark
qwwdfsad May 16, 2019
218dc97
Make ChannelIterator.next non-suspending
qwwdfsad May 16, 2019
e35637a
Fix overflow bug in Flow.drop
May 20, 2019
3fe7bd2
Update docs based on feedback @ I/O
objcode May 16, 2019
da2cf23
spelling
digitalbuddha May 22, 2019
d811d3a
Do not fill in stacktrace in AbortFlowException, update benchmark res…
qwwdfsad May 23, 2019
5627834
Add combineLatest with multiple flow parameters
qwwdfsad May 16, 2019
44e625d
Nested publication validator project to test actual dependencies on k…
qwwdfsad May 24, 2019
1b590e8
Promote ReceiveChannel.consumeEach and ReceiveChannel.consume to expe…
qwwdfsad May 24, 2019
b0f6e05
Improve coroutine exception handling logic
qwwdfsad May 17, 2019
8fbd8b7
Recursively check whether parent handles exception to avoid duplicate…
qwwdfsad May 24, 2019
997d2f2
Ignore cancellation cause thrown from the upstream suspension point i…
qwwdfsad May 24, 2019
f939617
Merge pull request #1161 from Kotlin/flow-performance-improvements
qwwdfsad May 24, 2019
b08d61c
New flow builder: channelFlow (and its alias callbackFlow) and supple…
qwwdfsad May 27, 2019
15ee8a3
SafeCollector rework (#1196)
qwwdfsad May 29, 2019
9b05908
Consolidate NullSurrogate with Symbol and rename it to NULL
elizarov May 23, 2019
c81dc91
Documentation improvements (#1229)
qwwdfsad May 31, 2019
b7e1499
Amortize the cost of coroutine dispatch using message queue in all JS…
qwwdfsad Mar 1, 2019
a3f150e
Use identity hash code on K/N for debug strings
qwwdfsad May 31, 2019
46e41f2
Use nanosleep in runBlocking's delay on Native (#1228)
qwwdfsad May 31, 2019
b9b7d82
Enable R8 optimization of Dispatchers.Main loading
wojtek-kalicinski May 30, 2019
8f6c03a
Fix compilation on 32-bit platforms
qwwdfsad Jun 2, 2019
daf8502
Disable binary compatibility tests in snapshot trains
qwwdfsad Jun 3, 2019
f44942a
Deprecate flowWith operator
qwwdfsad Jun 2, 2019
b73ebdc
Adjust behavior of conflated channel to deliver last value
elizarov May 31, 2019
b77a80c
Flow: decouple buffer size from various operators and fuse
elizarov May 29, 2019
3971df3
Rename flow ChannelFlow.kt file to Channels.kt
elizarov Jun 3, 2019
db52e97
Flow.conflate operator
elizarov Jun 4, 2019
e2a5671
Flow scope (#1227)
qwwdfsad Jun 5, 2019
d15d8d6
Make FastServiceLoader compatible with Java 1.6
qwwdfsad Jun 6, 2019
d5478b6
More operators (#1236)
qwwdfsad Jun 6, 2019
15c7d0f
Mark Flow declarations as experimental
qwwdfsad Jun 6, 2019
aa3d1ae
Deprecate Channel operators
qwwdfsad Jun 6, 2019
253e8eb
Add fast `Semaphore`.
ndkoval Jun 4, 2019
d90eb26
atomicfu version 0.12.8
elizarov Jun 6, 2019
3216825
Use real semaphore in flatten/flatMapMerge
elizarov Jun 6, 2019
18e3a4a
Mark Flow.collect as internal to prevent its direct implementation an…
qwwdfsad Jun 6, 2019
03b9bd1
Merge branch 'master' into develop
qwwdfsad Jun 6, 2019
6139ed3
Version 1.3.0-M1
qwwdfsad Jun 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,43 @@
# Change log for kotlinx.coroutines

## Version 1.3.0-M1

Flow:
* Core `Flow` interfaces and operators are graduated from preview status to experimental.
* Context preservation invariant rework (#1210).
* `channelFlow` and `callbackFlow` replacements for `flowViaChannel` for concurrent flows or callback-based APIs.
* `flow` prohibits emissions from non-scoped coroutines by default and recommends to use `channelFlow` instead to avoid most of the concurrency-related bugs.
* Flow cannot be implemented directly
* `AbstractFlow` is introduced for extension (e.g. for managing state) and ensures all context preservation invariants.
* Buffer size is decoupled from all operators that imply channel usage (#1233)
* `buffer` operator can be used to adjust buffer size of any buffer-dependent operator (e.g. `channelFlow`, `flowOn` and `flatMapMerge`).
* `conflate` operator is introduced.
* Flow performance is significantly improved.
* New operators: `scan`, `scanReduce`, `first`, `emitAll`.
* `flowWith` and `flowViaChannel` are deprecated.
* `retry` ignores cancellation exceptions from upstream when the flow was externally cancelled (#1122).
* `combineLatest` overloads for multiple flows (#1193).
* Fixed numerical overflow in `drop` operator.

Channels:
* `consumeEach` is promoted to experimental API (#1080).
* Conflated channels always deliver the latest value after closing (#332, #1235).
* Non-suspending `ChannelIterator.next` to improve iteration performance (#1162).
* Channel exception types are consistent with `produce` and are no longer swallowed as cancellation exceptions in case of programmatic errors (#957, #1128).
* All operators on channels (that were prone to coroutine leaks) are deprecated in the favor of `Flow`.

General changes:
* Kotlin updated to 1.3.31
* `Semaphore` implementation (#1088)
* Loading of `Dispatchers.Main` is tweaked so the latest version of R8 can completely remove I/O when loading it (#1231).
* Performace of all JS dispatchers is significantly improved (#820).
* `withContext` checks cancellation status on exit to make reasoning about sequential concurrent code easier (#1177).
* Consistent exception handling mechanism for complex hierarchies (#689).
* Convenient overload for `CoroutinesTimeout.seconds` (#1184).
* Fix cancellation bug in onJoin (#1130).
* Prevent internal names clash that caused errors for ProGuard (#1159).
* POSIX's `nanosleep` as `delay` in `runBlocking ` in K/N (#1225).

## Version 1.2.1

Major:
Expand Down
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

[![official JetBrains project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub)
[![GitHub license](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0)
[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.2.1) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.2.1)
[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.0-M1) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.0-M1)

Library support for Kotlin coroutines with [multiplatform](#multiplatform) support.
This is a companion version for Kotlin `1.3.30` release.
This is a companion version for Kotlin `1.3.31` release.

```kotlin
suspend fun main() = coroutineScope {
Expand Down Expand Up @@ -81,15 +81,15 @@ Add dependencies (you can also add other modules that you need):
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<version>1.2.1</version>
<version>1.3.0-M1</version>
</dependency>
```

And make sure that you use the latest Kotlin version:

```xml
<properties>
<kotlin.version>1.3.30</kotlin.version>
<kotlin.version>1.3.31</kotlin.version>
</properties>
```

Expand All @@ -99,15 +99,15 @@ Add dependencies (you can also add other modules that you need):

```groovy
dependencies {
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-M1'
}
```

And make sure that you use the latest Kotlin version:

```groovy
buildscript {
ext.kotlin_version = '1.3.30'
ext.kotlin_version = '1.3.31'
}
```

Expand All @@ -125,15 +125,15 @@ Add dependencies (you can also add other modules that you need):

```groovy
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-M1")
}
```

And make sure that you use the latest Kotlin version:

```groovy
plugins {
kotlin("jvm") version "1.3.30"
kotlin("jvm") version "1.3.31"
}
```

Expand All @@ -144,7 +144,7 @@ Make sure that you have either `jcenter()` or `mavenCentral()` in the list of re
Core modules of `kotlinx.coroutines` are also available for
[Kotlin/JS](#js) and [Kotlin/Native](#native).
In common code that should get compiled for different platforms, add dependency to
[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.2.1/jar)
[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.0-M1/jar)
(follow the link to get the dependency declaration snippet).

### Android
Expand All @@ -153,7 +153,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android)
module as dependency when using `kotlinx.coroutines` on Android:

```groovy
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.2.1'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-M1'
```

This gives you access to Android [Dispatchers.Main](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-android/kotlinx.coroutines.android/kotlinx.coroutines.-dispatchers/index.html)
Expand All @@ -172,15 +172,15 @@ R8 is a replacement for ProGuard in Android ecosystem, it is enabled by default
### JS

[Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as
[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.2.1/jar)
[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.0-M1/jar)
(follow the link to get the dependency declaration snippet).

You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM.

### Native

[Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as
[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.2.1/jar)
[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.0-M1/jar)
(follow the link to get the dependency declaration snippet).

Only single-threaded code (JS-style) on Kotlin/Native is currently supported.
Expand Down
49 changes: 45 additions & 4 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
sourceCompatibility = 1.8
targetCompatibility = 1.8

apply plugin: "net.ltgt.apt"
apply plugin: "com.github.johnrengelman.shadow"
Expand All @@ -10,15 +12,50 @@ repositories {
maven { url "https://repo.typesafe.com/typesafe/releases/" }
}

jmh.jmhVersion = '1.21'
compileJmhKotlin {
kotlinOptions {
jvmTarget = "1.8"
freeCompilerArgs += ['-Xjvm-default=enable']
}
}

/*
* Due to a bug in the inliner it sometimes does not remove inlined symbols (that are later renamed) from unused code paths,
* and it breaks JMH that tries to post-process these symbols and fails because they are renamed.
*/
task removeRedundantFiles(type: Delete) {
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$2\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$2\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$2\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$1\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$2\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble//SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class"

// Primes
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class"
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class"
}

jmhRunBytecodeGenerator.dependsOn(removeRedundantFiles)

// It is better to use the following to run benchmarks, otherwise you may get unexpected errors:
// ../gradlew --no-daemon cleanJmhJar jmh
// ./gradlew --no-daemon cleanJmhJar jmh -Pjmh="MyBenchmark"
jmh {
jmhVersion = '1.21'
duplicateClassesStrategy DuplicatesStrategy.INCLUDE
failOnError = true
resultFormat = 'CSV'
// include = ['.*ChannelProducerConsumer.*']
if (project.hasProperty('jmh')) {
include = ".*" + project.jmh + ".*"
}
// includeTests = false
}

jmhJar {
Expand All @@ -29,8 +66,12 @@ jmhJar {
}

dependencies {
compile "org.openjdk.jmh:jmh-core:1.21"
compile "io.projectreactor:reactor-core:$reactor_vesion"
compile 'io.reactivex.rxjava2:rxjava:2.1.9'
compile "com.github.akarnokd:rxjava2-extensions:0.20.8"

compile "org.openjdk.jmh:jmh-core:1.21"
compile 'com.typesafe.akka:akka-actor_2.12:2.5.0'
compile project(':kotlinx-coroutines-core')
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.flow.scrabble;

import benchmarks.flow.scrabble.IterableSpliterator;
import benchmarks.flow.scrabble.ShakespearePlaysScrabble;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.functions.Function;
import org.openjdk.jmh.annotations.*;

import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;

/**
* Shakespeare plays Scrabble with RxJava 2 Flowable.
* @author José
* @author akarnokd
*/
@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
public class RxJava2PlaysScrabble extends ShakespearePlaysScrabble {

@Benchmark
@Override
public List<Entry<Integer, List<String>>> play() throws Exception {

// Function to compute the score of a given word
Function<Integer, Flowable<Integer>> scoreOfALetter = letter -> Flowable.just(letterScores[letter - 'a']) ;

// score of the same letters in a word
Function<Entry<Integer, LongWrapper>, Flowable<Integer>> letterScore =
entry ->
Flowable.just(
letterScores[entry.getKey() - 'a'] *
Integer.min(
(int)entry.getValue().get(),
scrabbleAvailableLetters[entry.getKey() - 'a']
)
) ;

Function<String, Flowable<Integer>> toIntegerFlowable =
string -> Flowable.fromIterable(IterableSpliterator.of(string.chars().boxed().spliterator())) ;

// Histogram of the letters in a given word
Function<String, Single<HashMap<Integer, LongWrapper>>> histoOfLetters =
word -> toIntegerFlowable.apply(word)
.collect(
() -> new HashMap<>(),
(HashMap<Integer, LongWrapper> map, Integer value) ->
{
LongWrapper newValue = map.get(value) ;
if (newValue == null) {
newValue = () -> 0L ;
}
map.put(value, newValue.incAndSet()) ;
}

) ;

// number of blanks for a given letter
Function<Entry<Integer, LongWrapper>, Flowable<Long>> blank =
entry ->
Flowable.just(
Long.max(
0L,
entry.getValue().get() -
scrabbleAvailableLetters[entry.getKey() - 'a']
)
) ;

// number of blanks for a given word
Function<String, Maybe<Long>> nBlanks =
word -> histoOfLetters.apply(word)
.flatMapPublisher(map -> Flowable.fromIterable(() -> map.entrySet().iterator()))
.flatMap(blank)
.reduce(Long::sum) ;


// can a word be written with 2 blanks?
Function<String, Maybe<Boolean>> checkBlanks =
word -> nBlanks.apply(word)
.flatMap(l -> Maybe.just(l <= 2L)) ;

// score taking blanks into account letterScore1
Function<String, Maybe<Integer>> score2 =
word -> histoOfLetters.apply(word)
.flatMapPublisher(map -> Flowable.fromIterable(() -> map.entrySet().iterator()))
.flatMap(letterScore)
.reduce(Integer::sum) ;

// Placing the word on the board
// Building the streams of first and last letters
Function<String, Flowable<Integer>> first3 =
word -> Flowable.fromIterable(IterableSpliterator.of(word.chars().boxed().limit(3).spliterator())) ;
Function<String, Flowable<Integer>> last3 =
word -> Flowable.fromIterable(IterableSpliterator.of(word.chars().boxed().skip(3).spliterator())) ;


// Stream to be maxed
Function<String, Flowable<Integer>> toBeMaxed =
word -> Flowable.just(first3.apply(word), last3.apply(word))
.flatMap(observable -> observable) ;

// Bonus for double letter
Function<String, Maybe<Integer>> bonusForDoubleLetter =
word -> toBeMaxed.apply(word)
.flatMap(scoreOfALetter)
.reduce(Integer::max) ;

// score of the word put on the board
Function<String, Maybe<Integer>> score3 =
word ->
Maybe.merge(Arrays.asList(
score2.apply(word),
score2.apply(word),
bonusForDoubleLetter.apply(word),
bonusForDoubleLetter.apply(word),
Maybe.just(word.length() == 7 ? 50 : 0)
)
)
.reduce(Integer::sum) ;

Function<Function<String, Maybe<Integer>>, Single<TreeMap<Integer, List<String>>>> buildHistoOnScore =
score -> Flowable.fromIterable(() -> shakespeareWords.iterator())
.filter(scrabbleWords::contains)
.filter(word -> checkBlanks.apply(word).blockingGet())
.collect(
() -> new TreeMap<>(Comparator.reverseOrder()),
(TreeMap<Integer, List<String>> map, String word) -> {
Integer key = score.apply(word).blockingGet() ;
List<String> list = map.get(key) ;
if (list == null) {
list = new ArrayList<>() ;
map.put(key, list) ;
}
list.add(word) ;
}
) ;

// best key / value pairs
List<Entry<Integer, List<String>>> finalList2 =
buildHistoOnScore.apply(score3)
.flatMapPublisher(map -> Flowable.fromIterable(() -> map.entrySet().iterator()))
.take(3)
.collect(
() -> new ArrayList<Entry<Integer, List<String>>>(),
(list, entry) -> {
list.add(entry) ;
}
)
.blockingGet() ;
return finalList2 ;
}
}
Loading