Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for EJBCLIENT-285 and EJBCLIENT-284 #329

Merged
merged 4 commits into from
Oct 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
<version.org.wildfly.common>1.2.0.Final</version.org.wildfly.common>
<version.org.wildfly.naming.client>1.0.4.Final</version.org.wildfly.naming.client>
<version.org.wildfly.discovery>1.0.0.Final</version.org.wildfly.discovery>
<version.org.wildfly.security.elytron>1.1.0.Final</version.org.wildfly.security.elytron>
<version.org.wildfly.security.elytron>1.1.5.Final</version.org.wildfly.security.elytron>
<version.org.wildfly.transaction-client>1.0.2.Final</version.org.wildfly.transaction-client>

<version.org.jboss.bridger>1.4.Final</version.org.jboss.bridger>
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/jboss/ejb/_private/Logs.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ public interface Logs extends BasicLogger {
@Message(id = 80, value = "Request not sent")
IllegalStateException requestNotSent();

@Message(id = 81, value = "Failed to instantiate cluster node selector class \"%s\"")
IllegalArgumentException cannotInstantiateClustertNodeSelector(String name, @Cause ReflectiveOperationException e);

// Proxy API errors

@Message(id = 100, value = "Object '%s' is not a valid proxy object")
Expand Down Expand Up @@ -416,6 +419,9 @@ public interface Logs extends BasicLogger {
@Message(id = 509, value = "Unexpected exception processing EJB request")
void unexpectedException(@Cause Throwable t);

@Message(id = 510, value = "Failed to configure SSL context")
IOException failedToConfigureSslContext(@Cause Throwable cause);

// Remote messages; no ID for brevity but should be translated

@Message(value = "No such EJB: %s")
Expand Down
17 changes: 15 additions & 2 deletions src/main/java/org/jboss/ejb/client/AbstractInvocationContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.LinkedHashMap;
import java.util.Map;

import javax.transaction.Transaction;

import org.wildfly.common.Assert;
import org.wildfly.transaction.client.AbstractTransaction;

Expand All @@ -41,6 +39,21 @@ public abstract class AbstractInvocationContext extends Attachable {
private Affinity weakAffinity = Affinity.NONE;
private URI destination;
private Affinity targetAffinity;
private String initialCluster;

/**
* Gets the initial cluster assignment by discovery, if any
*
* @return the initial cluster if assigned
*/
public String getInitialCluster() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to make this public?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the remote package needs access

return initialCluster;
}

void setInitialCluster(String initialCluster) {
this.initialCluster = initialCluster;
}

private Map<String, Object> contextData;
private AbstractTransaction transaction;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;

import javax.ejb.NoSuchEJBException;
Expand Down Expand Up @@ -336,31 +338,36 @@ private List<Throwable> doFirstMatchDiscovery(AbstractInvocationContext context,
if (fallbackFilterSpec != null) {
assert context.getLocator().getAffinity() instanceof ClusterAffinity;
Logs.INVOCATION.tracef("Performed first-match discovery, no match, falling back to cluster discovery");
final List<Throwable> problems2 = doClusterDiscovery(context, fallbackFilterSpec);
if (problems2.isEmpty()) {
return problems;
} else if (problems.isEmpty()) {
return problems2;
} else {
final ArrayList<Throwable> problems3 = new ArrayList<>(problems.size() + problems2.size());
problems3.addAll(problems);
problems3.addAll(problems2);
return problems3;
}
return merge(problems, doClusterDiscovery(context, fallbackFilterSpec));
} else {
// no match!
Logs.INVOCATION.tracef("Performed first-match discovery, no match");
}
return problems;
}

private static List<Throwable> merge(List<Throwable> problems, List<Throwable> problems2) {
if (problems2.isEmpty()) {
return problems;
} else if (problems.isEmpty()) {
return problems2;
} else {
final ArrayList<Throwable> problems3 = new ArrayList<>(problems.size() + problems2.size());
problems3.addAll(problems);
problems3.addAll(problems2);
return problems3;
}
}

private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final FilterSpec filterSpec, final EJBLocator<?> locator) {
Logs.INVOCATION.tracef("Performing any discovery(locator = %s, weak affinity = %s, filter spec = %s)", context.getLocator(), context.getWeakAffinity(), filterSpec);
final List<Throwable> problems;
// blacklist
final Set<URI> blacklist = context.getAttachment(BL_KEY);
final Map<URI, String> nodes = new HashMap<>();
final Map<String, URI> uris = new HashMap<>();
final Map<URI, List<String>> clusterAssociations = new HashMap<>();

int nodeless = 0;
try (final ServicesQueue queue = discover(filterSpec)) {
ServiceURL serviceURL;
Expand All @@ -382,7 +389,24 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
nodeless++;
}
}
context.setDestination(location);

// Handle multiple cluster specifications per entry, and also multiple entries with
// cluster specifications that refer to the same URI. Currently multi-membership is
// represented in the latter form, however, handle the first form as well, just in
// case this changes in the future.
final List<AttributeValue> clusters = serviceURL.getAttributeValues(FILTER_ATTR_CLUSTER);
if (clusters != null) {
for (AttributeValue cluster : clusters) {
List<String> list = clusterAssociations.putIfAbsent(location, Collections.singletonList(cluster.toString()));
if (list != null) {
if (!(list instanceof ArrayList)) {
list = new ArrayList<>(list);
clusterAssociations.put(location, list);
}
list.add(cluster.toString());
}
}
}
}
}
problems = queue.getProblems();
Expand All @@ -405,8 +429,7 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
Logs.INVOCATION.tracef("Performed first-match discovery(target affinity(node) = %s, destination = %s)", nodeName, location);
} else if (nodeless == 0) {
// use the deployment node selector
// todo: configure on client context
DeploymentNodeSelector selector = DeploymentNodeSelector.RANDOM;
DeploymentNodeSelector selector = context.getClientContext().getDeploymentNodeSelector();
nodeName = selector.selectNode(nodes.values().toArray(NO_STRINGS), locator.getAppName(), locator.getModuleName(), locator.getDistinctName());
if (nodeName == null) {
throw Logs.INVOCATION.selectorReturnedNull(selector);
Expand All @@ -430,11 +453,31 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
Logs.INVOCATION.tracef("Performed first-match discovery, nodes > 1, URI selector used(target affinity(node) = %s, destination = %s)", nodeName, location);
}

// TODO DeploymentNodeSelector should be enhanced to handle URIs that are members of more than one cluster

// Clients typically do not have an auth policy for nodes which are dynamically discovered
// from cluster topology info. Anytime such a node is selected, we must register the
// associated cluster with the invocation, so that an effective auth config can be
// determined. Randomly pick a cluster if there is more than one.
selectCluster(context, clusterAssociations, location);
context.setDestination(location);
if (nodeName != null) context.setTargetAffinity(new NodeAffinity(nodeName));
return problems;
}

