Kumquatt is a Kotlin MQTT client library that wraps the Eclipse Paho Java client. Kumquatt takes advantage of Kotlin's coroutines and concise syntax to greatly simplify Paho's usage. Take the following subscription operation in Paho:
val client = MqttAsyncClient("tcp://test.mosquitto.org", MqttAsyncClient.generateClientId(), MemoryPersistence())
val messageListener = IMqttMessageListener { topic, message ->
println("$topic: ${message.payload.toString(StandardCharsets.UTF_8)}")
}
val statusListener = object : IMqttActionListener{
val QOS_EXACTLY_ONCE = 2
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {}
override fun onSuccess(asyncActionToken: IMqttToken) {
client.subscribe("kumquatt", QOS_EXACTLY_ONCE, messageListener)
}
}
client.connect(null, statusListener)
Compare this to the exact same operation in Kumquatt:
Kumquatt.withHost(host = "test.mosquitto.org").connect {
subscribe(topic = "kumquatt", qos = KumquattQos.EXACTLY_ONCE){ message ->
println("${message.topic}: ${message.text}")
}
}
Kumquatt replaces Paho's callback-heavy syntax with a concise DSL. It also improves upon subscribing by exposing a cold data stream for receiving messages.
Kumquatt exposes two functions that can be used to connect to a broker: withHost
and withUri
. withUri
takes a URI
in the format scheme://host
or scheme://host:port
. Currently, the Paho client only supports two schemes: TCP
and SSL. withHost
is essentially a wrapper around withUri
, so the uri can be built using the host
, port
, and
scheme
parameters. As a result, the following are identical:
Kumquatt.withHost(host = "test.mosquitto.org", port = 1883)
Kumquatt.withUri(brokerUri = "tcp://test.mosquitto.org:1883")
Note that withHost
defaults to TCP by default.
Both functions take an MqttConnectOptions
that can be used to set additional settings, such as usernames, passwords,
clean sessions, and automatic reconnecting. The defaultOptions
variable in the companion of Kumquatt
contains a set
of default settings. These include the Paho defaults in addition to automatic reconnecting and a 10s connection timeout.
A persistence strategy can also be provided. This defaults to MemoryPersistence
, but can alternatively be set to
MqttDefaultFilePersistence
. The persistence strategy is used for storing messages that need to be acknowledged, so
using the file variant will allow for messages to be persisted through a possible client shutdown.
Connecting a client to the broker is done with the connect
method:
client.connect()
onError
and onSuccess
lambdas can be used to handle failed and successful connections. The onSuccess
lambda is
especially useful because it provides the client as a receiver, enabling code using the client to only execute if the
connection is successful:
client.connect{
subscribe(...){
...
}
publish(...)
}
The client can be disconnected from the server gracefully (with quiesce time) or forcefully:
client.disconnect() //defaults to a 30s timeout
client.disconnect(timeout = 5_000L)
client.forceDisconnect()
The Paho client only allows for the client to be closed if all connections have been closed. For that reason, it is
recommended that the client be closed using the disconnectAndClose
method:
client.disconnectAndClose()
This method will try a graceful disconnect (disconnect
) and fallback to a forceful disconnect (forceDisconnect
) in
case of failure. It will close the client regardless of which disconnect method succeeded.
If the connections are already closed, then the following can be used: