Skip to content

Commit

Permalink
fix real_luzern_ch
Browse files Browse the repository at this point in the history
  • Loading branch information
5ila5 committed Sep 2, 2024
1 parent 04bc61d commit 1f94614
Showing 1 changed file with 74 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import datetime
from waste_collection_schedule import Collection

import requests
from bs4 import BeautifulSoup
from waste_collection_schedule import Collection
from waste_collection_schedule.service.ICS import ICS

TITLE = "Real Luzern"
DESCRIPTION = "Source script for Real Luzern, Switzerland"
Expand All @@ -11,7 +10,7 @@
TEST_CASES = {
"Luzern - Heimatweg": {"municipality_id": 13, "street_id": 766},
"Luzern - Pliatusblick": {"municipality_id": 13, "street_id": 936},
"Emmen": {"municipality_id": 6}
"Emmen": {"municipality_id": 6},
}

ICON_MAP = {
Expand All @@ -23,62 +22,90 @@
# "Altmetall": "mdi:engine",
}

GERMAN_MONTH_STRING_TO_INT = {
"Januar": 1,
"Februar": 2,
"März": 3,
"April": 4,
"Mai": 5,
"Juni": 6,
"Juli": 7,
"August": 8,
"September": 9,
"Oktober": 10,
"November": 11,
"Dezember": 12,
}

OLD_WASTE_NAME = (
{ # New fetcher uses different names this should prevent breaking changes
"Kehrichtsammlung": "Kehricht",
"Grüngutsammlung": "Grüngut",
"Papiersammlung": "Papier",
"Kartonsammlung": "Karton",
"Alteisen/Metallsammlung": "Altmetall",
}
)

API_URL = "https://www.real-luzern.ch/abfall/sammeldienst/abfallkalender/"


class Source:
def __init__(self, municipality_id, street_id=None):
def __init__(self, municipality_id: str | int, street_id: str | int | None = None):
self._municipality_id = municipality_id
self._street_id = street_id
self._ics = ICS()
self._ics_params: dict[str, str] | None = None

def fetch(self):
# make request
uri = f"{API_URL}?gemId={self._municipality_id}&strId={self._street_id}"
def fetch_ics_params(self) -> None:
uri = f"{API_URL}?gemid={self._municipality_id}&strid={self._street_id}"
r = requests.get(uri)
r.raise_for_status()
soup = BeautifulSoup(r.text, features="html.parser")
data_div = soup.select_one("div.abfk-calendar.abfk-colors")
if not data_div:
raise ValueError("No data found")

tour_id = data_div.get("data-tourid", None)
if not tour_id:
raise ValueError("No tour id found")
categories = data_div.get("data-categories", None)
if not categories:
raise ValueError("No categories found")

self._ics_params = {
"abfk_calendar_download": "1",
"calendar[tourId]": str(tour_id),
"calendar[gemeinde]": "Luzern",
"calendar[format]": "ics",
**{
f"calendar[{category}]": category
for category in str(categories).split(",")
},
}

def fetch(self) -> list[Collection]:
fresh = False
if not self._ics_params:
fresh = True
self.fetch_ics_params()

try:
entries = self.get_collections()
if not entries:
raise ValueError("No entries found")
return entries
except Exception as e:
if fresh:
raise e
self.fetch_ics_params()
return self.get_collections()

def get_collections(self) -> list[Collection]:
if not self._ics_params:
raise ValueError("No ics params found")

r = requests.get(API_URL, params=self._ics_params)
r.raise_for_status()
data = self._ics.convert(r.text)

# extract entries
entries = []
waste_type_containers = soup.find(
"div", class_='tab-content').findChildren('div', recursive=False)
for waste_type_container in waste_type_containers:
waste_type = waste_type_container.get('id')
years = [int(y.text) for y in waste_type_container.findChildren("h3", recursive=False)]

dates_per_year = [c.text.replace(". ", ".").split(
) for c in waste_type_container.find_all("div", class_='cols')]

for (year, dates) in zip(years, dates_per_year):
for (day, month) in [split_german_date(date) for date in dates]:
entries.append(
Collection(
date=datetime.date(year, month, day),
t=waste_type,
icon=ICON_MAP.get(waste_type),
)
)

return entries

for date, waste_type in data:
# New fetcher uses different names this should prevent breaking changes
waste_type = OLD_WASTE_NAME.get(waste_type, waste_type)
entries.append(
Collection(
date=date,
t=waste_type,
icon=ICON_MAP.get(waste_type),
)
)

def split_german_date(date_string):
day, month_string = date_string.strip().split(".")
month = GERMAN_MONTH_STRING_TO_INT[month_string]
return (int(day), month)
return entries

0 comments on commit 1f94614

Please sign in to comment.