Skip to content

Commit

Permalink
Add DisConnection class
Browse files Browse the repository at this point in the history
And update examples to show how to use it.
  • Loading branch information
leif81 committed Dec 1, 2022
1 parent 5b2ba3e commit bb1ddc0
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 45 deletions.
88 changes: 88 additions & 0 deletions src/main/java/edu/nps/moves/disutil/DisConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package edu.nps.moves.disutil;

import edu.nps.moves.dis.Pdu;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DisConnection implements Runnable {

private final DatagramSocket socket;
private DatagramPacket packet;
private InetAddress addr;
private final PduFactory pduFactory = new PduFactory();
private final BlockingQueue<Pdu> msgQ = new LinkedBlockingQueue<>();
private static final int MAX_PDU_SIZE = 16384;
private static final int MAX_PDU_QUEUE_SIZE = 500;

public DisConnection(String multicastAddress, int port) throws IOException {
init();
socket = new MulticastSocket(port);
addr = InetAddress.getByName(multicastAddress);
((MulticastSocket) socket).joinGroup(addr);
//((MulticastSocket) socket).setLoopbackMode(true); // disable loopback
}

public DisConnection(int port) throws IOException {
init();
socket = new DatagramSocket(port);
}

private void init() {
byte buffer[] = new byte[MAX_PDU_SIZE];
packet = new DatagramPacket(buffer, buffer.length);
}

public void terminate() {
socket.disconnect();
try {
if (socket instanceof MulticastSocket) {
((MulticastSocket) socket).leaveGroup(socket.getInetAddress());
}
} catch (IOException ex) {
Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, ex);
}
}

protected void handleMessage(Pdu pdu) {
synchronized (msgQ) {
msgQ.add(pdu);
if (msgQ.size() > MAX_PDU_QUEUE_SIZE) {
Logger.getLogger(getClass().getName()).warning("Pdu buffer overflow, clearing.");
msgQ.clear();
}
}
}

public synchronized Pdu getNext() throws InterruptedException {
return msgQ.take();
}

public void send(Pdu pdu) throws IOException {
socket.send(new DatagramPacket(pdu.marshal(), pdu.getLength(), addr, socket.getLocalPort()));
}

@Override
public void run() {
while (!Thread.interrupted()) {
try {
socket.receive(packet);
Pdu pdu = pduFactory.createPdu(packet.getData());
if (pdu != null) {
handleMessage(pdu);
}
} catch (IOException e) {
Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, e);
break;
}
}
terminate();
}
}

69 changes: 33 additions & 36 deletions src/main/java/edu/nps/moves/examples/EspduReceiver.java
Original file line number Diff line number Diff line change
@@ -1,49 +1,46 @@
package edu.nps.moves.examples;

