Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSE and mq improvements #2178

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions packages/client/hmi-client/src/services/ClientEventService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ let lastHeartbeat = new Date().valueOf();
*/
let backoffMs = 1000;

/**
* Whether we are currently reconnecting to the SSE endpoint
*/
let reconnecting = false;

/**
* An error that can be retried
*/
Expand Down Expand Up @@ -82,10 +87,14 @@ export async function init(): Promise<void> {
* and reconnects if not
*/
setInterval(async () => {
const config = await getConfiguration();
const heartbeatIntervalMillis = config?.sseHeartbeatIntervalMillis ?? 10000;
if (new Date().valueOf() - lastHeartbeat > heartbeatIntervalMillis) {
await init();
if (!reconnecting) {
const config = await getConfiguration();
const heartbeatIntervalMillis = config?.sseHeartbeatIntervalMillis ?? 10000;
if (new Date().valueOf() - lastHeartbeat > heartbeatIntervalMillis) {
reconnecting = true;
await init();
reconnecting = false;
}
}
}, 1000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class Config {
/**
* If queues should be declared durable. IF running Rabbit inside docker, this should be false
*/
Boolean durableQueues = true;
Boolean durableQueues = false;

@Data
@Accessors(chain = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@Service
@Slf4j
Expand All @@ -44,7 +44,8 @@ public class ClientEventService{
private static final String CLIENT_USER_EVENT_QUEUE = "clientUserEventQueue";
private static final String CLIENT_ALL_USERS_EVENT_QUEUE = "clientAllUsersEventQueue";

final Map<String, SseEmitter> userIdToEmitter = new HashMap<>();
final Map<String, SseEmitter> userIdToEmitter = new ConcurrentHashMap<>();

Queue allUsersQueue;
Queue userQueue;

Expand Down Expand Up @@ -118,7 +119,6 @@ public <T> void sendToUser(final ClientEvent<T> event, final String userId) {
*/
@RabbitListener(
queues = {CLIENT_ALL_USERS_EVENT_QUEUE},
exclusive = true,
concurrency = "1")
void onSendToAllUsersEvent(final Message message, final Channel channel) {
final JsonNode messageJson = decodeMessage(message);
Expand Down Expand Up @@ -152,7 +152,6 @@ void onSendToAllUsersEvent(final Message message, final Channel channel) {
*/
@RabbitListener(
queues = {CLIENT_USER_EVENT_QUEUE},
exclusive = true,
concurrency = "1")
void onSendToUserEvent(final Message message, final Channel channel) throws IOException {
final JsonNode messageJson = decodeMessage(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package software.uncharted.terarium.hmiserver.service;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
Expand All @@ -12,7 +11,6 @@
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import software.uncharted.terarium.hmiserver.configuration.Config;
import software.uncharted.terarium.hmiserver.models.ClientEvent;
import software.uncharted.terarium.hmiserver.models.ClientEventType;
Expand All @@ -21,99 +19,100 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Service
@Slf4j
@RequiredArgsConstructor
public class SimulationEventService {


private final ObjectMapper mapper;
private final ClientEventService clientEventService;
private final Config config;
private final RabbitAdmin rabbitAdmin;

private final Map<String, Set<String>> simulationIdToUserIds = new HashMap<>();

private final static String SCIML_QUEUE = "sciml-queue";
private final static String PYCIEMSS_QUEUE = "simulation-status";


Queue scimlQueue;
Queue pyciemssQueue;


@PostConstruct
void init() {
scimlQueue = new Queue(SCIML_QUEUE, config.getDurableQueues());
rabbitAdmin.declareQueue(scimlQueue);

pyciemssQueue = new Queue(PYCIEMSS_QUEUE, config.getDurableQueues());
rabbitAdmin.declareQueue(pyciemssQueue);

}
public void subscribe(List<String> simulationIds, User user){
for(String simulationId : simulationIds) {
if(!simulationIdToUserIds.containsKey(simulationId)) {
simulationIdToUserIds.put(simulationId, new HashSet<>());
}
simulationIdToUserIds.get(simulationId).add(user.getId());
}

}

public void unsubscribe(List<String> simulationIds, User user){
for(String simulationId : simulationIds)
simulationIdToUserIds.get(simulationId).remove(user.getId());
}


/**
* Lisens for messages to send to a user and if we have the SSE connection, send it
* @param message the message to send
* @param channel the channel to send the message on
* @throws IOException if there was an error sending the message
*/
@RabbitListener(
queues = {SCIML_QUEUE},
exclusive = true,
concurrency = "1")
private void onScimlSendToUserEvent(final Message message, final Channel channel) throws IOException {

final ScimlStatusUpdate update = decodeMessage(message, ScimlStatusUpdate.class);
ClientEvent<ScimlStatusUpdate> status = ClientEvent.<ScimlStatusUpdate>builder().type(ClientEventType.SIMULATION_SCIML).data(update).build();

simulationIdToUserIds.get(update.getId()).forEach(userId -> {
clientEventService.sendToUser(status, userId);
});

}

/**
* Lisens for messages to send to a user and if we have the SSE connection, send it
* @param message the message to send
* @param channel the channel to send the message on
* @throws IOException if there was an error sending the message
*/
@RabbitListener(
queues = {PYCIEMSS_QUEUE},
exclusive = true,
concurrency = "1")
private void onPyciemssSendToUserEvent(final Message message, final Channel channel) throws IOException {
//SimulationIntermediateResultsCiemss
final JsonNode messageJson = decodeMessage(message, JsonNode.class);
ClientEvent<Object> status = ClientEvent.builder().type(ClientEventType.SIMULATION_PYCIEMSS).data(messageJson).build();
clientEventService.sendToAllUsers(status);
}

private <T>T decodeMessage(final Message message, Class<T> clazz) {

try {
return mapper.readValue(message.getBody(), clazz);
} catch (IOException e) {
log.error("Error decoding message", e);
return null;
}
}
private final ObjectMapper mapper;
private final ClientEventService clientEventService;
private final Config config;
private final RabbitAdmin rabbitAdmin;

private final Map<String, Set<String>> simulationIdToUserIds = new ConcurrentHashMap<>();

private final static String SCIML_QUEUE = "sciml-queue";
private final static String PYCIEMSS_QUEUE = "simulation-status";


Queue scimlQueue;
Queue pyciemssQueue;


@PostConstruct
void init() {
scimlQueue = new Queue(SCIML_QUEUE, config.getDurableQueues());
rabbitAdmin.declareQueue(scimlQueue);

pyciemssQueue = new Queue(PYCIEMSS_QUEUE, config.getDurableQueues());
rabbitAdmin.declareQueue(pyciemssQueue);

}

public void subscribe(List<String> simulationIds, User user) {
for (String simulationId : simulationIds) {
if (!simulationIdToUserIds.containsKey(simulationId)) {
simulationIdToUserIds.put(simulationId, new HashSet<>());
}
simulationIdToUserIds.get(simulationId).add(user.getId());
}

}

public void unsubscribe(List<String> simulationIds, User user) {
for (String simulationId : simulationIds)
simulationIdToUserIds.get(simulationId).remove(user.getId());
}


/**
* Lisens for messages to send to a user and if we have the SSE connection, send it
*
* @param message the message to send
* @param channel the channel to send the message on
* @throws IOException if there was an error sending the message
*/
@RabbitListener(
queues = {SCIML_QUEUE},
concurrency = "1")
private void onScimlSendToUserEvent(final Message message, final Channel channel) throws IOException {

final ScimlStatusUpdate update = decodeMessage(message, ScimlStatusUpdate.class);
ClientEvent<ScimlStatusUpdate> status = ClientEvent.<ScimlStatusUpdate>builder().type(ClientEventType.SIMULATION_SCIML).data(update).build();

simulationIdToUserIds.get(update.getId()).forEach(userId -> {
clientEventService.sendToUser(status, userId);
});

}

/**
* Lisens for messages to send to a user and if we have the SSE connection, send it
*
* @param message the message to send
* @param channel the channel to send the message on
* @throws IOException if there was an error sending the message
*/
@RabbitListener(
queues = {PYCIEMSS_QUEUE},
concurrency = "1")
private void onPyciemssSendToUserEvent(final Message message, final Channel channel) throws IOException {
//SimulationIntermediateResultsCiemss
final JsonNode messageJson = decodeMessage(message, JsonNode.class);
ClientEvent<Object> status = ClientEvent.builder().type(ClientEventType.SIMULATION_PYCIEMSS).data(messageJson).build();
clientEventService.sendToAllUsers(status);
}

private <T> T decodeMessage(final Message message, Class<T> clazz) {

try {
return mapper.readValue(message.getBody(), clazz);
} catch (IOException e) {
log.error("Error decoding message", e);
return null;
}
}

}
2 changes: 0 additions & 2 deletions packages/server/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ spring.jackson.default-property-inclusion=NON_NULL
spring.servlet.multipart.max-file-size=100MB
spring.servlet.multipart.max-request-size=100MB
logging.level.org.springframework.web.servlet.mvc.support.DefaultHandlerExceptionResolver=ERROR
logging.level.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer=ERROR
logging.level.org.springframework.amqp.rabbit.connection.CachingConnectionFactory=ERROR

########################################################################################################################
# Microservice configurations
Expand Down
Loading