forked from emlid/ReachView
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathLogManager.py
166 lines (122 loc) · 5.62 KB
/
LogManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# ReachView code is placed under the GPL license.
# Written by Egor Fedorov (egor.fedorov@emlid.com)
# Copyright (c) 2015, Emlid Limited
# All rights reserved.
# If you are interested in using ReachView code as a part of a
# closed source project, please contact Emlid Limited (info@emlid.com).
# This file is part of ReachView.
# ReachView is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# ReachView is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with ReachView. If not, see <http://www.gnu.org/licenses/>.
import os
import math
from glob import glob
from log_converter import convbin
class LogManager():
supported_solution_formats = ["llh", "xyz", "enu", "nmea", "erb"]
def __init__(self, rtklib_path, log_path):
self.log_path = log_path
self.convbin = convbin.Convbin(rtklib_path)
self.log_being_converted = ""
self.available_logs = []
self.updateAvailableLogs()
def updateAvailableLogs(self):
self.available_logs = []
print("Getting a list of available logs")
for log in glob(self.log_path + "/*"):
if not log.endswith(".zip"):
log_name = os.path.basename(log)
# get size in bytes and convert to MB
log_size = self.getLogSize(log)
potential_zip_path = os.path.splitext(log)[0] + ".zip"
log_format = self.getLogFormat(log)
is_being_converted = True if log == self.log_being_converted else False
self.available_logs.append({
"name": log_name,
"size": log_size,
"format": log_format,
"is_being_converted": is_being_converted
})
self.available_logs.sort(key = lambda log: self.getLogCompareString(log["name"]), reverse = True)
def getLogCompareString(self, log_name):
name_without_extension = os.path.splitext(log_name)[0]
log_type, log_date = name_without_extension.split("_")
return log_date + log_type[0:2]
def getLogSize(self, log_path):
size = os.path.getsize(log_path) / (1024 * 1024.0)
return "{0:.2f}".format(size)
def getLogFormat(self, log_path):
file_path, extension = os.path.splitext(log_path)
extension = extension[1:]
potential_zip_path = file_path + ".zip"
if os.path.isfile(potential_zip_path):
return "RINEX"
if (extension in self.supported_solution_formats or
extension in self.convbin.supported_log_formats):
return extension.upper()
else:
return ""
def formTimeString(self, seconds):
# form a x minutes y seconds string from seconds
m, s = divmod(seconds, 60)
s = math.ceil(s)
format_string = "{0:.0f} minutes " if m > 0 else ""
format_string += "{1:.0f} seconds"
return format_string.format(m, s)
def calculateConversionTime(self, log_path):
# calculate time to convert based on log size and format
log_size = os.path.getsize(log_path) / (1024*1024.0)
conversion_time = 0
if log_path.endswith("rtcm3"):
conversion_time = 42.0 * log_size
elif log_path.endswith("ubx"):
conversion_time = 1.8 * log_size
return "{:.0f}".format(conversion_time)
def cleanLogFiles(self, log_path):
# delete all files except for the raw log
full_path_logs = glob(self.log_path + "/*.rtcm3") + glob(self.log_path + "/*.ubx")
extensions_not_to_delete = [".zip", ".ubx", ".rtcm3"]
log_without_extension = os.path.splitext(log_path)[0]
log_files = glob(log_without_extension + "*")
for log_file in log_files:
if not any(log_file.endswith(ext) for ext in extensions_not_to_delete):
try:
os.remove(log_file)
except OSError, e:
print ("Error: " + e.filename + " - " + e.strerror)
def deleteLog(self, log_filename):
# try to delete log if it exists
log_name, extension = os.path.splitext(log_filename)
# try to delete raw log
print("Deleting log " + log_name + extension)
try:
os.remove(self.log_path + "/" + log_name + extension)
except OSError, e:
print ("Error: " + e.filename + " - " + e.strerror)
print("Deleting log " + log_name + ".zip")
try:
os.remove(self.log_path + "/" + log_name + ".zip")
except OSError, e:
print ("Error: " + e.filename + " - " + e.strerror)
def getRINEXVersion(self):
# read RINEX version from system file
print("Getting RINEX version from system settings")
version = "3.01"
try:
with open("/home/reach/.reach/rinex_version", "r") as f:
version = f.readline().rstrip("\n")
except IOError, OSError:
print("No such file detected, defaulting to 3.01")
return version
def setRINEXVersion(self, version):
# write RINEX version to system file
print("Writing new RINEX version to system file")
with open("/home/reach/.reach/rinex_version", "w") as f:
f.write(version)