Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using jersey-blocking: LEAK: ByteBuf.release() was not called before it's garbage-collected #206

Closed
venkatraghavang opened this issue Mar 26, 2015 · 4 comments

Comments

@venkatraghavang
Copy link

Found initially in 2.2.00-ALPHA7 release. Two issues: (1) Memleak on http requests through jersey-blocking. (2) InputStream not being flushed properly that returns 0 bytes to the jersey providers on concurrent requests.

(1) Leak is similar to the one listed in RxNetty Bug#115. The fixes from #200 or #122 did not help. Output:

-Dio.netty.leakDetectionLevel=paranoid

2015-03-26 10:43:01 ERROR ResourceLeakDetector:171 - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 2
#2:
    io.netty.buffer.AdvancedLeakAwareByteBuf.readBytes(AdvancedLeakAwareByteBuf.java:463)
    netflix.karyon.transport.util.HttpContentInputStream$3.read(HttpContentInputStream.java:90)
    netflix.karyon.transport.util.HttpContentInputStream.blockingRead(HttpContentInputStream.java:127)
    netflix.karyon.transport.util.HttpContentInputStream.read(HttpContentInputStream.java:81)
    org.codehaus.jackson.impl.ByteSourceBootstrapper.ensureLoaded(ByteSourceBootstrapper.java:499)
    org.codehaus.jackson.impl.ByteSourceBootstrapper.detectEncoding(ByteSourceBootstrapper.java:129)
    org.codehaus.jackson.impl.ByteSourceBootstrapper.constructParser(ByteSourceBootstrapper.java:216)
    org.codehaus.jackson.JsonFactory._createJsonParser(JsonFactory.java:785)
    org.codehaus.jackson.JsonFactory.createJsonParser(JsonFactory.java:561)
    org.codehaus.jackson.jaxrs.JacksonJsonProvider.readFrom(JacksonJsonProvider.java:405)
    com.sun.jersey.json.impl.provider.entity.JacksonProviderProxy.readFrom(JacksonProviderProxy.java:139)
    com.sun.jersey.spi.container.ContainerRequest.getEntity(ContainerRequest.java:490)
    com.sun.jersey.server.impl.model.method.dispatch.EntityParamDispatchProvider$EntityInjectable.getValue(EntityParamDispatchProvider.java:123)
    com.sun.jersey.server.impl.inject.InjectableValuesProvider.getInjectableValues(InjectableValuesProvider.java:86)
    com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$EntityParamInInvoker.getParams(AbstractResourceMethodDispatchProvider.java:153)
    com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:203)
    com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
    com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302)
    com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
    com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
    com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
    com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
    com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542)
    com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473)
    com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419)
    com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409)
    netflix.karyon.jersey.blocking.JerseyBasedRouter$1.call(JerseyBasedRouter.java:70)
    netflix.karyon.jersey.blocking.JerseyBasedRouter$1.call(JerseyBasedRouter.java:1)
    rx.Observable$1.call(Observable.java:144)
    rx.Observable$1.call(Observable.java:136)
    rx.Observable.unsafeSubscribe(Observable.java:7195)
    rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
    rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:45)
    java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:482)
    java.util.concurrent.FutureTask.run(FutureTask.java:273)
    java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:189)
    java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:303)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1170)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:640)
    java.lang.Thread.run(Thread.java:853)
