An extension to the MQTTnet project, to transform the subscriptions into observables and to publish from an observable stream.
Use the MQTTnet.MqttFactory
with the MQTTnet.Extensions.External.RxMQTT.Client.MqttFactoryExtensions
.
var client = new MqttFactory().CreateRxMqttClient();
Use the managed client options
await client.StartAsync(options).ConfigureAwait(false);
Get an IObservable<MqttApplicationMessageReceivedEventArgs>
by connecting to the rx client and use extensions to process the message:
var subscription = rxMqttClinet
.Connect("RxClientTest/#")
.SelectPayload()
.Subscribe(Console.WriteLine);
End the subscription by disposing the subscription.
subscription.Dispose();
Create an observable sequence of MqttApplicationMessage
s and publish these via the rx client.
Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(i => new MqttApplicationMessageBuilder()
.WithTopic("RxClientTest")
.WithPayload("Time: " + DateTime.Now.ToLongTimeString())
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
.WithRetainFlag()
.Build())
.PublishOn(mqttClient)
.Subscribe();
Use the mqtt client publish method.