Skip to content

Commit

Permalink
Added cluster-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Aug 31, 2023
1 parent 76d1576 commit b8e42b4
Show file tree
Hide file tree
Showing 18 changed files with 113 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,11 @@ public InboundSettings inboundSettings(List<Address> destinations) {
public void inboundSettings(List<Address> destinations, boolean shallPass) {
InboundSettings settings = new InboundSettings(shallPass);

destinations.forEach(destination -> {
inboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
});
destinations.forEach(
destination -> {
inboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
});
}

/**
Expand Down
36 changes: 36 additions & 0 deletions cluster-tests/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-cluster-parent</artifactId>
<version>2.6.18-SNAPSHOT</version>
</parent>

<artifactId>cluster-tests</artifactId>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>scalecube-cluster-testlib</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>scalecube-transport-netty</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.scalecube.transport.netty.websocket.WebsocketTransportFactory;
import java.time.Duration;
import java.util.List;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
Expand Down Expand Up @@ -62,14 +61,14 @@ protected Mono<Void> send(Transport transport, Address to, Message msg) {
*/
protected Mono<Void> send(Transport transport, List<Address> to, Message msg) {
return TransportWrapper.send(transport, to, msg)
.doOnError(
th ->
LOGGER.error(
"Failed to send {} to {} from transport: {}, cause: {}",
msg,
to,
transport,
th.toString()));
.doOnError(
th ->
LOGGER.error(
"Failed to send {} to {} from transport: {}, cause: {}",
msg,
to,
transport,
th.toString()));
}

/**
Expand Down
44 changes: 29 additions & 15 deletions cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ public TransportWrapper(Transport transport) {
this.transport = transport;
}

/**
* Execute request response call.
*
* @param member member
* @param request request
* @return mono result
*/
public Mono<Message> requestResponse(Member member, Message request) {
return connections
.compute(
Expand All @@ -36,6 +43,28 @@ public Mono<Message> requestResponse(Member member, Message request) {
.map(result -> result.message);
}

private Mono<Result> requestResponse(List<Address> addresses, Message request) {
final AtomicInteger currentIndex = new AtomicInteger();
return Mono.defer(
() -> {
final int index = currentIndex.getAndIncrement();
return transport.requestResponse(addresses.get(index), request);
})
.retry(addresses.size() - 1)
.map(
message -> {
final int index = currentIndex.get();
return new Result(addresses.get(index), message);
});
}

/**
* Execute send call.
*
* @param member member
* @param request request
* @return mono result
*/
public Mono<Void> send(Member member, Message request) {
return connections
.compute(
Expand All @@ -53,21 +82,6 @@ public Mono<Void> send(Member member, Message request) {
.then();
}

private Mono<Result> requestResponse(List<Address> addresses, Message request) {
final AtomicInteger currentIndex = new AtomicInteger();
return Mono.defer(
() -> {
final int index = currentIndex.getAndIncrement();
return transport.requestResponse(addresses.get(index), request);
})
.retry(addresses.size() - 1)
.map(
message -> {
final int index = currentIndex.get();
return new Result(addresses.get(index), message);
});
}

private Mono<Result> send(List<Address> addresses, Message request) {
final AtomicInteger currentIndex = new AtomicInteger();
return Mono.defer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;

import io.scalecube.cluster.Member;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.cluster.fdetector.PingData.AckType;
import io.scalecube.cluster.membership.MemberStatus;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.net.Address;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.cluster.fdetector.FailureDetector;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorEvent;
import io.scalecube.cluster.gossip.GossipProtocol;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.net.Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.net.Address;
import java.nio.ByteBuffer;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ public void testSeparateNonEmptyNamespaces() {
.membership(
opts ->
opts.seedMembers(
root.addresses().get(0), root2.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0)))
root.addresses().get(0),
root2.addresses().get(0),
bob.addresses().get(0),
carol.addresses().get(0)))
.startAwait();

Cluster eve =
Expand Down Expand Up @@ -162,15 +165,20 @@ public void testSimpleNamespacesHierarchy() {
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("develop/develop"))
.membership(opts -> opts.seedMembers(rootDevelop.addresses().get(0), bob.addresses().get(0)))
.membership(
opts -> opts.seedMembers(rootDevelop.addresses().get(0), bob.addresses().get(0)))
.startAwait();

Cluster dan =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("develop/develop-2"))
.membership(
opts -> opts.seedMembers(rootDevelop.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0)))
opts ->
opts.seedMembers(
rootDevelop.addresses().get(0),
bob.addresses().get(0),
carol.addresses().get(0)))
.startAwait();

Cluster eve =
Expand All @@ -180,7 +188,10 @@ public void testSimpleNamespacesHierarchy() {
.membership(
opts ->
opts.seedMembers(
rootDevelop.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0), dan.addresses().get(0)))
rootDevelop.addresses().get(0),
bob.addresses().get(0),
carol.addresses().get(0),
dan.addresses().get(0)))
.startAwait();

assertThat(
Expand Down Expand Up @@ -213,7 +224,8 @@ public void testIsolatedParentNamespaces() {
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("a/1/c"))
.membership(opts -> opts.seedMembers(parent1.addresses().get(0), bob.addresses().get(0)))
.membership(
opts -> opts.seedMembers(parent1.addresses().get(0), bob.addresses().get(0)))
.startAwait();

Cluster parent2 =
Expand All @@ -229,7 +241,10 @@ public void testIsolatedParentNamespaces() {
.membership(
opts ->
opts.seedMembers(
parent1.addresses().get(0), parent2.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0)))
parent1.addresses().get(0),
parent2.addresses().get(0),
bob.addresses().get(0),
carol.addresses().get(0)))
.startAwait();

//noinspection unused
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ public void testJoinSeedClusterWithNoExistingSeedMember() {
List<Address> seeds = new ArrayList<>();

seeds.add(Address.from("localhost:1234")); // Not existent
seeds.add(Address.from("localhost:5678")); // Not existent
seeds.add(Address.from("localhost:5678")); // Not existent
seeds.addAll(seedNode.addresses());

Cluster otherNode =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,8 @@ private Future<List<FailureDetectorEvent>> listenNextEventFor(
for (final Address member : addresses) {
final CompletableFuture<FailureDetectorEvent> future = new CompletableFuture<>();
fd.listen()
.filter(event -> event.member().addresses().contains(member)).subscribe(future::complete);
.filter(event -> event.member().addresses().contains(member))
.subscribe(future::complete);
resultFuture.add(future);
}

Expand Down
5 changes: 4 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -55,6 +57,7 @@
<module>cluster-testlib</module>
<module>transport-parent</module>
<module>codec-parent</module>
<module>cluster-tests</module>
</modules>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ public List<Address> sender() {
return Collections.emptyList();
}

return Arrays.stream(headerValue.split(","))
.map(Address::from)
.collect(Collectors.toList());
return Arrays.stream(headerValue.split(",")).map(Address::from).collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -300,8 +298,8 @@ public Builder correlationId(String correlationId) {
*/
public Builder sender(List<Address> addresses) {
return header(
HEADER_SENDER,
addresses.stream().map(Address::toString).collect(Collectors.joining(",")));
HEADER_SENDER,
addresses.stream().map(Address::toString).collect(Collectors.joining(",")));
}

public Message build() {
Expand Down

0 comments on commit b8e42b4

Please sign in to comment.