#1:
    io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:547)
    netflix.karyon.transport.util.HttpContentInputStream$1.onNext(HttpContentInputStream.java:46)
    netflix.karyon.transport.util.HttpContentInputStream$1.onNext(HttpContentInputStream.java:1)
    rx.Observable$34.onNext(Observable.java:7168)
    rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:130)
    io.reactivex.netty.protocol.http.UnicastContentSubject$PassThruObserver.onNext(UnicastContentSubject.java:359)
    rx.observers.SerializedObserver.onNext(SerializedObserver.java:159)
    io.reactivex.netty.protocol.http.UnicastContentSubject.onNext(UnicastContentSubject.java:311)
    io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.invokeContentOnNext(ServerRequestResponseConverter.java:160)
    io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.channelRead(ServerRequestResponseConverter.java:96)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    io.netty.channel.AbstractChannelHandlerContext.access$700(AbstractChannelHandlerContext.java:32)
    io.netty.channel.AbstractChannelHandlerContext$8.run(AbstractChannelHandlerContext.java:324)
    io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:36)
    io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    java.lang.Thread.run(Thread.java:853)
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:259)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:141)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:75)
    netflix.karyon.transport.util.HttpContentInputStream.<init>(HttpContentInputStream.java:29)
    netflix.karyon.jersey.blocking.NettyToJerseyBridge.bridgeRequest(NettyToJerseyBridge.java:55)
    netflix.karyon.jersey.blocking.JerseyBasedRouter.handle(JerseyBasedRouter.java:61)
    netflix.karyon.jersey.blocking.JerseyBasedRouter.handle(JerseyBasedRouter.java:1)
    netflix.karyon.transport.interceptor.InterceptorExecutor$ChainSubscriber.onCompleted(InterceptorExecutor.java:211)
    rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
    rx.Subscriber.setProducer(Subscriber.java:137)
    rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
    rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
    rx.Observable.unsafeSubscribe(Observable.java:7195)
    netflix.karyon.transport.interceptor.InterceptorExecutor$ChainSubscriber.handleResult(InterceptorExecutor.java:223)
    netflix.karyon.transport.interceptor.InterceptorExecutor$ChainSubscriber.onCompleted(InterceptorExecutor.java:209)
    rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
    rx.Subscriber.setProducer(Subscriber.java:137)
    rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
    rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
    rx.Observable$1.call(Observable.java:144)
    rx.Observable$1.call(Observable.java:136)
    rx.Observable.subscribe(Observable.java:7284)
    io.reactivex.netty.protocol.http.server.HttpConnectionHandler$1$1.onNext(HttpConnectionHandler.java:112)
    io.reactivex.netty.protocol.http.server.HttpConnectionHandler$1$1.onNext(HttpConnectionHandler.java:62)
    rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
    rx.subjects.PublishSubject.onNext(PublishSubject.java:121)
    rx.observers.SerializedObserver.onNext(SerializedObserver.java:159)
    rx.subjects.SerializedSubject.onNext(SerializedSubject.java:64)
    io.reactivex.netty.pipeline.ObservableAdapter.channelRead(ObservableAdapter.java:40)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.channelRead(ServerRequestResponseConverter.java:89)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    io.netty.channel.AbstractChannelHandlerContext.access$700(AbstractChannelHandlerContext.java:32)
    io.netty.channel.AbstractChannelHandlerContext$8.run(AbstractChannelHandlerContext.java:324)
    io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:36)
    io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    java.lang.Thread.run(Thread.java:853)

(2) During a load test, a custom object mapper gets 0 available bytes to read although the stream gets written to the contentBuffer and read properly in HttpContentInputStream. As a workaround, I've been trying to introduce a delay in the subscription on JerseyBasedRouter handle() function. But, I'm suspecting this will be be alright if fixes from #199 gets merged.

Still looking into the code to see what can I get out of this. The tests were made on an simple servlet that consumes a JSON.

@venkatraghavang
Copy link
Author

Update:

Looks like both of the issues arise due to inconsistent locking in HttpContentInputStream. While issue (2) will be resolved once #199 fixes are merged, (1) requires an additional fix to ObservableInputStreamAdapter.

LEAK happens as the byte buffer is not being released after read(..). here.

  public int read(final byte[] b, final int off, final int len) throws IOException {
    if( b == null ) {
      throw new NullPointerException( "Null buffer" );
    }

    if( len < 0 || off < 0 || len > b.length - off ) {
      throw new IndexOutOfBoundsException( "Invalid index" ); 
    }

    if( len == 0 ) {
      return 0;
    }

    lock.lock();

    try {
      if( !await() ) {
        return -1;
      }

      int size = Math.min( len, buffer.readableBytes() );

      buffer.readBytes( b, off, size );

      return size;
    }
    finally {
      lock.unlock();
      buffer.release(); //Release ByteBuf
    }
  }

Not sure if this is the right place to handle, but it solves the leak.

@maksimlikharev
Copy link
Contributor

is it that, release(), will decrement reference count and effectively ask for the object deallocation?
I would think we have to release at the end of processing, then.

@maksimlikharev
Copy link
Contributor

I would do release in

public void close() throws IOException

and also do it in ( if not released )

protected void finalize() throws Throwable 

now logistical question, I can do the change, but I'm not sure of the status of that PR and where to make a change ( will have to produce unit test as well to cover this scenario )

@NiteshKant
Copy link
Contributor

@maksimlikharev I replied to your PR (apologies for the delay)

Your proposal for release the ByteBuffer in close() makes sense. I would not add the release in finalize() as it is understandable that the resources leak if close() is not called on the stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants