Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enh external hosts #387

Draft
wants to merge 36 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
884c42e
WIP
artem-v Aug 21, 2023
b757da6
Make compile (WIP)
Aug 24, 2023
2a94fc5
Fix comments, Improve error handler for updateMembership
Aug 30, 2023
2409325
Enabled logging :set back old versions for log4j and slf4j
artem-v Aug 30, 2023
d8f1621
Merge branch 'master' into enh_external_hosts
artem-v Aug 31, 2023
c38a23a
WIP
artem-v Aug 31, 2023
95ea168
WIP
artem-v Aug 31, 2023
76d1576
Done with TransportWrapper
artem-v Aug 31, 2023
b8e42b4
Added cluster-tests
artem-v Aug 31, 2023
7aafb99
Fixed style
artem-v Aug 31, 2023
ef7a7a9
WIP
artem-v Aug 31, 2023
5ded407
WIP
artem-v Aug 31, 2023
b0f3155
WIP
artem-v Aug 31, 2023
78aae14
Set back mockito and junit to old versions
artem-v Aug 31, 2023
78798e0
Added tests to TransportWrapperTest
artem-v Aug 31, 2023
9950b1c
WIP
artem-v Aug 31, 2023
1b0adc0
WIP
artem-v Aug 31, 2023
c06a65f
WIP
artem-v Aug 31, 2023
90e2d8d
WIP
artem-v Aug 31, 2023
fca28ce
Enhance transport tests
artem-v Aug 31, 2023
491f3eb
Enhance transport tests
artem-v Aug 31, 2023
9ed5ba2
Enhance transport tests
artem-v Aug 31, 2023
16fe20e
Cosmetic changes
artem-v Aug 31, 2023
837d09e
WIP
artem-v Sep 1, 2023
93cf26a
Reverted TransportWrapper to its simplest version, fixed tests
artem-v Sep 1, 2023
ad9db19
WIP
artem-v Sep 1, 2023
0f16819
WIP
artem-v Sep 1, 2023
43c0ea7
Done with TransportWrapperTest.java
artem-v Sep 1, 2023
249bd76
Fixed unit test JacksonSmileMessageCodecTest
artem-v Sep 1, 2023
212808f
Fixed unit test JacksonMessageCodecTest
artem-v Sep 1, 2023
22498a7
Enhanced cluster-wrapper
artem-v Sep 1, 2023
838b36f
Added `exposeAddress` TransportConfig setting
artem-v Sep 2, 2023
c540c88
Merge branch 'master' into enh_external_hosts
artem-v Sep 2, 2023
77e8552
Added `exposeAddress` TransportConfig setting
artem-v Sep 2, 2023
ccbe0ed
Refactored io.scalecube.cluster.membership.MembershipProtocolImpl.send
artem-v Sep 2, 2023
28156bb
Minor update
artem-v Sep 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cluster-api/src/main/java/io/scalecube/cluster/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Mono;

