-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathQRS_util.py
131 lines (111 loc) · 3.61 KB
/
QRS_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
## @author: Kemeng Chen: kemengchen@email.arizona.edu
import numpy as np
import math
from numpy import genfromtxt
import matplotlib.pyplot as plt
def read_ecg(file_name):
return genfromtxt(file_name, delimiter=',')
def lgth_transform(ecg, ws):
lgth = ecg.shape[0]
diff = np.zeros(lgth)
ecg = np.pad(ecg, ws, 'edge')
for i in range(lgth):
temp = ecg[i:i + ws + ws + 1]
left = temp[ws] - temp[0]
right = temp[ws] - temp[-1]
diff[i] = min(left, right)
diff[diff < 0] = 0
return np.multiply(diff, diff)
def integrate(ecg, ws):
lgth = ecg.shape[0]
integrate_ecg = np.zeros(lgth)
ecg = np.pad(ecg, math.ceil(ws / 2), mode='symmetric')
for i in range(lgth):
integrate_ecg[i] = np.sum(ecg[i:i + ws]) / ws
return integrate_ecg
def find_peak(data, ws):
lgth = data.shape[0]
true_peaks = list()
for i in range(lgth - ws + 1):
temp = data[i:i + ws]
if np.var(temp) < 5:
continue
index = int((ws - 1) / 2)
peak = True
for j in range(index):
if temp[index - j] <= temp[index - j - 1] or temp[index + j] <= temp[index + j + 1]:
peak = False
break
if peak is True:
true_peaks.append(int(i + (ws - 1) / 2))
return np.asarray(true_peaks)
def find_R_peaks(ecg, peaks, ws):
num_peak = peaks.shape[0]
R_peaks = list()
for index in range(num_peak):
i = peaks[index]
if i - 2 * ws > 0 and i < ecg.shape[0]:
temp_ecg = ecg[i - 2 * ws:i]
R_peaks.append(int(np.argmax(temp_ecg) + i - 2 * ws))
return np.asarray(R_peaks)
def find_S_point(ecg, R_peaks):
num_peak = R_peaks.shape[0]
S_point = list()
for index in range(num_peak):
i = R_peaks[index]
cnt = i
if cnt + 1 >= ecg.shape[0]:
break
while ecg[cnt] > ecg[cnt + 1]:
cnt += 1
if cnt >= ecg.shape[0]:
break
S_point.append(cnt)
return np.asarray(S_point)
def find_Q_point(ecg, R_peaks):
num_peak = R_peaks.shape[0]
Q_point = list()
for index in range(num_peak):
i = R_peaks[index]
cnt = i
if cnt - 1 < 0:
break
while ecg[cnt] > ecg[cnt - 1]:
cnt -= 1
if cnt < 0:
break
Q_point.append(cnt)
return np.asarray(Q_point)
def EKG_QRS_detect(ecg, ecg_plot, fs, QS, plot=False):
sig_lgth = ecg.shape[0]
ecg = ecg - np.mean(ecg)
ecg_lgth_transform = lgth_transform(ecg, int(fs / 20))
ws = int(fs / 8)
ecg_integrate = integrate(ecg_lgth_transform, ws) / ws
ws = int(fs / 6)
ecg_integrate = integrate(ecg_integrate, ws)
ws = int(fs / 36)
ecg_integrate = integrate(ecg_integrate, ws)
ws = int(fs / 72)
ecg_integrate = integrate(ecg_integrate, ws)
peaks = find_peak(ecg_integrate, int(fs / 10))
R_peaks = find_R_peaks(ecg, peaks, int(fs / 40))
if QS:
S_point = find_S_point(ecg, R_peaks)
Q_point = find_Q_point(ecg, R_peaks)
else:
S_point = None
Q_point = None
if plot:
index = np.arange(sig_lgth) / fs
fig, ax = plt.subplots()
ax.plot(index, ecg_plot, 'b', label='EKG')
ax.plot(R_peaks / fs, ecg_plot[R_peaks], 'ro', label='R peaks')
if QS:
ax.plot(S_point / fs, ecg_plot[S_point], 'go', label='S')
ax.plot(Q_point / fs, ecg_plot[Q_point], 'yo', label='Q')
ax.set_xlim([0, sig_lgth / fs])
ax.set_xlabel('Time [sec]')
ax.legend()
plt.show()
return R_peaks, S_point, Q_point