Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update client event service based on PanTera updates #3839

Merged
merged 7 commits into from
Jun 14, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,27 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import jakarta.annotation.PostConstruct;
import java.io.IOException;
import java.io.Serial;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.connector.ClientAbortException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import software.uncharted.terarium.hmiserver.configuration.Config;
import software.uncharted.terarium.hmiserver.models.ClientEvent;
import software.uncharted.terarium.hmiserver.models.ClientEventType;
import software.uncharted.terarium.hmiserver.models.User;
Expand All @@ -38,19 +35,15 @@
public class ClientEventService {
private final ObjectMapper mapper;
private final RabbitTemplate rabbitTemplate;
private final RabbitAdmin rabbitAdmin;
private final Config config;

@Value("${terarium.client-user-event-queue}")
private String CLIENT_USER_EVENT_QUEUE;
private static final String CLIENT_USER_EVENT_EXCHANGE = "client-user-event-exchange";
private static final String CLIENT_ALL_USERS_EVENT_EXCHANGE = "client-all-users-event-exchange";

@Value("${terarium.client-all-user-event-queue}")
private String CLIENT_ALL_USERS_EVENT_QUEUE;

final Map<String, List<SseEmitter>> userIdToEmitter = new ConcurrentHashMap<>();

Queue allUsersQueue;
Queue userQueue;
/**
* Map of user id to the emitters for that user. Users can have multiple emitters if they have multiple tabs open or
* multiple devices connected
*/
final Map<String, java.util.Queue<SseEmitter>> userIdToEmitters = new ConcurrentHashMap<>();

@Data
@Accessors(chain = true)
Expand All @@ -63,30 +56,45 @@ public static class UserClientEvent<T> implements Serializable {
private ClientEvent<T> event;
}

@PostConstruct
void init() {
allUsersQueue = new Queue(CLIENT_ALL_USERS_EVENT_QUEUE, config.getDurableQueues(), false, false);
rabbitAdmin.declareQueue(allUsersQueue);

userQueue = new Queue(CLIENT_USER_EVENT_QUEUE, config.getDurableQueues(), false, false);
rabbitAdmin.declareQueue(userQueue);
}

/**
* Connects a user to the SSE service
*
* @param user the user to connect
* @return the emitter to send messages to the user
*/
public SseEmitter connect(final User user) {
final java.util.Queue<SseEmitter> emitters =
userIdToEmitters.getOrDefault(user.getId(), new ConcurrentLinkedQueue<>());
final SseEmitter emitter = new SseEmitter();
if (!userIdToEmitter.containsKey(user.getId())) {
userIdToEmitter.put(user.getId(), new ArrayList<>());
}
userIdToEmitter.get(user.getId()).add(emitter);
emitter.onError(e -> {
emitter.complete();
emitters.remove(emitter);
if (emitters.isEmpty()) {
userIdToEmitters.remove(user.getId());
}
});
emitters.add(emitter);
userIdToEmitters.put(user.getId(), emitters);
return emitter;
}

/**
* Disconnects a user from the sse service. Completes all emitters for the user and removes them
*
* @param user the user to disconnect
*/
public void disconnect(final User user) {
final java.util.Queue<SseEmitter> userEmitters = userIdToEmitters.get(user.getId());
if (userEmitters != null) {
try {
userEmitters.forEach(SseEmitter::complete);
} catch (final Exception e) {
log.error("Error disconnecting user", e);
}
userIdToEmitters.remove(user.getId());
}
}

/**
* Sends a message to all users
*
Expand All @@ -96,7 +104,7 @@ public SseEmitter connect(final User user) {
public <T> void sendToAllUsers(final ClientEvent<T> event) {
try {
final String jsonStr = mapper.writeValueAsString(event);
rabbitTemplate.convertAndSend(allUsersQueue.getName(), jsonStr);
rabbitTemplate.convertAndSend(CLIENT_ALL_USERS_EVENT_EXCHANGE, "", jsonStr);
} catch (final IOException e) {
log.error("Error sending all users message", e);
}
Expand All @@ -113,7 +121,7 @@ public <T> void sendToUser(final ClientEvent<T> event, final String userId) {
try {
final String jsonStr = mapper.writeValueAsString(
new UserClientEvent<T>().setEvent(event).setUserId(userId));
rabbitTemplate.convertAndSend(userQueue.getName(), jsonStr);
rabbitTemplate.convertAndSend(CLIENT_USER_EVENT_EXCHANGE, "", jsonStr);
} catch (final JsonProcessingException e) {
log.error("Error sending all users message", e);
}
Expand All @@ -125,43 +133,37 @@ public <T> void sendToUser(final ClientEvent<T> event, final String userId) {
* @param message the message to send
* @param channel the channel to send the message on
*/
// TODO: use anonymous queues, currently this wont behave correctly with
// multiple hmi-server instances. Issue #2679
@RabbitListener(queues = "${terarium.client-all-user-event-queue}", concurrency = "1")
@RabbitListener(
bindings =
@QueueBinding(
value =
@Queue(
value =
"#{T(java.util.UUID).randomUUID().toString().concat('-all-users-event-queue')}",
durable = "false",
autoDelete = "true"),
exchange = @Exchange(value = CLIENT_ALL_USERS_EVENT_EXCHANGE, type = "fanout")))
void onSendToAllUsersEvent(final Message message, final Channel channel) {
final JsonNode messageJson = decodeMessage(message, JsonNode.class);
if (messageJson == null) {
return;
}
synchronized (userIdToEmitter) {
// Send the message to each user connected and remove disconnected users
userIdToEmitter.forEach((userId, emitterList) -> {
send(messageJson, emitterList, userId);
});
}
}

private void send(final Object message, final List<SseEmitter> emitterList, final String userId) {
final List<SseEmitter> emittersToRemove = new ArrayList<>();
emitterList.forEach((emitter) -> {
try {
emitter.send(message);
} catch (final IllegalStateException | ClientAbortException e) {
log.warn("Error sending all users message to user {}. User likely disconnected", userId);
emittersToRemove.add(emitter);
} catch (final IOException e) {
log.error("Error sending all users message to user {}", userId, e);
// Send the message to each user connected and remove disconnected users
userIdToEmitters.forEach((userId, emitters) -> {
final Set<SseEmitter> emittersToRemove = ConcurrentHashMap.newKeySet();
for (final SseEmitter emitter : emitters) {
try {
emitter.send(messageJson);
} catch (final Exception e) {
log.warn("Error sending all users message to user {}. User likely disconnected", userId);
emittersToRemove.add(emitter);
}
}
emitters.removeAll(emittersToRemove);
});

synchronized (userIdToEmitter) {
emittersToRemove.forEach((emitter) -> {
userIdToEmitter.get(userId).remove(emitter);
if (userIdToEmitter.get(userId).size() == 0) {
userIdToEmitter.remove(userId);
}
});
}
// Remove users with no emitters
userIdToEmitters.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}

/**
Expand All @@ -171,24 +173,39 @@ private void send(final Object message, final List<SseEmitter> emitterList, fina
* @param channel the channel to send the message on
* @throws IOException if there was an error sending the message
*/
// TODO: use anonymous queues, currently this wont behave correctly with
// multiple hmi-server instances. Issue #2679
@RabbitListener(
queues = {"${terarium.client-user-event-queue}"},
concurrency = "1")
bindings =
@QueueBinding(
value =
@Queue(
value =
"#{T(java.util.UUID).randomUUID().toString().concat('-user-event-queue')}",
durable = "false",
autoDelete = "true"),
exchange = @Exchange(value = CLIENT_USER_EVENT_EXCHANGE, type = "fanout")))
void onSendToUserEvent(final Message message, final Channel channel) throws IOException {
final JsonNode messageJson = decodeMessage(message, JsonNode.class);
if (messageJson == null) {
return;
}

final String userId = messageJson.at("/userId").asText();
synchronized (userIdToEmitter) {
final List<SseEmitter> emitterList = userIdToEmitter.get(userId);
if (emitterList != null) {
send(messageJson.at("/event"), emitterList, userId);
final java.util.Queue<SseEmitter> emitters = userIdToEmitters.get(userId);
if (emitters != null) {
final Set<SseEmitter> emittersToRemove = ConcurrentHashMap.newKeySet();
for (final SseEmitter emitter : emitters) {
try {
emitter.send(messageJson.at("/event"));
} catch (final Exception e) {
log.warn("Error sending user message to user {}. User likely disconnected", userId);
emittersToRemove.add(emitter);
}
}
emitters.removeAll(emittersToRemove);
}

// Remove users with no emitters
userIdToEmitters.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}

/**
Expand Down Expand Up @@ -225,7 +242,7 @@ public static <T> T decodeMessage(final Message message, final Class<T> clazz) {
* Heartbeat to ensure that the clients are subscribed to the SSE service. If the client does not receive a
* heartbeat within the configured interval, it will attempt to reconnect.
*/
@Scheduled(fixedDelayString = "${terarium.clientConfig.sseHeartbeatIntervalMillis}")
@Scheduled(fixedDelay = 5000L)
public void sendHeartbeat() {
final ClientEvent<Void> event = ClientEvent.<Void>builder()
.type(ClientEventType.HEARTBEAT)
Expand Down
2 changes: 0 additions & 2 deletions packages/server/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ spicedb.target=spicedb.dev.terarium.ai:443
spring.rabbitmq.addresses=amqp://rabbitmq.dev.terarium.ai:5672
spring.rabbitmq.username=${terarium.mq-username}
spring.rabbitmq.password=${terarium.mq-password}
terarium.client-user-event-queue=clientUserEventQueue
terarium.client-all-user-event-queue=clientAllUsersEventQueue
terarium.sciml-queue=scimlQueue
terarium.simulation-status=simulation-status
terarium.simulation.sciml-broadcast-exchange=sciml-broadcast-exchange
Expand Down
Loading