Skip to content

Commit

Permalink
Fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
teamofroman committed Feb 8, 2025
1 parent bb7a55c commit fd92de3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 34 deletions.
4 changes: 3 additions & 1 deletion src/alembic/versions/2025_02_06_1533-b6f239481bb3_.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def upgrade() -> None:
op.create_table('message_status',
sa.Column('message_id', sa.BigInteger(), nullable=True),
sa.Column('user_id', sa.BigInteger(), nullable=True),
sa.Column('status', sa.Enum('pending', 'sent'), nullable=True),
sa.Column('status', sa.Enum('pending', 'sent', name='message_status_enum'), nullable=True),
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
Expand Down Expand Up @@ -61,4 +61,6 @@ def downgrade() -> None:
existing_type=sa.BIGINT(),
nullable=False)
op.drop_table('message_status')

op.execute('DROP TYPE message_status_enum')
# ### end Alembic commands ###
63 changes: 30 additions & 33 deletions src/bot/ratelimiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
from datetime import datetime, timedelta
from typing import Any

from telegram import InputMediaPhoto
from telegram.constants import ParseMode
from telegram.error import BadRequest, Forbidden
from telegram.ext import Application, ContextTypes

from app.core.db import get_async_session_context
from app.crud.message import crud_message
from app.crud.user_tg import crud_user
from app.models.models import MessageStatus
from app.schemas.message import MessageUpdate
from telegram import InputMediaPhoto
from telegram.constants import ParseMode
from telegram.error import BadRequest, Forbidden
from telegram.ext import Application, ContextTypes


async def load_unsent_messages(context: ContextTypes.DEFAULT_TYPE) -> None:
Expand All @@ -34,15 +33,15 @@ async def load_unsent_messages(context: ContextTypes.DEFAULT_TYPE) -> None:
send_message,
when=send_time,
data={
'message_id': message.id,
'user_id': message.create_user, # Передаём ID пользователя
"message_id": message.id,
"user_id": message.create_user, # Передаём ID пользователя
},
name=f'send_mes_{message.id}',
name=f"send_mes_{message.id}",
job_kwargs={
'misfire_grace_time': None,
"misfire_grace_time": None,
},
)
logging.info(f'Запланирована задача {job.name} на {job.next_t}')
logging.info(f"Запланирована задача {job.name} на {job.next_t}")


async def send_message_to_user(
Expand All @@ -68,7 +67,7 @@ async def send_message_to_user(
else:
media = [
InputMediaPhoto(
media=open(message.photos[i].filename, 'rb'), # noqa: ASYNC230
media=open(message.photos[i].filename, "rb"), # noqa: ASYNC230
)
for i in range(min(10, len(message.photos)))
]
Expand All @@ -83,8 +82,8 @@ async def send_message_to_user(

async def send_message(context: ContextTypes.DEFAULT_TYPE) -> None: # noqa: C901
"""Отправка сообщения пользователям."""
message_id = context.job.data['message_id']
user_id = context.job.data['user_id'] # Извлекаем ID пользователя
message_id = context.job.data["message_id"]
user_id = context.job.data["user_id"] # Извлекаем ID пользователя

async with get_async_session_context() as session:
message = await crud_message.get_obj_by_id(
Expand All @@ -99,7 +98,8 @@ async def send_message(context: ContextTypes.DEFAULT_TYPE) -> None: # noqa: C90
return

existing_statuses = await crud_message.get_message_statuses(
session, message_id,
session,
message_id,
)
users_to_notify = set()
groups = message.groups
Expand All @@ -109,17 +109,14 @@ async def send_message(context: ContextTypes.DEFAULT_TYPE) -> None: # noqa: C90
# Если пользователь состоит оновременно в нескольких группах,
# то в сете он все равно окажется в единственном экземпляре.
users_to_notify = {
user.tg_id
for group in groups
for user in group.users
if user.is_active
user.tg_id for group in groups for user in group.users if user.is_active
}
else:
# Если групп нет, получаем всех активных пользователей,
# исключая администратора и заблокированных
active_users = await crud_user.get_all_by_attributes(
filters={
'is_admin': False,
"is_admin": False,
},
session=session,
)
Expand All @@ -132,12 +129,12 @@ async def send_message(context: ContextTypes.DEFAULT_TYPE) -> None: # noqa: C90
MessageStatus(
message_id=message.id,
user_id=user_id,
status='pending',
status="pending",
)
for user_id in users_to_notify - existing_statuses.keys()
]
session.add_all(new_statuses)
session.commit()
await session.commit()
statuses = await crud_message.get_message_statuses(session, message_id)
retry_counts = {user_id: 0 for user_id in statuses.keys()}

Expand All @@ -147,19 +144,19 @@ async def send_message(context: ContextTypes.DEFAULT_TYPE) -> None: # noqa: C90
await send_message_to_user(context, user_id, message)
retry_counts.pop(user_id, None)
except (BadRequest, Forbidden) as e:
#Если указан неверный tg_id, или пользователь удалил аккаунт,
#или пользователь удалил бота.
# Если указан неверный tg_id, или пользователь удалил аккаунт,
# или пользователь удалил бота.
error_msg = str(e)
logging.error(
f'Отправка сообщения отменена из-за ошибки {error_msg}',
f"Отправка сообщения отменена из-за ошибки {error_msg}",
)
session.delete(status)
retry_counts.pop(user_id, None)
except Exception as e:
#Повторная отправка в случае сетевых ошибок.
# Повторная отправка в случае сетевых ошибок.
error_msg = str(e)
logging.error(
f'Ошибка отправки сообщения {error_msg}',
f"Ошибка отправки сообщения {error_msg}",
)
retry_counts[user_id] += 1
if retry_counts[user_id] < 3:
Expand All @@ -171,12 +168,12 @@ async def send_message(context: ContextTypes.DEFAULT_TYPE) -> None: # noqa: C90
finally:
await session.commit()
update_data = {
'is_send': True,
'sended_at': datetime.now(),
'update_users': message.update_users,
"is_send": True,
"sended_at": datetime.now(),
"update_users": message.update_users,
}
if not message.update_users:
update_data['update_users'] = user_id
update_data["update_users"] = user_id
# Обновление статуса сообщения через CRUD-функцию
await crud_message.update(
db_obj=message,
Expand All @@ -188,13 +185,13 @@ async def send_message(context: ContextTypes.DEFAULT_TYPE) -> None: # noqa: C90

async def ptb_post_init(app: Application) -> None:
"""Функция для первоначальной инициализации приложения."""
logging.info('Запуск функции post_init.')
logging.info("Запуск функции post_init.")
app.job_queue.run_once(
load_unsent_messages,
when=5,
data=None,
name='load_unsent_messages',
name="load_unsent_messages",
job_kwargs={
'misfire_grace_time': None,
"misfire_grace_time": None,
},
)

0 comments on commit fd92de3

Please sign in to comment.