diff --git a/kotlin-fit-performer/src/main/kotlin/com/couchbase/client/performer/kotlin/KotlinSdkCommandExecutor.kt b/kotlin-fit-performer/src/main/kotlin/com/couchbase/client/performer/kotlin/KotlinSdkCommandExecutor.kt index 511cbc7d5..ed9de00a9 100644 --- a/kotlin-fit-performer/src/main/kotlin/com/couchbase/client/performer/kotlin/KotlinSdkCommandExecutor.kt +++ b/kotlin-fit-performer/src/main/kotlin/com/couchbase/client/performer/kotlin/KotlinSdkCommandExecutor.kt @@ -292,9 +292,9 @@ class KotlinSdkCommandExecutor( perRun, request.streamConfig.streamId, request.streamConfig, - ) { documentOrId -> - processScanResult(request, documentOrId) - } + { documentOrId -> processScanResult(request, documentOrId) }, + { throwable -> convertException(throwable) } + ) perRun.streamerOwner().addAndStart(streamer) result.setStream( diff --git a/kotlin-fit-performer/src/main/kotlin/com/couchbase/stream/FluxStreamer.java b/kotlin-fit-performer/src/main/kotlin/com/couchbase/stream/FluxStreamer.java index 1f5ac37d5..842f247ff 100644 --- a/kotlin-fit-performer/src/main/kotlin/com/couchbase/stream/FluxStreamer.java +++ b/kotlin-fit-performer/src/main/kotlin/com/couchbase/stream/FluxStreamer.java @@ -21,8 +21,13 @@ public class FluxStreamer extends Streamer { private final Flux results; private final AtomicReference subscriberRef = new AtomicReference<>(); - public FluxStreamer(Flux results, PerRun perRun, String streamId, Config streamConfig, Function convert) { - super(perRun, streamId, streamConfig, convert); + public FluxStreamer(Flux results, + PerRun perRun, + String streamId, + Config streamConfig, + Function convertResult, + Function convertException) { + super(perRun, streamId, streamConfig, convertResult, convertException); this.results = results; } @@ -35,7 +40,7 @@ public boolean isCreated() { public void run() { AtomicBoolean done = new AtomicBoolean(false); - BaseSubscriber subscriber = new BaseSubscriber() { + BaseSubscriber subscriber = new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription subscription) { if (streamConfig.hasAutomatically()) { @@ -49,7 +54,7 @@ protected void hookOnSubscribe(Subscription subscription) { protected void hookOnNext(T value) { logger.info("Flux streamer {} sending one", streamId); - perRun.resultsStream().enqueue(convert.apply(value)); + perRun.resultsStream().enqueue(convertResult.apply(value)); streamed.incrementAndGet(); } @@ -79,8 +84,9 @@ protected void hookOnError(Throwable throwable) { perRun.resultsStream().enqueue(Result.newBuilder() .setStream(com.couchbase.client.protocol.streams.Signal.newBuilder() - // todo convert error - .setError(com.couchbase.client.protocol.streams.Error.newBuilder().setStreamId(streamId))) + .setError(com.couchbase.client.protocol.streams.Error.newBuilder() + .setException(convertException.apply(throwable)) + .setStreamId(streamId))) .build()); }