-
Notifications
You must be signed in to change notification settings - Fork 5
/
readData_IWR1443.py
179 lines (130 loc) · 4.58 KB
/
readData_IWR1443.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import _thread
import pickle
import serial
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui
import datetime
import os
import time
import warnings
from utils.iwr1443_utils import readAndParseData14xx, parseConfigFile
configFileName = '1443config.cfg'
CLIport = {}
Dataport = {}
"""
# global to hold the average x, y, z of detected points
those are for testing swipe gestures
note that for now, only x and y are used
those values is updated in the function: update
"""
x = []
y = []
z = []
doppler = [] # TODO should the doppler be normalized
# features = []
# ------------------------------------------------------------------
# Function to configure the serial ports and send the data from
# the configuration file to the radar
def serialConfig(configFileName):
global CLIport
global Dataport
# Open the serial ports for the configuration and the data ports
# Raspberry pi
# CLIport = serial.Serial('/dev/ttyACM0', 115200)
# Dataport = serial.Serial('/dev/ttyACM1', 921600)
# For WINDOWS, CHANGE those serial port to match your machine's configuration
CLIport = serial.Serial('COM5', 115200)
Dataport = serial.Serial('COM4', 921600)
# Read the configuration file and send it to the board
config = [line.rstrip('\r\n') for line in open(configFileName)]
for i in config:
CLIport.write((i + '\n').encode())
print(i)
time.sleep(0.01)
return CLIport, Dataport
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Funtion to update the data and display in the plot
def update():
dataOk = 0
global detObj
x = []
y = []
z = []
doppler = []
# Read and parse the received data
dataOk, frameNumber, detObj = readAndParseData14xx(Dataport, configParameters)
if dataOk:
# print(detObj)
x = -detObj["x"]
y = detObj["y"]
z = detObj["z"]
doppler = detObj["doppler"] # doppler values for the detected points in m/s
s_original.setData(x, y)
s_processed.setData(z, doppler)
QtGui.QApplication.processEvents()
return dataOk
# ------------------------- MAIN -----------------------------------------
today = datetime.datetime.now()
today = datetime.datetime.now()
root_dn = 'f_data-' + str(today).replace(':', '-').replace(' ', '_')
warnings.simplefilter('ignore', np.RankWarning)
# Configurate the serial port
CLIport, Dataport = serialConfig(configFileName)
# Get the configuration parameters from the configuration file
configParameters = parseConfigFile(configFileName)
# START QtAPPfor the plot
app = QtGui.QApplication([])
# Set the plot
pg.setConfigOption('background', 'w')
win = pg.GraphicsWindow(title="2D scatter plot")
p_original = win.addPlot()
p_original.setXRange(-0.5, 0.5)
p_original.setYRange(0, 1.5)
p_original.setLabel('left', text='Y position (m)')
p_original.setLabel('bottom', text='X position (m)')
s_original = p_original.plot([], [], pen=None, symbol='o')
# set the processed plot
p_processed = win.addPlot()
p_processed.setXRange(-1, 1)
p_processed.setYRange(-1, 1)
p_processed.setLabel('left', text='Z position (m)')
p_processed.setLabel('bottom', text='Doppler (m/s)')
s_processed = p_processed.plot([], [], pen=None, symbol='o')
# Main loop
detObj = {}
frameData = {}
def input_thread(a_list):
input()
interrupt_list.append(True)
interrupt_list = []
_thread.start_new_thread(input_thread, (interrupt_list,))
while True:
try:
# Update the data and check if the data is okay
dataOk = update()
if dataOk:
# Store the current frame into frameData
# frameData[currentIndex] = detObj
frameData[time.time()] = detObj
# features.append([detObj['x'],detObj['y'],detObj['z'],detObj['doppler']])
time.sleep(0.033) # Additional Comment: this is framing frequency Sampling frequency of 30 Hz
if interrupt_list:
raise KeyboardInterrupt()
# Stop the program and close everything if Ctrl + c is pressed
except KeyboardInterrupt:
CLIport.write(('sensorStop\n').encode())
CLIport.close()
Dataport.close()
win.close()
# save radar frame data
is_save = input('do you wish to save the recorded frames? [y/n]')
if is_save == 'y':
os.mkdir(root_dn)
file_path = os.path.join(root_dn, 'f_data.p')
with open(file_path, 'wb') as pickle_file:
pickle.dump(frameData, pickle_file)
else:
print('exit without saving')
break