From b1a279e76bae65a4dd5741ece7a81a1f4971479d Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Thu, 3 Jul 2014 01:06:48 -0700 Subject: [PATCH 1/2] Fixes issue #115 Content callbacks for client response and server request were not getting released. --- .../protocol/http/client/ClientRequestResponseConverter.java | 4 ++++ .../protocol/http/server/ServerRequestResponseConverter.java | 3 +++ 2 files changed, 7 insertions(+) diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java index d488a278..62d00ef3 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.client; import io.netty.buffer.ByteBuf; @@ -31,6 +32,7 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; import io.reactivex.netty.client.ClientMetricsEvent; import io.reactivex.netty.client.ConnectionReuseEvent; import io.reactivex.netty.metrics.Clock; @@ -189,6 +191,8 @@ private void invokeContentOnNext(Object nextObject) { contentSubject.onNext(nextObject); } catch (ClassCastException e) { contentSubject.onError(e); + } finally { + ReferenceCountUtil.release(nextObject); } } diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java index 08474bed..d6b06d09 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java @@ -29,6 +29,7 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.ReferenceCountUtil; import io.reactivex.netty.metrics.Clock; import io.reactivex.netty.metrics.MetricEventsSubject; import io.reactivex.netty.server.ServerMetricsEvent; @@ -137,6 +138,8 @@ private static void invokeContentOnNext(Object nextObject, PublishSubject conten contentSubject.onNext(nextObject); } catch (ClassCastException e) { contentSubject.onError(e); + } finally { + ReferenceCountUtil.release(nextObject); } } } From 8c67cd73c1d19b4ee263ca88c04f5ce00105b6e5 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Thu, 3 Jul 2014 23:41:32 -0700 Subject: [PATCH 2/2] Fixed failing tests --- .../http/client/RequestProcessor.java | 19 +++---------------- .../protocol/http/server/Http10Test.java | 11 +++++++++-- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RequestProcessor.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RequestProcessor.java index a3b97a0f..daf2a67d 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RequestProcessor.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RequestProcessor.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.client; import io.netty.buffer.ByteBuf; @@ -30,7 +31,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -106,23 +106,10 @@ public Observable call(Long aLong) { } public Observable handlePost(final HttpServerRequest request, final HttpServerResponse response) { - return request.getContent().last().onErrorResumeNext( - new Func1>() { - @Override - public Observable call(Throwable throwable) { - if (throwable instanceof NoSuchElementException) { - return Observable.from(Unpooled.EMPTY_BUFFER); - } - return Observable.error(throwable); - } - }).flatMap(new Func1>() { + return request.getContent().flatMap(new Func1>() { @Override public Observable call(ByteBuf byteBuf) { - if (byteBuf.isReadable()) { - return response.writeAndFlush(byteBuf); - } else { - return response.writeStringAndFlush(SINGLE_ENTITY_BODY); - } + return response.writeAndFlush(byteBuf.retain()); } }); } diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/Http10Test.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/Http10Test.java index 905a662a..f419e7b8 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/Http10Test.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/Http10Test.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.server; import io.netty.buffer.ByteBuf; @@ -107,8 +108,14 @@ public Observable call( HttpClientResponse response) { return response.getContent(); } - }).toBlocking() - .toFuture().get(1, TimeUnit.MINUTES); + }) + .map(new Func1() { + @Override + public ByteBuf call(ByteBuf byteBuf) { + return byteBuf.retain(); + } + }).toBlocking().toFuture().get(1, TimeUnit.MINUTES); Assert.assertEquals("Unexpected Content.", WELCOME_SERVER_MSG, response.toString(Charset.defaultCharset())); + response.release(); } }