-
Notifications
You must be signed in to change notification settings - Fork 0
/
efield.py
183 lines (143 loc) · 5.2 KB
/
efield.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""
Module to deal with the messy task of detecting lightning and coming up with a
range for it.
Used by spinningCan.py, spinningCanTest.py, spinningCanReplay.py, and
analyzeRecodring.py to make sure they are all on the same page.
"""
import numpy
__version__ = "0.2"
__all__ = ['ElectricField',]
class ElectricField(object):
"""
Class to store and deal with the electric field and its changes. This class
will keep up with a moving set of electric field measurments in order to:
1) Keep track of the current field and its derivative,
2) Determine if the field is high enough to warrant a warning, and
3) Determin if lightning has been detected.
This class is designed to take over most if not all of the computations
needs by spinningCan and its ilk.
"""
def __init__(self, highField=5.0, veryHighField=7.0, minFieldChange=0.04, nKeep=20):
# Field measurment values
self.highField = float(highField)
self.veryHighField = float(veryHighField)
# Lightning detection control values
self.minFieldChange = float(minFieldChange)
# Field retention control
self.nKeep = int(nKeep)
# Internal data structures to store the time/field pairs.
self.times = []
self.field = []
def updateConfig(self, config):
"""
Update the current configuration using a dictionary of values.
Values looked for are:
* average_time - number of seconds to keep and average the
electric field
* high_field - field value in kV/m for a high field
* very_high_field - field value in kV/m for a very high field
* min_efield_change - minimum field change over
~0.3 s to count as lightning.
All dictionary keys are taken to be upper-cased and are case
sensitive.
"""
# Update values
## Data retention
self.nKeep = int(round(20.0*float(config['efield']['average_time'])))
if self.nKeep < 7:
self.nKeep = 7
## Field control
self.highField = float(config['efield']['high_field'] )
self.veryHighField = float(config['efield']['very_high_field'])
## Lightning control
self.minFieldChange = float(config['lightning']['min_efield_change'])
# Prune
self.times = self.times[-self.nKeep:]
self.field = self.field[-self.nKeep:]
def append(self, time, data):
"""
Append a new time stamp/electric field value (in kV/m) pair to the
instance.
"""
try:
self.times.append(time)
self.field.append(data)
self.times = self.times[-self.nKeep:]
self.field = self.field[-self.nKeep:]
return True
except:
return False
def mean(self):
"""
Determine the current mean of the electric field and return the value
in kV/m.
"""
n = 0
total = 0
for data in self.field:
n += 1
total += data
return total / float(n)
def __smooth(self, i):
"""
Perform a simple backwards boxcar smoothing of the data with a window
of three at the specified location.
"""
n = 1
smoothData = self.field[i]
try:
n += 1
smoothData += self.field[i-1]
n += 1
smoothData += self.field[i-2]
except IndexError:
n -= 1
return smoothData/float(n)
def deriv(self):
"""
Compute and return the derivative over a ~0.3 s window (6 samples).
"""
if len(self.field) > 6:
return self.__smooth(-1) - self.__smooth(-7)
else:
return 0.0
def isHigh(self):
"""
Examine the last field value and determine if we are in a `high field`
condition or not.
"""
if abs(self.field[-1] ) > self.highField:
return True
else:
return False
def isVeryHigh(self):
"""
Examine the last field value and determine if we are in a `very high
field` condition or not.
"""
if abs(self.field[-1] ) > self.veryHighField:
return True
else:
return False
def isLightning(self):
"""
Examine the current field list and determine if we have what looks like
lightning occuring.
"""
deriv = self.deriv()
if abs(deriv) > self.minFieldChange:
return True
else:
return False
def getLightningDistance(self, miles=False):
"""
Assuming the lightning is responsible for the field change, estimate
the distance of the lightning in km (or miles if the `milew` keyword
is set to True.
"""
deriv = self.deriv()
dist = (10.0/abs(deriv))**(1/3.) * 5
if miles:
return dist*0.621371192
else:
return dist