Skip to content

Commit

Permalink
Merge pull request #170 from NiteshKant/master
Browse files Browse the repository at this point in the history
Fixes issue #115
  • Loading branch information
NiteshKant committed Jul 4, 2014
2 parents 059ad96 + 8c67cd7 commit 2d9ba0c
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.protocol.http.client;

import io.netty.buffer.ByteBuf;
Expand All @@ -31,6 +32,7 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.netty.client.ClientMetricsEvent;
import io.reactivex.netty.client.ConnectionReuseEvent;
import io.reactivex.netty.metrics.Clock;
Expand Down Expand Up @@ -189,6 +191,8 @@ private void invokeContentOnNext(Object nextObject) {
contentSubject.onNext(nextObject);
} catch (ClassCastException e) {
contentSubject.onError(e);
} finally {
ReferenceCountUtil.release(nextObject);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.server.ServerMetricsEvent;
Expand Down Expand Up @@ -137,6 +138,8 @@ private static void invokeContentOnNext(Object nextObject, PublishSubject conten
contentSubject.onNext(nextObject);
} catch (ClassCastException e) {
contentSubject.onError(e);
} finally {
ReferenceCountUtil.release(nextObject);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.protocol.http.client;

import io.netty.buffer.ByteBuf;
Expand All @@ -30,7 +31,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -106,23 +106,10 @@ public Observable<Void> call(Long aLong) {
}

public Observable<Void> handlePost(final HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
return request.getContent().last().onErrorResumeNext(
new Func1<Throwable, Observable<ByteBuf>>() {
@Override
public Observable<ByteBuf> call(Throwable throwable) {
if (throwable instanceof NoSuchElementException) {
return Observable.from(Unpooled.EMPTY_BUFFER);
}
return Observable.error(throwable);
}
}).flatMap(new Func1<ByteBuf, Observable<Void>>() {
return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() {
@Override
public Observable<Void> call(ByteBuf byteBuf) {
if (byteBuf.isReadable()) {
return response.writeAndFlush(byteBuf);
} else {
return response.writeStringAndFlush(SINGLE_ENTITY_BODY);
}
return response.writeAndFlush(byteBuf.retain());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.protocol.http.server;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -107,8 +108,14 @@ public Observable<ByteBuf> call(
HttpClientResponse<ByteBuf> response) {
return response.getContent();
}
}).toBlocking()
.toFuture().get(1, TimeUnit.MINUTES);
})
.map(new Func1<ByteBuf, ByteBuf>() {
@Override
public ByteBuf call(ByteBuf byteBuf) {
return byteBuf.retain();
}
}).toBlocking().toFuture().get(1, TimeUnit.MINUTES);
Assert.assertEquals("Unexpected Content.", WELCOME_SERVER_MSG, response.toString(Charset.defaultCharset()));
response.release();
}
}

0 comments on commit 2d9ba0c

Please sign in to comment.