private void selectCluster(AbstractInvocationContext context, Map<URI, List<String>> clusterAssociations, URI location) {
List<String> associations = clusterAssociations.get(location);
String cluster = null;
if (associations != null) {
cluster = (associations.size() == 1) ? associations.get(0) :
associations.get(ThreadLocalRandom.current().nextInt(associations.size()));

}
if (cluster != null) {
context.setInitialCluster(cluster);
}
}

private List<Throwable> doClusterDiscovery(AbstractInvocationContext context, final FilterSpec filterSpec) {
Logs.INVOCATION.tracef("Performing cluster discovery(locator = %s, weak affinity = %s, filter spec = %s)", context.getLocator(), context.getWeakAffinity(), filterSpec);
Map<String, URI> nodes = new HashMap<>();
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/jboss/ejb/client/EJBClientContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ ClusterNodeSelector getClusterNodeSelector() {
return clusterNodeSelector;
}

DeploymentNodeSelector getDeploymentNodeSelector() {
return deploymentNodeSelector;
}

static final class ClassInterceptor {
private final String className;
private final EJBClientInterceptorInformation interceptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.jboss.ejb._private.Logs;
import org.jboss.ejb.client.ClusterNodeSelector;
import org.jboss.ejb.client.DeploymentNodeSelector;
import org.jboss.ejb.client.EJBClientConnection;
import org.jboss.ejb.client.EJBClientContext;
Expand Down Expand Up @@ -72,6 +74,22 @@ public static void configure(final EJBClientContext.Builder builder) {
builder.setDeploymentNodeSelector(deploymentNodeSelector);
}

Map<String, JBossEJBProperties.ClusterConfiguration> clusters = properties.getClusterConfigurations();
if (clusters != null) {
for (JBossEJBProperties.ClusterConfiguration cluster : clusters.values()) {
ExceptionSupplier<ClusterNodeSelector, ReflectiveOperationException> selectorSupplier = cluster.getClusterNodeSelectorSupplier();
if (selectorSupplier != null) {
try {
builder.setClusterNodeSelector(selectorSupplier.get());
} catch (ReflectiveOperationException e) {
throw Logs.MAIN.cannotInstantiateClustertNodeSelector(cluster.getClusterNodeSelectorClassName(), e);
}
// We only support one selector currently
break;
}
}
}

if (properties.getInvocationTimeout() != -1L) {
builder.setInvocationTimeout(properties.getInvocationTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.jboss.ejb.protocol.remote;

import java.net.URI;
import java.util.List;

/**
Expand All @@ -29,7 +30,7 @@ interface DiscoveredNodeRegistry {

List<NodeInformation> getAllNodeInformation();

void addNode(String clusterName, String nodeName);
void addNode(String clusterName, String nodeName, URI registeredBy);

void removeNode(String clusterName, String nodeName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private void processMessage(final MessageInputStream message) {
int memberCount = StreamUtils.readPackedSignedInt32(message);
for (int j = 0; j < memberCount; j ++) {
final String nodeName = message.readUTF();
discoveredNodeRegistry.addNode(clusterName, nodeName);
discoveredNodeRegistry.addNode(clusterName, nodeName, channel.getConnection().getPeerURI());
final NodeInformation nodeInformation = discoveredNodeRegistry.getNodeInformation(nodeName);
Logs.INVOCATION.debugf("Received CLUSTER_TOPOLOGY(%x) message, registering cluster %s to node %s", msg, clusterName, nodeName);

Expand Down
17 changes: 14 additions & 3 deletions src/main/java/org/jboss/ejb/protocol/remote/RemoteEJBReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

import javax.ejb.CreateException;

import org.jboss.ejb.client.AbstractInvocationContext;
import org.jboss.ejb.client.Affinity;
import org.jboss.ejb.client.AttachmentKey;
import org.jboss.ejb.client.ClusterAffinity;
import org.jboss.ejb.client.EJBReceiver;
import org.jboss.ejb.client.EJBReceiverContext;
import org.jboss.ejb.client.EJBReceiverInvocationContext;
Expand Down Expand Up @@ -122,7 +125,7 @@ EJBClientChannel getClientChannel(final Connection connection) throws IOExceptio

protected void processInvocation(final EJBReceiverInvocationContext receiverContext) throws Exception {
final AuthenticationContext authenticationContext = receiverContext.getAuthenticationContext();
final IoFuture<ConnectionPeerIdentity> futureConnection = getConnection(receiverContext.getClientInvocationContext().getDestination(), authenticationContext);
final IoFuture<ConnectionPeerIdentity> futureConnection = getConnection(receiverContext.getClientInvocationContext(), receiverContext.getClientInvocationContext().getDestination(), authenticationContext);
// this actually causes the invocation to move forward
futureConnection.addNotifier(notifier, receiverContext);
}
Expand All @@ -140,7 +143,7 @@ protected SessionID createSession(final EJBReceiverSessionCreationContext contex
final StatelessEJBLocator<?> statelessLocator = context.getClientInvocationContext().getLocator().asStateless();
final AuthenticationContext authenticationContext = context.getAuthenticationContext();
try {
IoFuture<ConnectionPeerIdentity> futureConnection = getConnection(context.getClientInvocationContext().getDestination(), authenticationContext);
IoFuture<ConnectionPeerIdentity> futureConnection = getConnection(context.getClientInvocationContext(), context.getClientInvocationContext().getDestination(), authenticationContext);
final ConnectionPeerIdentity identity = futureConnection.getInterruptibly();
final EJBClientChannel ejbClientChannel = getClientChannel(identity.getConnection());
final StatefulEJBLocator<?> result = ejbClientChannel.openSession(statelessLocator, identity, context.getClientInvocationContext());
Expand Down Expand Up @@ -170,7 +173,15 @@ protected boolean isConnected(final URI uri) {
}
}

private IoFuture<ConnectionPeerIdentity> getConnection(final URI target, @NotNull AuthenticationContext authenticationContext) throws Exception {
private IoFuture<ConnectionPeerIdentity> getConnection(final AbstractInvocationContext context, final URI target, @NotNull AuthenticationContext authenticationContext) throws Exception {
Affinity affinity = context.getLocator().getAffinity();
String cluster = (affinity instanceof ClusterAffinity) ? ((ClusterAffinity) affinity).getClusterName() : context.getInitialCluster();

if (cluster != null) {
return doPrivileged((PrivilegedAction<IoFuture<ConnectionPeerIdentity>>) () ->
discoveredNodeRegistry.getConnectedIdentityUsingClusterEffective(Endpoint.getCurrent(), target, "ejb", "jboss", authenticationContext, cluster));
}

return doPrivileged((PrivilegedAction<IoFuture<ConnectionPeerIdentity>>) () -> Endpoint.getCurrent().getConnectedIdentity(target, "ejb", "jboss", authenticationContext));
}
}
Loading