Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

github.com/Netflix/Hystrix/issues/118 #119

Merged
merged 1 commit into from
Feb 28, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 164 additions & 6 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -1585,7 +1585,7 @@ public ExecutionResult getExecutionResult() {
*/
private static class TryableSemaphore {
private final HystrixProperty<Integer> numberOfPermits;
private static AtomicInteger count = new AtomicInteger(0);
private final AtomicInteger count = new AtomicInteger(0);

public TryableSemaphore(HystrixProperty<Integer> numberOfPermits) {
this.numberOfPermits = numberOfPermits;
Expand Down Expand Up @@ -3281,12 +3281,15 @@ public void testExecutionSemaphoreWithQueue() {

final AtomicBoolean exceptionReceived = new AtomicBoolean();

final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(new Runnable() {

@Override
public void run() {
try {
new TestSemaphoreCommand(circuitBreaker, 1, 200).queue().get();
new TestSemaphoreCommand(circuitBreaker, semaphore, 200).queue().get();
} catch (Exception e) {
e.printStackTrace();
exceptionReceived.set(true);
Expand Down Expand Up @@ -3350,12 +3353,15 @@ public void testExecutionSemaphoreWithExecution() {

final AtomicBoolean exceptionReceived = new AtomicBoolean();

final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(new Runnable() {

@Override
public void run() {
try {
results.add(new TestSemaphoreCommand(circuitBreaker, 1, 200).execute());
results.add(new TestSemaphoreCommand(circuitBreaker, semaphore, 200).execute());
} catch (Exception e) {
e.printStackTrace();
exceptionReceived.set(true);
Expand Down Expand Up @@ -3469,6 +3475,107 @@ public void run() {
assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
}

/**
* Tests that semaphores are counted separately for commands with unique keys
*/
@Test
public void testSemaphorePermitsInUse() {
final TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();

// this semaphore will be shared across multiple command instances
final TryableSemaphore sharedSemaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(3));

// used to wait until all commands have started
final CountDownLatch startLatch = new CountDownLatch(sharedSemaphore.numberOfPermits.get() + 1);

// used to signal that all command can finish
final CountDownLatch sharedLatch = new CountDownLatch(1);

final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, sharedSemaphore, startLatch, sharedLatch).execute();
} catch (Exception e) {
e.printStackTrace();
}
}
});

// creates group of threads each using command sharing a single semaphore

// I create extra threads and commands so that I can verify that some of them fail to obtain a semaphore
final int sharedThreadCount = sharedSemaphore.numberOfPermits.get() * 2;
final Thread[] sharedSemaphoreThreads = new Thread[sharedThreadCount];
for(int i=0; i<sharedThreadCount; i++) {
sharedSemaphoreThreads[i] = new Thread(sharedSemaphoreRunnable);
}

// creates thread using isolated semaphore
final TryableSemaphore isolatedSemaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));

final CountDownLatch isolatedLatch = new CountDownLatch(1);

// tracks failures to obtain semaphores
final AtomicInteger failureCount = new AtomicInteger();

final Thread isolatedThread = new Thread(new HystrixContextRunnable(new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, isolatedSemaphore, startLatch, isolatedLatch).execute();
} catch (Exception e) {
e.printStackTrace();
failureCount.incrementAndGet();
}
}
}));

// verifies no permits in use before starting threads
assertEquals("wrong number of permits for shared semaphore", 0, sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("wrong number of permits for isolated semaphore", 0, isolatedSemaphore.getNumberOfPermitsUsed());

for(int i=0; i<sharedThreadCount; i++) {
sharedSemaphoreThreads[i].start();
}
isolatedThread.start();

// waits until all commands have started
try {
startLatch.await(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

// verifies that all semaphores are in use
assertEquals("wrong number of permits for shared semaphore",
sharedSemaphore.numberOfPermits.get().longValue(), sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("wrong number of permits for isolated semaphore",
isolatedSemaphore.numberOfPermits.get().longValue(), isolatedSemaphore.getNumberOfPermitsUsed());

// signals commands to finish
sharedLatch.countDown();
isolatedLatch.countDown();

try {
for(int i=0; i<sharedThreadCount; i++) {
sharedSemaphoreThreads[i].join();
}
isolatedThread.join();
} catch (Exception e) {
e.printStackTrace();
fail("failed waiting on threads");
}

// verifies no permits in use after finishing threads
assertEquals("wrong number of permits for shared semaphore", 0, sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("wrong number of permits for isolated semaphore", 0, isolatedSemaphore.getNumberOfPermitsUsed());

// verifies that some executions failed
final int expectedFailures = sharedSemaphore.getNumberOfPermitsUsed();
assertEquals("failures expected but did not happen", expectedFailures, failureCount.get());
}

/**
* Test that HystrixOwner can be passed in dynamically.
*/
Expand Down Expand Up @@ -5044,7 +5151,10 @@ public void testExecutionHookSuccessfulCommandWithSemaphoreIsolation() {
@Test
public void testExecutionHookFailureWithSemaphoreIsolation() {
/* test with execute() */
TestSemaphoreCommand command = new TestSemaphoreCommand(new TestCircuitBreaker(), 0, 200);
final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(0));

TestSemaphoreCommand command = new TestSemaphoreCommand(new TestCircuitBreaker(), semaphore, 200);
try {
command.execute();
fail("we expect a failure");
Expand Down Expand Up @@ -5678,8 +5788,15 @@ private TestSemaphoreCommand(TestCircuitBreaker circuitBreaker, int executionSem
super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)
.setCommandPropertiesDefaults(HystrixCommandProperties.Setter.getUnitTestPropertiesSetter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
.withExecutionIsolationSemaphoreMaxConcurrentRequests(executionSemaphoreCount))
.setExecutionSemaphore(new TryableSemaphore(HystrixProperty.Factory.asProperty(executionSemaphoreCount))));
.withExecutionIsolationSemaphoreMaxConcurrentRequests(executionSemaphoreCount)));
this.executionSleep = executionSleep;
}

private TestSemaphoreCommand(TestCircuitBreaker circuitBreaker, TryableSemaphore semaphore, long executionSleep) {
super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)
.setCommandPropertiesDefaults(HystrixCommandProperties.Setter.getUnitTestPropertiesSetter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))
.setExecutionSemaphore(semaphore));
this.executionSleep = executionSleep;
}

Expand All @@ -5692,7 +5809,48 @@ protected Boolean run() {
}
return true;
}
}

/**
* Semaphore based command that allows caller to use latches to know when it has started and signal when it
* would like the command to finish
*/
private static class LatchedSemaphoreCommand extends TestHystrixCommand<Boolean> {

private final CountDownLatch startLatch, waitLatch;

/**
*
* @param circuitBreaker
* @param semaphore
* @param startLatch this command calls {@link java.util.concurrent.CountDownLatch#countDown()} immediately
* upon running
* @param waitLatch this command calls {@link java.util.concurrent.CountDownLatch#await()} once it starts
* to run. The caller can use the latch to signal the command to finish
*/
private LatchedSemaphoreCommand(TestCircuitBreaker circuitBreaker, TryableSemaphore semaphore,
CountDownLatch startLatch, CountDownLatch waitLatch) {
super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)
.setCommandPropertiesDefaults(HystrixCommandProperties.Setter.getUnitTestPropertiesSetter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))
.setExecutionSemaphore(semaphore));
this.startLatch = startLatch;
this.waitLatch = waitLatch;
}

@Override
protected Boolean run() {
// signals caller that run has started
this.startLatch.countDown();

try {
// waits for caller to countDown latch
this.waitLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
return true;
}
}

/**
Expand Down