-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_interface.py
62 lines (46 loc) · 1.96 KB
/
data_interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from buienradar.buienradar import (get_data, parse_data)
from buienradar.constants import (CONTENT, RAINCONTENT, SUCCESS)
import logging
# log buienradar library warnings
logger = logging.getLogger()
logging.basicConfig(filename='data_interface.log', level=logging.WARNING)
timezone = ZoneInfo("Europe/Amsterdam")
def string_to_datetime(timestamp):
strhour, strminute = timestamp.split(":")
hour = int(strhour)
minute = int(strminute)
five_minutes_ago = datetime.now(tz=timezone) - timedelta(minutes=5)
now = datetime.now()
timestamp = datetime(year=now.year, month=now.month, day=now.day, hour=hour, minute=minute, tzinfo=timezone)
if timestamp < five_minutes_ago:
timestamp += timedelta(days=1)
return timestamp
def parse_rain_data(raindata):
split_response = raindata.strip().split("\n")
split_tuple_response = list(i.split("|") for i in split_response)
return list({"timestamp": string_to_datetime(timestamp), "mmh": int(mmh)/100} for mmh, timestamp in split_tuple_response)
def fetch_data(latitude, longitude, timeframe):
response = get_data(latitude=latitude,
longitude=longitude,
)
if response.get(SUCCESS):
data = response[CONTENT]
raindata = response[RAINCONTENT]
result = parse_data(data, raindata, latitude, longitude, timeframe)["data"]
temperature = result["temperature"]
windspeed = result["windspeed"]
parsed_rain_data = parse_rain_data(raindata=raindata)
return temperature, windspeed, parsed_rain_data
else:
return None, None, None
def max_rain(rain_data, timeframe):
max_rain = 0
now_plus_offset = datetime.now(tz=timezone) + timedelta(minutes=timeframe)
for i in rain_data:
if i["timestamp"] <= now_plus_offset:
max_rain = max(max_rain, i["mmh"])
else:
break
return max_rain