Skip to content

Commit

Permalink
Add support for Qingping/ClearGrass advertisements
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdklip committed Dec 13, 2021
1 parent b0a0fb4 commit 72ee337
Showing 1 changed file with 81 additions and 72 deletions.
153 changes: 81 additions & 72 deletions LYWSD03MMC.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,82 +597,54 @@ def MQTTOnDisconnect(client, userdata,rc):
try:
prev_data = None

def le_advertise_packet_handler(mac, adv_type, data, rssi):
global lastBLEPaketReceived
if args.watchdogtimer:
lastBLEPaketReceived = time.time()
lastBLEPaketReceived = time.time()
data_str = raw_packet_to_str(data)
def decode_data_atc(mac, adv_type, data_str, rssi, measurement):
preeamble = "161a18"
paketStart = data_str.find(preeamble)
offset = paketStart + len(preeamble)
atcData_str = data_str[offset:offset+26] #if shorter will just be shorter then 13 Bytes
atcData_str = data_str[offset:] #if shorter will just be shorter then 13 Bytes
customFormat_str = data_str[offset:offset+29]
ATCPaketMAC = atcData_str[0:12].upper()
macStr = mac.replace(":","").upper()
atcIdentifier = data_str[(offset-4):offset].upper()

# if (atcIdentifier == "1A18" ) and mac == "A4:C1:38:92:E3:BD" : #debug
# print("BLE packet: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
# print("raw:",data_str)

strippedData_str = data_str[offset:offset+26] #if shorter will just be shorter then 13 Bytes
strippedData_str = data_str[offset:] #if shorter will just be shorter then 13 Bytes
macStr = mac.replace(":","").upper()
dataIdentifier = data_str[(offset-4):offset].upper()

batteryVoltage=None
if(atcIdentifier == "1A18" ) and not args.onlydevicelist or (atcIdentifier == "1A18" and mac in sensors) and (len(atcData_str) == 26 or len(atcData_str) == 16 or len(atcData_str) == 22): #only Data from ATC devices
global measurements
measurement = Measurement(0,0,0,0,0,0,0,0)
if len(atcData_str) == 30: #custom format, next-to-last ist adv number
advNumber = atcData_str[-4:-2]
else:
advNumber = atcData_str[-2:] #last data in paket is adv number

# FIXME - This checks for lengths of 26, 16 and 22 while further along it checks for a length of 30 for custom format. It will never match that because the length of 30 is not included in this check...
if(dataIdentifier == "1A18") and not args.onlydevicelist or (dataIdentifier == "1A18" and mac in sensors) and (len(strippedData_str) == 26 or len(strippedData_str) == 16 or len(strippedData_str) == 22): #only Data from ATC devices
if len(strippedData_str) == 30: #custom format, next-to-last ist adv number
advNumber = strippedData_str[-4:-2]
else:
advNumber = strippedData_str[-2:] #last data in paket is adv number
if macStr in advCounter:
lastAdvNumber = advCounter[macStr]
else:
lastAdvNumber = None
if lastAdvNumber == None or lastAdvNumber != advNumber:

