diff --git a/affinity/src/main/java/net/openhft/affinity/LockCheck.java b/affinity/src/main/java/net/openhft/affinity/LockCheck.java index cfe4e78f6..bd692fbd7 100644 --- a/affinity/src/main/java/net/openhft/affinity/LockCheck.java +++ b/affinity/src/main/java/net/openhft/affinity/LockCheck.java @@ -109,15 +109,10 @@ static int getProcessForCpu(int core) throws IOException { return EMPTY_PID; } - static void updateCpu(int cpu) { + static void updateCpu(int cpu) throws IOException { if (!canOSSupportOperation()) return; - try { - replacePid(cpu, getPID()); - } catch (IOException e) { - LOGGER.warn("Failed to update lock file for cpu " + cpu, e); - e.printStackTrace(); - } + replacePid(cpu, getPID()); } public static void releaseLock(int cpu) { diff --git a/affinity/src/main/java/net/openhft/affinity/LockInventory.java b/affinity/src/main/java/net/openhft/affinity/LockInventory.java index 7523e5e28..8d62d72f5 100644 --- a/affinity/src/main/java/net/openhft/affinity/LockInventory.java +++ b/affinity/src/main/java/net/openhft/affinity/LockInventory.java @@ -22,6 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.NavigableMap; import java.util.TreeMap; @@ -69,9 +70,22 @@ private static boolean isAnyCpu(final int cpuId) { return cpuId == AffinityLock.ANY_CPU; } - private static void updateLockForCurrentThread(final boolean bind, final AffinityLock al, final boolean b) { - al.assignCurrentThread(bind, b); - LockCheck.updateCpu(al.cpuId()); + /** + * Update the lock for the current thread + * + * @param bind Whether to also bind the thread to the core + * @param al The lock to update + * @param wholeCore Whether to bind the whole core + * @return true if the lock was acquired, false otherwise + */ + private static boolean updateLockForCurrentThread(final boolean bind, final AffinityLock al, final boolean wholeCore) { + try { + LockCheck.updateCpu(al.cpuId()); + al.assignCurrentThread(bind, wholeCore); + return true; + } catch (IOException e) { + return false; + } } public final synchronized CpuLayout getCpuLayout() { @@ -106,8 +120,9 @@ public final synchronized AffinityLock acquireLock(boolean bind, int cpuId, Affi final boolean specificCpuRequested = !isAnyCpu(cpuId); if (specificCpuRequested && cpuId != 0) { final AffinityLock required = logicalCoreLocks[cpuId]; - if (required.canReserve(true) && anyStrategyMatches(cpuId, cpuId, strategies)) { - updateLockForCurrentThread(bind, required, false); + if (required.canReserve(true) + && anyStrategyMatches(cpuId, cpuId, strategies) + && updateLockForCurrentThread(bind, required, false)) { return required; } LOGGER.warn("Unable to acquire lock on CPU {} for thread {}, trying to find another CPU", @@ -119,8 +134,9 @@ public final synchronized AffinityLock acquireLock(boolean bind, int cpuId, Affi // if you have only one core, this library is not appropriate in any case. for (int i = logicalCoreLocks.length - 1; i > 0; i--) { AffinityLock al = logicalCoreLocks[i]; - if (al.canReserve(false) && (isAnyCpu(cpuId) || strategy.matches(cpuId, al.cpuId()))) { - updateLockForCurrentThread(bind, al, false); + if (al.canReserve(false) + && (isAnyCpu(cpuId) || strategy.matches(cpuId, al.cpuId())) + && updateLockForCurrentThread(bind, al, false)) { return al; } } @@ -136,8 +152,8 @@ public final synchronized AffinityLock tryAcquireLock(boolean bind, int cpuId) { return null; final AffinityLock required = logicalCoreLocks[cpuId]; - if (required.canReserve(true)) { - updateLockForCurrentThread(bind, required, false); + if (required.canReserve(true) + && updateLockForCurrentThread(bind, required, false)) { return required; } @@ -156,8 +172,9 @@ public final synchronized AffinityLock acquireCore(boolean bind, int cpuId, Affi continue LOOP; final AffinityLock al = als[0]; - updateLockForCurrentThread(bind, al, true); - return al; + if (updateLockForCurrentThread(bind, al, true)) { + return al; + } } } diff --git a/affinity/src/main/java/net/openhft/affinity/lockchecker/FileBasedLockChecker.java b/affinity/src/main/java/net/openhft/affinity/lockchecker/FileBasedLockChecker.java index 37224e104..88e74c9c6 100644 --- a/affinity/src/main/java/net/openhft/affinity/lockchecker/FileBasedLockChecker.java +++ b/affinity/src/main/java/net/openhft/affinity/lockchecker/FileBasedLockChecker.java @@ -78,6 +78,7 @@ public String getMetaInfo(int id) throws IOException { @NotNull protected File toFile(int id) { + assert id >= 0; return new File(tmpDir(), "cpu-" + id + ".lock"); } } diff --git a/affinity/src/main/java/net/openhft/affinity/lockchecker/FileLockBasedLockChecker.java b/affinity/src/main/java/net/openhft/affinity/lockchecker/FileLockBasedLockChecker.java index 10f2a2203..1ef74f198 100644 --- a/affinity/src/main/java/net/openhft/affinity/lockchecker/FileLockBasedLockChecker.java +++ b/affinity/src/main/java/net/openhft/affinity/lockchecker/FileLockBasedLockChecker.java @@ -11,6 +11,7 @@ import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; @@ -29,7 +30,7 @@ public class FileLockBasedLockChecker extends FileBasedLockChecker { private static final String OS = System.getProperty("os.name").toLowerCase(); private static final LockChecker instance = new FileLockBasedLockChecker(); - private static final HashSet openOptions = new HashSet<>(Arrays.asList(CREATE_NEW, WRITE, READ, SYNC)); + private static final HashSet openOptions = new HashSet<>(Arrays.asList(CREATE, WRITE, READ, SYNC)); private static final FileAttribute> fileAttr = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-rw-rw-")); private final LockReference[] locks = new LockReference[MAX_CPUS_SUPPORTED]; @@ -47,11 +48,6 @@ public boolean isLockFree(int id) { } private boolean isLockFree(File file, int id) { - //if no file exists - nobody has the lock for sure - if (!file.exists()) { - return true; - } - //do we have the lock already? LockReference existingLock = locks[id]; if (existingLock != null) { @@ -60,23 +56,27 @@ private boolean isLockFree(File file, int id) { //does another process have the lock? try { - FileChannel fc = FileChannel.open(file.toPath(), WRITE); - FileLock fileLock = fc.tryLock(); - if (fileLock == null) { - return false; + // only take a shared lock to test if the file is locked, + // this means processes testing the lock concurrently + // won't interfere with each other + try (FileChannel fc = FileChannel.open(file.toPath(), READ); + FileLock fileLock = fc.tryLock(0, Long.MAX_VALUE, true)) { + if (fileLock == null) { + return false; + } } - } catch (IOException | OverlappingFileLockException e) { + // file is present but no process is holding the lock + return true; + } catch (OverlappingFileLockException e) { + // another process has the lock + return false; + } catch (NoSuchFileException e) { + // the file doesn't exist, nobody has the lock + return true; + } catch (IOException e) { LOGGER.error(String.format("Exception occurred whilst trying to check lock on file %s : %s%n", file.getAbsolutePath(), e)); + return true; // maybe we should re-throw? } - - //file is present but nobody has it locked - delete it - boolean deleted = file.delete(); - if (deleted) - LOGGER.info(String.format("Deleted %s as nobody has the lock", file.getAbsolutePath())); - else - LOGGER.warn(String.format("Nobody has the lock on %s. Delete failed", file.getAbsolutePath())); - - return true; } @Override @@ -117,7 +117,6 @@ public boolean releaseLock(int id) { locks[id] = null; lock.lock.release(); lock.channel.close(); - toFile(id).delete(); return true; } catch (IOException e) { LOGGER.error(String.format("Couldn't release lock for id %d due to exception: %s%n", id, e.getMessage())); @@ -157,13 +156,14 @@ private String readMetaInfoFromLockFileChannel(File lockFile, FileChannel lockFi @NotNull @Override protected File toFile(int id) { + assert id >= 0; File file = super.toFile(id); try { if (file.exists() && OS.startsWith("linux")) { Files.setPosixFilePermissions(file.toPath(), PosixFilePermissions.fromString("rwxrwxrwx")); } } catch (IOException e) { - LOGGER.warn("Unable to set file permissions \"rwxrwxrwx\" for {} due to {}", file.toString(), e); + LOGGER.warn("Unable to set file permissions \"rwxrwxrwx\" for {} due to {}", file, e); } return file; } diff --git a/affinity/src/test/java/net/openhft/affinity/AffinityLockTest.java b/affinity/src/test/java/net/openhft/affinity/AffinityLockTest.java index c4f8aa769..2452f5380 100644 --- a/affinity/src/test/java/net/openhft/affinity/AffinityLockTest.java +++ b/affinity/src/test/java/net/openhft/affinity/AffinityLockTest.java @@ -247,20 +247,6 @@ public void shouldReturnLockForSpecifiedCpu() { assertEquals(AffinityLock.BASE_AFFINITY, Affinity.getAffinity()); } - @Test - public void lockFilesShouldBeRemovedOnRelease() { - if (!Utilities.ISLINUX) { - return; - } - final AffinityLock lock = AffinityLock.acquireLock(); - - assertThat(Files.exists(Paths.get(lockChecker.doToFile(lock.cpuId()).getAbsolutePath())), is(true)); - - lock.release(); - - assertThat(Files.exists(Paths.get(lockChecker.doToFile(lock.cpuId()).getAbsolutePath())), is(false)); - } - private void displayStatus() { System.out.println(Thread.currentThread() + " on " + Affinity.getCpu() + "\n" + AffinityLock.dumpLocks()); } diff --git a/affinity/src/test/java/net/openhft/affinity/FileLockLockCheckTest.java b/affinity/src/test/java/net/openhft/affinity/FileLockLockCheckTest.java index 1a41b2e47..1cbb2b424 100644 --- a/affinity/src/test/java/net/openhft/affinity/FileLockLockCheckTest.java +++ b/affinity/src/test/java/net/openhft/affinity/FileLockLockCheckTest.java @@ -53,6 +53,7 @@ public void before() { @After public void after() { + LockCheck.releaseLock(cpu); System.setProperty("java.io.tmpdir", TMP); } diff --git a/affinity/src/test/java/net/openhft/affinity/LockCheckTest.java b/affinity/src/test/java/net/openhft/affinity/LockCheckTest.java index 595269bb7..88ff61beb 100644 --- a/affinity/src/test/java/net/openhft/affinity/LockCheckTest.java +++ b/affinity/src/test/java/net/openhft/affinity/LockCheckTest.java @@ -55,6 +55,7 @@ public void before() { @After public void after() { System.setProperty("java.io.tmpdir", TMP); + LockCheck.releaseLock(cpu); } @Test diff --git a/affinity/src/test/java/net/openhft/affinity/MultiProcessAffinityTest.java b/affinity/src/test/java/net/openhft/affinity/MultiProcessAffinityTest.java index 07e5d52f8..5884b684c 100644 --- a/affinity/src/test/java/net/openhft/affinity/MultiProcessAffinityTest.java +++ b/affinity/src/test/java/net/openhft/affinity/MultiProcessAffinityTest.java @@ -11,8 +11,9 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.lang.String.format; import static net.openhft.affinity.LockCheck.IS_LINUX; @@ -70,42 +71,104 @@ public void shouldNotAcquireLockOnCoresLockedByOtherProcesses() throws IOExcepti } } - @Ignore("https://github.com/OpenHFT/Java-Thread-Affinity/issues/81") @Test public void shouldAllocateCoresCorrectlyUnderContention() throws IOException, InterruptedException { - final int numberOfLockers = Math.max(8, Runtime.getRuntime().availableProcessors()); + Assume.assumeTrue(IS_LINUX); + final int numberOfLockers = Math.max(2, Math.min(8, Runtime.getRuntime().availableProcessors())) / 2; List lockers = new ArrayList<>(); + LOGGER.info("Running test with {} locker processes", numberOfLockers); for (int i = 0; i < numberOfLockers; i++) { - lockers.add(ProcessRunner.runClass(RepeatedAffinityLocker.class, "last", "100")); + lockers.add(ProcessRunner.runClass(RepeatedAffinityLocker.class, + new String[]{"-Djava.io.tmpdir=" + folder.getRoot().getAbsolutePath()}, + new String[]{"last", "30", "2"})); } for (int i = 0; i < numberOfLockers; i++) { - if (!lockers.get(i).waitFor(10, TimeUnit.SECONDS)) { - throw new IllegalStateException("Locker process didn't end in time"); + final Process process = lockers.get(i); + if (!process.waitFor(20, TimeUnit.SECONDS)) { + ProcessRunner.printProcessOutput("Stalled locking process", process); + fail("Locker process didn't end in time"); + } + if (process.exitValue() != 0) { + ProcessRunner.printProcessOutput("Failed locking process", process); + fail("At least one of the locking processes failed, see output above"); } - assertEquals(0, lockers.get(i).exitValue()); + assertEquals(0, process.exitValue()); + } + } + + @Test + public void shouldBeAbleToAcquireLockLeftByOtherProcess() throws IOException, InterruptedException { + Assume.assumeTrue(IS_LINUX); + final Process process = ProcessRunner.runClass(AffinityLockerThatDoesNotReleaseProcess.class, + new String[]{"-Djava.io.tmpdir=" + folder.getRoot().getAbsolutePath()}, + new String[]{"last"}); + if (!process.waitFor(5, TimeUnit.SECONDS)) { + ProcessRunner.printProcessOutput("locker process", process); + fail("Locker process timed out"); + } + if (process.exitValue() != 0) { + ProcessRunner.printProcessOutput("locker process", process); + fail("Locker process failed"); + } + // We should be able to acquire the lock despite the other process not explicitly releasing it + try (final AffinityLock acquired = AffinityLock.acquireLock("last")) { + assertEquals(AffinityLock.PROCESSORS - 1, acquired.cpuId()); } } /** * Repeatedly acquires and releases a lock on the specified core */ - static class RepeatedAffinityLocker { + static class RepeatedAffinityLocker implements Callable { + private static final Logger LOGGER = LoggerFactory.getLogger(RepeatedAffinityLocker.class); private static final long PID = LockCheck.getPID(); + private final int iterations; + private final String cpuIdToLock; - public static void main(String[] args) throws IOException, InterruptedException { + public static void main(String[] args) throws InterruptedException, ExecutionException { String cpuIdToLock = args[0]; int iterations = Integer.parseInt(args[1]); + int threads = Integer.parseInt(args[2]); + + LOGGER.info("Acquiring lock with {} threads, {} iterations", threads, iterations); + ExecutorService executorService = Executors.newFixedThreadPool(threads); + final List> futures = executorService.invokeAll(IntStream.range(0, threads) + .mapToObj(tid -> new RepeatedAffinityLocker(cpuIdToLock, iterations)) + .collect(Collectors.toList())); + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("Executor service didn't shut down"); + } + } + + public RepeatedAffinityLocker(String cpuIdToLock, int iterations) { + this.iterations = iterations; + this.cpuIdToLock = cpuIdToLock; + } + @Override + public Void call() throws Exception { for (int i = 0; i < iterations; i++) { + LOGGER.info("******* Starting iteration {} at {}", i, LocalDateTime.now()); try (final AffinityLock affinityLock = AffinityLock.acquireLock(cpuIdToLock)) { - long lockPID = Long.parseLong(FileLockBasedLockChecker.getInstance().getMetaInfo(affinityLock.cpuId())); - if (lockPID != PID) { - throw new IllegalStateException(format("PID in lock file is not mine (lockPID=%d, myPID=%d)", lockPID, PID)); + if (affinityLock.isAllocated()) { + final String metaInfo = FileLockBasedLockChecker.getInstance().getMetaInfo(affinityLock.cpuId()); + LOGGER.info("Meta info is: " + metaInfo); + long lockPID = Long.parseLong(metaInfo); + if (lockPID != PID) { + throw new IllegalStateException(format("PID in lock file is not mine (lockPID=%d, myPID=%d)", lockPID, PID)); + } + Thread.sleep(ThreadLocalRandom.current().nextInt(50)); + } else { + LOGGER.info("Couldn't get a lock"); } - Thread.sleep(ThreadLocalRandom.current().nextInt(50)); } } + return null; } } @@ -129,4 +192,18 @@ public static void main(String[] args) { } } } + + /** + * Acquires a lock then ends + */ + static class AffinityLockerThatDoesNotReleaseProcess { + private static final Logger LOGGER = LoggerFactory.getLogger(AffinityLockerProcess.class); + + public static void main(String[] args) { + String cpuIdToLock = args[0]; + + final AffinityLock affinityLock = AffinityLock.acquireLock(cpuIdToLock); + LOGGER.info("Got affinity lock " + affinityLock + " at " + LocalDateTime.now() + ", CPU=" + affinityLock.cpuId()); + } + } }