-
Notifications
You must be signed in to change notification settings - Fork 1
/
graph.py
122 lines (100 loc) · 3.2 KB
/
graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from serial.tools.list_ports import comports
from pyqtgraph.Qt import QtCore
import pyqtgraph as pg
import pandas as pd
import threading
import serial
# change to True to enable scrolling graph
scrolling = True
df = pd.DataFrame()
running = True
class KeyPressWindow(pg.GraphicsLayoutWidget):
sigKeyPress = QtCore.pyqtSignal(object)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def keyPressEvent(self, ev):
self.scene().keyPressEvent(ev)
self.sigKeyPress.emit(ev)
def keyPressed(evt):
global df, p_all, chunk_size, start, end, scrolling
if evt.text() == "r":
df.drop(df.index, inplace=True)
start = 0
end = chunk_size
p_all.setXRange(start, end)
if evt.text() == "s":
scrolling = not scrolling
win = KeyPressWindow(show=True)
win.sigKeyPress.connect(keyPressed)
win.nextRow()
p_all = win.addPlot()
p_all.addLegend()
p_all.disableAutoRange(axis=pg.ViewBox.XAxis)
chunk_size = 10000
p_all.setXRange(0, chunk_size)
def find_port():
for port in comports():
if "V5 User Port" in port.description:
V5port = port.device
print("\n" + "V5 found (" + V5port + ")")
return V5port
print("V5 not found")
return 0
# end find_port
def collect_data(ser):
global df
while running:
line = ser.readline().decode('ascii')
# if data needs to be graphed
if "graph_data" in line:
line = ser.readline().decode('ascii')[6:].strip('\n')
columns = line.split('|')[0].split(',')
data = map(float, line.split('|')[1].split(','))
data = pd.DataFrame([data], columns=columns)
# concat data to dataframe
df = pd.concat([df, data], ignore_index=True)
# print data
#print("\n" + data.to_string(index=False))
# end collect_data
curves = []
start = 0
end = chunk_size
def update_graph():
global df, curves, start, end
if (df.shape[1] - 1 > len(curves)):
p_all.setLabel('bottom', df.columns[0])
for i in range(len(curves), df.shape[1] - 1):
curve = p_all.plot(x=df.iloc[:,0], y=df.iloc[:,i + 1], name=df.columns[i + 1], pen=(i,df.shape[1]))
curves.append(curve)
else:
for i in range(len(curves)):
curves[i].setData(x=df.iloc[:,0], y=df.iloc[:,i + 1])
if (df.shape[0] > 0 and df.iloc[-1,0] > end):
if not scrolling:
end = end * 2
p_all.setXRange(0, end)
else:
end = df.iloc[-1,0]
p_all.setXRange(end - chunk_size, end)
# end update_graph
# timer for updating graph
timer = pg.QtCore.QTimer()
timer.timeout.connect(update_graph)
timer.start(50)
# main
if __name__ == '__main__':
# find V5 port
port = find_port()
if port == 0:
exit()
# open serial port
ser = serial.Serial(port, 115200, timeout=0.1)
print("Starting data collection...\n")
# start data collection thread
collect_thread = threading.Thread(target=collect_data, args=(ser,))
collect_thread.start()
# graph data
pg.exec()
# stop data collection
running = False
collect_thread.join()