Skip to content

Commit

Permalink
cereal integration final
Browse files Browse the repository at this point in the history
  • Loading branch information
MankaranSingh committed Aug 21, 2022
1 parent ba5a0b1 commit d7a1ae5
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 37 deletions.
34 changes: 0 additions & 34 deletions messaging/java/messaging/Protocols.java

This file was deleted.

39 changes: 39 additions & 0 deletions messaging/java/messaging/Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package messaging;

import java.util.HashMap;
import java.util.Map;

public class Utils {
public static String SHARED_MEMORY = "SHARED_MEMORY";
public static String INTER_PROCESS = "INTER_PROCESS";
public static String TCP = "TCP";
public static final String defaultZMQProtocol = TCP;
public static String defaultZMQAdderess = "127.0.0.1";

public static Map<String, String> interfaces = new HashMap<String, String>() {{
put(SHARED_MEMORY, "inproc://");
put(INTER_PROCESS, "ipc://@");
put(TCP, "tcp://");
}};

public static String getZMQAddress(){
String MESSAGING_ADDRESS_OVERRIDE = System.getenv("ZMQ_MESSAGING_ADDRESS");
return MESSAGING_ADDRESS_OVERRIDE != null ? MESSAGING_ADDRESS_OVERRIDE : defaultZMQAdderess;
}

public static String getZMQProtocol(){
String MESSAGING_PROTOCOL_OVERRIDE = System.getenv("ZMQ_MESSAGING_PROTOCOL");
return MESSAGING_PROTOCOL_OVERRIDE != null ? interfaces.get(MESSAGING_PROTOCOL_OVERRIDE) : interfaces.get(defaultZMQProtocol);
}

public static String getSocketPath(String endpoint){
return getZMQProtocol() + getZMQAddress() + ":" + endpoint;
}

public static String getSHMDir(){
if (System.getProperty("os.name").startsWith("Windows"))
return System.getProperty("java.io.tmpdir");
else
return "@"; // use abstract sockets on unix.
}
}
3 changes: 1 addition & 2 deletions messaging/java/messaging/ZMQPubHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ public boolean createPublisher(String topic){
ZMQ.Socket pub;
int port = portMap.services.get(topic).port;
pub = context.socket(ZMQ.PUB);
pub.bind(Protocols.getSocketPath(Integer.toString(port)));
System.out.println(port);
pub.bind(Utils.getSocketPath(Integer.toString(port)));
this.sockets.put(topic, pub);
logger.info("Publisher created: {}", topic);
return true;
Expand Down
2 changes: 1 addition & 1 deletion messaging/java/messaging/ZMQSubHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public boolean createSubscriber(String topic){
port = ZMQSubHandler.portMap.services.get(topic).port;
socket = context.socket(ZMQ.SUB);
socket.setConflate(ZMQSubHandler.portMap.services.get(topic).keepLast);
socket.connect(Protocols.getSocketPath(Integer.toString(port)));
socket.connect(Utils.getSocketPath(Integer.toString(port)));
socket.subscribe("".getBytes());
poller.register(socket, ZMQ.Poller.POLLIN);
this.sockets.put(topic, socket);
Expand Down

0 comments on commit d7a1ae5

Please sign in to comment.