Skip to content

Commit

Permalink
Make lock acquisition atomic, Fixes #81
Browse files Browse the repository at this point in the history
  • Loading branch information
nicktindall committed Oct 21, 2021
1 parent e8ff714 commit 345865a
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 68 deletions.
9 changes: 2 additions & 7 deletions affinity/src/main/java/net/openhft/affinity/LockCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,10 @@ static int getProcessForCpu(int core) throws IOException {
return EMPTY_PID;
}

static void updateCpu(int cpu) {
static void updateCpu(int cpu) throws IOException {
if (!canOSSupportOperation())
return;
try {
replacePid(cpu, getPID());
} catch (IOException e) {
LOGGER.warn("Failed to update lock file for cpu " + cpu, e);
e.printStackTrace();
}
replacePid(cpu, getPID());
}

public static void releaseLock(int cpu) {
Expand Down
39 changes: 28 additions & 11 deletions affinity/src/main/java/net/openhft/affinity/LockInventory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.NavigableMap;
import java.util.TreeMap;

Expand Down Expand Up @@ -69,9 +70,22 @@ private static boolean isAnyCpu(final int cpuId) {
return cpuId == AffinityLock.ANY_CPU;
}

private static void updateLockForCurrentThread(final boolean bind, final AffinityLock al, final boolean b) {
al.assignCurrentThread(bind, b);
LockCheck.updateCpu(al.cpuId());
/**
* Update the lock for the current thread
*
* @param bind Whether to also bind the thread to the core
* @param al The lock to update
* @param wholeCore Whether to bind the whole core
* @return true if the lock was acquired, false otherwise
*/
private static boolean updateLockForCurrentThread(final boolean bind, final AffinityLock al, final boolean wholeCore) {
try {
LockCheck.updateCpu(al.cpuId());
al.assignCurrentThread(bind, wholeCore);
return true;
} catch (IOException e) {
return false;
}
}

public final synchronized CpuLayout getCpuLayout() {
Expand Down Expand Up @@ -106,8 +120,9 @@ public final synchronized AffinityLock acquireLock(boolean bind, int cpuId, Affi
final boolean specificCpuRequested = !isAnyCpu(cpuId);
if (specificCpuRequested && cpuId != 0) {
final AffinityLock required = logicalCoreLocks[cpuId];
if (required.canReserve(true) && anyStrategyMatches(cpuId, cpuId, strategies)) {
updateLockForCurrentThread(bind, required, false);
if (required.canReserve(true)
&& anyStrategyMatches(cpuId, cpuId, strategies)
&& updateLockForCurrentThread(bind, required, false)) {
return required;
}
LOGGER.warn("Unable to acquire lock on CPU {} for thread {}, trying to find another CPU",
Expand All @@ -119,8 +134,9 @@ public final synchronized AffinityLock acquireLock(boolean bind, int cpuId, Affi
// if you have only one core, this library is not appropriate in any case.
for (int i = logicalCoreLocks.length - 1; i > 0; i--) {
AffinityLock al = logicalCoreLocks[i];
if (al.canReserve(false) && (isAnyCpu(cpuId) || strategy.matches(cpuId, al.cpuId()))) {
updateLockForCurrentThread(bind, al, false);
if (al.canReserve(false)
&& (isAnyCpu(cpuId) || strategy.matches(cpuId, al.cpuId()))
&& updateLockForCurrentThread(bind, al, false)) {
return al;
}
}
Expand All @@ -136,8 +152,8 @@ public final synchronized AffinityLock tryAcquireLock(boolean bind, int cpuId) {
return null;

final AffinityLock required = logicalCoreLocks[cpuId];
if (required.canReserve(true)) {
updateLockForCurrentThread(bind, required, false);
if (required.canReserve(true)
&& updateLockForCurrentThread(bind, required, false)) {
return required;
}

Expand All @@ -156,8 +172,9 @@ public final synchronized AffinityLock acquireCore(boolean bind, int cpuId, Affi
continue LOOP;

final AffinityLock al = als[0];
updateLockForCurrentThread(bind, al, true);
return al;
if (updateLockForCurrentThread(bind, al, true)) {
return al;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public String getMetaInfo(int id) throws IOException {

@NotNull
protected File toFile(int id) {
assert id >= 0;
return new File(tmpDir(), "cpu-" + id + ".lock");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
Expand All @@ -29,7 +30,7 @@ public class FileLockBasedLockChecker extends FileBasedLockChecker {
private static final String OS = System.getProperty("os.name").toLowerCase();

private static final LockChecker instance = new FileLockBasedLockChecker();
private static final HashSet<StandardOpenOption> openOptions = new HashSet<>(Arrays.asList(CREATE_NEW, WRITE, READ, SYNC));
private static final HashSet<StandardOpenOption> openOptions = new HashSet<>(Arrays.asList(CREATE, WRITE, READ, SYNC));
private static final FileAttribute<Set<PosixFilePermission>> fileAttr = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-rw-rw-"));
private final LockReference[] locks = new LockReference[MAX_CPUS_SUPPORTED];

Expand All @@ -47,11 +48,6 @@ public boolean isLockFree(int id) {
}

private boolean isLockFree(File file, int id) {
//if no file exists - nobody has the lock for sure
if (!file.exists()) {
return true;
}

//do we have the lock already?
LockReference existingLock = locks[id];
if (existingLock != null) {
Expand All @@ -60,23 +56,27 @@ private boolean isLockFree(File file, int id) {

//does another process have the lock?
try {
FileChannel fc = FileChannel.open(file.toPath(), WRITE);
FileLock fileLock = fc.tryLock();
if (fileLock == null) {
return false;
// only take a shared lock to test if the file is locked,
// this means processes testing the lock concurrently
// won't interfere with each other
try (FileChannel fc = FileChannel.open(file.toPath(), READ);
FileLock fileLock = fc.tryLock(0, Long.MAX_VALUE, true)) {
if (fileLock == null) {
return false;
}
}
} catch (IOException | OverlappingFileLockException e) {
// file is present but no process is holding the lock
return true;
} catch (OverlappingFileLockException e) {
// another process has the lock
return false;
} catch (NoSuchFileException e) {
// the file doesn't exist, nobody has the lock
return true;
} catch (IOException e) {
LOGGER.error(String.format("Exception occurred whilst trying to check lock on file %s : %s%n", file.getAbsolutePath(), e));
return true; // maybe we should re-throw?
}

//file is present but nobody has it locked - delete it
boolean deleted = file.delete();
if (deleted)
LOGGER.info(String.format("Deleted %s as nobody has the lock", file.getAbsolutePath()));
else
LOGGER.warn(String.format("Nobody has the lock on %s. Delete failed", file.getAbsolutePath()));

return true;
}

@Override
Expand Down Expand Up @@ -117,7 +117,6 @@ public boolean releaseLock(int id) {
locks[id] = null;
lock.lock.release();
lock.channel.close();
toFile(id).delete();
return true;
} catch (IOException e) {
LOGGER.error(String.format("Couldn't release lock for id %d due to exception: %s%n", id, e.getMessage()));
Expand Down Expand Up @@ -157,13 +156,14 @@ private String readMetaInfoFromLockFileChannel(File lockFile, FileChannel lockFi
@NotNull
@Override
protected File toFile(int id) {
assert id >= 0;
File file = super.toFile(id);
try {
if (file.exists() && OS.startsWith("linux")) {
Files.setPosixFilePermissions(file.toPath(), PosixFilePermissions.fromString("rwxrwxrwx"));
}
} catch (IOException e) {
LOGGER.warn("Unable to set file permissions \"rwxrwxrwx\" for {} due to {}", file.toString(), e);
LOGGER.warn("Unable to set file permissions \"rwxrwxrwx\" for {} due to {}", file, e);
}
return file;
}
Expand Down
14 changes: 0 additions & 14 deletions affinity/src/test/java/net/openhft/affinity/AffinityLockTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,20 +247,6 @@ public void shouldReturnLockForSpecifiedCpu() {
assertEquals(AffinityLock.BASE_AFFINITY, Affinity.getAffinity());
}

@Test
public void lockFilesShouldBeRemovedOnRelease() {
if (!Utilities.ISLINUX) {
return;
}
final AffinityLock lock = AffinityLock.acquireLock();

assertThat(Files.exists(Paths.get(lockChecker.doToFile(lock.cpuId()).getAbsolutePath())), is(true));

lock.release();

assertThat(Files.exists(Paths.get(lockChecker.doToFile(lock.cpuId()).getAbsolutePath())), is(false));
}

private void displayStatus() {
System.out.println(Thread.currentThread() + " on " + Affinity.getCpu() + "\n" + AffinityLock.dumpLocks());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void before() {

@After
public void after() {
LockCheck.releaseLock(cpu);
System.setProperty("java.io.tmpdir", TMP);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void before() {
@After
public void after() {
System.setProperty("java.io.tmpdir", TMP);
LockCheck.releaseLock(cpu);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.lang.String.format;
import static net.openhft.affinity.LockCheck.IS_LINUX;
Expand Down Expand Up @@ -70,42 +71,104 @@ public void shouldNotAcquireLockOnCoresLockedByOtherProcesses() throws IOExcepti
}
}

@Ignore("https://github.com/OpenHFT/Java-Thread-Affinity/issues/81")
@Test
public void shouldAllocateCoresCorrectlyUnderContention() throws IOException, InterruptedException {
final int numberOfLockers = Math.max(8, Runtime.getRuntime().availableProcessors());
Assume.assumeTrue(IS_LINUX);
final int numberOfLockers = Math.max(2, Math.min(8, Runtime.getRuntime().availableProcessors())) / 2;
List<Process> lockers = new ArrayList<>();
LOGGER.info("Running test with {} locker processes", numberOfLockers);
for (int i = 0; i < numberOfLockers; i++) {
lockers.add(ProcessRunner.runClass(RepeatedAffinityLocker.class, "last", "100"));
lockers.add(ProcessRunner.runClass(RepeatedAffinityLocker.class,
new String[]{"-Djava.io.tmpdir=" + folder.getRoot().getAbsolutePath()},
new String[]{"last", "30", "2"}));
}
for (int i = 0; i < numberOfLockers; i++) {
if (!lockers.get(i).waitFor(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("Locker process didn't end in time");
final Process process = lockers.get(i);
if (!process.waitFor(20, TimeUnit.SECONDS)) {
ProcessRunner.printProcessOutput("Stalled locking process", process);
fail("Locker process didn't end in time");
}
if (process.exitValue() != 0) {
ProcessRunner.printProcessOutput("Failed locking process", process);
fail("At least one of the locking processes failed, see output above");
}
assertEquals(0, lockers.get(i).exitValue());
assertEquals(0, process.exitValue());
}
}

@Test
public void shouldBeAbleToAcquireLockLeftByOtherProcess() throws IOException, InterruptedException {
Assume.assumeTrue(IS_LINUX);
final Process process = ProcessRunner.runClass(AffinityLockerThatDoesNotReleaseProcess.class,
new String[]{"-Djava.io.tmpdir=" + folder.getRoot().getAbsolutePath()},
new String[]{"last"});
if (!process.waitFor(5, TimeUnit.SECONDS)) {
ProcessRunner.printProcessOutput("locker process", process);
fail("Locker process timed out");
}
if (process.exitValue() != 0) {
ProcessRunner.printProcessOutput("locker process", process);
fail("Locker process failed");
}
// We should be able to acquire the lock despite the other process not explicitly releasing it
try (final AffinityLock acquired = AffinityLock.acquireLock("last")) {
assertEquals(AffinityLock.PROCESSORS - 1, acquired.cpuId());
}
}

/**
* Repeatedly acquires and releases a lock on the specified core
*/
static class RepeatedAffinityLocker {
static class RepeatedAffinityLocker implements Callable<Void> {

private static final Logger LOGGER = LoggerFactory.getLogger(RepeatedAffinityLocker.class);
private static final long PID = LockCheck.getPID();
private final int iterations;
private final String cpuIdToLock;

public static void main(String[] args) throws IOException, InterruptedException {
public static void main(String[] args) throws InterruptedException, ExecutionException {
String cpuIdToLock = args[0];
int iterations = Integer.parseInt(args[1]);
int threads = Integer.parseInt(args[2]);

LOGGER.info("Acquiring lock with {} threads, {} iterations", threads, iterations);
ExecutorService executorService = Executors.newFixedThreadPool(threads);
final List<Future<Void>> futures = executorService.invokeAll(IntStream.range(0, threads)
.mapToObj(tid -> new RepeatedAffinityLocker(cpuIdToLock, iterations))
.collect(Collectors.toList()));
for (Future<Void> future : futures) {
future.get();
}
executorService.shutdown();
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("Executor service didn't shut down");
}
}

public RepeatedAffinityLocker(String cpuIdToLock, int iterations) {
this.iterations = iterations;
this.cpuIdToLock = cpuIdToLock;
}

@Override
public Void call() throws Exception {
for (int i = 0; i < iterations; i++) {
LOGGER.info("******* Starting iteration {} at {}", i, LocalDateTime.now());
try (final AffinityLock affinityLock = AffinityLock.acquireLock(cpuIdToLock)) {
long lockPID = Long.parseLong(FileLockBasedLockChecker.getInstance().getMetaInfo(affinityLock.cpuId()));
if (lockPID != PID) {
throw new IllegalStateException(format("PID in lock file is not mine (lockPID=%d, myPID=%d)", lockPID, PID));
if (affinityLock.isAllocated()) {
final String metaInfo = FileLockBasedLockChecker.getInstance().getMetaInfo(affinityLock.cpuId());
LOGGER.info("Meta info is: " + metaInfo);
long lockPID = Long.parseLong(metaInfo);
if (lockPID != PID) {
throw new IllegalStateException(format("PID in lock file is not mine (lockPID=%d, myPID=%d)", lockPID, PID));
}
Thread.sleep(ThreadLocalRandom.current().nextInt(50));
} else {
LOGGER.info("Couldn't get a lock");
}
Thread.sleep(ThreadLocalRandom.current().nextInt(50));
}
}
return null;
}
}

Expand All @@ -129,4 +192,18 @@ public static void main(String[] args) {
}
}
}

/**
* Acquires a lock then ends
*/
static class AffinityLockerThatDoesNotReleaseProcess {
private static final Logger LOGGER = LoggerFactory.getLogger(AffinityLockerProcess.class);

public static void main(String[] args) {
String cpuIdToLock = args[0];

final AffinityLock affinityLock = AffinityLock.acquireLock(cpuIdToLock);
LOGGER.info("Got affinity lock " + affinityLock + " at " + LocalDateTime.now() + ", CPU=" + affinityLock.cpuId());
}
}
}

0 comments on commit 345865a

Please sign in to comment.