Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3513 from marosmars/reentrant_local_lock
Browse files Browse the repository at this point in the history
Make LocalOnlyLock reentrant
  • Loading branch information
v1r3n authored Mar 11, 2023
2 parents db51392 + eb55096 commit ad1c785
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,16 +34,10 @@ public class LocalOnlyLock implements Lock {

private static final Logger LOGGER = LoggerFactory.getLogger(LocalOnlyLock.class);

private static final CacheLoader<String, Semaphore> LOADER =
new CacheLoader<String, Semaphore>() {
@Override
public Semaphore load(String key) {
return new Semaphore(1, true);
}
};
private static final CacheLoader<String, ReentrantLock> LOADER = key -> new ReentrantLock(true);
private static final ConcurrentHashMap<String, ScheduledFuture<?>> SCHEDULEDFUTURES =
new ConcurrentHashMap<>();
private static final LoadingCache<String, Semaphore> LOCKIDTOSEMAPHOREMAP =
private static final LoadingCache<String, ReentrantLock> LOCKIDTOSEMAPHOREMAP =
Caffeine.newBuilder().build(LOADER);
private static final ThreadGroup THREAD_GROUP = new ThreadGroup("LocalOnlyLock-scheduler");
private static final ThreadFactory THREAD_FACTORY =
Expand All @@ -54,14 +48,14 @@ public Semaphore load(String key) {
@Override
public void acquireLock(String lockId) {
LOGGER.trace("Locking {}", lockId);
LOCKIDTOSEMAPHOREMAP.get(lockId).acquireUninterruptibly();
LOCKIDTOSEMAPHOREMAP.get(lockId).lock();
}

@Override
public boolean acquireLock(String lockId, long timeToTry, TimeUnit unit) {
try {
LOGGER.trace("Locking {} with timeout {} {}", lockId, timeToTry, unit);
return LOCKIDTOSEMAPHOREMAP.get(lockId).tryAcquire(timeToTry, unit);
return LOCKIDTOSEMAPHOREMAP.get(lockId).tryLock(timeToTry, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Expand All @@ -80,7 +74,7 @@ public boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUn
if (acquireLock(lockId, timeToTry, unit)) {
LOGGER.trace("Releasing {} automatically after {} {}", lockId, leaseTime, unit);
SCHEDULEDFUTURES.put(
lockId, SCHEDULER.schedule(() -> releaseLock(lockId), leaseTime, unit));
lockId, SCHEDULER.schedule(() -> deleteLock(lockId), leaseTime, unit));
return true;
}
return false;
Expand All @@ -97,14 +91,13 @@ private void removeLeaseExpirationJob(String lockId) {
@Override
public void releaseLock(String lockId) {
// Synchronized to prevent race condition between semaphore check and actual release
// The check is here to prevent semaphore getting above 1
// e.g. in case when lease runs out but release is also called
synchronized (LOCKIDTOSEMAPHOREMAP) {
if (LOCKIDTOSEMAPHOREMAP.get(lockId).availablePermits() == 0) {
LOGGER.trace("Releasing {}", lockId);
LOCKIDTOSEMAPHOREMAP.get(lockId).release();
removeLeaseExpirationJob(lockId);
if (LOCKIDTOSEMAPHOREMAP.getIfPresent(lockId) == null) {
return;
}
LOGGER.trace("Releasing {}", lockId);
LOCKIDTOSEMAPHOREMAP.get(lockId).unlock();
removeLeaseExpirationJob(lockId);
}
}

Expand All @@ -115,7 +108,7 @@ public void deleteLock(String lockId) {
}

@VisibleForTesting
LoadingCache<String, Semaphore> cache() {
LoadingCache<String, ReentrantLock> cache() {
return LOCKIDTOSEMAPHOREMAP;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
*/
package com.netflix.conductor.core.sync.local;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
Expand All @@ -22,6 +26,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

@Ignore
// Test always times out in CI environment
Expand All @@ -30,38 +35,71 @@ public class LocalOnlyLockTest {
// Lock can be global since it uses global cache internally
private final LocalOnlyLock localOnlyLock = new LocalOnlyLock();

@After
public void tearDown() {
// Clean caches between tests as they are shared globally
localOnlyLock.cache().invalidateAll();
localOnlyLock.scheduledFutures().values().forEach(f -> f.cancel(false));
localOnlyLock.scheduledFutures().clear();
}

@Test
public void testLockUnlock() {
localOnlyLock.acquireLock("a", 100, 1000, TimeUnit.MILLISECONDS);
final boolean a = localOnlyLock.acquireLock("a", 100, 10000, TimeUnit.MILLISECONDS);
assertTrue(a);
assertEquals(localOnlyLock.cache().estimatedSize(), 1);
assertEquals(localOnlyLock.cache().get("a").availablePermits(), 0);
assertEquals(localOnlyLock.cache().get("a").isLocked(), true);
assertEquals(localOnlyLock.scheduledFutures().size(), 1);
localOnlyLock.releaseLock("a");
assertEquals(localOnlyLock.scheduledFutures().size(), 0);
assertEquals(localOnlyLock.cache().get("a").availablePermits(), 1);
assertEquals(localOnlyLock.cache().get("a").isLocked(), false);
localOnlyLock.deleteLock("a");
assertEquals(localOnlyLock.cache().estimatedSize(), 0);
}

@Test(timeout = 10 * 10_000)
public void testLockTimeout() {
localOnlyLock.acquireLock("c", 100, 1000, TimeUnit.MILLISECONDS);
public void testLockTimeout() throws InterruptedException, ExecutionException {
final ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(
() -> {
localOnlyLock.acquireLock("c", 100, 1000, TimeUnit.MILLISECONDS);
})
.get();
assertTrue(localOnlyLock.acquireLock("d", 100, 1000, TimeUnit.MILLISECONDS));
assertFalse(localOnlyLock.acquireLock("c", 100, 1000, TimeUnit.MILLISECONDS));
assertEquals(localOnlyLock.scheduledFutures().size(), 2);
localOnlyLock.releaseLock("c");
executor.submit(
() -> {
localOnlyLock.releaseLock("c");
})
.get();
localOnlyLock.releaseLock("d");
assertEquals(localOnlyLock.scheduledFutures().size(), 0);
}

@Test(timeout = 10 * 10_000)
public void testLockLeaseTime() {
for (int i = 0; i < 10; i++) {
localOnlyLock.acquireLock("a", 1000, 100, TimeUnit.MILLISECONDS);
public void testReleaseFromAnotherThread() throws InterruptedException, ExecutionException {
final ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(
() -> {
localOnlyLock.acquireLock("c", 100, 10000, TimeUnit.MILLISECONDS);
})
.get();
try {
localOnlyLock.releaseLock("c");
} catch (IllegalMonitorStateException e) {
// expected
localOnlyLock.deleteLock("c");
return;
} finally {
executor.submit(
() -> {
localOnlyLock.releaseLock("c");
})
.get();
}
localOnlyLock.acquireLock("a");
assertEquals(0, localOnlyLock.cache().get("a").availablePermits());
localOnlyLock.releaseLock("a");

fail();
}

@Test(timeout = 10 * 10_000)
Expand All @@ -73,15 +111,32 @@ public void testLockLeaseWithRelease() throws Exception {
Thread.sleep(2000);

localOnlyLock.acquireLock("b");
assertEquals(0, localOnlyLock.cache().get("b").availablePermits());
assertEquals(true, localOnlyLock.cache().get("b").isLocked());
localOnlyLock.releaseLock("b");
}

@Test
public void testRelease() {
localOnlyLock.releaseLock("x54as4d2;23'4");
localOnlyLock.releaseLock("x54as4d2;23'4");
assertEquals(1, localOnlyLock.cache().get("x54as4d2;23'4").availablePermits());
assertEquals(false, localOnlyLock.cache().get("x54as4d2;23'4").isLocked());
}

@Test(timeout = 10 * 10_000)
public void testLockLeaseTime() throws InterruptedException {
for (int i = 0; i < 10; i++) {
final Thread thread =
new Thread(
() -> {
localOnlyLock.acquireLock("a", 1000, 100, TimeUnit.MILLISECONDS);
});
thread.start();
thread.join();
}
localOnlyLock.acquireLock("a");
assertTrue(localOnlyLock.cache().get("a").isLocked());
localOnlyLock.releaseLock("a");
localOnlyLock.deleteLock("a");
}

@Test
Expand Down

0 comments on commit ad1c785

Please sign in to comment.