-
Notifications
You must be signed in to change notification settings - Fork 0
/
decoder.py
299 lines (235 loc) · 9.92 KB
/
decoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# Author: Jacob Dawson
#
# This file does the decoding of an APT recording.
import scipy.io.wavfile as wav
import scipy.signal as signal
import numpy as np
from os import listdir, path, makedirs
from PIL import Image
from PIL.ImageOps import autocontrast
def readFile(filename):
# this function takes a .wav file and returns its frequency and data.
# Because we only need the signal in mono, we only return one channel if
# the given recording has stereo sound.
print("Reading", filename)
fs, data = wav.read(filename)
if len(data.shape) == 2:
print(f"Data in two channels, using the first one")
data = data[:, 0] # only need 1 channel
return fs, data
def resample(fs, data):
# this function uses scipy's resample function to resample a signal
# into the correct sample rate. Normally this will be a downsample,
# so don't worry about any aliasing issues or whatever
expectedRate = 20800
if fs != expectedRate:
print(f"Resampling from {fs} to {expectedRate}")
data = signal.resample(data, int((data.shape[0] / fs) * expectedRate))
# sample rate is now expected
fs = expectedRate
else:
print(f"No need to resample, recording is already at {fs}")
return fs, data
def hilbert(data):
# This function performs the hilbert transform and returns its absolute
# value on our data. Scipy has a good visualization of this at the bottom
# of the page:
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.hilbert.html
analytical_signal = signal.hilbert(data)
amplitude_envelope = np.abs(analytical_signal)
return amplitude_envelope
def kernelFilter(fs, data_am):
# this applies a median filter kernel size 5,
# and then keeps only the signal at each 3rd position.
data_am = data_am[: ((data_am.size // 5) * 5)] # signal's size a factor of 5
data_am = signal.medfilt(data_am, 5)
data_am = data_am.reshape(len(data_am) // 5, 5)[:, 3]
fs = fs // 5
return fs, data_am
def toImgValues(data_am):
# this function simply normalizes our raw data into a range of (0,255),
# the range of a greyscale image. Note that data_am, both given and
# returned, is expected to be in the shape (x,), that is a one-dimensional
# array. However, I think it *should* work otherwise.
print("Determining pixel values")
# erring on the side of caution and just saving the raw image, so let's
# simply scale everything:
max = np.amax(data_am)
min = np.amin(data_am)
data_am = ((data_am - min) / max) * 255 # values are in (0,255)
# and just to be safe:
data_am = np.clip(data_am, 0, 255)
return data_am
def alignSignal(data_am):
# this function takes the signal and returns data on how the signal ought
# to be aligned.
print("Aligning signal for image")
# https://github.com/zacstewart/apt-decoder has some great code for this,
# copying it and changing things slightly. It's very smart to make a little
# "example" and then compare slices of our real signal to figure out how
# similar the potential slice is. Unfortunately it needs to be done with
# some python iteration, but I don't think this is avoidable
# we compare this platonic ideal of a telemetry signal to our real data:
syncA = [0, 128, 255, 128] * 7 + [0] * 7
# list of maximum correlations found: (index, value)
peaks = [(0, 0)]
# minimum distance between peaks
minDistance = 2000
# need to shift the values down to get meaningful correlation values
signalshifted = [x - 128 for x in data_am]
syncA = [x - 128 for x in syncA]
for i in range(len(data_am) - len(syncA)):
corr = np.dot(syncA, signalshifted[i : i + len(syncA)])
# if previous peak is too far, keep it and add this value to the
# list as a new peak
if i - peaks[-1][0] > minDistance:
peaks.append((i, corr))
# else if this value is bigger than the previous maximum, set this one
elif corr > peaks[-1][1]:
peaks[-1] = (i, corr)
return peaks
def createGreyscaleImg(data_am, peaks, frame_width):
# given a signal with shape (x,) and some info on how wide the image is
# expected to be and how the signal ought to be aligned, create an aligned
# greyscale image and return it.
print("Creating image from array")
# create image matrix starting each line on the peaks found
matrix = []
for ind, _ in peaks:
row = data_am[ind : ind + frame_width]
if row.size != frame_width:
break
matrix.append(row)
img = np.array(matrix)
return img
def saveImg(img, outputFolder, filename, enhanceContrast=False):
# this function simply takes an ndarray which may be greyscale or RGB and
# saves it to a png file
# first, we want to change the image from 32-bit floats to 8-bit integers:
img = img.astype(np.int8)
if len(img.shape) == 2:
image = Image.fromarray(img, mode="L")
elif len(img.shape) == 3:
image = Image.fromarray(img, mode="RGB")
else:
print("Unknown format, cannot save image")
return
# dunno!
if image.mode != "RGB":
image = image.convert("RGB")
if enhanceContrast:
image = autocontrast(image)
output_path = outputFolder + "/" + filename.split("\\")[-1].split(".")[0] + ".png"
print(f"Writing image to {output_path}")
image.save(output_path)
def createFalseColorImg(greyscaleImg):
print("Combining channels and creating a false color image")
chA = greyscaleImg[:, :1040]
chB = greyscaleImg[:, 1040:]
"""
# let's just see what overlaying the two channels does for now. Maybe we
# add them together with some constants multiplying?
stackedImg = np.stack(
(chA * 1.5, (chA * 0.7) + (chB * 0.3), chB * 0.5), axis=-1
)
stackedImg = toImgValues(stackedImg)
return stackedImg
"""
# found this solution here:
# https://github.com/enigmastrat/apt137
# It's intriguing, but maybe their setup is slightly different, because it
# doesn't quite work. Maybe I just need to tweak the values.
stackedImg = np.stack((chA, chB), axis=-1)
colorizedImg = []
waterThreshold = np.median(chA) * 0.45
vegThreshold = np.median(chA) * 1.0
dirtThreshold = np.median(chA) * 1.3
cloudThreshold = np.median(chB) * 1.45
waterPixels = 0
cloudPixels = 0
vegPixels = 0
dirtPixels = 0
normalPixels = 0
for i in stackedImg:
colorizedLine = []
for colVal, irVal in i:
if colVal < waterThreshold:
# if water:
r = (8 * 256) + colVal * 0.2
g = (20 * 256) + colVal * 1
b = (50 * 256) + colVal * 0.75
waterPixels += 1
elif irVal > cloudThreshold:
# if cloud/ice/snow:
r = (
(irVal + colVal) / 2
) * 1.25 # Average the two for a little better cloud distinction
g = r
b = r
cloudPixels += 1
elif colVal < vegThreshold:
# if vegetation:
r = colVal * 0.9
g = colVal * 1
b = colVal * 0.7
vegPixels += 1
elif colVal <= dirtThreshold:
# if dirt/desert/brown:
r = colVal * 1.1
g = colVal * 0.9
b = colVal * 0.7
dirtPixels += 1
else:
# Everything else, but this was probably captured by the IR channel above
r = colVal
g = colVal
b = colVal
normalPixels += 1
colorizedLine.append([r, g, b])
colorizedImg.append(colorizedLine)
totalPixels = waterPixels + cloudPixels + vegPixels + dirtPixels + normalPixels
print(f"Water: {round(100*(waterPixels/totalPixels), 2)}%")
print(f"Cloud: {round(100*(cloudPixels/totalPixels), 2)}%")
print(f"Vegetation: {round(100*(vegPixels/totalPixels), 2)}%")
print(f"Dirt: {round(100*(dirtPixels/totalPixels), 2)}%")
print(f"Other: {round(100*(normalPixels/totalPixels), 2)}%")
colorizedImg = np.array(colorizedImg)
colorizedImg = toImgValues(colorizedImg)
return colorizedImg
def process(filename, outputFolderRawImgs, outputFalseColorImages):
# the "main" function. Given a filename and an expected output folder, this
# function calls the above functions to turn the given .wav file into an
# image from a weather sat!
# read the given file:
fs, data = readFile(filename)
# resample to 20800
fs, data = resample(fs, data)
# perform hilbert transform
data_am = hilbert(data)
# kernel filter, reducing sample rate and improving signal (?)
fs, data_am = kernelFilter(fs, data_am)
frame_width = fs // 2 # should now be 2080, if my math is correct
# determine how to align image
peaks = alignSignal(data_am)
# turn signal into a 2D ndarray based on the alignment numbers above
img = createGreyscaleImg(data_am, peaks, frame_width)
# let's save that greyscale to a file, call it rawImages
saveImg(toImgValues(img), outputFolderRawImgs, filename)
falseColorImg = createFalseColorImg(img)
saveImg(falseColorImg, outputFalseColorImages, filename, enhanceContrast=True)
if __name__ == "__main__":
recordings = "recordings"
outputRawImages = "rawImages"
outputFalseColorImages = "falseColorImages"
if not path.isdir(recordings):
makedirs(recordings)
if not path.isdir(outputRawImages):
makedirs(outputRawImages)
if not path.isdir(outputFalseColorImages):
makedirs(outputFalseColorImages)
if len(listdir(recordings)) == 0:
print("Place recoded APT signals in the 'recordings' folder!")
for filename in listdir(recordings):
print("\n")
f = path.join(recordings, filename)
process(f, outputRawImages, outputFalseColorImages)