Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Qingping/Cleargrass advertisements and fix support for custom format #105

Merged
merged 12 commits into from
Jan 2, 2022
192 changes: 101 additions & 91 deletions LYWSD03MMC.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#-u to unbuffer output. Otherwise when calling with nohup or redirecting output things are printed very lately or would even mixup

print("---------------------------------------------")
print("MiTemperature2 / ATC Thermometer version 4.0")
print("MiTemperature2 / ATC Thermometer version 5.0")
print("---------------------------------------------")

readme="""
Expand Down Expand Up @@ -45,8 +45,8 @@ class Measurement:

def __eq__(self, other): #rssi may be different, so exclude it from comparison
if self.temperature == other.temperature and self.humidity == other.humidity and self.calibratedHumidity == other.calibratedHumidity and self.battery == other.battery and self.sensorname == other.sensorname:
#in atc mode also exclude voltage as it changes often due to frequent measurements
return True if args.atc else (self.voltage == other.voltage)
#in passive mode also exclude voltage as it changes often due to frequent measurements
return True if args.passive else (self.voltage == other.voltage)
else:
return False

Expand All @@ -72,7 +72,7 @@ def myMQTTPublish(topic,jsonMessage):


def signal_handler(sig, frame):
if args.atc:
if args.passive:
disable_le_scan(sock)
os._exit(0)

Expand Down Expand Up @@ -321,15 +321,15 @@ def MQTTOnDisconnect(client, userdata,rc):
# Main loop --------
parser=argparse.ArgumentParser(allow_abbrev=False,epilog=readme)
parser.add_argument("--device","-d", help="Set the device MAC-Address in format AA:BB:CC:DD:EE:FF",metavar='AA:BB:CC:DD:EE:FF')
parser.add_argument("--battery","-b", help="Get estimated battery level, in ATC-Mode: Get battery level from device", metavar='', type=int, nargs='?', const=1)
parser.add_argument("--battery","-b", help="Get estimated battery level, in passive mode: Get battery level from device", metavar='', type=int, nargs='?', const=1)
parser.add_argument("--count","-c", help="Read/Receive N measurements and then exit script", metavar='N', type=int)
parser.add_argument("--interface","-i", help="Specifiy the interface number to use, e.g. 1 for hci1", metavar='N', type=int, default=0)
parser.add_argument("--unreachable-count","-urc", help="Exit after N unsuccessful connection tries", metavar='N', type=int, default=0)
parser.add_argument("--mqttconfigfile","-mcf", help="specify a configurationfile for MQTT-Broker")


rounding = parser.add_argument_group("Rounding and debouncing")
rounding.add_argument("--round","-r", help="Round temperature to one decimal place (and in ATC mode humidity to whole numbers)",action='store_true')
rounding.add_argument("--round","-r", help="Round temperature to one decimal place (and in passive mode humidity to whole numbers)",action='store_true')
rounding.add_argument("--debounce","-deb", help="Enable this option to get more stable temperature values, requires -r option",action='store_true')

offsetgroup = parser.add_argument_group("Offset calibration mode")
Expand All @@ -349,12 +349,12 @@ def MQTTOnDisconnect(client, userdata,rc):
callbackgroup.add_argument("--skipidentical","-skip", help="N consecutive identical measurements won't be reported to callbackfunction",metavar='N', type=int, default=0)
callbackgroup.add_argument("--influxdb","-infl", help="Optimize for writing data to influxdb,1 timestamp optimization, 2 integer optimization",metavar='N', type=int, default=0)

atcgroup = parser.add_argument_group("ATC mode related arguments")
atcgroup.add_argument("--atc","-a", help="Read the data of devices with custom ATC firmware flashed, use --battery to get battery level additionaly in percent",action='store_true')
atcgroup.add_argument("--watchdogtimer","-wdt",metavar='X', type=int, help="Re-enable scanning after not receiving any BLE packet after X seconds")
atcgroup.add_argument("--devicelistfile","-df",help="Specify a device list file giving further details to devices")
atcgroup.add_argument("--onlydevicelist","-odl", help="Only read devices which are in the device list file",action='store_true')
atcgroup.add_argument("--rssi","-rs", help="Report RSSI via callback",action='store_true')
passivegroup = parser.add_argument_group("Passive mode related arguments")
passivegroup.add_argument("--passive","-p","--atc","-a", help="Read the data of devices based on BLE advertisements, use --battery to get battery level additionaly in percent",action='store_true')
passivegroup.add_argument("--watchdogtimer","-wdt",metavar='X', type=int, help="Re-enable scanning after not receiving any BLE packet after X seconds")
passivegroup.add_argument("--devicelistfile","-df",help="Specify a device list file giving further details to devices")
passivegroup.add_argument("--onlydevicelist","-odl", help="Only read devices which are in the device list file",action='store_true')
passivegroup.add_argument("--rssi","-rs", help="Report RSSI via callback",action='store_true')


args=parser.parse_args()
Expand Down Expand Up @@ -427,7 +427,7 @@ def MQTTOnDisconnect(client, userdata,rc):
else:
print("Please specify device MAC-Address in format AA:BB:CC:DD:EE:FF")
os._exit(1)
elif not args.atc:
elif not args.passive:
parser.print_help()
os._exit(1)

Expand Down Expand Up @@ -536,13 +536,13 @@ def MQTTOnDisconnect(client, userdata,rc):
print ("Waiting...")
# Perhaps do something else here

elif args.atc:
print("Script started in ATC Mode")
print("----------------------------")
elif args.passive:
print("Script started in passive mode")
print("------------------------------")
print("In this mode all devices within reach are read out, unless a devicelistfile and --onlydevicelist is specified.")
print("Also --name Argument is ignored, if you require names, please use --devicelistfile.")
print("In this mode debouncing is not available. Rounding option will round humidity and temperature to one decimal place.")
print("ATC mode usually requires root rights. If you want to use it with normal user rights, \nplease execute \"sudo setcap cap_net_raw,cap_net_admin+eip $(eval readlink -f `which python3`)\"")
print("Passive mode usually requires root rights. If you want to use it with normal user rights, \nplease execute \"sudo setcap cap_net_raw,cap_net_admin+eip $(eval readlink -f `which python3`)\"")
print("You have to redo this step if you upgrade your python version.")
print("----------------------------")

Expand Down Expand Up @@ -597,82 +597,54 @@ def MQTTOnDisconnect(client, userdata,rc):
try:
prev_data = None

def le_advertise_packet_handler(mac, adv_type, data, rssi):
global lastBLEPaketReceived
if args.watchdogtimer:
lastBLEPaketReceived = time.time()
lastBLEPaketReceived = time.time()
data_str = raw_packet_to_str(data)
def decode_data_atc(mac, adv_type, data_str, rssi, measurement):
preeamble = "161a18"
paketStart = data_str.find(preeamble)
offset = paketStart + len(preeamble)
atcData_str = data_str[offset:offset+26] #if shorter will just be shorter then 13 Bytes
atcData_str = data_str[offset:] #if shorter will just be shorter then 13 Bytes
customFormat_str = data_str[offset:offset+29]
ATCPaketMAC = atcData_str[0:12].upper()
macStr = mac.replace(":","").upper()
atcIdentifier = data_str[(offset-4):offset].upper()

# if (atcIdentifier == "1A18" ) and mac == "A4:C1:38:92:E3:BD" : #debug
# print("BLE packet: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
# print("raw:",data_str)

strippedData_str = data_str[offset:offset+26] #if shorter will just be shorter then 13 Bytes
strippedData_str = data_str[offset:] #if shorter will just be shorter then 13 Bytes
macStr = mac.replace(":","").upper()
dataIdentifier = data_str[(offset-4):offset].upper()

batteryVoltage=None
if(atcIdentifier == "1A18" ) and not args.onlydevicelist or (atcIdentifier == "1A18" and mac in sensors) and (len(atcData_str) == 26 or len(atcData_str) == 16 or len(atcData_str) == 22): #only Data from ATC devices
global measurements
measurement = Measurement(0,0,0,0,0,0,0,0)
if len(atcData_str) == 30: #custom format, next-to-last ist adv number
advNumber = atcData_str[-4:-2]
else:
advNumber = atcData_str[-2:] #last data in paket is adv number

if(dataIdentifier == "1A18") and not args.onlydevicelist or (dataIdentifier == "1A18" and mac in sensors) and (len(strippedData_str) in (16, 22, 26, 30)): #only Data from ATC devices
if len(strippedData_str) == 30: #custom format, next-to-last ist adv number
advNumber = strippedData_str[-4:-2]
else:
advNumber = strippedData_str[-2:] #last data in paket is adv number
if macStr in advCounter:
lastAdvNumber = advCounter[macStr]
else:
lastAdvNumber = None
if lastAdvNumber == None or lastAdvNumber != advNumber:

if len(atcData_str) == 26: #ATC1441 Format
#print("atc14441") #debug
if len(strippedData_str) == 26: #ATC1441 Format
print("BLE packet - ATC1441: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
advCounter[macStr] = advNumber
print("BLE packet: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
#print("AdvNumber: ", advNumber)
#temp = data_str[22:26].encode('utf-8')
#temperature = int.from_bytes(bytearray.fromhex(data_str[22:26]),byteorder='big') / 10.
#temperature = int(data_str[22:26],16) / 10.
temperature = int.from_bytes(bytearray.fromhex(atcData_str[12:16]),byteorder='big',signed=True) / 10.
# print("Temperature: ", temperature)
humidity = int(atcData_str[16:18], 16)
# print("Humidity: ", humidity)
batteryVoltage = int(atcData_str[20:24], 16) / 1000
# print ("Battery voltage:", batteryVoltage,"V")
# print ("RSSI:", rssi, "dBm")

#if args.battery:
batteryPercent = int(atcData_str[18:20], 16)
#print ("Battery:", batteryPercent,"%")

elif len(atcData_str) == 30: #custom format
#print("custom:", atcData_str)
print("BLE packet: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
temperature = int.from_bytes(bytearray.fromhex(atcData_str[12:16]),byteorder='little',signed=True) / 100.
humidity = int.from_bytes(bytearray.fromhex(atcData_str[16:20]),byteorder='little',signed=False) / 100.
batteryVoltage = int.from_bytes(bytearray.fromhex(atcData_str[20:24]),byteorder='little',signed=False) / 1000.
batteryPercent = int.from_bytes(bytearray.fromhex(atcData_str[24:26]),byteorder='little',signed=False)



elif len(atcData_str) == 22 or len(atcData_str) == 16: #encrypted: length 22/11 Bytes on custom format, 16/8 Bytes on ATC1441 Format
#print("enc") # debug
#if macStr in encryptedPacketStore:
#temperature = int(data_str[12:16],16) / 10. # this method fails for negative temperatures
temperature = int.from_bytes(bytearray.fromhex(strippedData_str[12:16]),byteorder='big',signed=True) / 10.
humidity = int(strippedData_str[16:18], 16)
batteryVoltage = int(strippedData_str[20:24], 16) / 1000
batteryPercent = int(strippedData_str[18:20], 16)

elif len(strippedData_str) == 30: #Custom format
print("BLE packet - Custom: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
advCounter[macStr] = advNumber
temperature = int.from_bytes(bytearray.fromhex(strippedData_str[12:16]),byteorder='little',signed=True) / 100.
humidity = int.from_bytes(bytearray.fromhex(strippedData_str[16:20]),byteorder='little',signed=False) / 100.
batteryVoltage = int.from_bytes(bytearray.fromhex(strippedData_str[20:24]),byteorder='little',signed=False) / 1000.
batteryPercent = int.from_bytes(bytearray.fromhex(strippedData_str[24:26]),byteorder='little',signed=False)

elif len(strippedData_str) == 22 or len(strippedData_str) == 16: #encrypted: length 22/11 Bytes on custom format, 16/8 Bytes on ATC1441 Format
if macStr in advCounter:
lastData = advCounter[macStr]
else:
lastData = None

if lastData == None or lastData != atcData_str:
print("Encrypted BLE packet: %s %02x %s %d, length: %d" % (mac, adv_type, data_str, rssi, len(atcData_str)/2))
if lastData == None or lastData != strippedData_str:
print("BLE packet - Encrypted: %s %02x %s %d, length: %d" % (mac, adv_type, data_str, rssi, len(strippedData_str)/2))
advCounter[macStr] = strippedData_str
if mac in sensors and "key" in sensors[mac]:
bindkey = bytes.fromhex(sensors[mac]["key"])
macReversed=""
Expand All @@ -682,11 +654,11 @@ def le_advertise_packet_handler(mac, adv_type, data, rssi):
#print("New encrypted format, MAC:" , macStr, "Reversed: ", macReversed)
lengthHex=data_str[offset-8:offset-6]
#lengthHex="0b"
ret = cryptoFunctions.decrypt_aes_ccm(bindkey,macReversed,bytes.fromhex(lengthHex + "161a18" + atcData_str))
ret = cryptoFunctions.decrypt_aes_ccm(bindkey,macReversed,bytes.fromhex(lengthHex + "161a18" + strippedData_str))
if ret == None: #Error decrypting
print("\n")
return
#temperature, humidity, batteryPercent = cryptoFunctions.decrypt_aes_ccm(bindkey,macReversed,bytes.fromhex(lengthHex + "161a18" + atcData_str))
#temperature, humidity, batteryPercent = cryptoFunctions.decrypt_aes_ccm(bindkey,macReversed,bytes.fromhex(lengthHex + "161a18" + strippedData_str))
temperature, humidity, batteryPercent = ret
else:
print("Warning: No key provided for sensor:", mac,"\n")
Expand All @@ -697,27 +669,65 @@ def le_advertise_packet_handler(mac, adv_type, data, rssi):
else: #Packet is just repeated
return

measurement.battery = batteryPercent
measurement.humidity = humidity
measurement.temperature = temperature
measurement.voltage = batteryVoltage if batteryVoltage != None else 0
measurement.rssi = rssi
return measurement

# Tested with Qingping CGG1 and CGDK2
def decode_data_qingping(mac, adv_type, data_str, rssi, measurement):
preeamble = "cdfd88"
paketStart = data_str.find(preeamble)
offset = paketStart + len(preeamble)
strippedData_str = data_str[offset:offset+32]
macStr = mac.replace(":","").upper()
dataIdentifier = data_str[(offset-2):offset].upper()

if(dataIdentifier == "88") and not args.onlydevicelist or (dataIdentifier == "88" and mac in sensors) and len(strippedData_str) == 32:
print("BLE packet - Qingping: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
temperature = int.from_bytes(bytearray.fromhex(strippedData_str[18:22]),byteorder='little',signed=True) / 10.
humidity = int.from_bytes(bytearray.fromhex(strippedData_str[22:26]),byteorder='little',signed=True) / 10.
batteryPercent = int(strippedData_str[30:32], 16)

measurement.battery = batteryPercent
measurement.humidity = humidity
measurement.temperature = temperature
measurement.rssi = rssi
return measurement

def le_advertise_packet_handler(mac, adv_type, data, rssi):
global lastBLEPaketReceived
if args.watchdogtimer:
lastBLEPaketReceived = time.time()
lastBLEPaketReceived = time.time()
data_str = raw_packet_to_str(data)

global measurements
measurement = Measurement(0,0,0,0,0,0,0,0)
measurement = (
decode_data_atc(mac, adv_type, data_str, rssi, measurement)
or
decode_data_qingping(mac, adv_type, data_str, rssi, measurement)
)

if measurement:
if args.influxdb == 1:
measurement.timestamp = int((time.time() // 10) * 10)
else:
measurement.timestamp = int(time.time())

if args.round:
temperature=round(temperature,1)
humidity=round(humidity,1)

measurement.battery = batteryPercent
measurement.humidity = humidity
measurement.temperature = temperature
measurement.voltage = batteryVoltage if batteryVoltage != None else 0
measurement.rssi = rssi
measurement.temperature=round(measurement.temperature,1)
measurement.humidity=round(measurement.humidity,1)

print("Temperature: ", temperature)
print("Humidity: ", humidity)
if batteryVoltage != None:
print ("Battery voltage:", batteryVoltage,"V")
print("Temperature: ", measurement.temperature)
print("Humidity: ", measurement.humidity)
if measurement.voltage != None:
print ("Battery voltage:", measurement.voltage,"V")
print ("RSSI:", rssi, "dBm")
print ("Battery:", batteryPercent,"%")
print ("Battery:", measurement.battery,"%")

currentMQTTTopic = MQTTTopic
if mac in sensors:
Expand Down Expand Up @@ -748,7 +758,7 @@ def le_advertise_packet_handler(mac, adv_type, data, rssi):
#MQTTClient.publish(currentMQTTTopic,jsonString,1)

#print("Length:", len(measurements))
print("")
print("")

if args.watchdogtimer:
keepingLEScanRunningThread = threading.Thread(target=keepingLEScanRunning)
Expand Down
Loading