Skip to content

Commit

Permalink
Now we no longer depend on the GC to free native memory on Linux.
Browse files Browse the repository at this point in the history
JNA is freeing 192 KiB of per NuProcess native buffers on GC Finalizer / Reference Queue processing. In cases with very low GC activity, this is essentially a memory leak, potentially causing native memory issues.

  * out/err/in buffers are freed onExit,
  * event needed for Epoll #registerProcess and ##queueWrite is allocated and freed in the method,
  * tweak in reusing of IntByReference for duration of epoll/kqueue processor,
  * updated JNA library to 5.13.0, to get #close on Memory object (added in 5.12.0).
  • Loading branch information
avrecko committed Feb 13, 2024
1 parent e13abba commit 3eed929
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.11.0</version>
<version>5.13.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public abstract class BaseEventProcessor<T extends BasePosixProcess> implements
private CyclicBarrier startBarrier;
private AtomicBoolean isRunning;

protected final IntByReference exitCodePointer = new IntByReference();

static {
LINGER_TIME_MS = Math.max(1000, Integer.getInteger("com.zaxxer.nuprocess.lingerTimeMs", 2500));

Expand Down Expand Up @@ -128,6 +130,7 @@ public void shutdown()
process.onExit(Integer.MAX_VALUE - 1);
LibC.waitpid(process.getPid(), exitCode, LibC.WNOHANG);
}
Util.close(exitCode);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,16 +323,16 @@ public void onExit(int statusCode)
}
finally {
exitPending.countDown();
// Once the last reference to the buffer is gone, Java will finalize the buffer
// and release the native memory we allocated in initializeBuffers().
outBufferMemory.close();
errBufferMemory.close();
inBufferMemory.close();
outBufferMemory = null;
errBufferMemory = null;
inBufferMemory = null;
outBuffer = null;
errBuffer = null;
inBuffer = null;
processHandler = null;
Memory.purge();
}
}

Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/zaxxer/nuprocess/internal/Util.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.zaxxer.nuprocess.internal;

import com.sun.jna.Memory;
import com.sun.jna.Pointer;
import com.sun.jna.PointerType;

public class Util {
public static void close(PointerType pointerType) {
if (pointerType == null) {
return;
}
close(pointerType.getPointer());
}

public static void close(Pointer p) {
if (p instanceof Memory) {
((Memory) p).close();
}
}

}
6 changes: 6 additions & 0 deletions src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;

import com.sun.jna.*;
import com.zaxxer.nuprocess.internal.Util;