Expand All @@ -14,7 +15,7 @@ public interface Cluster {
*
* @return cluster address
*/
Address address();
List<Address> addresses();

/**
* Spreads given message between cluster members using gossiping protocol.
Expand Down
67 changes: 27 additions & 40 deletions cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.cluster.metadata.MetadataCodec;
import io.scalecube.cluster.transport.api.TransportConfig;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -35,8 +37,7 @@ public final class ClusterConfig implements Cloneable {

private String memberId;
private String memberAlias;
private String externalHost;
private Integer externalPort;
private List<String> externalHosts;

private TransportConfig transportConfig = TransportConfig.defaultConfig();
private FailureDetectorConfig failureDetectorConfig = FailureDetectorConfig.defaultConfig();
Expand Down Expand Up @@ -136,27 +137,39 @@ public ClusterConfig metadataCodec(MetadataCodec metadataCodec) {
}

/**
* Returns externalHost. {@code externalHost} is a config property for container environments,
* it's being set for advertising to scalecube cluster some connectable hostname which maps to
* Returns externalHosts. {@code externalHosts} is a config property for container environments,
* it's being set for advertising to scalecube cluster some connectable hostnames which maps to
* scalecube transport's hostname on which scalecube transport is listening.
*
* @return external host
* @return external hosts
*/
public String externalHost() {
return externalHost;
public List<String> externalHosts() {
return externalHosts;
}

/**
* Setter for externalHost. {@code externalHost} is a config property for container environments,
* it's being set for advertising to scalecube cluster some connectable hostname which maps to
* scalecube transport's hostname on which scalecube transport is listening.
* Setter for externalHosts. {@code externalHosts} is a config property for container
* environments, it's being set for advertising to scalecube cluster some connectable hostnames
* which maps to scalecube transport's hostname on which scalecube transport is listening.
*
* @param externalHosts external hosts
* @return new {@code ClusterConfig} instance
*/
public ClusterConfig externalHosts(String... externalHosts) {
return externalHosts(Arrays.asList(externalHosts));
}

/**
* Setter for externalHosts. {@code externalHosts} is a config property for container
* environments, it's being set for advertising to scalecube cluster some connectable hostnames
* which maps to scalecube transport's hostname on which scalecube transport is listening.
*
* @param externalHost external host
* @param externalHosts external hosts
* @return new {@code ClusterConfig} instance
*/
public ClusterConfig externalHost(String externalHost) {
public ClusterConfig externalHosts(List<String> externalHosts) {
ClusterConfig c = clone();
c.externalHost = externalHost;
c.externalHosts = externalHosts;
return c;
}

Expand Down Expand Up @@ -205,31 +218,6 @@ public ClusterConfig memberAlias(String memberAlias) {
return c;
}

/**
* Returns externalPort. {@code externalPort} is a config property for container environments,
* it's being set for advertising to scalecube cluster a port which mapped to scalecube
* transport's listening port.
*
* @return external port
*/
public Integer externalPort() {
return externalPort;
}

/**
* Setter for externalPort. {@code externalPort} is a config property for container environments,
* it's being set for advertising to scalecube cluster a port which mapped to scalecube
* transport's listening port.
*
* @param externalPort external port
* @return new {@code ClusterConfig} instance
*/
public ClusterConfig externalPort(Integer externalPort) {
ClusterConfig c = clone();
c.externalPort = externalPort;
return c;
}

/**
* Applies {@link TransportConfig} settings.
*
Expand Down Expand Up @@ -316,8 +304,7 @@ public String toString() {
.add("metadataCodec=" + metadataCodec)
.add("memberId='" + memberId + "'")
.add("memberAlias='" + memberAlias + "'")
.add("externalHost='" + externalHost + "'")
.add("externalPort=" + externalPort)
.add("externalHosts=" + externalHosts)
.add("transportConfig=" + transportConfig)
.add("failureDetectorConfig=" + failureDetectorConfig)
.add("gossipConfig=" + gossipConfig)
Expand Down
60 changes: 41 additions & 19 deletions cluster-api/src/main/java/io/scalecube/cluster/Member.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;
Expand All @@ -20,24 +23,36 @@ public final class Member implements Externalizable {

private String id;
private String alias;
private Address address;
private List<Address> addresses;
private String namespace;

public Member() {}

/**
* Constructor.
*
* @param id member id; not null
* @param id member id
* @param alias member alias (optional)
* @param address member address; not null
* @param namespace namespace; not null
* @param addresses member addresses
* @param namespace namespace
*/
public Member(String id, String alias, List<Address> addresses, String namespace) {
this.id = Objects.requireNonNull(id, "id");
this.alias = alias;
this.addresses = Objects.requireNonNull(addresses, "addresses");
this.namespace = Objects.requireNonNull(namespace, "namespace");
}

/**
* Constructor.
*
* @param id member id
* @param alias member alias (optional)
* @param address member address
* @param namespace namespace
*/
public Member(String id, String alias, Address address, String namespace) {
this.id = Objects.requireNonNull(id, "member id");
this.alias = alias; // optional
this.address = Objects.requireNonNull(address, "member address");
this.namespace = Objects.requireNonNull(namespace, "member namespace");
this(id, alias, Collections.singletonList(address), namespace);
}

/**
Expand Down Expand Up @@ -70,14 +85,14 @@ public String namespace() {
}

/**
* Returns cluster member address, an address on which this cluster member listens connections
* from other cluster members.
* Returns cluster member addresses, those are addresses on which this cluster member listens
* connections from other cluster members.
*
* @see io.scalecube.cluster.transport.api.TransportConfig#port(int)
* @return member address
*/
public Address address() {
return address;
public List<Address> addresses() {
return addresses;
}

@Override
Expand All @@ -90,13 +105,13 @@ public boolean equals(Object that) {
}
Member member = (Member) that;
return Objects.equals(id, member.id)
&& Objects.equals(address, member.address)
&& Objects.equals(addresses, member.addresses)
&& Objects.equals(namespace, member.namespace);
}

@Override
public int hashCode() {
return Objects.hash(id, address, namespace);
return Objects.hash(id, addresses, namespace);
}

@Override
Expand All @@ -110,7 +125,10 @@ public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(alias);
}
// address
out.writeUTF(address.toString());
out.writeInt(addresses.size());
for (Address address : addresses) {
out.writeUTF(address.toString());
}
// namespace
out.writeUTF(namespace);
}
Expand All @@ -124,8 +142,12 @@ public void readExternal(ObjectInput in) throws IOException {
if (aliasNotNull) {
alias = in.readUTF();
}
// address
address = Address.from(in.readUTF());
// addresses
final int addressesSize = in.readInt();
addresses = new ArrayList<>(addressesSize);
for (int i = 0; i < addressesSize; i++) {
addresses.add(Address.from(in.readUTF()));
}
// namespace
this.namespace = in.readUTF();
}
Expand All @@ -143,9 +165,9 @@ private static String stringifyId(String id) {
public String toString() {
StringJoiner stringJoiner = new StringJoiner(":");
if (alias == null) {
return stringJoiner.add(namespace).add(stringifyId(id) + "@" + address).toString();
return stringJoiner.add(namespace).add(stringifyId(id)).toString();
} else {
return stringJoiner.add(namespace).add(alias).add(stringifyId(id) + "@" + address).toString();
return stringJoiner.add(namespace).add(alias).add(stringifyId(id)).toString();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.scalecube.cluster.utils;

import io.scalecube.cluster.Member;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -213,15 +215,44 @@ public InboundSettings inboundSettings(Address destination) {
return inboundSettings.getOrDefault(destination, defaultInboundSettings);
}

/**
* Returns network inbound settings applied to the given destination.
*
* @param member target member
* @return network inbound settings
*/
public InboundSettings inboundSettings(Member member) {
final List<Address> destinations = member.addresses();

if (destinations.isEmpty()) {
return defaultInboundSettings;
}

for (Address destination : destinations) {
InboundSettings inboundSettings = this.inboundSettings.get(destination);

if (inboundSettings != null) {
return inboundSettings;
}
}

return defaultInboundSettings;
}

/**
* Setter for network emulator inbound settings for specific destination.
*
* @param shallPass shallPass inbound flag
*/
public void inboundSettings(Address destination, boolean shallPass) {
public void inboundSettings(Member member, boolean shallPass) {
final List<Address> destinations = member.addresses();
InboundSettings settings = new InboundSettings(shallPass);
inboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);

destinations.forEach(
destination -> {
inboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.scalecube.cluster.utils;

import io.scalecube.cluster.Member;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
Expand Down Expand Up @@ -49,7 +50,7 @@ public boolean isStopped() {
public Mono<Void> send(Address address, Message message) {
return Mono.defer(
() ->
Mono.just(enhanceWithSender(message))
Mono.just(Message.with(message).build())
.flatMap(msg -> networkEmulator.tryFailOutbound(msg, address))
.flatMap(msg -> networkEmulator.tryDelayOutbound(msg, address))
.flatMap(msg -> transport.send(address, msg)));
Expand All @@ -59,7 +60,7 @@ public Mono<Void> send(Address address, Message message) {
public Mono<Message> requestResponse(Address address, Message request) {
return Mono.defer(
() ->
Mono.just(enhanceWithSender(request))
Mono.just(Message.with(request).build())
.flatMap(msg -> networkEmulator.tryFailOutbound(msg, address))
.flatMap(msg -> networkEmulator.tryDelayOutbound(msg, address))
.flatMap(
Expand All @@ -69,7 +70,9 @@ public Mono<Message> requestResponse(Address address, Message request) {
.flatMap(
message -> {
boolean shallPass =
networkEmulator.inboundSettings(message.sender()).shallPass();
networkEmulator
.inboundSettings((Member) message.sender())
.shallPass();
return shallPass ? Mono.just(message) : Mono.never();
})));
}
Expand All @@ -78,11 +81,7 @@ public Mono<Message> requestResponse(Address address, Message request) {
public Flux<Message> listen() {
return transport
.listen()
.filter(message -> networkEmulator.inboundSettings(message.sender()).shallPass())
.filter(message -> networkEmulator.inboundSettings((Member) message.sender()).shallPass())
.onBackpressureBuffer();
}

private Message enhanceWithSender(Message message) {
return Message.with(message).sender(transport.address()).build();
}
}
Loading
Loading