-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
228 lines (191 loc) · 8.77 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import random
import json
from json import JSONDecodeError
import discord
from discord.ext import commands, tasks
import datetime
import logging
import os
from rich.logging import RichHandler
from rich import print
from rich.traceback import install
from typing import List
import configuration
from configuration import LOGLEVEL, DATA_FOLDER, USERS_JSON_FILE_PATH, GOODREADS_SERVICE, BOOKWYRM_SERVICE
from classes import Review, BookUser, check_new_reviews, get_stars, read_json_data
from classes import extract_user_from_url, read_json_data, write_to_users_json, format_review_text
from exceptions import UrlNotValid
from rss_helper import RSSHelper
from rss_helper import DATE_FORMAT_INPUT, DATE_FORMAT_OUTPUT
FORMAT = "%(message)s"
logging.basicConfig(level=LOGLEVEL,
format=FORMAT,
datefmt="[%X]",
handlers=[RichHandler(markup=True, rich_tracebacks=True)])
log = logging.getLogger("rich")
# Show Tracebacks if DEBUG
if logging.root.level == logging.DEBUG:
install(show_locals=True)
else:
install(show_locals=False)
def init_file_structure (users_json_path: str):
if not os.path.exists(DATA_FOLDER):
os.makedirs(DATA_FOLDER)
with open(users_json_path, "w") as f:
log.info("Data folder doesn't exists, creating folder and .json file...")
pass # Creates empty file if none exist
elif os.path.exists(DATA_FOLDER) and not os.path.exists(users_json_path):
with open(users_json_path, "w") as f:
log.info("Data folder exists, but .json file doesn't exists, creating...")
pass
init_file_structure(USERS_JSON_FILE_PATH)
# Importing keys
try:
DISCORD_TOKEN = os.environ['DISCORD_TOKEN']
GUILD_ID = os.environ['GUILD_ID']
CHANNEL_ID = int(os.environ['CHANNEL_ID'])
except:
DISCORD_TOKEN = configuration.DISCORD_TOKEN
GUILD_ID = configuration.GUILD_ID
CHANNEL_ID = configuration.CHANNEL_ID
logging.debug(f"Channel ID: {CHANNEL_ID}")
# Importing Users
try:
f = open(USERS_JSON_FILE_PATH)
data = json.load(f)
users: List[BookUser] = [user for user in data["users"]]
log.info(f":books: Starting Discord Bot on ChannelID {CHANNEL_ID} :books:")
except JSONDecodeError:
users: List[BookUser] = []
log.warning("Json file is empty")
log.info(f":books: Starting Discord Bot on ChannelID {CHANNEL_ID} :books:")
# Variables
rsh = RSSHelper()
#reviews: List[Review] = rsh.get_reviews(users)
intents = discord.Intents.default()
intents.message_content = True
# Connecting with Discord
class UpdatesClient(commands.Bot):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.msg_sent = False
self.synced = False
async def on_ready(self):
channel = client.get_channel(CHANNEL_ID) # replace with channel ID that you want to send to
if not self.synced: # check if slash commands have been synced
await tree.sync(guild=discord.Object(
id=GUILD_ID)) # guild specific: leave blank if global (global registration can take 1-24 hours)
self.synced = True
log.info(f"Successfully logged and synced in as {client.user}")
await self.timer.start(channel)
# @tasks.loop(seconds=5) # For debug purposes
@tasks.loop(minutes=15)
async def timer(self, channel, force_check=False):
global reviews
rand_debug = random.randrange(0,4)
log.debug(f":stopwatch: Starting timer... Random:{rand_debug} :stopwatch:")
log.debug(f"Is sync? {self.synced}")
current_time = datetime.datetime.now()
#if rand_debug == 1: # Uncomment to test
#if (current_time.minute == 0 or current_time.minute == 15
# or current_time.minute == 30 or current_time.minute == 45
# or force_check):
#if (not self.msg_sent) or force_check:
log.info (f":books: Starting review check... :books:")
try:
data = read_json_data (USERS_JSON_FILE_PATH)
reviews = rsh.get_reviews(data["users"])
new_reviews = check_new_reviews(reviews, data)
for review in new_reviews:
score_star = get_stars(review['score'])
embed = discord.Embed(title=review['title'] + ' ' + score_star,
url=review['url'])
embed.set_author(name=review['username'],
url=review["user_url"], icon_url=review["user_image_url"])
embed.set_thumbnail(url=review['image_url'])
if review['review_text'] == "":
embed.description = f"{review['author']}"
else:
embed.description = format_review_text(review)
log.info(f"Review sent for user: {review['username']}")
await channel.send(embed=embed, mention_author=True)
self.msg_sent = True
reviews = []
log.info (f":books: Review check finished. Sent {len(new_reviews)} new reviews :books:")
except KeyError:
log.warning("Json file is empty")
#reviews = rsh.get_reviews(users)
client = UpdatesClient(command_prefix='/', intents=discord.Intents().all())
channel = client.get_channel(CHANNEL_ID)
tree = client.tree
# Discord Slash Commands
@tree.command(guild=discord.Object(id=GUILD_ID), name='add', description='Add BookUser') # guild specific
async def add_user(interaction: discord.Interaction, user_input_url: str):
global users
global reviews
users_id: List[str] = []
try:
extracted_user = extract_user_from_url(user_input_url)
except UrlNotValid:
log.error(f"URL {user_input_url} not supported!")
await interaction.response.send_message("URL not supported!", ephemeral=True)
return
data = read_json_data()
current_user: BookUser = {
"service" : extracted_user["service"],
"id" : extracted_user["id"],
"user_url" : extracted_user["user_url"],
"last_review_ts" : datetime.datetime.now().strftime(DATE_FORMAT_OUTPUT) # this is server time, maybe could be rss time?
}
if data:
users_id = [user["id"] for user in data["users"]]
if extracted_user["id"] not in users_id:
data["users"].append(current_user)
users.append(current_user)
else: # For first user
data.setdefault("users",[])
data["users"].append(current_user)
users.append(current_user)
write_to_users_json(data)
# reviews = rsh.get_reviews(users) # Need to fix to just
# update reviews with this user reviews
await interaction.response.send_message("¡Añadido!", ephemeral=True)
log.info (f"BookUser {user_input_url} added!")
log.info (f"New user list: {users}")
if not extracted_user:
await interaction.response.send_message("URL not supported!", ephemeral=True)
return
@tree.command(guild=discord.Object(id=GUILD_ID), name='remove', description='Remove User') # guild specific
async def remove_user(interaction: discord.Interaction, user_input_url: str):
global reviews
global users
try:
extracted_user = extract_user_from_url(user_input_url)
except UrlNotValid:
log.error(f"URL {user_input_url} not supported!")
await interaction.response.send_message("URL not supported!", ephemeral=True)
return
data = read_json_data()
for i, user in enumerate(data["users"]):
if user["id"] == extracted_user["id"] and user["service"] == extracted_user["service"]:
del data["users"][i]
write_to_users_json(data)
users = data["users"]
if not extracted_user:
log.error(f"URL {user_input_url} not supported!")
await interaction.response.send_message("URL not supported!", ephemeral=True)
return
await interaction.response.send_message("¡Eliminado!", ephemeral=True)
log.info (f"BookUser {user_input_url} removed!")
@tree.command(guild=discord.Object(id=GUILD_ID), name='review_check', description='Trigger Review Check') # guild specific
async def trigger_review_check(interaction: discord.Interaction):
await interaction.response.send_message("Triggering review check...", ephemeral=True)
await client.timer(client.get_channel(CHANNEL_ID), True)
@tree.command(guild=discord.Object(id=GUILD_ID), name='sync', description='Sync bot (dev)') # guild specific
async def sync_bot(interaction: discord.Interaction):
await tree.sync(guild=discord.Object(
id=GUILD_ID))
await interaction.response.send_message("Bot synced!", ephemeral=True)
log.info (f"Bot synced!")
# Update Reviews
client.run(DISCORD_TOKEN)