From b2a3ac4c36e3ae45c06c90fcd10c91945acf1c4a Mon Sep 17 00:00:00 2001 From: weizeyuan Date: Mon, 3 Jun 2024 16:28:41 +0800 Subject: [PATCH] fix response close for getRowsUpdated(#1538) --- .../clickhouse/r2dbc/ClickHouseResult.java | 23 +++++++++--------- .../clickhouse/r2dbc/ClickHouseResult091.java | 24 +++++++++---------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult.java index 8601f1236..f12d6fcf4 100644 --- a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult.java +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult.java @@ -20,28 +20,24 @@ public class ClickHouseResult implements Result { private static final Logger log = LoggerFactory.getLogger(ClickHouseResult.class); - private final Flux rowSegments; - private final Mono updatedCount; private final Flux segments; ClickHouseResult(ClickHouseResponse response) { - this.rowSegments = Mono.just(response) + Flux rowSegments = Mono.just(response) .flatMapMany(resp -> Flux .fromStream(StreamSupport.stream(resp.records().spliterator(), false) .map(rec -> ClickHousePair.of(resp.getColumns(), rec)))) .map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft())) .map(RowSegment::new); - this.updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary) + Mono updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary) .map(ClickHouseResponseSummary::getProgress) .map(ClickHouseResponseSummary.Progress::getWrittenRows) .map(UpdateCount::new); - this.segments = Flux.concat(this.updatedCount, this.rowSegments); + this.segments = Flux.concat(updatedCount, rowSegments).doOnComplete(response::close); } - ClickHouseResult(Flux rowSegments, Mono updatedCount) { - this.rowSegments = rowSegments; - this.updatedCount = updatedCount; - this.segments = Flux.concat(this.updatedCount, this.rowSegments); + ClickHouseResult(Flux rowSegments) { + this.segments = rowSegments; } /** @@ -51,12 +47,15 @@ public class ClickHouseResult implements Result { */ @Override public Mono getRowsUpdated() { - return updatedCount.map(val -> ((UpdateCount) val).value()); + return this.segments.filter(segment -> segment instanceof UpdateCount) + .cast(UpdateCount.class) + .map(UpdateCount::value) + .reduce(Long::sum); } @Override public Publisher map(BiFunction biFunction) { - return rowSegments.cast(RowSegment.class) + return this.segments.filter(segment -> segment instanceof RowSegment).cast(RowSegment.class) .map(RowSegment::row).handle((row, sink) -> { try { sink.next(biFunction.apply(row, row.getMetadata())); @@ -68,7 +67,7 @@ public Publisher map(BiFunction biFunction @Override public Result filter(Predicate predicate) { - return new ClickHouseResult(segments.filter(predicate), updatedCount.filter(predicate)); + return new ClickHouseResult(segments.filter(predicate)); } @Override diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult091.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult091.java index 4b67bf889..9ce37c921 100644 --- a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult091.java +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult091.java @@ -20,28 +20,24 @@ class ClickHouseResult implements Result { private static final Logger log = LoggerFactory.getLogger(ClickHouseResult.class); - private final Flux rowSegments; - private final Mono updatedCount; private final Flux segments; ClickHouseResult(ClickHouseResponse response) { - this.rowSegments = Mono.just(response) + Flux rowSegments = Mono.just(response) .flatMapMany(resp -> Flux .fromStream(StreamSupport.stream(resp.records().spliterator(), false) .map(rec -> ClickHousePair.of(resp.getColumns(), rec)))) .map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft())) .map(RowSegment::new); - this.updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary) + Mono updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary) .map(ClickHouseResponseSummary::getProgress) .map(ClickHouseResponseSummary.Progress::getWrittenRows) .map(UpdateCount::new); - this.segments = Flux.concat(this.updatedCount, this.rowSegments); + this.segments = Flux.concat(updatedCount, rowSegments).doOnComplete(response::close); } - ClickHouseResult(Flux rowSegments, Mono updatedCount) { - this.rowSegments = rowSegments; - this.updatedCount = updatedCount; - this.segments = Flux.concat(this.updatedCount, this.rowSegments); + ClickHouseResult(Flux rowSegments) { + this.segments = rowSegments; } /** @@ -51,12 +47,16 @@ class ClickHouseResult implements Result { */ @Override public Mono getRowsUpdated() { - return updatedCount.map(val -> (int) ((UpdateCount) val).value()); + return this.segments.filter(segment -> segment instanceof UpdateCount) + .cast(UpdateCount.class) + .map(UpdateCount::value) + .reduce(Long::sum) + .map(Math::toIntExact); } @Override public Publisher map(BiFunction biFunction) { - return rowSegments.cast(RowSegment.class) + return this.segments.filter(segment -> segment instanceof RowSegment).cast(RowSegment.class) .map(RowSegment::row).handle((row, sink) -> { try { sink.next(biFunction.apply(row, row.getMetadata())); @@ -68,7 +68,7 @@ public Publisher map(BiFunction biFunction @Override public Result filter(Predicate predicate) { - return new ClickHouseResult(segments.filter(predicate), updatedCount.filter(predicate)); + return new ClickHouseResult(segments.filter(predicate)); } @Override