Skip to content

Commit

Permalink
Merge pull request #4 from flightonary/feature/remove_synchronized
Browse files Browse the repository at this point in the history
remove synchronized mutex
  • Loading branch information
flightonary committed Dec 16, 2014
2 parents f9cd7a5 + 257b220 commit 240ee0f
Showing 1 changed file with 52 additions and 62 deletions.
114 changes: 52 additions & 62 deletions Moscapsule/Moscapsule.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public enum ReasonCode: Int {
}
}

public enum Mosq_Return: Int {
public enum MosqResult: Int {
case MOSQ_CONN_PENDING = -1
case MOSQ_SUCCESS = 0
case MOSQ_NOMEM = 1
Expand Down Expand Up @@ -211,12 +211,12 @@ public class MQTTConfig {
public var mqttClientCert: MQTTClientCert?
public var mqttTlsOpts: MQTTTlsOpts?

public var onConnectCallback: ((returnCode: ReturnCode) -> Void)?
public var onDisconnectCallback: ((reasonCode: ReasonCode) -> Void)!
public var onPublishCallback: ((messageId: Int) -> Void)!
public var onMessageCallback: ((mqttMessage: MQTTMessage) -> Void)!
public var onSubscribeCallback: ((messageId: Int, grantedQos: Array<Int32>) -> Void)!
public var onUnsubscribeCallback: ((messageId: Int) -> Void)!
public var onConnectCallback: ((returnCode: ReturnCode) -> ())?
public var onDisconnectCallback: ((reasonCode: ReasonCode) -> ())!
public var onPublishCallback: ((messageId: Int) -> ())!
public var onMessageCallback: ((mqttMessage: MQTTMessage) -> ())!
public var onSubscribeCallback: ((messageId: Int, grantedQos: Array<Int32>) -> ())!
public var onUnsubscribeCallback: ((messageId: Int) -> ())!

public init(clientId: String, host: String, port: Int32, keepAlive: Int32) {
// MQTT client ID is restricted to 23 characters in the MQTT v3.1 spec
Expand All @@ -235,12 +235,12 @@ public class MQTTConfig {
public final class __MosquittoContext {
public var mosquittoHandler: COpaquePointer = COpaquePointer.null()
public var isConnected: Bool = false
public var onConnectCallback: ((returnCode: Int) -> Void)!
public var onDisconnectCallback: ((reasonCode: Int) -> Void)!
public var onPublishCallback: ((messageId: Int) -> Void)!
public var onMessageCallback: ((message: UnsafePointer<mosquitto_message>) -> Void)!
public var onSubscribeCallback: ((messageId: Int, qosCount: Int, grantedQos: UnsafePointer<Int32>) -> Void)!
public var onUnsubscribeCallback: ((messageId: Int) -> Void)!
public var onConnectCallback: ((returnCode: Int) -> ())!
public var onDisconnectCallback: ((reasonCode: Int) -> ())!
public var onPublishCallback: ((messageId: Int) -> ())!
public var onMessageCallback: ((message: UnsafePointer<mosquitto_message>) -> ())!
public var onSubscribeCallback: ((messageId: Int, qosCount: Int, grantedQos: UnsafePointer<Int32>) -> ())!
public var onUnsubscribeCallback: ((messageId: Int) -> ())!
public var keyfile_passwd: String = ""
internal init(){}
}
Expand Down Expand Up @@ -324,27 +324,27 @@ public final class MQTT {
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)) {
mosquitto_connect(mosquittoContext.mosquittoHandler, host.cCharArray, port, keepAlive)
mosquitto_loop_start(mosquittoContext.mosquittoHandler)
mqttClient.operationQueue.suspended = false
mqttClient.serialQueue.suspended = false
}

return mqttClient
}

private class func onConnectAdapter(callback: ((ReturnCode) -> Void)!) -> ((returnCode: Int) -> Void)! {
private class func onConnectAdapter(callback: ((ReturnCode) -> ())!) -> ((returnCode: Int) -> ())! {
return callback == nil ? nil : { (rawReturnCode: Int) in
let returnCode = rawReturnCode < ReturnCode.Unknown.rawValue ? ReturnCode(rawValue: rawReturnCode) : ReturnCode.Unknown
callback(returnCode!)
}
}

private class func onDisconnectAdapter(callback: ((ReasonCode) -> Void)!) -> ((reasonCode: Int) -> Void)! {
private class func onDisconnectAdapter(callback: ((ReasonCode) -> ())!) -> ((reasonCode: Int) -> ())! {
return callback == nil ? nil : { (rawReasonCode: Int) in
let reasonCode = rawReasonCode < ReasonCode.Unexpected.rawValue ? ReasonCode(rawValue: rawReasonCode) : ReasonCode.Unexpected
callback(reasonCode!)
}
}

private class func onMessageAdapter(callback: ((MQTTMessage) -> Void)!) -> ((UnsafePointer<mosquitto_message>) -> Void)! {
private class func onMessageAdapter(callback: ((MQTTMessage) -> ())!) -> ((UnsafePointer<mosquitto_message>) -> ())! {
return callback == nil ? nil : { (rawMessage: UnsafePointer<mosquitto_message>) in
let message = rawMessage.memory
let topic = String.fromCString(message.topic)!
Expand All @@ -354,7 +354,7 @@ public final class MQTT {
}
}

private class func onSubscribeAdapter(callback: ((Int, Array<Int32>) -> Void)!) -> ((Int, Int, UnsafePointer<Int32>) -> Void)! {
private class func onSubscribeAdapter(callback: ((Int, Array<Int32>) -> ())!) -> ((Int, Int, UnsafePointer<Int32>) -> ())! {
return callback == nil ? nil : { (messageId: Int, qosCount: Int, grantedQos: UnsafePointer<Int32>) in
var grantedQosList = [Int32](count: qosCount, repeatedValue: Qos.At_Least_Once)
Array(0..<qosCount).reduce(grantedQos) { (qosPointer, index) in
Expand All @@ -368,87 +368,77 @@ public final class MQTT {

public final class MQTTClient {
private let mosquittoContext: __MosquittoContext
internal let operationQueue: NSOperationQueue
public private(set) var isFinished: Bool
public private(set) var isFinished: Bool = false
internal let serialQueue: NSOperationQueue = {
let queue = NSOperationQueue()
queue.name = "MQTT Client Operation Queue"
queue.maxConcurrentOperationCount = 1
queue.suspended = true
return queue
}()
public var isConnected: Bool {
return mosquittoContext.isConnected
}

internal init(mosquittoContext: __MosquittoContext) {
self.mosquittoContext = mosquittoContext
self.operationQueue = NSOperationQueue()
self.isFinished = false

self.operationQueue.name = "MQTT Client Operation Queue"
self.operationQueue.suspended = true
}

deinit {
disconnect()
}

public func publish(payload: NSData, topic: String, qos: Int32, retain: Bool, requestCompletion: ((Mosq_Return, Int) -> Void)? = nil) {
synchronized { mosquittoContext, operationQueue in
operationQueue.addOperationWithBlock {
public func publish(payload: NSData, topic: String, qos: Int32, retain: Bool, requestCompletion: ((MosqResult, Int) -> ())? = nil) {
serialQueue.addOperationWithBlock {
if (!self.isFinished) {
var messageId: Int32 = 0
let mosqReturn = mosquitto_publish(mosquittoContext.mosquittoHandler, &messageId, topic.cCharArray,
Int32(payload.length), payload.bytes, qos, retain)
requestCompletion?(Mosq_Return(rawValue: Int(mosqReturn))!, Int(messageId))
let mosqReturn = mosquitto_publish(self.mosquittoContext.mosquittoHandler, &messageId, topic.cCharArray,
Int32(payload.length), payload.bytes, qos, retain)
requestCompletion?(MosqResult(rawValue: Int(mosqReturn))!, Int(messageId))
}
}
}

public func publishString(payload: String, topic: String, qos: Int32, retain: Bool, requestCompletion: ((Mosq_Return, Int) -> Void)? = nil) {
public func publishString(payload: String, topic: String, qos: Int32, retain: Bool, requestCompletion: ((MosqResult, Int) -> ())? = nil) {
if let payloadData = (payload as NSString).dataUsingEncoding(NSUTF8StringEncoding) {
self.publish(payloadData, topic: topic, qos: qos, retain: retain, requestCompletion)
publish(payloadData, topic: topic, qos: qos, retain: retain, requestCompletion)
}
}

public func subscribe(topic: String, qos: Int32, requestCompletion: ((Mosq_Return, Int) -> Void)? = nil) {
synchronized { mosquittoContext, operationQueue in
operationQueue.addOperationWithBlock {
public func subscribe(topic: String, qos: Int32, requestCompletion: ((MosqResult, Int) -> ())? = nil) {
serialQueue.addOperationWithBlock {
if (!self.isFinished) {
var messageId: Int32 = 0
let mosqReturn = mosquitto_subscribe(mosquittoContext.mosquittoHandler, &messageId, topic.cCharArray, qos)
requestCompletion?(Mosq_Return(rawValue: Int(mosqReturn))!, Int(messageId))
let mosqReturn = mosquitto_subscribe(self.mosquittoContext.mosquittoHandler, &messageId, topic.cCharArray, qos)
requestCompletion?(MosqResult(rawValue: Int(mosqReturn))!, Int(messageId))
}
}
}

public func unsubscribe(topic: String, requestCompletion: ((Mosq_Return, Int) -> Void)? = nil) {
synchronized { mosquittoContext, operationQueue in
operationQueue.addOperationWithBlock {
public func unsubscribe(topic: String, requestCompletion: ((MosqResult, Int) -> ())? = nil) {
serialQueue.addOperationWithBlock {
if (!self.isFinished) {
var messageId: Int32 = 0
let mosqReturn = mosquitto_unsubscribe(mosquittoContext.mosquittoHandler, &messageId, topic.cCharArray)
requestCompletion?(Mosq_Return(rawValue: Int(mosqReturn))!, Int(messageId))
let mosqReturn = mosquitto_unsubscribe(self.mosquittoContext.mosquittoHandler, &messageId, topic.cCharArray)
requestCompletion?(MosqResult(rawValue: Int(mosqReturn))!, Int(messageId))
}
}
}

public func disconnect() {
synchronized { mosquittoContext, operationQueue in
self.isFinished = true
operationQueue.addOperationWithBlock {
mosquitto_disconnect(mosquittoContext.mosquittoHandler)
return
}
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)) {
operationQueue.waitUntilAllOperationsAreFinished()
mosquitto_loop_stop(mosquittoContext.mosquittoHandler, false)
mosquitto_context_cleanup(mosquittoContext)
if (!isFinished) {
isFinished = true
let context = mosquittoContext
serialQueue.addOperationWithBlock {
mosquitto_disconnect(context.mosquittoHandler)
mosquitto_loop_stop(context.mosquittoHandler, false)
mosquitto_context_cleanup(context)
}
}
}

public func awaitRequestCompletion() {
operationQueue.waitUntilAllOperationsAreFinished()
}

private func synchronized(operation: (__MosquittoContext, NSOperationQueue) -> Void) {
objc_sync_enter(self)
if (!isFinished) {
operation(self.mosquittoContext, self.operationQueue)
}
objc_sync_exit(self)
serialQueue.waitUntilAllOperationsAreFinished()
}

public var socket: Int32? {
Expand Down

0 comments on commit 240ee0f

Please sign in to comment.