Skip to content

Commit

Permalink
Done with TransportWrapperTest.java
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Sep 1, 2023
1 parent 0f16819 commit 43c0ea7
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 47 deletions.
70 changes: 29 additions & 41 deletions cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class TransportWrapper {

private final Transport transport;

private final Map<Member, Integer> addressIndexByMember = new ConcurrentHashMap<>();
private final Map<Member, AtomicInteger> addressIndexByMember = new ConcurrentHashMap<>();

public TransportWrapper(Transport transport) {
this.transport = transport;
Expand All @@ -27,27 +27,21 @@ public TransportWrapper(Transport transport) {
* @return mono result
*/
public Mono<Message> requestResponse(Member member, Message request) {
final List<Address> addresses = member.addresses();
final AtomicInteger currentIndex =
addressIndexByMember.computeIfAbsent(member, m -> new AtomicInteger());
return Mono.defer(
() -> {
final List<Address> addresses = member.addresses();
final int numRetries = addresses.size() - 1;
final Integer index = addressIndexByMember.getOrDefault(member, 0);
final AtomicInteger currentIndex = new AtomicInteger(index);

return Mono.defer(
() -> {
int increment = currentIndex.getAndIncrement();

if (increment == addresses.size()) {
increment = 0;
currentIndex.set(1);
}

final Address address = addresses.get(increment);
return transport.requestResponse(address, request);
})
.retry(numRetries);
});
() -> {
synchronized (this) {
if (currentIndex.get() == addresses.size()) {
currentIndex.set(0);
}
final Address address = addresses.get(currentIndex.getAndIncrement());
return transport.requestResponse(address, request);
}
})
.retry(addresses.size() - 1)
.doOnError(throwable -> addressIndexByMember.remove(member, currentIndex));
}

/**
Expand All @@ -58,26 +52,20 @@ public Mono<Message> requestResponse(Member member, Message request) {
* @return mono result
*/
public Mono<Void> send(Member member, Message request) {
final List<Address> addresses = member.addresses();
final AtomicInteger currentIndex =
addressIndexByMember.computeIfAbsent(member, m -> new AtomicInteger());
return Mono.defer(
() -> {
final List<Address> addresses = member.addresses();
final int numRetries = addresses.size() - 1;
final Integer index = addressIndexByMember.getOrDefault(member, 0);
final AtomicInteger currentIndex = new AtomicInteger(index);

return Mono.defer(
() -> {
int increment = currentIndex.getAndIncrement();

if (increment == addresses.size()) {
increment = 0;
currentIndex.set(1);
}

final Address address = addresses.get(increment);
return transport.send(address, request);
})
.retry(numRetries);
});
() -> {
synchronized (this) {
if (currentIndex.get() == addresses.size()) {
currentIndex.set(0);
}
final Address address = addresses.get(currentIndex.getAndIncrement());
return transport.send(address, request);
}
})
.retry(addresses.size() - 1)
.doOnError(throwable -> addressIndexByMember.remove(member, currentIndex));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -75,12 +76,12 @@ static void populateBuilder(Builder<Arguments> builder, int size) {
}
}

private Map<Member, Integer> addressIndexByMember()
private Map<Member, AtomicInteger> addressIndexByMember()
throws NoSuchFieldException, IllegalAccessException {
final Field field = TransportWrapper.class.getDeclaredField("addressIndexByMember");
field.setAccessible(true);
//noinspection unchecked
return (Map<Member, Integer>) field.get(transportWrapper);
return (Map<Member, AtomicInteger>) field.get(transportWrapper);
}

@ParameterizedTest
Expand All @@ -94,7 +95,7 @@ void requestResponseShouldWorkByRoundRobin(int size, int startIndex, int success
}

if (startIndex > 0) {
addressIndexByMember().put(member, startIndex);
addressIndexByMember().put(member, new AtomicInteger(startIndex));
}

for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -162,7 +163,7 @@ void requestResponseShouldFailByRoundRobin(int size, int startIndex, int ignore)
}

if (startIndex > 0) {
addressIndexByMember().put(member, startIndex);
addressIndexByMember().put(member, new AtomicInteger(startIndex));
}

for (int i = 0; i < size; i++) {
Expand All @@ -186,7 +187,7 @@ void sendShouldWorkByRoundRobin(int size, int startIndex, int successIndex) thro
}

if (startIndex > 0) {
addressIndexByMember().put(member, startIndex);
addressIndexByMember().put(member, new AtomicInteger(startIndex));
}

for (int i = 0; i < size; i++) {
Expand All @@ -212,7 +213,7 @@ void sendShouldFailByRoundRobin(int size, int startIndex, int ignore) throws Exc
}

if (startIndex > 0) {
addressIndexByMember().put(member, startIndex);
addressIndexByMember().put(member, new AtomicInteger(startIndex));
}

for (int i = 0; i < size; i++) {
Expand Down

0 comments on commit 43c0ea7

Please sign in to comment.