-
Notifications
You must be signed in to change notification settings - Fork 0
/
booker.py
217 lines (195 loc) · 7.72 KB
/
booker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import time
import asyncio
import logging
import scraper as scraper
import controller as controller
from config import get_value
from aiogram import Bot
from selenium import webdriver
LOGGER = logging.getLogger()
def wait_code(_id):
"""Wait for SMS code for a maximum of 2 minutes
Args:
_id: telegram chat id
Returns:
dict: dict with _id and code
None: code not found
"""
for i in range(120):
# read file
with open("codes.txt", "r") as f:
codes = f.readlines()
# check a code with matching _id
for raw_code in codes:
code_data = raw_code.split("%")
# if found
if code_data[0] == _id:
# remove line from file
codes.remove(raw_code)
with open("codes.txt", "w") as f:
f.writelines(codes)
# return dict
return {"_id": code_data[0], "code": code_data[1]}
time.sleep(1)
return None
def find_appointment(driver, wanted_date, wanted_place):
"""Search for the wanted appointment on the page and click on it
Args:
driver: webdriver
wanted_date (string): wanted appointment date
wanted_place (string): wanted appointment place
Returns:
True: appointment found and clicked
False: appointment not found
"""
# list of appointments
appointments = driver.find_elements_by_class_name("text-wrap")
for appointment in appointments:
info = appointment.find_elements_by_tag_name("span")
place = info[2].text.upper()
date = f"{(info[0].text)[-10:]} {(info[1].text)[-5:]}:00"
# check if appointment equals wanted appointment
if scraper.are_equal({"date": date, "place": place}, {"date": wanted_date, "place": wanted_place}, True):
appointment.click()
return True
return False
def check(driver, appointment):
"""Check if wanted appointment is still available and proceed with the booking
Args:
driver: webdriver
appointment (dict): wanted appointment
Returns:
True: appointment found and booking process started
False: appointment not found
"""
# check if appointment is still available
# by distance filter
scraper.switch_filter(driver, "Distanza")
if not find_appointment(driver, appointment["date"], appointment["place"]):
# by date filter
scraper.switch_filter(driver, "Data")
# appointment not found
if not find_appointment(driver, appointment["date"], appointment["place"]):
return False
try:
# start booking process
driver.execute_script("document.getElementsByClassName('btn btn-primary btn-icon')[0].click()")
driver.execute_script("document.getElementsByClassName('btn btn-primary btn-icon mr-4 btn-large')[0].click()")
return True
except Exception:
# appointment is not available anymore
return False
def book(driver, _id):
"""Perform booking process
Args:
driver: webdriver
_id (str): telegram chat id
Returns:
0 (int): booking done
-1 (int): code not found
-2 (int): wrong or expired code or appointment not available anymore
"""
# search code
code = wait_code(_id)
if code is None:
return -1
# fill code field and proceed
scraper.wait_until_present(driver, el_id="otpCode")
driver.find_element_by_id("otpCode").send_keys(code["code"])
driver.execute_script("document.getElementsByClassName('btn btn-primary btn-icon')[0].click()")
try:
# if that element is present booking was successful
scraper.wait_until_present(driver, el_id="bookingDetailsSection", duration=5)
return 0
except Exception:
# booking failed due to wrong/expired code or appointment not available
return -2
def start_booker():
"""Start booker loop"""
LOGGER.info("Starting booker.")
# bot istance
bot = Bot(get_value("token"))
# webdriver options
options = webdriver.firefox.options.Options()
options.headless = True
# wedriver
driver = webdriver.Firefox(options=options)
# files initialization
with open("appointments.txt", "w"):
pass
with open("codes.txt", "w"):
pass
# loop
while True:
try:
# read booking requests
with open("appointments.txt", "r") as f:
bookings = f.readlines()
for raw_booking in bookings:
LOGGER.info("New booking request")
# remove booking request from the file
bookings.remove(raw_booking)
with open("appointments.txt", "w") as f:
f.writelines(bookings)
# parse data
booking_data = raw_booking.split("%")
booking = {"_id": booking_data[0], "date": booking_data[1][:11], "place": booking_data[1][12:]}
# get user
user = controller.get_user(booking["_id"])
# perform login
scraper.login(driver, user["health_card"], user["fiscal_code"])
is_vaccinated = scraper.find(driver, user["region"], user["country"], user["postal_code"], user["phone"], user["date"])
# if user has already booked an appointment or he disabled notifications
if is_vaccinated:
# set is_booking to False and is_vaccinated to True and send a notification
asyncio.get_event_loop().run_until_complete(controller.change_booking_state(user["_id"], False))
asyncio.get_event_loop().run_until_complete(controller.update_status(user["_id"], is_vaccinated))
asyncio.get_event_loop().run_until_complete(bot.send_message(
user["_id"], "Ho notato che hai già effettuato una prenotazione perciò non controllerò le date per te.\n\nSe dovessi annullare la prenotazione e volessi essere notificato ancora digita /reset\n\nSe vuoi cancellare i tuoi dati digita /cancella"))
# next booking request
time.sleep(30)
continue
still_available = check(driver, booking)
# if wanted appointment is still availale
if still_available:
# send SMS code request notification
asyncio.get_event_loop().run_until_complete(bot.send_message(
user["_id"], "Invia il comando /codice per procedere con l'inserimento del codice."))
# perform booking
result = book(driver, booking["_id"])
# booking failed -> if result = -2
result_message = "Il codice che hai fornito non è corretto o è scaduto, oppure l'appuntamento non è più disponibile."
# booking done
if result == 0:
LOGGER.info("Booking request was successful.")
result_message = "Prenotazione effettuata con successo. Riceverai un SMS di conferma."
# booking failed
elif result == -1:
result_message = "Non hai fornito un codice in tempo (attesa massima 2 minuti)."
# send a notification with the booking result
asyncio.get_event_loop().run_until_complete(bot.send_message(user["_id"], result_message))
else:
asyncio.get_event_loop().run_until_complete(bot.send_message(user["_id"], "L'appuntamento che volevi prenotare non è più disponibile."))
# set is_booking to False
asyncio.get_event_loop().run_until_complete(controller.change_booking_state(user["_id"], False))
# clear cookies and wait 30 seconds for next
driver.delete_all_cookies()
time.sleep(30)
# loop cycle done, wait 1 minute for next cycle
time.sleep(60)
except Exception as e:
LOGGER.exception(e)
try:
# set is_booking to False
asyncio.get_event_loop().run_until_complete(controller.change_booking_state(user["_id"], False))
# clear cookies
driver.delete_all_cookies()
# check IP ban
if "Sessione scaduta" in driver.page_source:
LOGGER.info("IP banned")
# wait 60 minutes but I think it's useless, need to change IP
time.sleep(60 * 60)
except Exception as e:
LOGGER.exception(e)
# wait 5 minutes
time.sleep(60 * 5)