import java.net.*;
import edu.nps.moves.disutil.*;
import edu.nps.moves.dis.*;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Receives PDUs from the network in IEEE format.
*
* @author DMcG
* @version $Id:$
* Example code that receives ESPDUs from the network in IEEE format.
*/
public class EspduReceiver {

public static final int MAX_PDU_SIZE = 16384;

public static final int DIS_PORT = 3000;

public static void main(String args[]) throws IOException {

MulticastSocket socket = new MulticastSocket(DIS_PORT);
InetAddress address = InetAddress.getByName(EspduSender.DEFAULT_MULTICAST_GROUP);
socket.joinGroup(address);

DatagramPacket packet;
PduFactory pduFactory = new PduFactory();

// Loop infinitely, receiving datagrams
while (true) {
byte buffer[] = new byte[MAX_PDU_SIZE];
packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);

Pdu aPdu = pduFactory.createPdu(packet.getData());

System.out.println("Received PDU of type: " + aPdu.getClass().getName());
if (aPdu instanceof EntityStatePdu) {
EntityID eid = ((EntityStatePdu) aPdu).getEntityID();
Vector3Double position = ((EntityStatePdu) aPdu).getEntityLocation();
System.out.println(" Site,App,Id:[" + eid.getSite() + ", " + eid.getApplication() + ", " + eid.getEntity() + "] ");
System.out.println(" Location in DIS coordinates: [" + position.getX() + ", " + position.getY() + ", " + position.getZ() + "]");

final double[] latlon = CoordinateConversions.xyzToLatLonDegrees(position.toArray());
System.out.println(" Location in Latitude Longitude Elevation: [" + latlon[0] + ", " + latlon[1] + ", " + latlon[2] + "]");
public static void main(String args[]) throws IOException, InterruptedException {

DisConnection con = new DisConnection(EspduSender.DEFAULT_MULTICAST_GROUP, EspduSender.DIS_PORT);
new Thread(con).start(); // In this thread we receive pdu's from the network and put them into a queue

// In this thread we take pdu's off the queue and process them.
new Runnable() {

@Override
public void run() {
while (!Thread.interrupted()) {
try {
Pdu pdu = con.getNext();
if (pdu != null) {
System.out.println("Received PDU of type: " + pdu.getClass().getName());
if (pdu instanceof EntityStatePdu) {
EntityID eid = ((EntityStatePdu) pdu).getEntityID();
Vector3Double position = ((EntityStatePdu) pdu).getEntityLocation();
System.out.println(" Site,App,Id:[" + eid.getSite() + ", " + eid.getApplication() + ", " + eid.getEntity() + "] ");
System.out.println(" Location in DIS coordinates: [" + position.getX() + ", " + position.getY() + ", " + position.getZ() + "]");

final double[] latlon = CoordinateConversions.xyzToLatLonDegrees(position.toArray());
System.out.println(" Location in Latitude Longitude Elevation: [" + latlon[0] + ", " + latlon[1] + ", " + latlon[2] + "]");
}
}
} catch (InterruptedException ex) {
Logger.getLogger(DisConnection.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
}.run();
}
}
14 changes: 5 additions & 9 deletions src/main/java/edu/nps/moves/examples/EspduSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import edu.nps.moves.dis.*;
import edu.nps.moves.disutil.CoordinateConversions;
import edu.nps.moves.disutil.DisConnection;
import edu.nps.moves.disutil.DisTime;
import java.util.concurrent.TimeUnit;

Expand All @@ -19,16 +20,13 @@ public class EspduSender {

public static final String DEFAULT_MULTICAST_GROUP = "239.1.2.3";

public static final int DIS_DESTINATION_PORT = 3000;
public static final int DIS_PORT = 3000;

public static final int DIS_HEARTBEAT_SECS = 10;

public static void main(String args[]) throws UnknownHostException, IOException, RuntimeException, InterruptedException {

InetAddress destinationIp = InetAddress.getByName(DEFAULT_MULTICAST_GROUP);

MulticastSocket socket = new MulticastSocket(DIS_DESTINATION_PORT);
socket.joinGroup(destinationIp);
DisConnection con = new DisConnection(DEFAULT_MULTICAST_GROUP, DIS_PORT);

EntityStatePdu espdu = new EntityStatePdu();

Expand Down Expand Up @@ -67,7 +65,7 @@ public static void main(String args[]) throws UnknownHostException, IOException,
double lon = -121.877000;

// Loop through sending N ESPDUs
System.out.println("Sending " + NUMBER_TO_SEND + " ESPDU packets to " + destinationIp.toString() + ". One packet every " + DIS_HEARTBEAT_SECS + " seconds.");
System.out.println("Sending " + NUMBER_TO_SEND + " ESPDU packets to " + DEFAULT_MULTICAST_GROUP + ". One packet every " + DIS_HEARTBEAT_SECS + " seconds.");
for (int idx = 0; idx < NUMBER_TO_SEND; idx++) {
// DIS time is a pain in the ass. DIS time units are 2^31-1 units per
// hour, and time is set to DIS time units from the top of the hour.
Expand Down Expand Up @@ -109,9 +107,7 @@ public static void main(String args[]) throws UnknownHostException, IOException,
// You can set other ESPDU values here, such as the velocity, acceleration,
// and so on.


DatagramPacket packet = new DatagramPacket(espdu.marshal(), espdu.getLength(), destinationIp, socket.getLocalPort());
socket.send(packet);
con.send(espdu);

location = espdu.getEntityLocation();

Expand Down

0 comments on commit bb1ddc0

Please sign in to comment.