-
Notifications
You must be signed in to change notification settings - Fork 3
/
fixation_detector.py
169 lines (145 loc) · 9.14 KB
/
fixation_detector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import copy
import math
import numpy as np
from arff import xrange
import functions
from arff_helper import ArffHelper
def FixationDetector(param, gaze_points, inplace=False):
"""
Identify and label fixation intervals as 'FIX' and some others as 'NOISE'.
Fixation identification includes the following steps:
- First, all inter-saccadic intervals with a dispersion of less than
a certain spread threshold (@param["PREFILTERING_INTERVAL_SPREAD_THRESHOLD_DEGREES"] are marked as fixations.
- Then, a temporal window (@param["SLIDING_WINDOW_WIDTH_MILLISEC"]ms) is shifted across the
remaining data and a non-fixation onset (offset) is marked every
time speed rises above (fell below) threshold (@param["SPEED_THRESHOLD_DEGREES_PER_SEC"].
- There are two ways for speed calculation: spread and speed.
-'speed': speed from start point to end point is larger than
threshold.
-'spread': maximum moving speed of either x or y is larger than
threshold.
Data with speed below threshold are labeled as 'FIX'.
- Finally, non-fixation episodes longer than @param["MINIMAL_SP_DURATION_MILLISEC"]are kept as 'UNKNOWN',
the shorter ones are labeled as 'NOISE' (these are fairly dynamic episodes that however should not be SP).
:param gaze_points: arff object with saccades detected (and intersaccadic intervals labelled)
:param inplace: whether to replace the data inside @gaze_points or create a new structure
:return: arff object with data labeled as 'FIX' and 'NOISE'. Some 'UNKNOWN' labels are kept for the next stage.
"""
if not inplace:
gaze_points = copy.deepcopy(gaze_points)
# add a global index column (to keep track of where we are even if working within an intersaccadic interval)
gaze_points = ArffHelper.add_column(gaze_points, name='global_index', dtype='INTEGER', default_value=-1)
gaze_points['data']['global_index'] = np.arange(gaze_points['data'].shape[0])
# I. First step of fixation removal: rough prefiltering
speed_thd = param["SPEED_THRESHOLD_DEGREES_PER_SEC"]
prefiltering_spread_thd = param["PREFILTERING_INTERVAL_SPREAD_THRESHOLD_DEGREES"]
# record intersaccadic interval indices of those intervals that are not labelled as FIX by the prefiltering
unknown_interval_index = []
unknown_interval_masks = []
for i in xrange(max(gaze_points['data']['INTERSACC_INTERVAL_INDEX']) + 1):
mask = gaze_points['data']['INTERSACC_INTERVAL_INDEX'] == i
intersacc_interval = gaze_points['data'][mask]
if len(intersacc_interval) == 0:
continue
dispersion = [max(intersacc_interval['x']) - min(intersacc_interval['x']),
max(intersacc_interval['y']) - min(intersacc_interval['y'])]
if any(thd >= prefiltering_spread_thd for thd in dispersion):
unknown_interval_index.append(i) # keep unknown
unknown_interval_masks.append(mask.copy()) # cache the indexing
else:
gaze_points['data']['EYE_MOVEMENT_TYPE'][mask] = 'FIX'
# II. Second step of fixation removal: finer prefiltering
#
for i, interval_mask in zip(unknown_interval_index, unknown_interval_masks):
# We record the borders of the non-FIX episodes to validate their duration. If the non-FIX episode is very
# short, we mark it as NOISE (not enough duration for a candidate for smooth pursuit)
onset_timestamp = None
onset_index = None
intersacc_interval = gaze_points['data'][interval_mask]
intersacc_interval = functions.get_xy_moving_average(intersacc_interval,
param["NORMALIZATION_SLIDING_WINDOW_SIZE_SAMPLES"],
inplace=False)
# for intervals shorter than @param["INTERSACCADIC_INTERVAL_DURATION_THRESHOLD_MILLISEC"]
# cannot do further filtering. The label remains 'UNKNOWN'
if intersacc_interval['time'][-1] - intersacc_interval['time'][0] < \
param["INTERSACCADIC_INTERVAL_DURATION_THRESHOLD_MILLISEC"]:
continue
# for intervals that longer than param["SLIDING_WINDOW_WIDTH_MILLISEC"] do further pre-filtering.
# Label data as 'FIX' or 'NOISE', or keep 'UNKNOWN'
else:
# window is shifted by 1 sample every time
for index, item in enumerate(intersacc_interval):
x_start = item['x']
y_start = item['y']
shift_window_interval = intersacc_interval[
(intersacc_interval['time'] >= item['time']) *
(intersacc_interval['time'] <= item['time'] + param["SLIDING_WINDOW_WIDTH_MILLISEC"])
]
# if distance between current data and the end of interval is shorter than
# param["SLIDING_WINDOW_WIDTH_MILLISEC"](i.e. if the end of the window matches the end of the
# intersaccadic interval), we keep the previous label if it was FIX, otherwise keep UNKNOWN
if shift_window_interval['time'][-1] == intersacc_interval['time'][-1]:
if intersacc_interval['EYE_MOVEMENT_TYPE'][index - 1] == 'FIX':
gaze_points['data']['EYE_MOVEMENT_TYPE'][
(gaze_points['data']['time'] == item['time'])] = 'FIX'
# we do not keep track of the non-fixation interval anymore since it will be all fixation
# until the end of the intersaccadic interval
onset_timestamp = None
onset_index = None
else:
# new non-fixation interval is starting
onset_timestamp = item['time']
onset_index = item['global_index']
# if distance between current data and the end of interval is larger than window size, continue
# with the process
else:
# get window duration in seconds
period = (shift_window_interval['time'][-1] - shift_window_interval['time'][0]) * 1e-6
# is the fixation criterion satisfied?
fixation_flag = True
if param["SLIDING_WINDOW_CRITERION"] == 'speed':
# if the current speed is larger than speed threshold --
# mark as onset(UNKNOWN, NOISE). else -- mark as offset(FIX)
x_end = shift_window_interval['x'][-1]
y_end = shift_window_interval['y'][-1]
if math.sqrt((x_start - x_end) ** 2 + (y_start - y_end) ** 2) >= speed_thd * period:
# will not be a fixation
fixation_flag = False
else: # spread
# if either x_max - x_min or y_max - y_min is larger than speed threshold * time --
# mark as onset. else -- mark as offset
x_max = max(shift_window_interval['x'])
x_min = min(shift_window_interval['x'])
y_max = max(shift_window_interval['y'])
y_min = min(shift_window_interval['y'])
if max(x_max - x_min, y_max - y_min) >= speed_thd * period:
# will not be a fixation
fixation_flag = False
if fixation_flag:
gaze_points['data']['EYE_MOVEMENT_TYPE'][item['global_index']] = 'FIX'
# either a fixation start or the whole interval end
if fixation_flag or index == len(intersacc_interval) - 1:
# if we had a non-fixation interval going on before, check it's duration
if onset_index is not None:
# onset episode larger than 50ms: UNKNOWN. else: NOISE
if item['time'] - onset_timestamp < param["MIN_SP_DURATION_MILLISEC"]:
offset_timestamp = item['time'] - 1
offset_index = item['global_index'] - 1
# if this is not the beginning of fixation,
# the last item also should be labelled as NOISE
if not fixation_flag:
offset_timestamp += 1
offset_index += 1
gaze_points['data'][onset_index:(offset_index + 1)]['EYE_MOVEMENT_TYPE'] = 'NOISE'
# episode is finished
onset_timestamp = None
onset_index = None
else:
# if new non-fixation interval started
if onset_timestamp is None:
onset_timestamp = item['time']
onset_index = item['global_index']
# otherwise it just continues, don't have to do anything
# can now remove the global_index column
gaze_points = ArffHelper.remove_column(gaze_points, 'global_index')
return gaze_points