Skip to content

Commit

Permalink
Добавил файл миграций, заменил цикл for на while, добавил счетчик поп…
Browse files Browse the repository at this point in the history
…ыток отправки и переместил crud обновлящий статус сообщения
  • Loading branch information
thereareyou123 committed Feb 6, 2025
1 parent 897d759 commit 439ee9e
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,4 @@ dmypy.json

# vscode
.vscode
.DS_Store
64 changes: 64 additions & 0 deletions src/alembic/versions/2025_02_06_1533-b6f239481bb3_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""empty message
Revision ID: b6f239481bb3
Revises: 3b627edb26a8
Create Date: 2025-02-06 15:33:29.308860
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'b6f239481bb3'
down_revision: Union[str, None] = '3b627edb26a8'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('message_status',
sa.Column('message_id', sa.BigInteger(), nullable=True),
sa.Column('user_id', sa.BigInteger(), nullable=True),
sa.Column('status', sa.Enum('pending', 'sent'), nullable=True),
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['message_id'], ['message.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['user_tg.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.alter_column('message_group', 'message_id',
existing_type=sa.BIGINT(),
nullable=True)
op.alter_column('message_group', 'group_id',
existing_type=sa.BIGINT(),
nullable=True)
op.alter_column('user_group', 'user_id',
existing_type=sa.BIGINT(),
nullable=True)
op.alter_column('user_group', 'group_id',
existing_type=sa.BIGINT(),
nullable=True)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('user_group', 'group_id',
existing_type=sa.BIGINT(),
nullable=False)
op.alter_column('user_group', 'user_id',
existing_type=sa.BIGINT(),
nullable=False)
op.alter_column('message_group', 'group_id',
existing_type=sa.BIGINT(),
nullable=False)
op.alter_column('message_group', 'message_id',
existing_type=sa.BIGINT(),
nullable=False)
op.drop_table('message_status')
# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions src/app/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Group,
Message,
MessageGroupAssociation,
MessageStatus,
Photo,
UserGroupAssociation,
UserTG,
Expand All @@ -13,6 +14,7 @@
'Group',
'Message',
'MessageGroupAssociation',
'MessageStatus',
'Photo',
'UserGroupAssociation',
'UserTG',
Expand Down
2 changes: 2 additions & 0 deletions src/app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Group,
Message,
MessageGroupAssociation,
MessageStatus,
Photo,
UserGroupAssociation,
UserTG,
Expand All @@ -14,4 +15,5 @@
UserTG,
UserGroupAssociation,
MessageGroupAssociation,
MessageStatus
]
4 changes: 2 additions & 2 deletions src/app/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class UserTG(Base):

message_statuses = relationship(
'MessageStatus',
bacl_populates='user',
back_populates='user',
lazy='joined',
)

Expand Down Expand Up @@ -103,7 +103,7 @@ class Message(Base):

message_statuses = relationship(
'MessageStatus',
bacl_populates='message',
back_populates='message',
lazy='joined',
)
# Связь с группами
Expand Down
57 changes: 30 additions & 27 deletions src/bot/ratelimiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def send_message(context: ContextTypes.DEFAULT_TYPE) -> None: # noqa: C90
if message.is_send:
return

existing_statuses = crud_message.get_message_statuses(
existing_statuses = await crud_message.get_message_statuses(
session, message_id,
)
users_to_notify = set()
Expand Down Expand Up @@ -138,27 +138,17 @@ async def send_message(context: ContextTypes.DEFAULT_TYPE) -> None: # noqa: C90
]
session.add_all(new_statuses)
session.commit()
statuses = crud_message.get_message_statuses(session, message_id)
for user_id, status in statuses.items():
statuses = await crud_message.get_message_statuses(session, message_id)
retry_count = 0

while statuses:
user_id, status = statuses.popitem()
if status.status == 'sent':
continue
try:
await send_message_to_user(context, user_id, message)
status.status = 'sent'
update_data = {
'is_send': True,
'sended_at': datetime.now(),
'update_users': message.update_users,
}
if not message.update_users:
update_data['update_users'] = user_id
# Обновление статуса сообщения через CRUD-функцию
await crud_message.update(
db_obj=message,
pydantic_scheme_obj=MessageUpdate(**update_data),
session=session,
)
session.delete(status)
retry_count = 0
except (BadRequest, Forbidden) as e:
#Если указан неверный tg_id, или пользователь удалил аккаунт,
#или пользователь удалил бота.
Expand All @@ -167,22 +157,35 @@ async def send_message(context: ContextTypes.DEFAULT_TYPE) -> None: # noqa: C90
f'Отправка сообщения отменена из-за ошибки {error_msg}',
)
session.delete(status)
retry_count = 0
except Exception as e:
#Повторная отправка в случае сетевых ошибок.
error_msg = str(e)
logging.error(
f'Ошибка отправки сообщения {error_msg}',
)
await asyncio.sleep(10)
context.job_queue.run_once(
send_message,
when=10,
data={
'message_id': message_id,
'user_id': user_id,
},
name=f'retry_send_mes_{message_id}',
)
retry_count += 1
if retry_count < 3:
statuses[user_id] = status
await asyncio.sleep(10)
else:
session.delete(status)
retry_count = 0
finally:
await session.commit()
update_data = {
'is_send': True,
'sended_at': datetime.now(),
'update_users': message.update_users,
}
if not message.update_users:
update_data['update_users'] = user_id
# Обновление статуса сообщения через CRUD-функцию
await crud_message.update(
db_obj=message,
pydantic_scheme_obj=MessageUpdate(**update_data),
session=session,
)
await session.commit()


Expand Down

0 comments on commit 439ee9e

Please sign in to comment.