Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swift: add core #285

Merged
merged 11 commits into from
Feb 11, 2025
196 changes: 196 additions & 0 deletions swift/ITSClient/Sources/ITSCore/Core.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Software Name : ITSClient
* SPDX-FileCopyrightText: Copyright (c) Orange SA
* SPDX-License-Identifier: MIT
*
* This software is distributed under the MIT license,
* see the "LICENSE.txt" file for more details or https://opensource.org/license/MIT/
*
* Software description: Swift ITS client.
*/

import Foundation

/// An object that manages a MQTT client and a telemetry client.
/// Depending the configuration, MQTT message publishing and reception might be automatically traced.
public actor Core {
private var mqttClient: MQTTClient?
private var telemetryClient: TelemetryClient?
private var continuationsByTopic: [String: AsyncStream<CoreMQTTMessage>.Continuation]
private let spanName = "IoT3 Core MQTT Message"
private let traceParentProperty = "traceparent"

/// Initializes a `Core`.
public init() {
continuationsByTopic = [:]
}

/// Starts the `Core` with a configuration to connect to a MQTT server and initialize the telemetry client.
/// - Parameter coreConfiguration: The configuration used to start the MQTT client and the telemetry client.
/// - Throws: A `CoreError` if the MQTT connection fails.
public func start(coreConfiguration: CoreConfiguration) async throws(CoreError) {
let telemetryClient = coreConfiguration.telemetryClientConfiguration.map {
OpenTelemetryClient(configuration: $0)
}
let mqttClient = MQTTNIOClient(configuration: coreConfiguration.mqttClientConfiguration)

try await start(mqttClient: mqttClient, telemetryClient: telemetryClient)
}

/// Subscribes to a MQTT topic.
/// If the `TelemetryClientConfiguration`is set, a linked span is created.
/// - Parameter topic: The topic to subscribe.
/// - Returns: An async stream to receive the messages of the subscribed topic.
/// - Throws: A `CoreError` if the MQTT subscription fails or the `Core` is not started.
public func subscribe(to topic: String) async throws(CoreError) -> AsyncStream<CoreMQTTMessage> {
guard let mqttClient else {
throw .notStarted
}

do {
try await mqttClient.subscribe(to: topic)
} catch {
throw .mqttError(EquatableError(wrappedError: error))
}

return AsyncStream { continuation in
continuationsByTopic[topic] = continuation
continuation.onTermination = { @Sendable _ in
Task { [weak self] in
try await self?.unsubscribe(from: topic)
}
}
}
}

/// Unsubscribes from a MQTT topic.
/// - Parameter topic: The topic to unsubscribe.
/// - Throws: A `CoreError` if the MQTT unsubscription fails or the `Core` is not started.
public func unsubscribe(from topic: String) async throws(CoreError) {
guard let mqttClient else {
throw .notStarted
}

defer {
let continuation = continuationsByTopic[topic]
continuation?.finish()
continuationsByTopic.removeValue(forKey: topic)
}

do {
try await mqttClient.unsubscribe(from: topic)
} catch {
throw .mqttError(EquatableError(wrappedError: error))
}
}

/// Publishes a MQTT message on a topic.
/// If the `TelemetryClientConfiguration`is set, a span is created.
/// - Parameter message: The message to publish.
/// - Throws: A `CoreError` if the MQTT publishing fails or the `Core` is not started.
public func publish(message: CoreMQTTMessage) async throws(CoreError) {
guard let mqttClient else {
throw .notStarted
}

let spanID = await startSentMessageSpan(message)
var traceParent: String?
if let spanID {
let context = await telemetryClient?.updateContext(withSpanID: spanID)
traceParent = context?[traceParentProperty]
}

do {
let userProperty = traceParent.map { MQTTMessageUserProperty(key: traceParentProperty, value: $0) }
let message = MQTTMessage(payload: message.payload,
topic: message.topic,
userProperty: userProperty)
try await mqttClient.publish(message)
await stopSpan(spanID: spanID)
} catch {
await stopSpan(spanID: spanID, errorMessage: error.localizedDescription)
throw .mqttError(EquatableError(wrappedError: error))
}
}

/// Stops the `Core` disconnecting the MQTT client and stopping the telemetry client.
/// - Throws: A `CoreError` if the MQTT unsubscriptions or disconnection fails.
public func stop() async throws(CoreError) {
for topic in continuationsByTopic.keys {
try await unsubscribe(from: topic)
}

do {
try await mqttClient?.disconnect()
} catch {
throw .mqttError(EquatableError(wrappedError: error))
}
mqttClient = nil

await telemetryClient?.stop()
telemetryClient = nil
}

func start(mqttClient: MQTTClient, telemetryClient: TelemetryClient?) async throws(CoreError) {
guard self.mqttClient == nil && self.telemetryClient == nil else {
return
}

self.mqttClient = mqttClient
await mqttClient.setMessageReceivedHandler(messageReceivedHandler: { message in
Task { [weak self] in
guard let self else { return }

let spanID = await startReceivedMessageSpan(message)
await stopSpan(spanID: spanID)

let topicContinuation = await continuationsByTopic[message.topic]
topicContinuation?.yield(CoreMQTTMessage(payload: message.payload, topic: message.topic))
}
})
self.telemetryClient = telemetryClient

do {
try await self.mqttClient?.connect()
} catch {
throw .mqttError(EquatableError(wrappedError: error))
}

await self.telemetryClient?.start()
}

private func startReceivedMessageSpan(_ message: MQTTMessage) async -> String? {
let traceParent = traceParent(from: message)
let attributes = buildAttributes(payload: message.payload, topic: message.topic)
let context = traceParent.map { [traceParentProperty: $0] } ?? [:]
return await telemetryClient?.startSpan(name: spanName,
type: .consumer,
attributes: attributes,
fromContext: context)
}

private func startSentMessageSpan(_ message: CoreMQTTMessage) async -> String? {
let attributes = buildAttributes(payload: message.payload, topic: message.topic)
return await telemetryClient?.startSpan(name: spanName,
type: .producer,
attributes: attributes)
}

private func traceParent(from message: MQTTMessage) -> String? {
guard let userProperty = message.userProperty else { return nil }

return userProperty.key == traceParentProperty ? userProperty.value : nil
}

private func buildAttributes(payload: Data, topic: String) -> [String: Sendable] {
["iot3.core.mqtt.topic": topic,
"iot3.core.mqtt.payload_size": payload.count,
"iot3.core.sdk_language": "swift"]
}

private func stopSpan(spanID: String?, errorMessage: String? = nil) async {
guard let spanID else { return }

await telemetryClient?.stopSpan(spanID: spanID, errorMessage: errorMessage)
}
}
28 changes: 28 additions & 0 deletions swift/ITSClient/Sources/ITSCore/CoreConfiguration.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Software Name : ITSClient
* SPDX-FileCopyrightText: Copyright (c) Orange SA
* SPDX-License-Identifier: MIT
*
* This software is distributed under the MIT license,
* see the "LICENSE.txt" file for more details or https://opensource.org/license/MIT/
*
* Software description: Swift ITS client.
*/