class EpollEvent
{
Expand Down Expand Up @@ -64,6 +65,11 @@ int size() {
return size;
}

public void close()
{
Util.close(getPointer());
}

public static class EpollEventPrototype extends Structure
{
/*
Expand Down
15 changes: 2 additions & 13 deletions src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.zaxxer.nuprocess.internal.LibC;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.List;
import java.util.logging.Level;
Expand All @@ -35,7 +36,6 @@
*/
public class LinuxProcess extends BasePosixProcess
{
private final EpollEvent epollEvent;

static {
LibEpoll.sigignore(LibEpoll.SIGPIPE);
Expand All @@ -58,7 +58,6 @@ private enum LaunchMechanism {
LinuxProcess(NuProcessHandler processListener) {
super(processListener);

epollEvent = new EpollEvent();
}

@Override
Expand Down Expand Up @@ -117,17 +116,6 @@ public void run(List<String> command, String[] environment, Path cwd)
}
}

/**
* An {@link EpollEvent} struct, which may be used when registering for events for this process. Each process has
* its own struct to avoid concurrency issues in {@link ProcessEpoll#registerProcess} when multiple processes are
* registered at once (e.g. multiple threads are all starting new processes concurrently).
*
* @return this process's {@link EpollEvent} struct
*/
EpollEvent getEpollEvent() {
return epollEvent;
}

private void prepareProcess(List<String> command, String[] environment, Path cwd) throws IOException
{
String[] cmdarray = command.toArray(new String[0]);
Expand Down Expand Up @@ -230,4 +218,5 @@ private static byte[] toEnvironmentBlock(String[] environment) {

return block;
}

}
25 changes: 16 additions & 9 deletions src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import java.util.List;

import com.sun.jna.Native;
import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.NuProcess;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
import com.zaxxer.nuprocess.internal.Util;

import static com.zaxxer.nuprocess.internal.LibC.WIFEXITED;
import static com.zaxxer.nuprocess.internal.LibC.WEXITSTATUS;
Expand Down Expand Up @@ -86,6 +86,7 @@ public void registerProcess(LinuxProcess process)
int stdinFd = Integer.MIN_VALUE;
int stdoutFd = Integer.MIN_VALUE;
int stderrFd = Integer.MIN_VALUE;
EpollEvent event = new EpollEvent();
try {
stdinFd = process.getStdin().acquire();
stdoutFd = process.getStdout().acquire();
Expand All @@ -96,7 +97,6 @@ public void registerProcess(LinuxProcess process)
fildesToProcessMap.put(stdoutFd, process);
fildesToProcessMap.put(stderrFd, process);

EpollEvent event = process.getEpollEvent();
event.setEvents(LibEpoll.EPOLLIN);
event.setFileDescriptor(stdoutFd);
int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_ADD, stdoutFd, event.getPointer());
Expand All @@ -114,6 +114,7 @@ public void registerProcess(LinuxProcess process)
}
}
finally {
event.close();
if (stdinFd != Integer.MIN_VALUE) {
process.getStdin().release();
}
Expand All @@ -133,13 +134,14 @@ public void queueWrite(LinuxProcess process)
return;
}

EpollEvent event = null;
try {
int stdin = process.getStdin().acquire();
if (stdin == -1) {
return;
}

EpollEvent event = process.getEpollEvent();
event = new EpollEvent();
event.setEvents(LibEpoll.EPOLLOUT | LibEpoll.EPOLLONESHOT | LibEpoll.EPOLLRDHUP | LibEpoll.EPOLLHUP);
event.setFileDescriptor(stdin);
int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_MOD, stdin, event.getPointer());
Expand All @@ -154,6 +156,9 @@ public void queueWrite(LinuxProcess process)
}
finally {
process.getStdin().release();
if (event != null) {
event.close();
}
}
}

Expand All @@ -167,6 +172,9 @@ public void run()
// the handler's onExit is called before LinuxProcess.run returns.
waitForDeadPool();
}

Util.close(exitCodePointer);
triggeredEvent.close();
}

@Override
Expand All @@ -183,6 +191,7 @@ public void closeStdin(LinuxProcess process)
}
}


@Override
public boolean process()
{
Expand Down Expand Up @@ -306,8 +315,7 @@ private void cleanupProcess(LinuxProcess linuxProcess, int stdinFd, int stdoutFd
return;
}

IntByReference ret = new IntByReference();
int rc = LibC.waitpid(linuxProcess.getPid(), ret, LibC.WNOHANG);
int rc = LibC.waitpid(linuxProcess.getPid(), exitCodePointer, LibC.WNOHANG);

if (rc == 0) {
deadPool.add(linuxProcess);
Expand All @@ -316,7 +324,7 @@ else if (rc < 0) {
linuxProcess.onExit((Native.getLastError() == LibC.ECHILD) ? Integer.MAX_VALUE : Integer.MIN_VALUE);
}
else {
handleExit(linuxProcess, ret.getValue());
handleExit(linuxProcess, exitCodePointer.getValue());
}
}

Expand All @@ -326,11 +334,10 @@ private void checkDeadPool()
return;
}

