diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java index 543a26bd8..03cf5d41d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java @@ -199,7 +199,7 @@ public CompletionStage runAsync(Query query, TransactionConfig con public CompletionStage runRx( Query query, TransactionConfig config, CompletionStage cursorPublishStage) { ensureSessionIsOpen(); - return ensureNoOpenTxBeforeRunningQuery() + var newResultCursorStage = ensureNoOpenTxBeforeRunningQuery() .thenCompose(ignore -> acquireConnection(mode)) .thenCompose(connection -> { var parameters = query.parameters().asMap(Values::value); @@ -244,9 +244,12 @@ public CompletionStage runRx( } }) .thenCompose(Function.identity()); - resultCursorStage = cursorStage.exceptionally(error -> null); return cursorStage.thenApply(Function.identity()); }); + resultCursorStage = newResultCursorStage + .thenCompose(cursor -> cursor == null ? CompletableFuture.completedFuture(null) : cursorPublishStage) + .exceptionally(throwable -> null); + return newResultCursorStage; } public CompletionStage beginTransactionAsync( diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java index fe47ebc43..2474c3f40 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java @@ -288,11 +288,18 @@ public CompletionStage summaryAsync() { @Override public CompletionStage rollback() { - log.trace("[%d] Rolling back unpublished result", hashCode()); synchronized (this) { - state = State.SUCCEEDED; + log.trace("[%d] Rolling back unpublished result %s state", hashCode(), state); + switch (state) { + case READY -> state = State.SUCCEEDED; + case STREAMING, DISCARDING -> { + return summaryFuture.thenApply(ignored -> null); + } + case FAILED, SUCCEEDED -> { + return CompletableFuture.completedFuture(null); + } + } } - completeSummaryFuture(null, null); var resetFuture = new CompletableFuture(); boltConnection .reset() @@ -319,14 +326,17 @@ public void onComplete() { resetFuture.completeExceptionally(throwable); } }); - return resetFuture.thenCompose(ignored -> boltConnection.close()).exceptionally(throwable -> null); + return resetFuture + .thenCompose(ignored -> boltConnection.close()) + .whenComplete((ignored, throwable) -> completeSummaryFuture(null, null)) + .exceptionally(throwable -> null); } @Override public void onComplete() { - log.trace("[%d] onComplete", hashCode()); Runnable runnable; synchronized (this) { + log.trace("[%d] onComplete", hashCode()); var throwable = interruptSupplier.get(); if (throwable != null) { handleError(throwable, true);