Skip to content

Commit

Permalink
Multicast ipv6 support for branch 2.6.x (#3430)
Browse files Browse the repository at this point in the history
*  Multicast demo fails with message "Can't assign requested address
* remove useless code
* Fix multicast registry ut
  • Loading branch information
chickenlj authored and ralf0131 committed Mar 8, 2019
1 parent 00a8edc commit 8683da7
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import com.alibaba.dubbo.common.logger.LoggerFactory;

import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -284,4 +287,34 @@ public static String toURL(String protocol, String host, int port, String path)
return sb.toString();
}

public static void joinMulticastGroup(MulticastSocket multicastSocket, InetAddress multicastAddress) throws IOException {
setInterface(multicastSocket, multicastAddress instanceof Inet6Address);
multicastSocket.setLoopbackMode(false);
multicastSocket.joinGroup(multicastAddress);
}

public static void setInterface(MulticastSocket multicastSocket, boolean preferIpv6) throws IOException {
boolean interfaceSet = false;
Enumeration interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface i = (NetworkInterface) interfaces.nextElement();
Enumeration addresses = i.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = (InetAddress) addresses.nextElement();
if (preferIpv6 && address instanceof Inet6Address) {
multicastSocket.setInterface(address);
interfaceSet = true;
break;
} else if (!preferIpv6 && address instanceof Inet4Address) {
multicastSocket.setInterface(address);
interfaceSet = true;
break;
}
}
if (interfaceSet) {
break;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
Expand Down Expand Up @@ -58,11 +59,11 @@ public class MulticastRegistry extends FailbackRegistry {

private static final int DEFAULT_MULTICAST_PORT = 1234;

private final InetAddress mutilcastAddress;
private final InetAddress multicastAddress;

private final MulticastSocket mutilcastSocket;
private final MulticastSocket multicastSocket;

private final int mutilcastPort;
private final int multicastPort;

private final ConcurrentMap<URL, Set<URL>> received = new ConcurrentHashMap<URL, Set<URL>>();

Expand All @@ -79,23 +80,21 @@ public MulticastRegistry(URL url) {
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
if (!isMulticastAddress(url.getHost())) {
throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0 - 239.255.255.255");
}
try {
mutilcastAddress = InetAddress.getByName(url.getHost());
mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
mutilcastSocket = new MulticastSocket(mutilcastPort);
mutilcastSocket.setLoopbackMode(false);
mutilcastSocket.joinGroup(mutilcastAddress);
multicastAddress = InetAddress.getByName(url.getHost());
checkMulticastAddress(multicastAddress);

multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
multicastSocket = new MulticastSocket(multicastPort);
NetUtils.joinMulticastGroup(multicastSocket, multicastAddress);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
byte[] buf = new byte[2048];
DatagramPacket recv = new DatagramPacket(buf, buf.length);
while (!mutilcastSocket.isClosed()) {
while (!multicastSocket.isClosed()) {
try {
mutilcastSocket.receive(recv);
multicastSocket.receive(recv);
String msg = new String(recv.getData()).trim();
int i = msg.indexOf('\n');
if (i > 0) {
Expand All @@ -104,7 +103,7 @@ public void run() {
MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
Arrays.fill(buf, (byte) 0);
} catch (Throwable e) {
if (!mutilcastSocket.isClosed()) {
if (!multicastSocket.isClosed()) {
logger.error(e.getMessage(), e);
}
}
Expand Down Expand Up @@ -133,6 +132,19 @@ public void run() {
}
}

private void checkMulticastAddress(InetAddress multicastAddress) {
if (!multicastAddress.isMulticastAddress()) {
String message = "Invalid multicast address " + multicastAddress;
if (!(multicastAddress instanceof Inet4Address)) {
throw new IllegalArgumentException(message + ", " +
"ipv4 multicast address scope: 224.0.0.0 - 239.255.255.255.");
} else {
throw new IllegalArgumentException(message + ", " + "ipv6 multicast address must start with ff, " +
"for example: ff01::1");
}
}
}

private static boolean isMulticastAddress(String ip) {
int i = ip.indexOf('.');
if (i > 0) {
Expand Down Expand Up @@ -233,25 +245,25 @@ private void receive(String msg, InetSocketAddress remoteAddress) {

private void broadcast(String msg) {
if (logger.isInfoEnabled()) {
logger.info("Send broadcast message: " + msg + " to " + mutilcastAddress + ":" + mutilcastPort);
logger.info("Send broadcast message: " + msg + " to " + multicastAddress + ":" + multicastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
DatagramPacket hi = new DatagramPacket(data, data.length, mutilcastAddress, mutilcastPort);
mutilcastSocket.send(hi);
DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort);
multicastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

private void unicast(String msg, String host) {
if (logger.isInfoEnabled()) {
logger.info("Send unicast message: " + msg + " to " + host + ":" + mutilcastPort);
logger.info("Send unicast message: " + msg + " to " + host + ":" + multicastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), mutilcastPort);
mutilcastSocket.send(hi);
DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), multicastPort);
multicastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
Expand Down Expand Up @@ -293,7 +305,7 @@ protected void doUnsubscribe(URL url, NotifyListener listener) {
@Override
public boolean isAvailable() {
try {
return mutilcastSocket != null;
return multicastSocket != null;
} catch (Throwable t) {
return false;
}
Expand All @@ -310,8 +322,8 @@ public void destroy() {
logger.warn(t.getMessage(), t);
}
try {
mutilcastSocket.leaveGroup(mutilcastAddress);
mutilcastSocket.close();
multicastSocket.leaveGroup(multicastAddress);
multicastSocket.close();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
Expand Down Expand Up @@ -434,7 +446,7 @@ public List<URL> lookup(URL url) {
}

public MulticastSocket getMutilcastSocket() {
return mutilcastSocket;
return multicastSocket;
}

public Map<URL, Set<URL>> getReceived() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@
import org.junit.Before;
import org.junit.Test;

import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;

public class MulticastRegistryTest {

Expand All @@ -52,7 +53,7 @@ public void setUp() throws Exception {
registry.register(serviceUrl);
}

@Test(expected = IllegalArgumentException.class)
@Test(expected = IllegalStateException.class)
public void testUrlError() {
URL errorUrl = URL.valueOf("multicast://mullticast/");
new MulticastRegistry(errorUrl);
Expand Down Expand Up @@ -124,4 +125,43 @@ public void testDefaultPort() {
}
}

@Test
public void testMulticastAddress() {
InetAddress multicastAddress = null;
MulticastSocket multicastSocket = null;
try {
// ipv4 multicast address
multicastAddress = InetAddress.getByName("224.55.66.77");
multicastSocket = new MulticastSocket(2345);
multicastSocket.setLoopbackMode(false);
NetUtils.setInterface(multicastSocket, false);
multicastSocket.joinGroup(multicastAddress);
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
if (multicastSocket != null) {
multicastSocket.close();
}
}

// multicast ipv6 address,
try {
multicastAddress = InetAddress.getByName("ff01::1");

multicastSocket = new MulticastSocket();
multicastSocket.setLoopbackMode(false);
NetUtils.setInterface(multicastSocket, true);
multicastSocket.joinGroup(multicastAddress);
} catch (Throwable t) {
t.printStackTrace();
} finally {
if (multicastSocket != null) {
multicastSocket.close();
}
}

}


}

0 comments on commit 8683da7

Please sign in to comment.