-
Notifications
You must be signed in to change notification settings - Fork 0
/
filerecord.py
116 lines (90 loc) · 3.62 KB
/
filerecord.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from itertools import accumulate
from pypactQC.util.file import content_as_str
from pypactQC.util.lines import line_indices
from pypactQC.output.tags import TIME_STEP_HEADER, IRRAD_TIME_TAG, COOLING_TIME_TAG
import pypactQC.util.propertyfinder as pf
class FileRecord:
def __init__(self, filename, asstring=''):
"""
Cache the file content as a list of strings and process the timesteps
:param filename:
"""
self.cachedlines = (asstring if asstring else content_as_str(filename))
self._setup()
self._process()
def _setup(self):
pass
def _process(self):
pass
class InventoryFileRecord(FileRecord):
def _setup(self):
self.lineindices = line_indices(self.cachedlines, TIME_STEP_HEADER)
self.timesteps = []
self.irradiation_times = []
self.cooling_times = []
self.times = []
def __len__(self):
return len(self.lineindices)
def __getitem__(self, interval):
"""
Get the timestep lines for a given interval
:param interval:
:return:
"""
l = [s for i, s in self.timesteps if i == interval]
if len(l) == 1:
return l[0]
return ''
def cumulirradiationtime(self, interval):
"""
Get the cumulative irradiation time at a given interval
:param interval:
:return:
"""
return self._getcumultime(interval, self.irradiation_times)
def cumulcoolingtime(self, interval):
"""
Get the cumulative irradiation time at a given interval
:param interval:
:return:
"""
return self._getcumultime(interval, self.cooling_times)
def _process(self):
for i in range(0, len(self)):
t = self.lineindices[i]
nt = -1
if i < len(self.lineindices) - 1:
nt = self.lineindices[i + 1]
interval = int(pf.first(datadump=self.cachedlines[t:nt],
headertag=TIME_STEP_HEADER,
starttag=TIME_STEP_HEADER,
endtag='',
ignores=[],
asstring=False))
irrad_time = pf.first(datadump=self.cachedlines[t:nt],
headertag=TIME_STEP_HEADER,
starttag=IRRAD_TIME_TAG,
endtag='SECS',
ignores=[],
asstring=False)
cool_time = pf.first(datadump=self.cachedlines[t:nt],
headertag=TIME_STEP_HEADER,
starttag=COOLING_TIME_TAG,
endtag='SECS',
ignores=[],
asstring=False)
self.timesteps.append((interval, self.cachedlines[t:nt]))
self.irradiation_times.append(irrad_time)
self.cooling_times.append(cool_time)
if irrad_time == 0.0:
irrad_time = cool_time
self.times.append(irrad_time)
# turn them into cumulative values
self.irradiation_times = list(accumulate(self.irradiation_times))
self.cooling_times = list(accumulate(self.cooling_times))
assert len(self.irradiation_times) == len(self.cooling_times)
def _getcumultime(self, interval, listoftuples):
t = [x for x, y in enumerate(self.timesteps) if y[0] == interval]
if len(t) == 1:
return listoftuples[t[0]]
return 0.0