-
Notifications
You must be signed in to change notification settings - Fork 0
/
TI_radar_library.py
347 lines (272 loc) · 12.8 KB
/
TI_radar_library.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
"""
Copyright (c) 2020 Imagimob AB. Distributed under MIT license.
"""
import serial
import time
import struct
import math
import binascii
import codecs
def start_TI_radar(filePath):
serialPort_CFG = serial.Serial( # opening serial CFG_port
port="COM3",
baudrate=115200,
bytesize=8,
timeout=None,
stopbits=serial.STOPBITS_ONE
)
serialPort_CFG.close()
time.sleep(0.1)
serialPort_CFG.open()
commandsList = []
with open(filePath,"r") as configFile:
for line in configFile:
if line[0] != '%':
commandsList.append(line)
for command in commandsList:
serialPort_CFG.write(command.encode()) # sending commands to radar encoded as bytes
time.sleep(0.1)
print("\nTI radar initialized ! \n")
def stop_TI_radar():
serialPort_CFG = serial.Serial( # opening serial CFG_port
port="COM3",
baudrate=115200,
bytesize=8,
timeout=None,
stopbits=serial.STOPBITS_ONE
)
serialPort_CFG.write(b'sensorStop\n')
time.sleep(0.1)
serialPort_CFG.close()
print("\nTI radar stopped ! \n")
def start_data_stream_TI_radar():
serialPort_DATA = serial.Serial( # opening serial DATA_port
port="COM4",
baudrate=921600,
bytesize=8,
timeout=None,
stopbits=serial.STOPBITS_ONE
)
serialPort_DATA.close()
time.sleep(0.1)
serialPort_DATA.open()
print("\nData stream started... \n")
return serialPort_DATA
def getUint32(data):
"""!
This function coverts 4 bytes to a 32-bit unsigned integer.
@param data : 1-demension byte array
@return : 32-bit unsigned integer
"""
return (data[0] +
data[1]*256 +
data[2]*65536 +
data[3]*16777216)
def getUint16(data):
"""!
This function coverts 2 bytes to a 16-bit unsigned integer.
@param data : 1-demension byte array
@return : 16-bit unsigned integer
"""
return (data[0] +
data[1]*256)
def getHex(data):
"""!
This function coverts 4 bytes to a 32-bit unsigned integer in hex.
@param data : 1-demension byte array
@return : 32-bit unsigned integer in hex
"""
return (binascii.hexlify(data[::-1]))
def checkMagicPattern(data):
"""!
This function check if data arrary contains the magic pattern which is the start of one mmw demo output packet.
@param data : 1-demension byte array
@return : 1 if magic pattern is found
0 if magic pattern is not found
"""
found = 0
if (data[0] == 2 and data[1] == 1 and data[2] == 4 and data[3] == 3 and data[4] == 6 and data[5] == 5 and data[6] == 8 and data[7] == 7):
found = 1
return (found)
def parserOnePacket(serialPort_DATA): # function used to parse 1 data packet transmitted to the DATA_port
PI = 3.14159265
headerNumBytes = 40
MW_found = False
while not MW_found:
header_MW = serialPort_DATA.read(8)
if checkMagicPattern(header_MW) == 1:
MW_found = True
else:
print(header_MW)
MW_found = False
serialPort_DATA.reset_input_buffer()
#serialPort_DATA.reset_output_buffer()
break
#if checkMagicPattern(header_MW) == 1:
if MW_found:
header = serialPort_DATA.read(headerNumBytes - 8)
headerStartIndex = -8
totalPacketNumBytes = getUint32(header[headerStartIndex+12:headerStartIndex+16:1])
data = serialPort_DATA.read(totalPacketNumBytes - headerNumBytes)
platform = getHex(header[headerStartIndex+16:headerStartIndex+20:1])
frameNumber = getUint32(header[headerStartIndex+20:headerStartIndex+24:1])
timeCpuCycles = getUint32(header[headerStartIndex+24:headerStartIndex+28:1])
numDetObj = getUint32(header[headerStartIndex+28:headerStartIndex+32:1])
numTlv = getUint32(header[headerStartIndex+32:headerStartIndex+36:1])
subFrameNumber = getUint32(header[headerStartIndex+36:headerStartIndex+40:1])
#if numDetObj <=0:
# print("Negative Object", numDetObj)
#if subFrameNumber > 3:
# print("Sub Frame Error", subFrameNumber)
"""
print("headerStartIndex = %d" % (headerStartIndex))
print("totalPacketNumBytes = %d" % (totalPacketNumBytes))
print("platform = %s" % (platform))
print("frameNumber = %d" % (frameNumber))
print("timeCpuCycles = %d" % (timeCpuCycles))
print("numDetObj = %d" % (numDetObj))
print("numTlv = %d" % (numTlv))
print("subFrameNumber = %d" % (subFrameNumber))
"""
#tlvStart = headerStartIndex + headerNumBytes
tlvStart = 0
tlvType = getUint32(data[tlvStart+0:tlvStart+4:1])
tlvLen = getUint32(data[tlvStart+4:tlvStart+8:1])
offset = 8
"""
print("The 1st TLV")
print(" type %d" % (tlvType))
print(" len %d bytes" % (tlvLen))
"""
#======================================================================================================================================
detectedX_array = []
detectedY_array = []
detectedZ_array = []
detectedV_array = []
detectedRange_array = []
detectedAzimuth_array = []
detectedElevAngle_array = []
detectedSNR_array = []
detectedNoise_array = []
output_array = []
if numDetObj <=0:
detectedX_array.append(0)
detectedY_array.append(0)
detectedZ_array.append(0)
detectedV_array.append(0)
detectedRange_array.append(0)
detectedAzimuth_array.append(0)
detectedElevAngle_array.append(0)
detectedSNR_array.append(0)
detectedNoise_array.append(0)
detectedRange_array.append(0)
detectedAzimuth_array.append(0)
detectedElevAngle_array.append(0)
numDetObj = 1
tlvType = 0
if tlvType == 1 and tlvLen < (totalPacketNumBytes - headerNumBytes):#MMWDEMO_UART_MSG_DETECTED_POINTS
# TLV type 1 contains x, y, z, v values of all detect objects.
# each x, y, z, v are 32-bit float in IEEE 754 single-precision binary floating-point format, so every 16 bytes represent x, y, z, v values of one detect objects.
# for each detect objects, extract/convert float x, y, z, v values and calculate range profile and azimuth
for obj in range(numDetObj):
# convert byte0 to byte3 to float x value
x = struct.unpack('<f', codecs.decode(binascii.hexlify(data[tlvStart + offset:tlvStart + offset+4:1]),'hex'))[0]
# convert byte4 to byte7 to float y value
y = struct.unpack('<f', codecs.decode(binascii.hexlify(data[tlvStart + offset+4:tlvStart + offset+8:1]),'hex'))[0]
# convert byte8 to byte11 to float z value
z = struct.unpack('<f', codecs.decode(binascii.hexlify(data[tlvStart + offset+8:tlvStart + offset+12:1]),'hex'))[0]
# convert byte12 to byte15 to float v value
v = struct.unpack('<f', codecs.decode(binascii.hexlify(data[tlvStart + offset+12:tlvStart + offset+16:1]),'hex'))[0]
# calculate range profile from x, y, z
compDetectedRange = math.sqrt((x * x)+(y * y)+(z * z))
# calculate azimuth from x, y
if y == 0:
if x >= 0:
detectedAzimuth = 90
else:
detectedAzimuth = -90
else:
detectedAzimuth = math.atan(x/y) * 180 / PI
# calculate elevation angle from x, y, z
if x == 0 and y == 0:
if z >= 0:
detectedElevAngle = 90
else:
detectedElevAngle = -90
else:
detectedElevAngle = math.atan(z/math.sqrt((x * x)+(y * y))) * 180 / PI
detectedX_array.append(x)
detectedY_array.append(y)
detectedZ_array.append(z)
detectedV_array.append(v)
detectedRange_array.append(compDetectedRange)
detectedAzimuth_array.append(detectedAzimuth)
detectedElevAngle_array.append(detectedElevAngle)
offset = offset + 16
# end of for obj in range(numDetObj) for 1st TLV
# Process the 2nd TLV
tlvStart = tlvStart + 8 + tlvLen
tlvType = getUint32(data[tlvStart+0:tlvStart+4:1])
tlvLen = getUint32(data[tlvStart+4:tlvStart+8:1])
offset = 8
"""
print("The 2nd TLV")
print(" type %d" % (tlvType))
print(" len %d bytes" % (tlvLen))
"""
if tlvType == 7:
# TLV type 7 contains snr and noise of all detect objects.
# each snr and noise are 16-bit integer represented by 2 bytes, so every 4 bytes represent snr and noise of one detect objects.
# for each detect objects, extract snr and noise
for obj in range(numDetObj):
# byte0 and byte1 represent snr. convert 2 bytes to 16-bit integer
snr = getUint16(data[tlvStart + offset + 0:tlvStart + offset + 2:1])
# byte2 and byte3 represent noise. convert 2 bytes to 16-bit integer
noise = getUint16(data[tlvStart + offset + 2:tlvStart + offset + 4:1])
detectedSNR_array.append(snr)
detectedNoise_array.append(noise)
offset = offset + 4
else:
for obj in range(numDetObj):
detectedSNR_array.append(0)
detectedNoise_array.append(0)
# end of if tlvType == 7
print(" x(m) y(m) z(m) v(m/s) Com0range(m) azimuth(deg) elevAngle(deg) snr(0.1dB) noise(0.1dB)")
for obj in range(numDetObj):
print(" obj%3d: %12f %12f %12f %12f %12f %12f %12d %12d %12d" % (obj, detectedX_array[obj], detectedY_array[obj], detectedZ_array[obj], detectedV_array[obj], detectedRange_array[obj], detectedAzimuth_array[obj], detectedElevAngle_array[obj], detectedSNR_array[obj], detectedNoise_array[obj]))
output_array.append([obj, detectedX_array[obj],\
detectedY_array[obj],\
detectedZ_array[obj],\
detectedV_array[obj],\
detectedRange_array[obj],\
detectedAzimuth_array[obj],\
detectedElevAngle_array[obj],
detectedSNR_array[obj],\
detectedNoise_array[obj]])
#======================================================================================================================================
print("\n")
#print("\n Succeeded !!!!!!!!!!! \n")
#print("End Input Buffer Bytes", serialPort.in_waiting)
#print("End Output Buffer Bytes", serialPort.out_waiting)
#serialPort.reset_input_buffer()
else:
print("\nCORRUPTED FRAME\n")
#time.sleep(5)
time.sleep(0.01) # to avoid double timestamp when corrupted frame occurs (?!?!)
output_array = [[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]]
numDetObj = 1
#break
return numDetObj, output_array
#=======================================================================================================================================================================
# Testing functioning of above defined functions
file_path = "xwr68xx.cfg"
start_TI_radar(file_path)
time.sleep(10)
serialPort_DATA = start_data_stream_TI_radar()
print(serialPort_DATA)
for index in range(3):
print(index)
output_TI_radar = parserOnePacket(serialPort_DATA)
print(output_TI_radar)
print("OUTPUT: ", output_TI_radar)
stop_TI_radar()