-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
130 lines (105 loc) · 4.24 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import sys, csv, time
import numpy as np
import pandas as pd
import sounddevice as sd
import googlemaps
from key import GMAPS_KEY
from iploc.loc_tools import ipinfo
from resources.mls_handler import MLS
# breaker exception
class Breaker(Exception):
pass
# format printed output for sound
def print_sound(indata, outdata, frames, time, status):
volume_norm = np.linalg.norm(indata)*10
if int(volume_norm) < 10:
print (" %.20f" % float(volume_norm), " ", "|" * int(volume_norm))
elif int(volume_norm) < 100:
print (" %.20f" % float(volume_norm), " ", "|" * int(volume_norm))
noise_data.append(volume_norm)
def tower_yielder(dataset):
for data in dataset:
if dataset.index(data) == 20:
break
else:
yield data
def closest_towers(dataset):
cells = []
for tower in dataset:
if dataset.index(tower) == 10:
break
else:
cell = {}
cell['cellId'] = tower[4]
cell['locationAreaCode'] = tower[3]
cell['mobileCountryCode'] = tower[1]
cell['mobileNetworkCode'] = tower[2]
cells.append(cell)
return cells
if __name__=='__main__':
global noise_data
# get execution start time
start_time = time.time()
print("Python {0:s} {1:d}bit on {2:s}\n".format(" ".join(item.strip() for item in sys.version.split("\n")), 64 if sys.maxsize > 0x100000000 else 32, sys.platform))
# write first line if file is empty
try:
with open('avg.csv', 'r', newline='') as csv_file:
csv_reader = csv.reader(csv_file)
data = list(csv_reader)
if data == '':
with open('avg.csv', 'w', newline='') as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["noise index", "latitude", "longitude", "accuracy", "timeframe", "cell Id"])
except:
with open('avg.csv', 'w', newline='') as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["noise index", "latitude", "longitude", "accuracy", "timeframe", "cell Id"])
# generate datasets
print("Running MLS checks")
mls = MLS()
print("Getting IP information")
ip = ipinfo()
mcc_dataset = mls.get_mcc(ip.mcc)
print("Sorting dataset")
sorted_dataset = mls.sort_data(mcc_dataset, ip.initial_coords)
gmaps = googlemaps.Client(key = GMAPS_KEY)
# start collecting data
while True:
try:
with open("avg.csv", "a", newline='') as avg_file:
avg_writer = csv.writer(avg_file)
for tower in tower_yielder(sorted_dataset):
print(tower)
cell = {}
cell['cellId'] = tower[4]
cell['locationAreaCode'] = tower[3]
cell['mobileCountryCode'] = tower[1]
cell['mobileNetworkCode'] = tower[2]
try:
noise_data = []
# collect data for mil timeframe
with sd.Stream(callback=print_sound):
sd.sleep(1000)
# get gps coordinates
coords = gmaps.geolocate(tower[1], tower[2], tower[0], tower[8], False, cell)
lat = coords['location']['lat']
lng = coords['location']['lng']
acr = coords['accuracy']
# get the average noise index
noise_avg = 0
for nd in noise_data:
noise_avg += float(nd)
noise_avg = str(noise_avg/len(noise_data))
# write to dataset
avg_writer.writerow([noise_avg, lat, lng, acr, time.time(), cell['cellId']])
except googlemaps.exceptions.ApiError:
sorted_dataset.remove(tower)
pass
except IndexError:
pass
except KeyboardInterrupt:
raise Breaker
except Breaker:
break
# print total execution time
print("--- %s seconds ---" % (time.time() - start_time))