/// A structure to configure the core.
public struct CoreConfiguration: Sendable {
let mqttClientConfiguration: MQTTClientConfiguration
let telemetryClientConfiguration: TelemetryClientConfiguration?

/// Initializes a `CoreConfiguration`.
/// - Parameters:
/// - mqttClientConfiguration: The MQTT client configuration.
/// - telemetryClientConfiguration: The telemetry client configuration. Can be nil to opt-out telemetry.
public init(
mqttClientConfiguration: MQTTClientConfiguration,
telemetryClientConfiguration: TelemetryClientConfiguration? = nil
) {
self.mqttClientConfiguration = mqttClientConfiguration
self.telemetryClientConfiguration = telemetryClientConfiguration
}
}
20 changes: 20 additions & 0 deletions swift/ITSClient/Sources/ITSCore/CoreError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Software Name : ITSClient
* SPDX-FileCopyrightText: Copyright (c) Orange SA
* SPDX-License-Identifier: MIT
*
* This software is distributed under the MIT license,
* see the "LICENSE.txt" file for more details or https://opensource.org/license/MIT/
*
* Software description: Swift ITS client.
*/

import Foundation

/// The errors thrown by the core.
public enum CoreError: Error, Equatable {
/// The core must be started before performing this action.
case notStarted
/// A MQTT error occured.
case mqttError(EquatableError)
}
29 changes: 29 additions & 0 deletions swift/ITSClient/Sources/ITSCore/CoreMQTTMessage.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Software Name : ITSClient
* SPDX-FileCopyrightText: Copyright (c) Orange SA
* SPDX-License-Identifier: MIT
*
* This software is distributed under the MIT license,
* see the "LICENSE.txt" file for more details or https://opensource.org/license/MIT/
*
* Software description: Swift ITS client.
*/

import Foundation

