diff --git a/src/main/kotlin/online/danielstefani/paddy/cron/ScheduledPingPong.kt b/src/main/kotlin/online/danielstefani/paddy/cron/ScheduledPingPong.kt index 249dc9f..cd04e16 100644 --- a/src/main/kotlin/online/danielstefani/paddy/cron/ScheduledPingPong.kt +++ b/src/main/kotlin/online/danielstefani/paddy/cron/ScheduledPingPong.kt @@ -11,7 +11,7 @@ class ScheduledPingPong( // @Scheduled(every = "15s") fun pingDevicesPeriodically() { - rxMqttClient.publish("device-reads", "{\"message\":\"Ping!\"}") - ?.subscribe() +// rxMqttClient.publish("device-reads", "{\"message\":\"Ping!\"}") +// ?.subscribe() } } \ No newline at end of file diff --git a/src/main/kotlin/online/danielstefani/paddy/daemon/DaemonService.kt b/src/main/kotlin/online/danielstefani/paddy/daemon/DaemonService.kt index 8f9f028..dc7cd02 100644 --- a/src/main/kotlin/online/danielstefani/paddy/daemon/DaemonService.kt +++ b/src/main/kotlin/online/danielstefani/paddy/daemon/DaemonService.kt @@ -1,5 +1,6 @@ package online.danielstefani.paddy.daemon +import com.hivemq.client.mqtt.datatypes.MqttQos import io.smallrye.mutiny.Uni import jakarta.enterprise.context.ApplicationScoped import online.danielstefani.paddy.daemon.dto.CreateDaemonResponse @@ -14,7 +15,7 @@ import org.eclipse.microprofile.rest.client.inject.RestClient class DaemonService( private val daemonRepository: DaemonRepository, private val userRepository: UserRepository, - private val rxMqttClient: RxMqttClient, + private val mqtt: RxMqttClient, @RestClient private val paddyAuth: JwtAuthClient ) { fun getDaemon(daemonId: String): Daemon? { @@ -47,10 +48,10 @@ class DaemonService( fun toggleDaemon(username: String, daemonId: String): Boolean { val user = userRepository.get(username) - daemonRepository.updateUserDaemon(user!!, daemonId) + val daemon = daemonRepository.updateUserDaemon(user!!, daemonId) { it.on = !it.on } ?: return false - rxMqttClient.publish("toggle", daemonId) + mqtt.publish(daemonId, "toggle", if (daemon.on) "1" else "0", MqttQos.EXACTLY_ONCE) ?.subscribe() return true diff --git a/src/main/kotlin/online/danielstefani/paddy/mqtt/MqttController.kt b/src/main/kotlin/online/danielstefani/paddy/mqtt/MqttController.kt index 493aa95..cd632ff 100644 --- a/src/main/kotlin/online/danielstefani/paddy/mqtt/MqttController.kt +++ b/src/main/kotlin/online/danielstefani/paddy/mqtt/MqttController.kt @@ -14,7 +14,7 @@ class MqttController( fun ping(daemonId: String, body: String?) { val on = daemonService.getDaemon(daemonId)?.on ?: return - mqtt.publish(daemonId, if (on) "1" else "0", MqttQos.EXACTLY_ONCE) + mqtt.publish(daemonId, "toggle", if (on) "1" else "0", MqttQos.EXACTLY_ONCE) ?.subscribe() } diff --git a/src/main/kotlin/online/danielstefani/paddy/mqtt/RxMqttClient.kt b/src/main/kotlin/online/danielstefani/paddy/mqtt/RxMqttClient.kt index 828a6d6..44b7720 100644 --- a/src/main/kotlin/online/danielstefani/paddy/mqtt/RxMqttClient.kt +++ b/src/main/kotlin/online/danielstefani/paddy/mqtt/RxMqttClient.kt @@ -55,6 +55,7 @@ class RxMqttClient( fun publish( daemonId: String, + action: String, message: String = "", qos: MqttQos = MqttQos.AT_MOST_ONCE, ): Flowable? { @@ -162,7 +163,7 @@ class RxMqttClient( } .doOnNext { // If we didn't send the message - to prevent loops - if (it.topic.toString() != mqttConfig.deviceReadTopic()) { + if (it.topic.toString().startsWith(mqttConfig.deviceReadTopic())) { mqttRouter.route(it) } }