Skip to content

Commit

Permalink
Add Wild Card Support
Browse files Browse the repository at this point in the history
Closes #79
  • Loading branch information
ChrisPulman committed Sep 12, 2024
1 parent df4cd25 commit f60d74b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Version.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json",
"version": "2.2.2",
"version": "2.2.3",
"publicReleaseRefSpec": [
"^refs/heads/master$",
"^refs/heads/main$"
Expand Down
13 changes: 11 additions & 2 deletions src/MQTTnet.Rx.Client/MqttdSubscribeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public static IObservable<MqttApplicationMessageReceivedEventArgs> SubscribeToTo
var check = value.Find(x => x.topic == topic);
if (!EqualityComparer<(string topic, int count)>.Default.Equals(check, default))
{
disposable.Add(mqttClient.ApplicationMessageReceived().Where(x => x.ApplicationMessage.Topic == topic).Subscribe(observer));
disposable.Add(mqttClient.ApplicationMessageReceived().Where(x => x.DetectCorrectTopicWithOrWithoutWildcard(topic)).Subscribe(observer));
check.count++;
if (check.count == 1)
{
Expand Down Expand Up @@ -326,7 +326,7 @@ public static IObservable<MqttApplicationMessageReceivedEventArgs> SubscribeToTo
var check = value.Find(x => x.topic == topic);
if (!EqualityComparer<(string topic, int count)>.Default.Equals(check, default))
{
disposable.Add(mqttClient.ApplicationMessageReceived().Where(x => x.ApplicationMessage.Topic == topic).Subscribe(observer));
disposable.Add(mqttClient.ApplicationMessageReceived().Where(x => x.DetectCorrectTopicWithOrWithoutWildcard(topic)).Subscribe(observer));
check.count++;
if (check.count == 1)
{
Expand Down Expand Up @@ -366,4 +366,13 @@ public static IObservable<MqttApplicationMessageReceivedEventArgs> SubscribeToTo
}
});
}).Retry().Publish().RefCount();

private static bool DetectCorrectTopicWithOrWithoutWildcard(this MqttApplicationMessageReceivedEventArgs message, string topic)
{
var topicToCheck = message.ApplicationMessage.Topic;
var topicParts = topic.Split('+');
return topic == topicToCheck ||
(topic.EndsWith("#") && topicToCheck.StartsWith(topic.Substring(0, topic.Length - 1))) ||
(topicParts.Length == 2 && topicToCheck.StartsWith(topicParts[0]) && (topic.EndsWith("+") || topicToCheck.EndsWith(topicParts[1])));
}
}
14 changes: 7 additions & 7 deletions src/TestSingleClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,29 @@
Console.WriteLine($"{DateTime.Now.Dump()}\t CLIENT: Disconnected with server.")));
}));
var s1 = obsClient1.SubscribeToTopic("FromMilliseconds")
var s1 = obsClient1.SubscribeToTopic("FromMilliseconds/#")
.Subscribe(r => Console.WriteLine($"\tCLIENT S1: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}"));
var s2 = obsClient1.SubscribeToTopic("FromMilliseconds")
var s2 = obsClient1.SubscribeToTopic("FromMilliseconds/+/abc")
.Subscribe(r => Console.WriteLine($"\tCLIENT S2: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}"));
var s3 = obsClient1.SubscribeToTopic("FromMilliseconds")
var s3 = obsClient1.SubscribeToTopic("FromMilliseconds/+")
.Subscribe(r => Console.WriteLine($"\tCLIENT S3: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}"));
var s4 = obsClient2.SubscribeToTopic("FromMilliseconds")
var s4 = obsClient2.SubscribeToTopic("FromMilliseconds/1/abc")
.Subscribe(r => Console.WriteLine($"\tCLIENT S4: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}"));
var s5 = obsClient1.SubscribeToTopic("FromMilliseconds1")
var s5 = obsClient1.SubscribeToTopic("FromMilliseconds/2/abc")
.Subscribe(r => Console.WriteLine($"\tCLIENT S5: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}"));
Subject<(string topic, string payload)> message = new();
sub.Disposable.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000)).Subscribe(i => message.OnNext(("FromMilliseconds", "{" + $"payload: {i}" + "}"))));
sub.Disposable.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000)).Subscribe(i => message.OnNext(("FromMilliseconds/1/abc", "{" + $"payload: {i}" + "}"))));
sub.Disposable.Add(obsClient1.PublishMessage(message).Subscribe());
Subject<(string topic, string payload)> message1 = new();
sub.Disposable.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000)).Subscribe(i => message.OnNext(("FromMilliseconds1", "{" + $"payload: {i}" + "}"))));
sub.Disposable.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000)).Subscribe(i => message.OnNext(("FromMilliseconds/2/abc", "{" + $"payload: {i}" + "}"))));
sub.Disposable.Add(obsClient1.PublishMessage(message1).Subscribe());
await Task.Delay(3000);
Expand Down

0 comments on commit f60d74b

Please sign in to comment.