/// The representation of a message received or sent by the core.
public struct CoreMQTTMessage: Sendable {
/// The message payload.
public let payload: Data
/// The topic on which the message is received or sent.
public let topic: String

/// Initializes a `CoreMQTTMessage`
/// - Parameters:
/// - payload: The message payload.
/// - topic: The topic on which the message is received or sent.
public init(payload: Data, topic: String) {
self.payload = payload
self.topic = topic
}
}
32 changes: 32 additions & 0 deletions swift/ITSClient/Sources/ITSCore/EquatableError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Software Name : ITSClient
* SPDX-FileCopyrightText: Copyright (c) Orange SA
* SPDX-License-Identifier: MIT
*
* This software is distributed under the MIT license,
* see the "LICENSE.txt" file for more details or https://opensource.org/license/MIT/
*
* Software description: Swift ITS client.
*/

/// A structure to manage equatable errors for underlying errors.
public struct EquatableError: Error, Equatable {
/// The wrapped error.
public let wrappedError: any Error & Equatable
private let equalsClosure: (@Sendable (any Error & Equatable) -> Bool)

/// The localized description.
public var localizedDescription: String {
return wrappedError.localizedDescription
}

init<T: Error & Equatable>(wrappedError: T) {
self.wrappedError = wrappedError
// To avoid generic on struct, do the test in the closure and store it
equalsClosure = { $0 as? T == wrappedError }
}

public static func == (lhs: Self, rhs: Self) -> Bool {
return lhs.equalsClosure(rhs.wrappedError)
}
}
7 changes: 4 additions & 3 deletions swift/ITSClient/Sources/ITSCore/MQTT/MQTTClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import Foundation

protocol MQTTClient: Actor {
var isConnected: Bool { get }

init(configuration: MQTTClientConfiguration, messageReceivedHandler: (@Sendable @escaping (MQTTMessage) -> Void))

init(configuration: MQTTClientConfiguration)
func setMessageReceivedHandler(messageReceivedHandler: (@escaping @Sendable (MQTTMessage) -> Void))
func connect() async throws(MQTTClientError)
func subscribe(to topic: String) async throws(MQTTClientError)
func unsubscribe(from topic: String) async throws(MQTTClientError)
func disconnect() async throws(MQTTClientError)
func publish(_ message: MQTTMessage) async throws(MQTTClientError)
}

16 changes: 13 additions & 3 deletions swift/ITSClient/Sources/ITSCore/MQTT/MQTTClientConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,26 @@
* Software description: Swift ITS client.
*/

struct MQTTClientConfiguration {
/// A structure to configure a MQTT client.
public struct MQTTClientConfiguration: Sendable {
let host: String
let port: Int
let clientIdentifier: String
let userName: String?
let password: String?
let useSSL: Bool
let useWebSockets: Bool

init(

/// Initializes a `MQTTClientConfiguration`.
/// - Parameters:
/// - host: The MQTT server host.
/// - port: The MQTT server port.
/// - clientIdentifier: The MQTT client identifier.
/// - userName: The MQTT user name if authentication is enabled on the server.
/// - password: The MQTT password if authentication is enabled on the server.
/// - useSSL: `true` to use an encrypted connection to the server.
/// - useWebSockets: `true` to use a websocket connection to the server.
public init(
host: String,
port: Int,
clientIdentifier: String,
Expand Down
20 changes: 20 additions & 0 deletions swift/ITSClient/Sources/ITSCore/MQTT/MQTTClientError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,26 @@ enum MQTTClientError: Error {
case connectionFailed
case clientNotConnected
case subscriptionFailed
case unsubscriptionFailed
case disconnectionFailed
case sendPayloadFailed
}

extension MQTTClientError {
var localizedDescription: String {
switch self {
case .connectionFailed:
return "The connection to the server has failed."
case .clientNotConnected:
return "Unable to perform the operation because the client is not connected."
case .subscriptionFailed:
return "The subscription has failed."
case .unsubscriptionFailed:
return "The unsubscription has failed."
case .disconnectionFailed:
return "The disconnection from the server has failed."
case .sendPayloadFailed:
return "The MQTT message can't be sent."
}
}
}
8 changes: 8 additions & 0 deletions swift/ITSClient/Sources/ITSCore/MQTT/MQTTMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,12 @@ import Foundation
struct MQTTMessage {
let payload: Data
let topic: String
let userProperty: MQTTMessageUserProperty?
}

struct MQTTMessageUserProperty {
let key: String
let value: String
}

extension MQTTMessageUserProperty: Equatable {}
Loading
Loading