Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…eX#438)

Issue ReactiveX#273: Added remove method to AbstractRegistry.
Issue ReactiveX#273: Added replace method to AbstractRegistry.
Issue ReactiveX#327: MicroMeter tagged Metric classes are automatically updated when an entry is added, removed or replaced in AbstractRegistry.
  • Loading branch information
RobWin authored Apr 28, 2019
1 parent 3665c70 commit 6e4cb87
Show file tree
Hide file tree
Showing 51 changed files with 1,078 additions and 485 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,19 +204,19 @@ private class BulkheadEventProcessor extends EventProcessor<BulkheadEvent> imple

@Override
public ThreadPoolBulkheadEventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> onCallPermittedEventConsumer) {
registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer);
registerConsumer(BulkheadOnCallPermittedEvent.class.getSimpleName(), onCallPermittedEventConsumer);
return this;
}

@Override
public ThreadPoolBulkheadEventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> onCallRejectedEventConsumer) {
registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer);
registerConsumer(BulkheadOnCallRejectedEvent.class.getSimpleName(), onCallRejectedEventConsumer);
return this;
}

@Override
public ThreadPoolBulkheadEventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> onCallFinishedEventConsumer) {
registerConsumer(BulkheadOnCallFinishedEvent.class, onCallFinishedEventConsumer);
registerConsumer(BulkheadOnCallFinishedEvent.class.getSimpleName(), onCallFinishedEventConsumer);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.core.AbstractRegistry;
import io.github.resilience4j.core.registry.AbstractRegistry;
import io.github.resilience4j.core.ConfigurationNotFoundException;
import io.vavr.collection.Array;
import io.vavr.collection.Seq;
Expand Down Expand Up @@ -62,7 +62,7 @@ public InMemoryBulkheadRegistry(BulkheadConfig defaultConfig) {
*/
@Override
public Seq<Bulkhead> getAllBulkheads() {
return Array.ofAll(targetMap.values());
return Array.ofAll(entryMap.values());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;
import io.github.resilience4j.core.AbstractRegistry;
import io.github.resilience4j.core.registry.AbstractRegistry;
import io.github.resilience4j.core.ConfigurationNotFoundException;
import io.vavr.collection.Array;
import io.vavr.collection.Seq;
Expand Down Expand Up @@ -62,7 +62,7 @@ public InMemoryThreadPoolBulkheadRegistry(ThreadPoolBulkheadConfig defaultConfig
*/
@Override
public Seq<ThreadPoolBulkhead> getAllBulkheads() {
return Array.ofAll(targetMap.values());
return Array.ofAll(entryMap.values());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,19 @@ private class BulkheadEventProcessor extends EventProcessor<BulkheadEvent> imple

@Override
public EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> onCallPermittedEventConsumer) {
registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer);
registerConsumer(BulkheadOnCallPermittedEvent.class.getSimpleName(), onCallPermittedEventConsumer);
return this;
}

@Override
public EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> onCallRejectedEventConsumer) {
registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer);
registerConsumer(BulkheadOnCallRejectedEvent.class.getSimpleName(), onCallRejectedEventConsumer);
return this;
}

@Override
public EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> onCallFinishedEventConsumer) {
registerConsumer(BulkheadOnCallFinishedEvent.class, onCallFinishedEventConsumer);
registerConsumer(BulkheadOnCallFinishedEvent.class.getSimpleName(), onCallFinishedEventConsumer);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,26 @@

import org.junit.Before;
import org.junit.Test;
import org.mockito.BDDMockito;
import org.slf4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import static org.assertj.core.api.BDDAssertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;


public class BulkheadRegistryTest {

private BulkheadConfig config;
private BulkheadRegistry registry;
private Logger LOGGER;
private Consumer<Bulkhead> post_consumer = circuitBreaker -> LOGGER.info("invoking the post consumer1");

@Before
public void setUp() {
LOGGER = mock(Logger.class);
// registry with default config
registry = BulkheadRegistry.ofDefaults();
registry.registerPostCreationConsumer(post_consumer);
// registry with custom config
config = BulkheadConfig.custom()
.maxConcurrentCalls(100)
Expand All @@ -71,7 +66,6 @@ public void shouldReturnTheCorrectName() {
assertThat(bulkhead.getName()).isEqualTo("test");
assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(25);
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(25);
BDDMockito.then(LOGGER).should(times(1)).info("invoking the post consumer1");
}

@Test
Expand All @@ -82,7 +76,6 @@ public void shouldBeTheSameInstance() {

assertThat(bulkhead1).isSameAs(bulkhead2);
assertThat(registry.getAllBulkheads()).hasSize(1);
BDDMockito.then(LOGGER).should(times(1)).info("invoking the post consumer1");
}

@Test
Expand All @@ -93,7 +86,6 @@ public void shouldBeNotTheSameInstance() {

assertThat(bulkhead1).isNotSameAs(bulkhead2);
assertThat(registry.getAllBulkheads()).hasSize(2);
BDDMockito.then(LOGGER).should(times(2)).info("invoking the post consumer1");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,19 @@ private class CacheEventProcessor extends EventProcessor<CacheEvent> implements

@Override
public EventPublisher onCacheHit(EventConsumer<CacheOnHitEvent> eventConsumer) {
registerConsumer(CacheOnHitEvent.class, eventConsumer);
registerConsumer(CacheOnHitEvent.class.getSimpleName(), eventConsumer);
return this;
}

@Override
public EventPublisher onCacheMiss(EventConsumer<CacheOnMissEvent> eventConsumer) {
registerConsumer(CacheOnMissEvent.class, eventConsumer);
registerConsumer(CacheOnMissEvent.class.getSimpleName(), eventConsumer);
return this;
}

@Override
public EventPublisher onError(EventConsumer<CacheOnErrorEvent> eventConsumer) {
registerConsumer(CacheOnErrorEvent.class, eventConsumer);
registerConsumer(CacheOnErrorEvent.class.getSimpleName(), eventConsumer);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,37 +333,37 @@ Clock getClock() {
private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher {
@Override
public EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) {
registerConsumer(CircuitBreakerOnSuccessEvent.class, onSuccessEventConsumer);
registerConsumer(CircuitBreakerOnSuccessEvent.class.getSimpleName(), onSuccessEventConsumer);
return this;
}

@Override
public EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> onErrorEventConsumer) {
registerConsumer(CircuitBreakerOnErrorEvent.class, onErrorEventConsumer);
registerConsumer(CircuitBreakerOnErrorEvent.class.getSimpleName(), onErrorEventConsumer);
return this;
}

@Override
public EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> onStateTransitionEventConsumer) {
registerConsumer(CircuitBreakerOnStateTransitionEvent.class, onStateTransitionEventConsumer);
registerConsumer(CircuitBreakerOnStateTransitionEvent.class.getSimpleName(), onStateTransitionEventConsumer);
return this;
}

@Override
public EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> onResetEventConsumer) {
registerConsumer(CircuitBreakerOnResetEvent.class, onResetEventConsumer);
registerConsumer(CircuitBreakerOnResetEvent.class.getSimpleName(), onResetEventConsumer);
return this;
}

@Override
public EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> onIgnoredErrorEventConsumer) {
registerConsumer(CircuitBreakerOnIgnoredErrorEvent.class, onIgnoredErrorEventConsumer);
registerConsumer(CircuitBreakerOnIgnoredErrorEvent.class.getSimpleName(), onIgnoredErrorEventConsumer);
return this;
}

@Override
public EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> onCallNotPermittedEventConsumer) {
registerConsumer(CircuitBreakerOnCallNotPermittedEvent.class, onCallNotPermittedEventConsumer);
registerConsumer(CircuitBreakerOnCallNotPermittedEvent.class.getSimpleName(), onCallNotPermittedEventConsumer);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.core.AbstractRegistry;
import io.github.resilience4j.core.registry.AbstractRegistry;
import io.github.resilience4j.core.ConfigurationNotFoundException;
import io.vavr.collection.Array;
import io.vavr.collection.Seq;
Expand Down Expand Up @@ -62,7 +62,7 @@ public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {
*/
@Override
public Seq<CircuitBreaker> getAllCircuitBreakers() {
return Array.ofAll(targetMap.values());
return Array.ofAll(entryMap.values());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
package io.github.resilience4j.circuitbreaker.internal;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.core.ConfigurationNotFoundException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.core.ConfigurationNotFoundException;
import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.Mockito.mock;


public class InMemoryCircuitBreakerRegistryTest {
Expand All @@ -29,26 +25,6 @@ public void setUp() {
LOGGER = mock(Logger.class);
}

@Test
public void testPostConsumerBeingCalled() {
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
Consumer<CircuitBreaker> consumer1 = circuitBreaker -> LOGGER.info("invoking the post consumer1");
Consumer<CircuitBreaker> consumer2 = circuitBreaker -> LOGGER.info("invoking the post consumer2");

circuitBreakerRegistry.registerPostCreationConsumer(consumer1);

circuitBreakerRegistry.circuitBreaker("testCircuitBreaker");
circuitBreakerRegistry.circuitBreaker("testCircuitBreaker2", CircuitBreakerConfig.ofDefaults());
circuitBreakerRegistry.circuitBreaker("testCircuitBreaker3", CircuitBreakerConfig::ofDefaults);

then(LOGGER).should(times(3)).info("invoking the post consumer1");

circuitBreakerRegistry.registerPostCreationConsumer(consumer2);
circuitBreakerRegistry.unregisterPostCreationConsumer(consumer1);
circuitBreakerRegistry.circuitBreaker("testCircuitBreaker4");
then(LOGGER).should(times(1)).info("invoking the post consumer2");
}

@Test
public void testAddCircuitBreakerRegistry() {
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,47 @@

import io.github.resilience4j.core.lang.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;

public class EventProcessor<T> implements EventPublisher<T> {

private boolean consumerRegistered;
@Nullable private EventConsumer<T> onEventConsumer;
private ConcurrentMap<Class<? extends T>, EventConsumer<Object>> eventConsumers = new ConcurrentHashMap<>();
List<EventConsumer<T>> onEventConsumers = new CopyOnWriteArrayList<>();
ConcurrentMap<String, List<EventConsumer<T>>> eventConsumerMap = new ConcurrentHashMap<>();

public boolean hasConsumers(){
return consumerRegistered;
}

@SuppressWarnings("unchecked")
public synchronized <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){
consumerRegistered = true;
eventConsumers.put(eventType, (EventConsumer<Object>) eventConsumer);
public synchronized void registerConsumer(String className, EventConsumer<? extends T> eventConsumer){
this.consumerRegistered = true;
this.eventConsumerMap.compute(className, (k, consumers) -> {
if(consumers == null){
consumers = new ArrayList<>();
consumers.add((EventConsumer<T>) eventConsumer);
return consumers;
}else{
consumers.add((EventConsumer<T>) eventConsumer);
return consumers;
}
});
}

@SuppressWarnings("unchecked")
public <E extends T> boolean processEvent(E event) {
boolean consumed = false;
EventConsumer<T> onEventConsumer = this.onEventConsumer;
if(onEventConsumer != null){
onEventConsumer.consumeEvent(event);
if(!onEventConsumers.isEmpty()){
onEventConsumers.forEach(onEventConsumer -> onEventConsumer.consumeEvent(event));
consumed = true;
}
if(!eventConsumers.isEmpty()){
EventConsumer<T> eventConsumer = (EventConsumer<T>) eventConsumers.get(event.getClass());
if(eventConsumer != null){
eventConsumer.consumeEvent(event);
if(!eventConsumerMap.isEmpty()){
List<EventConsumer<T>> eventConsumers = this.eventConsumerMap.get(event.getClass().getSimpleName());
if(eventConsumers != null && !eventConsumers.isEmpty()){
eventConsumers.forEach(consumer -> consumer.consumeEvent(event));
consumed = true;
}
}
Expand All @@ -59,7 +69,7 @@ public <E extends T> boolean processEvent(E event) {

@Override
public synchronized void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
consumerRegistered = true;
this.onEventConsumer = onEventConsumer;
this.consumerRegistered = true;
this.onEventConsumers.add(onEventConsumer);
}
}
Loading

0 comments on commit 6e4cb87

Please sign in to comment.