Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Aug 31, 2023
1 parent d8f1621 commit c38a23a
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.transport.api.TransportFactory;
import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.net.Address;
import io.scalecube.utils.ServiceLoaderUtil;
import java.io.Serializable;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
package io.scalecube.cluster.transport.api;
package io.scalecube.cluster;

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import reactor.core.publisher.Mono;

public class TransportWrapper {

public static Mono<Message> requestResponse(
Transport transport, List<Address> addresses, Message request) {
return requestResponse(transport, addresses, 0, request);
private final Transport transport;

private final Map<Member, Address> connections = new ConcurrentHashMap<>();

public TransportWrapper(Transport transport) {
this.transport = transport;
}

private static Mono<Message> requestResponse(
Transport transport, List<Address> addresses, int currentIndex, Message request) {
public Mono<Message> requestResponse(Member member, Message request) {
// return requestResponse(transport, addresses, 0, request);
connections.computeIfAbsent(member, m -> requestResponse0(m, request));
}

private Address requestResponse0(Member member, Message request) {
return null;
}

private Mono<Message> requestResponse(
List<Address> addresses, int currentIndex, Message request) {
if (currentIndex >= addresses.size()) {
return Mono.error(new RuntimeException("All addresses have been tried and failed"));
}

return transport
.requestResponse(addresses.get(currentIndex), request)
.onErrorResume(th -> requestResponse(transport, addresses, currentIndex + 1, request));
.onErrorResume(th -> requestResponse(addresses, currentIndex + 1, request));
}

public static Mono<Void> send(Transport transport, List<Address> addresses, Message request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.net.Address;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.net.Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.scalecube.cluster.Member;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.net.Address;
import java.nio.ByteBuffer;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import javax.sql.rowset.spi.TransactionalWriter;

/** Base test class. */
public class BaseTest {

Expand Down

0 comments on commit c38a23a

Please sign in to comment.