Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swift: add core #285

Merged
merged 11 commits into from
Feb 11, 2025
3 changes: 2 additions & 1 deletion swift/ITSClient/Sources/ITSCore/MQTT/MQTTClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@

import Foundation

protocol MQTTClient: Actor {
protocol MQTTClient: Sendable {
var isConnected: Bool { get }

init(configuration: MQTTClientConfiguration, messageReceivedHandler: (@Sendable @escaping (MQTTMessage) -> Void))
func connect() async throws(MQTTClientError)
func subscribe(to topic: String) async throws(MQTTClientError)
func unsubscribe(from topic: String) async throws(MQTTClientError)
func disconnect() async throws(MQTTClientError)
func publish(_ message: MQTTMessage) async throws(MQTTClientError)
}
Expand Down
1 change: 1 addition & 0 deletions swift/ITSClient/Sources/ITSCore/MQTT/MQTTClientError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ enum MQTTClientError: Error {
case connectionFailed
case clientNotConnected
case subscriptionFailed
case unsubscriptionFailed
case disconnectionFailed
case sendPayloadFailed
}
19 changes: 13 additions & 6 deletions swift/ITSClient/Sources/ITSCore/MQTT/MQTTNIOClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ import Foundation
import MQTTNIO
import NIOCore

actor MQTTNIOClient: MQTTClient {
final class MQTTNIOClient: MQTTClient {
private let client: MQTTNIO.MQTTClient
private let listenerName = "MQTTNIOClientListener"
private var subscribedTopics = [String]()

var isConnected: Bool {
client.isActive()
Expand Down Expand Up @@ -72,19 +71,27 @@ actor MQTTNIOClient: MQTTClient {
do {
_ = try await client.v5.subscribe(to: [MQTTSubscribeInfoV5(topicFilter: topic,
qos: .atLeastOnce)])
subscribedTopics.append(topic)
} catch {
throw .subscriptionFailed
}
}

func unsubscribe(from topic: String) async throws(MQTTClientError) {
guard isConnected else {
throw .clientNotConnected
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this guard ensures that we can only unsubscribe (but there is the same guard on the subscribe) when the MQTT client is actually connected.

This has too implications, both not very pleasant:

  1. the user of the SDK may not be aware that the MQTT client got disconnected, so they will get an exception; and even if they checked whether the MQTT client was connected before unsubscribing, this is both tedious and not race-free;
  2. the MQTT client does not automatically re-subscribe on its own, and the user of the SDK has to track subscriptions on their own; since they have no way of knowing if/when the MQTT client gets dis/connected, they have no way to know if/when they have to re-subscribe to their topics of interest.

What would be a better solution is to actually keep the list of subscriptions as a "private member" in the MQTT client object, and allow the user of the SDK to manipulate that list at will (by way of calling subscribe() and unsibscribe()). Then when the MQTT client gets connected, it requests those subscriptions to the broker.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first point the library does the same : it launches an exception if not connected so it would be the same result without the guard for all the operations.
I plan to manage the disconnection later because there are two main options:

  1. just trigger a disconnection event for the client. The client must stop publishing messages or update subscriptions and handle a new connection/subscriptions depending its network status (network is on -> reconnect) or a retry mechanism if the disconnection is not related to the mobile network.
  2. Handle this internally with a reconnection mechanism that can connect and resume the subscriptions. In this case, we need to choose the behavior when a message is sent when reconnecting (skip, save to send later ?). This solution is easy to use but opaque and can be related to the use case of the client regarding the behaviors when reconnecting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option retained : 2 for a future PR.
Update subscriptions list and skip published message when reconnecting.


do {
_ = try await client.v5.unsubscribe(from: [topic])
} catch {
throw .unsubscriptionFailed
}
}

func disconnect() async throws(MQTTClientError) {
guard isConnected else { return }

do {
if !subscribedTopics.isEmpty {
_ = try await client.v5.unsubscribe(from: subscribedTopics)
}
try await client.v5.disconnect()
} catch {
throw .disconnectionFailed
Expand Down
43 changes: 35 additions & 8 deletions swift/ITSClient/Tests/ITSCoreTests/MQTTNIOClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct MQTTNIOClientTests {
try await mqttClient.connect()

// Then
#expect(await mqttClient.isConnected)
#expect(mqttClient.isConnected)
}

@Test("MQTT anonymous connection with SSL should succeed")
Expand All @@ -44,7 +44,7 @@ struct MQTTNIOClientTests {
try await mqttClient.connect()

// Then
#expect(await mqttClient.isConnected)
#expect(mqttClient.isConnected)
}

@Test("MQTT authenticated connection should succeed")
Expand All @@ -62,7 +62,7 @@ struct MQTTNIOClientTests {
try await mqttClient.connect()

// Then
#expect(await mqttClient.isConnected)
#expect(mqttClient.isConnected)
}

@Test("MQTT websocket connection should succeed")
Expand All @@ -79,7 +79,7 @@ struct MQTTNIOClientTests {
try await mqttClient.connect()

// Then
#expect(await mqttClient.isConnected)
#expect(mqttClient.isConnected)
}

@Test("MQTT message should be received if published on a subscribed topic")
Expand Down Expand Up @@ -110,6 +110,33 @@ struct MQTTNIOClientTests {
}
}

@Test("MQTT message should not be received if published on a unsubscribed topic")
func mqtt_message_should_not_be_received_if_published_on_a_unsubscribed_topic() async throws {
// Given
let mqttClientConfiguration = MQTTClientConfiguration(host: "test.mosquitto.org",
port: 1883,
clientIdentifier: clientIdentifier,
useSSL: false)
let topic = "its-test-topic"
let payload = "payload"

try await confirmation(expectedCount: 0) { confirmation in
let mqttClient = MQTTNIOClient(configuration: mqttClientConfiguration) { message in
confirmation()
}

// When
try await mqttClient.connect()
try await mqttClient.subscribe(to: topic)
try await mqttClient.unsubscribe(from: topic)
try await mqttClient.publish(MQTTMessage(payload: payload.data(using: .utf8)!,
topic: topic,
userProperty: nil))
// Wait the message
try await Task.sleep(for: .seconds(0.5))
}
}

@Test("MQTT disconnect after connection should succeed")
func mqtt_disconnect_after_connection_should_succeed() async throws {
// Given
Expand All @@ -121,11 +148,11 @@ struct MQTTNIOClientTests {

// When
try await mqttClient.connect()
#expect(await mqttClient.isConnected)
#expect(mqttClient.isConnected)
try await mqttClient.disconnect()

// Then
#expect(await !mqttClient.isConnected)
#expect(!mqttClient.isConnected)
}

@Test("MQTT disconnect after connection and subscription should succeed")
Expand All @@ -139,12 +166,12 @@ struct MQTTNIOClientTests {

// When
try await mqttClient.connect()
#expect(await mqttClient.isConnected)
#expect(mqttClient.isConnected)
try await mqttClient.subscribe(to: "test")
try await mqttClient.disconnect()

// Then
#expect(await !mqttClient.isConnected)
#expect(!mqttClient.isConnected)
}

@Test("MQTT subscription without connection should fail")
Expand Down