Skip to content

Commit

Permalink
FIT testing: Convert stream exceptions
Browse files Browse the repository at this point in the history
Change-Id: Id98ab4f0a6f8f8b1ea8f666a62a5d44e3342bfc0
Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/183717
Tested-by: David Nault <david.nault@couchbase.com>
Reviewed-by: Graham Pople <graham.pople@couchbase.com>
  • Loading branch information
dnault committed Dec 5, 2022
1 parent a6b94d0 commit 73674c1
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ class KotlinSdkCommandExecutor(
perRun,
request.streamConfig.streamId,
request.streamConfig,
) { documentOrId ->
processScanResult(request, documentOrId)
}
{ documentOrId -> processScanResult(request, documentOrId) },
{ throwable -> convertException(throwable) }
)

perRun.streamerOwner().addAndStart(streamer)
result.setStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ public class FluxStreamer<T> extends Streamer<T> {
private final Flux<T> results;
private final AtomicReference<BaseSubscriber> subscriberRef = new AtomicReference<>();

public FluxStreamer(Flux<T> results, PerRun perRun, String streamId, Config streamConfig, Function<T, Result> convert) {
super(perRun, streamId, streamConfig, convert);
public FluxStreamer(Flux<T> results,
PerRun perRun,
String streamId,
Config streamConfig,
Function<T, Result> convertResult,
Function<Throwable, com.couchbase.client.protocol.shared.Exception> convertException) {
super(perRun, streamId, streamConfig, convertResult, convertException);
this.results = results;
}

Expand All @@ -35,7 +40,7 @@ public boolean isCreated() {
public void run() {
AtomicBoolean done = new AtomicBoolean(false);

BaseSubscriber<T> subscriber = new BaseSubscriber<T>() {
BaseSubscriber subscriber = new BaseSubscriber<T>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
if (streamConfig.hasAutomatically()) {
Expand All @@ -49,7 +54,7 @@ protected void hookOnSubscribe(Subscription subscription) {
protected void hookOnNext(T value) {
logger.info("Flux streamer {} sending one", streamId);

perRun.resultsStream().enqueue(convert.apply(value));
perRun.resultsStream().enqueue(convertResult.apply(value));
streamed.incrementAndGet();
}

Expand Down Expand Up @@ -79,8 +84,9 @@ protected void hookOnError(Throwable throwable) {

perRun.resultsStream().enqueue(Result.newBuilder()
.setStream(com.couchbase.client.protocol.streams.Signal.newBuilder()
// todo convert error
.setError(com.couchbase.client.protocol.streams.Error.newBuilder().setStreamId(streamId)))
.setError(com.couchbase.client.protocol.streams.Error.newBuilder()
.setException(convertException.apply(throwable))
.setStreamId(streamId)))
.build());
}

Expand Down

0 comments on commit 73674c1

Please sign in to comment.