Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/last will delivery #182

Merged
merged 3 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ private void requestActorStop() {

long delay = state.isClientIdGenerated() ? TimeUnit.SECONDS.toMillis(actorConfiguration.getWaitBeforeGeneratedActorStopSeconds())
: TimeUnit.SECONDS.toMillis(actorConfiguration.getWaitBeforeNamedActorStopSeconds());
log.debug("[{}][{}][{}] Scheduling actor stop command with {} delay", state.getClientId(), state.getCurrentSessionId(),
state.getCurrentSessionState(), delay);
systemContext.scheduleMsgWithDelay(ctx, stopActorCommandMsg, delay);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

@Configuration
public class ActorsConfiguration {

@Value("${actors.system.throughput:5}")
private int actorThroughput;
@Value("${actors.system.max-actor-init-attempts:10}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.common.data.SessionInfo;
import org.thingsboard.mqtt.broker.common.util.ThingsBoardExecutors;
import org.thingsboard.mqtt.broker.common.util.ThingsBoardThreadFactory;
Expand All @@ -34,6 +33,7 @@
import org.thingsboard.mqtt.broker.service.mqtt.retain.RetainedMsgProcessor;
import org.thingsboard.mqtt.broker.service.processing.MsgDispatcherService;
import org.thingsboard.mqtt.broker.service.stats.StatsManager;
import org.thingsboard.mqtt.broker.util.MqttPropertiesUtil;

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -75,11 +75,11 @@ public void destroy() {
public void saveLastWillMsg(SessionInfo sessionInfo, PublishMsg publishMsg) {
if (log.isTraceEnabled()) {
log.trace("[{}][{}] Saving last will msg, topic - [{}]",
sessionInfo.getClientInfo().getClientId(), sessionInfo.getSessionId(), publishMsg.getTopicName());
sessionInfo.getClientId(), sessionInfo.getSessionId(), publishMsg.getTopicName());
}
lastWillMessages.compute(sessionInfo.getSessionId(), (sessionId, lastWillMsg) -> {
if (lastWillMsg != null) {
log.error("[{}][{}] Last-will message has been saved already!", sessionInfo.getClientInfo().getClientId(), sessionId);
log.error("[{}][{}] Last-will message has been saved already!", sessionInfo.getClientId(), sessionId);
}
return new MsgWithSessionInfo(publishMsg, sessionInfo);
});
Expand All @@ -91,21 +91,21 @@ public void removeAndExecuteLastWillIfNeeded(UUID sessionId, boolean sendMsg,
MsgWithSessionInfo lastWillMsgWithSessionInfo = lastWillMessages.get(sessionId);
if (lastWillMsgWithSessionInfo == null) {
if (log.isTraceEnabled()) {
log.trace("[{}] No last will msg.", sessionId);
log.trace("[{}] No last will msg found", sessionId);
}
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Removing last will msg, sendMsg - {}", sessionId, sendMsg);
log.debug("[{}][{}] Removing last will msg, sendMsg - {}", lastWillMsgWithSessionInfo.getClientId(), sessionId, sendMsg);
}
lastWillMessages.remove(sessionId);
if (sendMsg) {
int willDelay = getWillDelay(lastWillMsgWithSessionInfo, sessionExpiryInterval);
if (!newSessionCleanStart && willDelay > 0) {
return;
}
scheduleLastWill(lastWillMsgWithSessionInfo, sessionId, willDelay);
scheduleLastWill(lastWillMsgWithSessionInfo, willDelay);
}
}

Expand All @@ -117,18 +117,18 @@ public void cancelLastWillDelayIfScheduled(String clientId) {
}
}

void scheduleLastWill(MsgWithSessionInfo lastWillMsgWithSessionInfo, UUID sessionId, int willDelay) {
ScheduledFuture<?> futureTask = scheduler.schedule(() -> processLastWill(lastWillMsgWithSessionInfo, sessionId), willDelay, TimeUnit.SECONDS);
void scheduleLastWill(MsgWithSessionInfo lastWillMsgWithSessionInfo, int willDelay) {
if (scheduler == null || scheduler.isShutdown()) return;
log.debug("[{}][{}] Schedule last will with delay {}", lastWillMsgWithSessionInfo.getClientId(), lastWillMsgWithSessionInfo.getTopicName(), willDelay);
ScheduledFuture<?> futureTask = scheduler.schedule(() -> processLastWill(lastWillMsgWithSessionInfo), willDelay, TimeUnit.SECONDS);
delayedLastWillFuturesMap.put(getClientId(lastWillMsgWithSessionInfo), futureTask);
}

private int getWillDelay(MsgWithSessionInfo lastWillMsgWithSessionInfo, int sessionExpiryIntervalFromDisconnect) {
SessionInfo sessionInfo = lastWillMsgWithSessionInfo.getSessionInfo();
int sessionExpiryInterval = getSessionExpiryInterval(sessionExpiryIntervalFromDisconnect, sessionInfo);

MqttProperties properties = lastWillMsgWithSessionInfo.getPublishMsg().getProperties();
MqttProperties.IntegerProperty willDelayProperty =
(MqttProperties.IntegerProperty) properties.getProperty(BrokerConstants.WILL_DELAY_INTERVAL_PROP_ID);
MqttProperties.IntegerProperty willDelayProperty = MqttPropertiesUtil.getWillDelayProperty(lastWillMsgWithSessionInfo.getProperties());
if (willDelayProperty != null) {
if (!sessionInfo.isCleanStart() && sessionExpiryInterval == 0) {
return willDelayProperty.value();
Expand All @@ -142,40 +142,53 @@ private int getSessionExpiryInterval(int sessionExpiryIntervalFromDisconnect, Se
return sessionExpiryIntervalFromDisconnect == -1 ? sessionInfo.safeGetSessionExpiryInterval() : sessionExpiryIntervalFromDisconnect;
}

private void processLastWill(MsgWithSessionInfo lastWillMsgWithSessionInfo, UUID sessionId) {
private void processLastWill(MsgWithSessionInfo lastWillMsgWithSessionInfo) {
PublishMsg publishMsg = lastWillMsgWithSessionInfo.getPublishMsg();
if (publishMsg.isRetained()) {
publishMsg = retainedMsgProcessor.process(publishMsg);
}
persistPublishMsg(lastWillMsgWithSessionInfo.getSessionInfo(), publishMsg, sessionId);
persistPublishMsg(lastWillMsgWithSessionInfo.getSessionInfo(), publishMsg);
delayedLastWillFuturesMap.remove(getClientId(lastWillMsgWithSessionInfo));
}

private String getClientId(MsgWithSessionInfo lastWillMsgWithSessionInfo) {
return lastWillMsgWithSessionInfo.getSessionInfo().getClientInfo().getClientId();
return lastWillMsgWithSessionInfo.getClientId();
}

void persistPublishMsg(SessionInfo sessionInfo, PublishMsg publishMsg, UUID sessionId) {
void persistPublishMsg(SessionInfo sessionInfo, PublishMsg publishMsg) {
msgDispatcherService.persistPublishMsg(sessionInfo, publishMsg,
new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
if (log.isTraceEnabled()) {
log.trace("[{}] Successfully acknowledged last will msg.", sessionId);
log.trace("[{}][{}] Successfully acknowledged last will msg.", sessionInfo.getClientId(), sessionInfo.getSessionId());
}
}

@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to acknowledge last will msg.", sessionId, t);
log.warn("[{}][{}] Failed to acknowledge last will msg.", sessionInfo.getClientId(), sessionInfo.getSessionId(), t);
}
});
}

@AllArgsConstructor
@Data
public static class MsgWithSessionInfo {

private final PublishMsg publishMsg;
private final SessionInfo sessionInfo;

private String getClientId() {
return sessionInfo.getClientId();
}

private String getTopicName() {
return publishMsg.getTopicName();
}

private MqttProperties getProperties() {
return publishMsg.getProperties();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package org.thingsboard.mqtt.broker.util;

import io.netty.handler.codec.mqtt.MqttProperties;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.common.data.DevicePublishMsg;
import org.thingsboard.mqtt.broker.common.data.mqtt.MsgExpiryResult;
import org.thingsboard.mqtt.broker.common.data.util.BytesUtil;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.queue.TbQueueMsgHeaders;
import org.thingsboard.mqtt.broker.queue.common.DefaultTbQueueMsgHeaders;
import org.thingsboard.mqtt.broker.service.mqtt.PublishMsg;
Expand Down Expand Up @@ -119,6 +119,10 @@ public static Integer getPayloadFormatIndicatorValue(MqttProperties mqttProperti
return payloadFormatProperty == null ? null : payloadFormatProperty.value();
}

public static MqttProperties.IntegerProperty getWillDelayProperty(MqttProperties mqttProperties) {
return (MqttProperties.IntegerProperty) mqttProperties.getProperty(BrokerConstants.WILL_DELAY_INTERVAL_PROP_ID);
}

public static MqttProperties.StringProperty getContentTypeProperty(MqttProperties mqttProperties) {
return (MqttProperties.StringProperty) mqttProperties.getProperty(BrokerConstants.CONTENT_TYPE_PROP_ID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;
import org.thingsboard.mqtt.broker.common.data.SessionInfo;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.common.data.SessionInfo;
import org.thingsboard.mqtt.broker.service.mqtt.PublishMsg;
import org.thingsboard.mqtt.broker.service.mqtt.retain.RetainedMsgProcessor;
import org.thingsboard.mqtt.broker.service.processing.MsgDispatcherService;
Expand Down Expand Up @@ -65,7 +65,7 @@ public void setUp() {

ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
lastWillService.setScheduler(scheduledExecutorService);
doNothing().when(lastWillService).scheduleLastWill(any(), any(), anyInt());
doNothing().when(lastWillService).scheduleLastWill(any(), anyInt());
}

@Test
Expand Down Expand Up @@ -96,7 +96,7 @@ public void testLastWillMsgNotSent() {
}

private void verifyPersistPublishMsg(VerificationMode mode) {
verify(lastWillService, mode).scheduleLastWill(any(), any(), anyInt());
verify(lastWillService, mode).scheduleLastWill(any(), anyInt());
}

private void saveLastWillMsg() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,12 @@ public Set<TbActorId> getAllActorIds() {
return new HashSet<>(actors.keySet());
}

@Override
public void destroy() {
log.info("Stopping actor system.");
log.info("Stopping actor system dispatchers...");
dispatchers.values().forEach(d -> ThingsBoardExecutors.shutdownAndAwaitTermination(d.getExecutor(), d.getDispatcherId()));
if (scheduler != null) {
log.info("Stopping actor system scheduler...");
ThingsBoardExecutors.shutdownAndAwaitTermination(scheduler, "Actor system scheduler");
}
actors.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ public interface TbActorSystem {
List<TbActorId> filterChildren(TbActorId parent, Predicate<TbActorId> childFilter);

Set<TbActorId> getAllActorIds();

void destroy();
}