IntByReference ret = new IntByReference();
Iterator<LinuxProcess> iterator = deadPool.iterator();
while (iterator.hasNext()) {
LinuxProcess process = iterator.next();
int rc = LibC.waitpid(process.getPid(), ret, LibC.WNOHANG);
int rc = LibC.waitpid(process.getPid(), exitCodePointer, LibC.WNOHANG);
if (rc == 0) {
continue;
}
Expand All @@ -341,7 +348,7 @@ private void checkDeadPool()
continue;
}

handleExit(process, ret.getValue());
handleExit(process, exitCodePointer.getValue());
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import com.sun.jna.Memory;
import com.sun.jna.Native;
import com.sun.jna.Pointer;
import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
import com.zaxxer.nuprocess.internal.Util;
import com.zaxxer.nuprocess.osx.LibKevent.Kevent;
import com.zaxxer.nuprocess.osx.LibKevent.TimeSpec;

Expand Down Expand Up @@ -186,6 +187,12 @@ public void queueWrite(OsxProcess process)
LibC.kill(JAVA_PID, LibC.SIGUSR2);
}

@Override
public void run() {
super.run();
Util.close(exitCodePointer);
}

@Override
public void closeStdin(OsxProcess process)
{
Expand Down Expand Up @@ -393,7 +400,7 @@ private void checkWaitWrites()

private void cleanupProcess(OsxProcess osxProcess)
{
LibC.waitpid(osxProcess.getPid(), new IntByReference(), LibC.WNOHANG);
LibC.waitpid(osxProcess.getPid(), exitCodePointer, LibC.WNOHANG);

// If this is the last process in the map, this thread will cleanly shut down.
pidToProcessMap.remove(osxProcess.getPid());
Expand Down
80 changes: 80 additions & 0 deletions src/test/java/com/zaxxer/nuprocess/ThreadLoadTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.zaxxer.nuprocess;

import org.junit.Assert;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* Yet another threaded test.
*/
public class ThreadLoadTest {

@Test
public void testStartLoad() throws InterruptedException {
int durationInMs = 15_000;
long cutOfTime = System.currentTimeMillis() + durationInMs;
int nrOfThreads = (Runtime.getRuntime().availableProcessors() * 2);
CountDownLatch latch = new CountDownLatch(nrOfThreads);
AtomicLong runCountCode0 = new AtomicLong();
AtomicLong runCountCodeNon0 = new AtomicLong();
AtomicLong problems = new AtomicLong();
for (int i = 0; i < nrOfThreads; i++) {
startNewThread(latch, cutOfTime, runCountCode0, runCountCodeNon0, problems);
}

Assert.assertTrue(latch.await(durationInMs + 1_000, TimeUnit.MILLISECONDS));
System.out.println("runCount 0 = " + runCountCode0.get());
System.out.println("runCount non-0 = " + runCountCodeNon0.get());
System.out.println("problems = " + problems.get());
}

private void startNewThread(final CountDownLatch latch, final long cutOfTime, final AtomicLong zeroExit, final AtomicLong nonZeroExit, AtomicLong problems) {
new Thread(new Runnable() {
public void run() {
while (System.currentTimeMillis() < cutOfTime) {
final int randomInt = ThreadLocalRandom.current().nextInt(10_000);
final String text = "foo" + randomInt;
long startTime = System.nanoTime();

final NuProcess start = new NuProcessBuilder(new NuAbstractProcessHandler() {
public void onPreStart(final NuProcess nuProcess) {
super.onPreStart(nuProcess);
}

@Override
public void onStart(NuProcess nuProcess) {
}

@Override
public void onStdout(ByteBuffer buffer, boolean closed) {

}

public void onExit(final int statusCode) {
if (statusCode == 0) {
zeroExit.incrementAndGet();
} else {
nonZeroExit.incrementAndGet();
}
}
}, "echo", text).start();
try {
start.waitFor(10, TimeUnit.DAYS);

// System.out.println("Took " + ((System.nanoTime() - startTime) / 1000000));
// start.wantWrite();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

latch.countDown();
}
}).start();
}
}

0 comments on commit 3eed929

Please sign in to comment.