Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetty 10: backport the tracking retainable pool from 12 #12041

Merged
merged 18 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,11 @@ public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)

private class TestByteBufferPool extends ArrayByteBufferPool
{
public TestByteBufferPool()
{
super(-1, -1, -1, -1, 0, 0, 0, 0);
}

@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, maxHeapMemory, maxDirectMemory);
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, -2, -2);
}

/**
Expand Down Expand Up @@ -339,4 +339,48 @@ protected void removed(RetainableByteBuffer retainedBuffer)
ArrayByteBufferPool.this.release(retainedBuffer.getBuffer());
}
}

/**
* <p>A variant of {@link ArrayByteBufferPool} that tracks buffer
* acquires/releases of the retained buffers, useful to identify buffer leaks.</p>
* @see ArrayRetainableByteBufferPool.Tracking
*/
public static class Tracking extends ArrayByteBufferPool
{
public Tracking()
{
// Do not call super as we want to change the default value of retainedHeapMemory and retainedDirectMemory.
this(-1, -1, -1, -1, 0, 0);
}

public Tracking(int minCapacity, int factor, int maxCapacity)
{
// Do not call super as we want to change the default value of retainedHeapMemory and retainedDirectMemory.
this(minCapacity, factor, maxCapacity, -1, 0, 0);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxQueueLength)
{
// Do not call super as we want to change the default value of retainedHeapMemory and retainedDirectMemory.
this(minCapacity, factor, maxCapacity, maxQueueLength, 0, 0);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
// Set retainedHeapMemory and retainedDirectMemory to the same values as maxHeapMemory and maxDirectMemory respectively
// to default to a retaining pool.
super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, maxHeapMemory, maxDirectMemory);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
}

@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new ArrayRetainableByteBufferPool.Tracking(0, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@
package org.eclipse.jetty.io;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.NanoTime;
Expand Down Expand Up @@ -486,4 +494,162 @@ public String toString()
entries > 0 ? (inUse * 100) / entries : 0);
}
}

/**
* <p>A variant of {@link ArrayRetainableByteBufferPool} that tracks buffer
* acquires/releases, useful to identify buffer leaks.</p>
* <p>Use {@link #getLeaks()} when the system is idle to get
* the {@link Buffer}s that have been leaked, which contain
* the stack trace information of where the buffer was acquired.</p>
*/
public static class Tracking extends ArrayRetainableByteBufferPool
{
private static final Logger LOG = LoggerFactory.getLogger(Tracking.class);

private final Set<Buffer> buffers = ConcurrentHashMap.newKeySet();

public Tracking()
{
super();
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
{
super(minCapacity, factor, maxCapacity, maxBucketSize);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, IntUnaryOperator bucketIndexFor, IntUnaryOperator bucketCapacity, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, bucketIndexFor, bucketCapacity, maxHeapMemory, maxDirectMemory);
}

@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
RetainableByteBuffer buffer = super.acquire(size, direct);
Buffer wrapper = new Buffer(buffer, size);
if (LOG.isDebugEnabled())
LOG.debug("acquired {}", wrapper);
buffers.add(wrapper);
return wrapper;
}

public Set<Buffer> getLeaks()
{
return buffers;
}

public String dumpLeaks()
{
return getLeaks().stream()
.map(Buffer::dump)
.collect(Collectors.joining(System.lineSeparator()));
}