if len(atcData_str) == 26: #ATC1441 Format
#print("atc14441") #debug
if len(strippedData_str) == 26: #ATC1441 Format
print("BLE packet - ATC1441: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
advCounter[macStr] = advNumber
print("BLE packet: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
#print("AdvNumber: ", advNumber)
#temp = data_str[22:26].encode('utf-8')
#temperature = int.from_bytes(bytearray.fromhex(data_str[22:26]),byteorder='big') / 10.
#temperature = int(data_str[22:26],16) / 10.
temperature = int.from_bytes(bytearray.fromhex(atcData_str[12:16]),byteorder='big',signed=True) / 10.
# print("Temperature: ", temperature)
humidity = int(atcData_str[16:18], 16)
# print("Humidity: ", humidity)
batteryVoltage = int(atcData_str[20:24], 16) / 1000
# print ("Battery voltage:", batteryVoltage,"V")
# print ("RSSI:", rssi, "dBm")

#if args.battery:
batteryPercent = int(atcData_str[18:20], 16)
#print ("Battery:", batteryPercent,"%")

elif len(atcData_str) == 30: #custom format
#print("custom:", atcData_str)
print("BLE packet: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
temperature = int.from_bytes(bytearray.fromhex(atcData_str[12:16]),byteorder='little',signed=True) / 100.
humidity = int.from_bytes(bytearray.fromhex(atcData_str[16:20]),byteorder='little',signed=False) / 100.
batteryVoltage = int.from_bytes(bytearray.fromhex(atcData_str[20:24]),byteorder='little',signed=False) / 1000.
batteryPercent = int.from_bytes(bytearray.fromhex(atcData_str[24:26]),byteorder='little',signed=False)



elif len(atcData_str) == 22 or len(atcData_str) == 16: #encrypted: length 22/11 Bytes on custom format, 16/8 Bytes on ATC1441 Format
#print("enc") # debug
#if macStr in encryptedPacketStore:
temperature = int.from_bytes(bytearray.fromhex(strippedData_str[12:16]),byteorder='big',signed=True) / 10.
humidity = int(strippedData_str[16:18], 16)
batteryVoltage = int(strippedData_str[20:24], 16) / 1000
batteryPercent = int(strippedData_str[18:20], 16)

elif len(strippedData_str) == 30: #custom format
print("BLE packet - Custom: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
advCounter[macStr] = advNumber
temperature = int.from_bytes(bytearray.fromhex(strippedData_str[12:16]),byteorder='little',signed=True) / 100.
humidity = int.from_bytes(bytearray.fromhex(strippedData_str[16:20]),byteorder='little',signed=False) / 100.
batteryVoltage = int.from_bytes(bytearray.fromhex(strippedData_str[20:24]),byteorder='little',signed=False) / 1000.
batteryPercent = int.from_bytes(bytearray.fromhex(strippedData_str[24:26]),byteorder='little',signed=False)

elif len(strippedData_str) == 22 or len(strippedData_str) == 16: #encrypted: length 22/11 Bytes on custom format, 16/8 Bytes on ATC1441 Format
if macStr in advCounter:
lastData = advCounter[macStr]
else:
lastData = None

if lastData == None or lastData != atcData_str:
print("Encrypted BLE packet: %s %02x %s %d, length: %d" % (mac, adv_type, data_str, rssi, len(atcData_str)/2))
if lastData == None or lastData != strippedData_str:
print("BLE packet - Encrypted: %s %02x %s %d, length: %d" % (mac, adv_type, data_str, rssi, len(strippedData_str)/2))
advCounter[macStr] = strippedData_str
if mac in sensors and "key" in sensors[mac]:
bindkey = bytes.fromhex(sensors[mac]["key"])
macReversed=""
Expand All @@ -682,11 +654,11 @@ def le_advertise_packet_handler(mac, adv_type, data, rssi):
#print("New encrypted format, MAC:" , macStr, "Reversed: ", macReversed)
lengthHex=data_str[offset-8:offset-6]
#lengthHex="0b"
ret = cryptoFunctions.decrypt_aes_ccm(bindkey,macReversed,bytes.fromhex(lengthHex + "161a18" + atcData_str))
ret = cryptoFunctions.decrypt_aes_ccm(bindkey,macReversed,bytes.fromhex(lengthHex + "161a18" + strippedData_str))
if ret == None: #Error decrypting
print("\n")
return
#temperature, humidity, batteryPercent = cryptoFunctions.decrypt_aes_ccm(bindkey,macReversed,bytes.fromhex(lengthHex + "161a18" + atcData_str))
#temperature, humidity, batteryPercent = cryptoFunctions.decrypt_aes_ccm(bindkey,macReversed,bytes.fromhex(lengthHex + "161a18" + strippedData_str))
temperature, humidity, batteryPercent = ret
else:
print("Warning: No key provided for sensor:", mac,"\n")
Expand All @@ -697,27 +669,64 @@ def le_advertise_packet_handler(mac, adv_type, data, rssi):
else: #Packet is just repeated
return

if args.influxdb == 1:
measurement.timestamp = int((time.time() // 10) * 10)
else:
measurement.timestamp = int(time.time())

if args.round:
temperature=round(temperature,1)
humidity=round(humidity,1)

measurement.battery = batteryPercent
measurement.humidity = humidity
measurement.temperature = temperature
measurement.voltage = batteryVoltage if batteryVoltage != None else 0
measurement.rssi = rssi
return measurement

def decode_data_qingping(mac, adv_type, data_str, rssi, measurement):
preeamble = "cdfd88"
paketStart = data_str.find(preeamble)
offset = paketStart + len(preeamble)
strippedData_str = data_str[offset:offset+32]
macStr = mac.replace(":","").upper()
dataIdentifier = data_str[(offset-2):offset].upper()

print("Temperature: ", temperature)
print("Humidity: ", humidity)
if batteryVoltage != None:
print ("Battery voltage:", batteryVoltage,"V")
if(dataIdentifier == "88") and not args.onlydevicelist or (dataIdentifier == "88" and mac in sensors) and len(strippedData_str) == 32:
print("BLE packet - Qingping: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
advCounter[macStr] = strippedData_str
temperature = int.from_bytes(bytearray.fromhex(strippedData_str[18:22]),byteorder='little',signed=True) / 10.
humidity = int.from_bytes(bytearray.fromhex(strippedData_str[22:26]),byteorder='little',signed=True) / 10.
batteryPercent = int(strippedData_str[30:32], 16)

if args.round:
temperature=round(temperature,1)
humidity=round(humidity,1)

measurement.battery = batteryPercent
measurement.humidity = humidity
measurement.temperature = temperature
measurement.rssi = rssi
return measurement

def le_advertise_packet_handler(mac, adv_type, data, rssi):
global lastBLEPaketReceived
if args.watchdogtimer:
lastBLEPaketReceived = time.time()
lastBLEPaketReceived = time.time()
data_str = raw_packet_to_str(data)

global measurements
measurement = Measurement(0,0,0,0,0,0,0,0)
measurement = (
decode_data_atc(mac, adv_type, data_str, rssi, measurement)
or
decode_data_qingping(mac, adv_type, data_str, rssi, measurement)
)

if measurement:
print("Temperature: ", measurement.temperature)
print("Humidity: ", measurement.humidity)
if measurement.voltage != None:
print ("Battery voltage:", measurement.voltage,"V")
print ("RSSI:", rssi, "dBm")
print ("Battery:", batteryPercent,"%")
print ("Battery:", measurement.battery,"%")

currentMQTTTopic = MQTTTopic
if mac in sensors:
Expand Down Expand Up @@ -748,7 +757,7 @@ def le_advertise_packet_handler(mac, adv_type, data, rssi):
#MQTTClient.publish(currentMQTTTopic,jsonString,1)

#print("Length:", len(measurements))
print("")
print("")

if args.watchdogtimer:
keepingLEScanRunningThread = threading.Thread(target=keepingLEScanRunning)
Expand Down

0 comments on commit 72ee337

Please sign in to comment.