Skip to content

Commit

Permalink
examples: Add dup filtering to mqtt_relay
Browse files Browse the repository at this point in the history
Keep information about the previous value sent.  If it's been 8
seconds, or new value is different (ignorning keys like snr and
frequency), then send it.  Otherwise, just don't.  This causes bursts
of e.g. 4 transmissions to result in one MQTT message, on the theory
that the 4 transmissions are not actually 4 messags, but a strategy to
transmit one message more reliably.

Define a new configuration option to enable duplicate filtering, and
default it to True.
  • Loading branch information
gdt committed Aug 2, 2024
1 parent 3821498 commit 107516e
Showing 1 changed file with 96 additions and 1 deletion.
97 changes: 96 additions & 1 deletion examples/rtl_433_mqtt_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@
from __future__ import print_function
from __future__ import with_statement

import socket
import json
import socket
import time

import paho.mqtt.client as mqtt

# \todo Make a config variable?
debug = True


# The config class represents a config object. The constructor takes
# an optional pathname, and will switch on the suffix (.yaml for now)
# and read a dictionary.
Expand All @@ -42,6 +48,7 @@ class rtlconfig(object):
'MQTT_PASSWORD': None,
'MQTT_TLS': False,
'MQTT_PREFIX': "sensor/rtl_433",
'MQTT_DEDUP': True,
'MQTT_INDIVIDUAL_TOPICS': True,
'MQTT_JSON_TOPIC': True,
}
Expand All @@ -68,9 +75,87 @@ def __init__(self, f=None):
def __getitem__(self, k):
return self.c[k]

# A dedup class object supports deduping a stream of reports by
# answering if a report is interesting relative to the history.
# While more complicated deduping is allowed by the interface, for now
# it is very simple, keeping track of only the previous interesting object.
# For now, we more or less require that all reports have the same keys.
# \todo Consider a cache with several entries.
class dedup(object):

def __init__(self, debug = False):
self.debug = debug

# Make this long enough to skip repeats, but allow messages
# every 10s to come through.
self.duration = 5
# Exclude reception metadata (time and RF).
self.boring_keys = ('time', 'freq', 'freq1', 'freq2', 'rssi', 'snr', 'noise', 'raw_msg')
# Initialize storage for what was last sent.
(self.last_report, self.last_now) = (None, None)

def send_store(self, report, n):
(self.last_report, self.last_now) = (report, n)
return True

# Return True if j1 and j2 are the same, except for boring_keys.
def equiv(self, j1, j2):
for (k, v) in j1.items():
# If in boring, we don't care.
if k not in self.boring_keys:
# If in j1 and not j2, they are different.
if k not in j2:
if self.debug:
print("%s in j1 and not j2" % (k))
return False
if j1[k] != j2[k]:
if self.debug:
print("%s differs j1=%s and j2=%s" % (k, j1[k], j2[k]))
return False
# If the lengths are different, they must be different.
if len(j1) != len(j2):
if self.debug:
print("len(j1) %d != len(j2) %d" % (len(j1), len(j2)))
return False

# If we get here, then the lengths are the same, and all
# non-boring keys in j1 exist in j2, and have the same value.
# It could be that j2 is missing a boring key and also has a
# new non-boring key, but boring keys in particular should not
# be variable.
return True

# report is a python dictionary
def is_interesting(self, report):
n = time.time()

# If previous interesting is empty (or troubled), accept this
# one.
if self.last_report is None or self.last_now is None:
if self.debug:
print("interesting: no previous")
return self.send_store(report, n)

# If previous one was too long ago, accept this one.
if n - self.last_now > self.duration:
if self.debug:
print("interesting: time")
return self.send_store(report, n)

if not self.equiv(self.last_report, report):
if self.debug:
print("interesting: different")
return self.send_store(report, n)

return False

# Create a config object, defaults modified by the config file if present.
c = rtlconfig("rtl_433_mqtt_relay.yaml")

# Create a dedup object for later use, even if it's configure off.
d = dedup(debug)


def mqtt_connect(client, userdata, flags, rc):
"""Handle MQTT connection callback."""
print("MQTT connected: " + mqtt.connack_string(rc))
Expand Down Expand Up @@ -100,6 +185,16 @@ def sanitize(text):
def publish_sensor_to_mqtt(mqttc, data, line):
"""Publish rtl_433 sensor data to MQTT."""

if c['MQTT_DEDUP']:
# If this data is not novel relative to recent data, just skip it.
# Otherwise, send it via MQTT.
if not d.is_interesting(data):
if debug:
print("not interesting: %s" % (line))
return
if debug:
print("INTERESTING: %s" % (line))

# Construct a topic from the information that identifies which
# device this frame is from.
# NB: id is only used if channel is not present.
Expand Down

0 comments on commit 107516e

Please sign in to comment.