public class Buffer extends RetainableByteBuffer
{
private final Instant creationInstant = Instant.now();
private final RetainableByteBuffer wrapped;
private final int size;
private final Instant acquireInstant;
private final Throwable acquireStack;
private final List<Throwable> retainStacks = new CopyOnWriteArrayList<>();
private final List<Throwable> releaseStacks = new CopyOnWriteArrayList<>();
private final List<Throwable> overReleaseStacks = new CopyOnWriteArrayList<>();

private Buffer(RetainableByteBuffer wrapped, int size)
{
super(wrapped.getBuffer(), x -> {});
this.wrapped = wrapped;
this.size = size;
this.acquireInstant = Instant.now();
this.acquireStack = new Throwable();
}

public int getSize()
{
return size;
}

public Instant getAcquireInstant()
{
return acquireInstant;
}

public Throwable getAcquireStack()
{
return acquireStack;
}

@Override
protected void acquire()
{
wrapped.acquire();
}

@Override
public boolean isRetained()
{
return wrapped.isRetained();
}

@Override
public void retain()
{
wrapped.retain();
retainStacks.add(new Throwable());
}

@Override
public boolean release()
{
try
{
boolean released = wrapped.release();
if (released)
{
buffers.remove(this);
if (LOG.isDebugEnabled())
LOG.debug("released {}", this);
}
releaseStacks.add(new Throwable());
return released;
}
catch (IllegalStateException e)
{
buffers.add(this);
overReleaseStacks.add(new Throwable());
throw e;
}
}

public String dump()
{
StringWriter w = new StringWriter();
PrintWriter pw = new PrintWriter(w);
getAcquireStack().printStackTrace(pw);
pw.println("\n" + retainStacks.size() + " retain(s)");
for (Throwable retainStack : retainStacks)
{
retainStack.printStackTrace(pw);
}
pw.println("\n" + releaseStacks.size() + " release(s)");
for (Throwable releaseStack : releaseStacks)
{
releaseStack.printStackTrace(pw);
}
pw.println("\n" + overReleaseStacks.size() + " over-release(s)");
for (Throwable overReleaseStack : overReleaseStacks)
{
overReleaseStack.printStackTrace(pw);
}
return String.format("%s@%x of %d bytes on %s wrapping %s acquired at %s", getClass().getSimpleName(), hashCode(), getSize(), getAcquireInstant(), wrapped, w);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQ
*/
public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
this(minCapacity, maxCapacity, maxQueueLength, maxHeapMemory, maxDirectMemory, maxHeapMemory, maxDirectMemory);
this(minCapacity, maxCapacity, maxQueueLength, maxHeapMemory, maxDirectMemory, -2, -2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to document this change in defaults?

Copy link
Contributor Author

@lorban lorban Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say yes, the release notes should indicate that a change in our default pool may have an impact on performance.

You've put your finger on the most sensible part of this PR that we should discuss: what do we want to do for 10/11 in the future with potential buffer leaks? Our options are:

  • change the default to use the less performant pool that is leak-resistant (what this change does)
  • backport the fast and leak resistant retainable pool from 12 (not a trivial job)
  • do nothing (which means spending time tracking reported leaks in releases that are in maintenance mode)

I'm in favor of the 1st option with the following justification: this will minimize the maintenance work of 10/11, the drop in perf is minimal enough that it's only going to affect a small fraction of the user base, it's easy to change your config to revert to the max-perf retainable pool, and upgrading to 12 will give you back the lost perf.

@sbordet @gregw @janbartel WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in favour of option 3. Let's not change behaviour significantly in a dot release.
So restore the behaviour here but document the configuration changes needed to get the no retained memory behaviour.

}

/**
Expand All @@ -76,8 +76,8 @@ public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQ
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
* @param retainedHeapMemory the max heap memory in bytes, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public boolean isDirect()
* The reason why this method exists on top of {@link #retain()} is to be able to
* have some safety checks that must know why the ref counter is being incremented.
*/
void acquire()
protected void acquire()
{
if (references.getAndUpdate(c -> c == 0 ? 1 : c) != 0)
throw new IllegalStateException("re-pooled while still used " + this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@ public void testLogarithmic()
RetainableByteBuffer retain5 = retainablePool.acquire(5, false);
retain5.release();
RetainableByteBuffer retain6 = retainablePool.acquire(6, false);
assertThat(retain6, sameInstance(retain5));
retain6.release();
RetainableByteBuffer retain9 = retainablePool.acquire(9, false);
assertThat(retain9, not(sameInstance(retain5)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.LogarithmicArrayByteBufferPool;
import org.eclipse.jetty.io.LogarithmicArrayByteBufferPool.LogarithmicRetainablePool;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
Expand Down Expand Up @@ -64,6 +65,7 @@ public void before() throws Exception
{
_client = new WebSocketClient();
_server = new Server();
_server.addBean(new LogarithmicArrayByteBufferPool(-1, -1, -1, 0, 0, 0, 0));
_connector = new ServerConnector(_server);
_server.addConnector(_connector);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.LogarithmicArrayByteBufferPool;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
Expand Down Expand Up @@ -81,6 +82,7 @@ public void configure(JettyWebSocketServletFactory factory)
public void start() throws Exception
{
server = new Server();
server.addBean(new LogarithmicArrayByteBufferPool(-1, -1, -1, 0, 0, 0, 0));
connector = new ServerConnector(server);
server.addConnector(connector);

Expand Down
Loading