Skip to content

Commit

Permalink
feat: 推送取消稿件消息
Browse files Browse the repository at this point in the history
  • Loading branch information
RockChinQ committed May 12, 2024
1 parent d1fb180 commit bbc42c4
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
19 changes: 19 additions & 0 deletions campux/imbot/mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ async def send_new_post_notify(
message.MessageSegment.image(image)
)

asyncio.create_task(self.ap.imbot.send_group_message(
self.ap.config.campux_review_qq_group_id,
msg
))

async def send_post_cancel_notify(
self,
post_id: int
):
post = await self.ap.cpx_api.get_post_info(post_id)

logger.info(f"稿件已取消:{post}")

if self.ap.config.campux_qq_group_review:

msg = [
message.MessageSegment.text(f"稿件已取消: #{post.id}"),
]

asyncio.create_task(self.ap.imbot.send_group_message(
self.ap.config.campux_review_qq_group_id,
msg
Expand Down
29 changes: 21 additions & 8 deletions campux/mq/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@

from ..core import app

streams_name = {
'campux_redis_publish_post_stream': 'campux_publish_post',
'campux_redis_new_post_stream': 'campux_new_post',
'campux_redis_post_cancel_stream': 'campux_post_cancel',
}


class RedisStreamMQ:

Expand Down Expand Up @@ -36,10 +42,7 @@ async def initialize(self):
# 创建xgroup
# 检查是否存在同名group

streams_to_check = [
self.ap.config.campux_redis_publish_post_stream,
self.ap.config.campux_redis_new_post_stream
]
streams_to_check = streams_name.values()

for stream in streams_to_check:
group_info = await self.redis_client.xinfo_groups(stream)
Expand All @@ -60,8 +63,9 @@ async def initialize(self):
async def routine_loop():
await asyncio.sleep(10)
while True:
await self.process_stream(self.ap.config.campux_redis_publish_post_stream, self.check_publish_post)
await self.process_stream(self.ap.config.campux_redis_new_post_stream, self.check_new_post)
await self.process_stream(streams_name['campux_redis_post_cancel_stream'], self.check_post_cancel)
await self.process_stream(streams_name['campux_redis_publish_post_stream'], self.check_publish_post)
await self.process_stream(streams_name['campux_redis_new_post_stream'], self.check_new_post)
await asyncio.sleep(10)

asyncio.create_task(routine_loop())
Expand Down Expand Up @@ -127,7 +131,7 @@ async def check_publish_post(self, message: tuple):
if await self.ap.social.can_operate():
asyncio.create_task(self.ap.social.publish_post(post_id))
# 确认消息
await self.redis_client.xack(self.ap.config.campux_redis_publish_post_stream, self.get_instance_identity(), message[0])
await self.redis_client.xack(streams_name['campux_redis_publish_post_stream'], self.get_instance_identity(), message[0])
else:
now = time.time()

Expand All @@ -147,11 +151,20 @@ async def check_new_post(self, message: tuple):

await self.ap.imbot.send_new_post_notify(post_id)

await self.redis_client.xack(self.ap.config.campux_redis_new_post_stream, self.get_instance_identity(), message[0])
await self.redis_client.xack(streams_name['campux_redis_new_post_stream'], self.get_instance_identity(), message[0])

async def mark_post_published(self, post_id):
await self.redis_client.hset(
f"publish_post_status:{post_id}",
self.get_instance_identity(),
1
)

async def check_post_cancel(self, message: tuple):
logger.info(f"处理消息:{message}")

post_id = int(message[1][b'post_id'].decode('utf-8'))

await self.ap.imbot.send_post_cancel_notify(post_id)

await self.redis_client.xack(streams_name['campux_redis_post_cancel_stream'], self.get_instance_identity(), message[0])

0 comments on commit bbc42c4

Please sign in to comment.