Skip to content

Commit

Permalink
FIT performers - handle errors on stream in FluxStreamer
Browse files Browse the repository at this point in the history
A previous fix converts stream errors correctly for
IteratorBasedStreamer, but missed FluxStreamer.

Change-Id: I9779a487877ad289c06d551c26c740a024da6b53
Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/183711
Tested-by: Build Bot <build@couchbase.com>
Tested-by: <matthew.bray@couchbase.com>
Reviewed-by: <matthew.bray@couchbase.com>
  • Loading branch information
programmatix committed Dec 5, 2022
1 parent 1a3f728 commit da53970
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
public abstract class IteratorBasedStreamer<T> extends Streamer<T> {
protected volatile int demanded = 0;
protected volatile boolean cancelled = false;
protected final Function<Throwable, com.couchbase.client.protocol.shared.Exception> convertException;

public IteratorBasedStreamer(PerRun perRun,
String streamId,
Config streamConfig,
Function<T, Result> convertResult,
Function<Throwable, com.couchbase.client.protocol.shared.Exception> convertException) {
super(perRun, streamId, streamConfig, convertResult);
this.convertException = convertException;
super(perRun, streamId, streamConfig, convertResult, convertException);
}

protected abstract T next();
Expand All @@ -30,7 +28,7 @@ public IteratorBasedStreamer(PerRun perRun,

private void enqueueNext() {
T next = next();
Result result = convert.apply(next);
Result result = convertResult.apply(next);
perRun.resultsStream().enqueue(result);
int streamedNow = streamed.incrementAndGet();
logger.info("Streamer {} got and enqueued an item, has sent {}", streamId, streamedNow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ abstract public class Streamer<T> extends Thread {
protected final PerRun perRun;
protected final String streamId;
protected final Config streamConfig;
protected final Function<T, Result> convert;
protected final Function<T, Result> convertResult;
protected final Function<Throwable, com.couchbase.client.protocol.shared.Exception> convertException;
protected final AtomicInteger streamed = new AtomicInteger(0);

public Streamer(PerRun perRun,
String streamId,
Config streamConfig,
Function<T, Result> convert) {
Function<T, Result> convertResult,
Function<Throwable, com.couchbase.client.protocol.shared.Exception> convertException) {
this.perRun = perRun;
this.streamId = streamId;
this.streamConfig = streamConfig;
this.convert = convert;
this.convertResult = convertResult;
this.convertException = convertException;
}

public String streamId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ private Mono<Result> performOperationReactive(com.couchbase.client.protocol.sdk.
else results = collection.scan(scanType);
result.setElapsedNanos(System.nanoTime() - start);
var streamer = new FluxStreamer<ScanResult>(results, perRun, request.getStreamConfig().getStreamId(), request.getStreamConfig(),
(ScanResult r) -> {
return processScanResult(request, r);
});
(ScanResult r) -> processScanResult(request, r),
(Throwable err) -> convertException(err));
perRun.streamerOwner().addAndStart(streamer);
result.setStream(com.couchbase.client.protocol.streams.Signal.newBuilder()
.setCreated(com.couchbase.client.protocol.streams.Created.newBuilder()
Expand All @@ -176,4 +175,4 @@ private Mono<Result> performOperationReactive(com.couchbase.client.protocol.sdk.
protected Exception convertException(Throwable raw) {
return convertExceptionShared(raw);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ public class FluxStreamer<T> extends Streamer<T> {
private final Flux<T> results;
private final AtomicReference<BaseSubscriber> subscriberRef = new AtomicReference<>();

public FluxStreamer(Flux<T> results, PerRun perRun, String streamId, Config streamConfig, Function<T, Result> convert) {
super(perRun, streamId, streamConfig, convert);
public FluxStreamer(Flux<T> results,
PerRun perRun,
String streamId,
Config streamConfig,
Function<T, Result> convertResult,
Function<Throwable, com.couchbase.client.protocol.shared.Exception> convertException) {
super(perRun, streamId, streamConfig, convertResult, convertException);
this.results = results;
}

Expand All @@ -49,7 +54,7 @@ protected void hookOnSubscribe(Subscription subscription) {
protected void hookOnNext(T value) {
logger.info("Flux streamer {} sending one", streamId);

perRun.resultsStream().enqueue(convert.apply(value));
perRun.resultsStream().enqueue(convertResult.apply(value));
streamed.incrementAndGet();
}

Expand Down Expand Up @@ -79,8 +84,9 @@ protected void hookOnError(Throwable throwable) {

perRun.resultsStream().enqueue(Result.newBuilder()
.setStream(com.couchbase.client.protocol.streams.Signal.newBuilder()
// todo convert error
.setError(com.couchbase.client.protocol.streams.Error.newBuilder().setStreamId(streamId)))
.setError(com.couchbase.client.protocol.streams.Error.newBuilder()
.setException(convertException.apply(throwable))
.setStreamId(streamId)))
.build());
}

Expand Down

0 comments on commit da53970

Please sign in to comment.