-
Notifications
You must be signed in to change notification settings - Fork 5
/
interface_IWR1443_palmpad_experiment.py
273 lines (209 loc) · 7.56 KB
/
interface_IWR1443_palmpad_experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
import _thread
import pickle
from threading import Thread, Event
import serial
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui
import datetime
import os
import time
import warnings
from classes.model_wrapper import NeuralNetwork
from utils.data_utils import preprocess_frame
from utils.iwr1443_utils import readAndParseData14xx, parseConfigFile
isPredict = False
configFileName = '1443config.cfg'
CLIport = {}
Dataport = {}
"""
# global to hold the average x, y, z of detected points
those are for testing swipe gestures
note that for now, only x and y are used
those values is updated in the function: update
"""
x = []
y = []
z = []
doppler = []
# ------------------------------------------------------------------
# Function to configure the serial ports and send the data from
# the configuration file to the radar
def serialConfig(configFileName):
global CLIport
global Dataport
# Open the serial ports for the configuration and the data ports
# Raspberry pi
# CLIport = serial.Serial('/dev/ttyACM0', 115200)
# Dataport = serial.Serial('/dev/ttyACM1', 921600)
# For WINDOWS, CHANGE those serial port to match your machine's configuration
CLIport = serial.Serial('COM5', 115200)
Dataport = serial.Serial('COM4', 921600)
# Read the configuration file and send it to the board
config = [line.rstrip('\r\n') for line in open(configFileName)]
for i in config:
CLIport.write((i + '\n').encode())
print(i)
time.sleep(0.01)
return CLIport, Dataport
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Funtion to update the data and display in the plot
def update():
dataOk = 0
global detObj
x = []
y = []
z = []
doppler = []
# Read and parse the received data
dataOk, frameNumber, detObj = readAndParseData14xx(Dataport, configParameters)
if dataOk:
# print(detObj)
x = -detObj["x"]
y = detObj["y"]
z = detObj["z"]
doppler = detObj["doppler"] # doppler values for the detected points in m/s
s_original.setData(x, y)
s_processed.setData(z, doppler)
QtGui.QApplication.processEvents()
return dataOk
# ------------------------- MAIN -----------------------------------------
today = datetime.datetime.now()
today = datetime.datetime.now()
root_dn = 'data/f_data-' + str(today).replace(':', '-').replace(' ', '_')
warnings.simplefilter('ignore', np.RankWarning)
# Configurate the serial port
CLIport, Dataport = serialConfig(configFileName)
# Get the configuration parameters from the configuration file
configParameters = parseConfigFile(configFileName)
# START QtAPPfor the plot
app = QtGui.QApplication([])
# Set the plot
pg.setConfigOption('background', 'w')
win = pg.GraphicsWindow(title="2D scatter plot")
p_original = win.addPlot()
p_original.setXRange(-0.5, 0.5)
p_original.setYRange(0, 1.5)
p_original.setLabel('left', text='Y position (m)')
p_original.setLabel('bottom', text='X position (m)')
s_original = p_original.plot([], [], pen=None, symbol='o')
# set the processed plot
p_processed = win.addPlot()
p_processed.setXRange(-1, 1)
p_processed.setYRange(-1, 1)
p_processed.setLabel('left', text='Z position (m)')
p_processed.setLabel('bottom', text='Doppler (m/s)')
s_processed = p_processed.plot([], [], pen=None, symbol='o')
# Main loop
detObj = {}
frameData = {}
preprocessed_frameArray = []
frameArray = []
# reading RNN model
if isPredict:
# regressive_classifier = NeuralNetwork()
# regressive_classifier.load(file_name='trained_models/radar_model/072319_02/regressive_classifier.h5')
onNotOn_ann_classifier = NeuralNetwork()
onNotOn_ann_classifier.load(file_name='F:/config_detection/models/onNotOn_ANN/classifier_080919_2.h5')
onNotOn_encoder = pickle.load(open('F:/config_detection/models/onNotOn_ANN/encoder_080919_2', 'rb'))
rnn_timestep = 100
num_padding = 50
def input_thread(a_list):
input()
interrupt_list.append(True)
class prediction_thread(Thread):
def __init__(self, event):
Thread.__init__(self)
self.stopped = event
def run(self):
while not self.stopped.wait(0.5):
prediction_funct_onNotOn_ANN()
def prediction_funct_onNotOn_ANN():
if len(preprocessed_frameArray) > 0:
p = onNotOn_ann_classifier.predict(np.asarray([preprocessed_frameArray[len(preprocessed_frameArray) - 1]]))
p = onNotOn_encoder.inverse_transform(p)
p = p[0][0]
if p == 1:
print('No cluster')
elif p == 2:
print('Thumb is setting')
elif p == 3:
print('Thumb is raising')
elif p == 4:
print('Thumb is rubbing')
elif p == 5:
print('Thumb is rubbing in air')
# def prediction_func_onNoton_RNN():
# if (len(frameData)) < rnn_timestep:
# print('Warming up')
# else:
# x = []
#
# pred_data = list(frameData.items())
# pred_data = pred_data[len(pred_data) - rnn_timestep:]
#
# for entry in pred_data:
# data = entry[1]
# data = np.asarray([data['x'], data['y'], data['z'], data['doppler']]).transpose()
# if data.shape[0] > num_padding:
# raise Exception('Insufficient Padding')
# data = np.pad(data, ((0, num_padding - data.shape[0]), (0, 0)), 'constant', constant_values=0)
# data = data.reshape((400,)) # flatten
#
# x.append(data) # add one additional dimension
#
# x = np.asarray(x)
# x = np.expand_dims(x, axis=0)
#
# if x.shape != (1, 100, 400):
# print('Prediction: BAD Input Shape')
# else:
# prediction_result = regressive_classifier.predict(x)
# # print('Prediction: ' + str(prediction_result[0][0]), end=' ')
# print('Prediction: ', end=' ')
#
# if prediction_result[0][0] > 0.5:
# print('Thumb is ON Pointing Finger')
# else:
# print('Thumb is NOT ON Pointing Finger')
input("Press Enter to start!...")
# create the interrupt thread
interrupt_list = []
_thread.start_new_thread(input_thread, (interrupt_list,))
# start the prediction thread
main_stop_event = Event()
if isPredict:
thread = prediction_thread(main_stop_event)
thread.start()
start_time = time.time()
while True:
try:
# Update the data and check if the data is okay
dataOk = update()
if dataOk:
# Store the current frame into frameData
frameRow = np.asarray([detObj['x'], detObj['y'], detObj['z'], detObj['doppler']]).transpose()
frameArray.append(time.time(), frameRow)
preprocessed_frameArray.append(preprocess_frame(frameRow))
time.sleep(0.033) # This is framing frequency Sampling frequency of 30 Hz
if interrupt_list:
raise KeyboardInterrupt()
# Stop the program and close everything if Ctrl + c is pressed
except KeyboardInterrupt:
CLIport.write(('sensorStop\n').encode())
CLIport.close()
Dataport.close()
win.close()
# stop prediction thread
main_stop_event.set()
# save radar frame data
is_save = input('do you wish to save the recorded frames? [y/n]')
if is_save == 'y':
os.mkdir(root_dn)
file_path = os.path.join(root_dn, 'f_data.p')
with open(file_path, 'wb') as pickle_file:
pickle.dump(frameData, pickle_file)
else:
print('exit without saving')
break