diff --git a/gupshup_matrix/db/__init__.py b/gupshup_matrix/db/__init__.py index eca2b6a..dfe0e19 100644 --- a/gupshup_matrix/db/__init__.py +++ b/gupshup_matrix/db/__init__.py @@ -4,12 +4,13 @@ from .message import Message from .portal import Portal from .puppet import Puppet +from .reaction import Reaction from .upgrade import upgrade_table from .user import User def init(db: Database) -> None: - for table in (Puppet, Portal, User, Message, GupshupApplication): + for table in (Puppet, Portal, User, Message, GupshupApplication, Reaction): table.db = db diff --git a/gupshup_matrix/db/reaction.py b/gupshup_matrix/db/reaction.py new file mode 100644 index 0000000..cdbe52a --- /dev/null +++ b/gupshup_matrix/db/reaction.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, ClassVar, Optional + +import asyncpg +from attr import dataclass +from mautrix.types import EventID, RoomID, UserID +from mautrix.util.async_db import Database + +fake_db = Database.create("") if TYPE_CHECKING else None + +log: logging.Logger = logging.getLogger("meta.out") + + +@dataclass +class Reaction: + db: ClassVar[Database] = fake_db + + event_mxid: EventID + room_id: RoomID + sender: UserID + gs_message_id: str + reaction: str + created_at: float + + @property + def _values(self): + return ( + self.event_mxid, + self.room_id, + self.sender, + self.gs_message_id, + self.reaction, + self.created_at, + ) + + _columns = "event_mxid, room_id, sender, gs_message_id, reaction, created_at" + + async def insert(self) -> None: + q = f"INSERT INTO reaction ({self._columns}) VALUES ($1, $2, $3, $4, $5, $6)" + await self.db.execute(q, *self._values) + + @classmethod + def _from_row(cls, row: asyncpg.Record) -> Optional["Reaction"]: + return cls(**row) + + @classmethod + async def delete_all(cls, room_id: RoomID) -> None: + await cls.db.execute("DELETE FROM reaction WHERE room_id=$1", room_id) + + @classmethod + async def get_by_gs_message_id( + cls, gs_message_id: str, sender: UserID + ) -> Optional["Reaction"]: + q = f"SELECT {cls._columns} FROM reaction WHERE gs_message_id=$1 AND sender=$2" + row = await cls.db.fetchrow(q, gs_message_id, sender) + if not row: + return None + return cls._from_row(row) + + @classmethod + async def get_by_event_mxid(cls, event_mxid: EventID, room_id: RoomID) -> Optional["Reaction"]: + q = f"SELECT {cls._columns} FROM reaction WHERE event_mxid=$1 AND room_id=$2" + row = await cls.db.fetchrow(q, event_mxid, room_id) + if not row: + return None + return cls._from_row(row) + + @classmethod + async def get_last_reaction(cls, room_id: RoomID) -> "Reaction": + q = f""" + SELECT {cls._columns} + FROM reaction WHERE room_id=$1 ORDER BY created_at DESC LIMIT 1 + """ + row = await cls.db.fetchrow(q, room_id) + if not row: + return None + return cls._from_row(row) + + @classmethod + async def delete_by_event_mxid( + cls, event_mxid: EventID, room_id: RoomID, sender: UserID + ) -> "Reaction": + q = "DELETE FROM reaction WHERE event_mxid=$1 AND room_id=$2 AND sender=$3" + row = await cls.db.fetchrow(q, event_mxid, room_id, sender) + if not row: + return None + return cls._from_row(row) diff --git a/gupshup_matrix/db/upgrade.py b/gupshup_matrix/db/upgrade.py index d27e230..03e36ae 100644 --- a/gupshup_matrix/db/upgrade.py +++ b/gupshup_matrix/db/upgrade.py @@ -62,3 +62,23 @@ async def upgrade_v1(conn: Connection) -> None: @upgrade_table.register(description="Add field encrypted to portal table") async def upgrade_v2(conn: Connection) -> None: await conn.execute("ALTER TABLE portal ADD COLUMN encrypted BOOLEAN DEFAULT false") + + +@upgrade_table.register(description="Add reaction table to store reactions") +async def upgrade_v3(conn: Connection) -> None: + await conn.execute( + """CREATE TABLE reaction ( + event_mxid VARCHAR(255) PRIMARY KEY, + room_id VARCHAR(255) NOT NULL, + sender VARCHAR(255) NOT NULL, + gs_message_id TEXT NOT NULL, + reaction VARCHAR(255), + created_at TIMESTAMP WITH TIME ZONE NOT NULL + )""" + ) + await conn.execute("ALTER TABLE message ADD CONSTRAINT unique_gsid UNIQUE (gsid);") + await conn.execute( + """ALTER TABLE reaction ADD CONSTRAINT FK_message_gsid + FOREIGN KEY (gs_message_id) references message (gsid) + ON DELETE CASCADE""" + ) diff --git a/gupshup_matrix/example-config.yaml b/gupshup_matrix/example-config.yaml index cce06a6..eff0529 100644 --- a/gupshup_matrix/example-config.yaml +++ b/gupshup_matrix/example-config.yaml @@ -124,6 +124,8 @@ gupshup: # Gupsghup base URL base_url: https://api.gupshup.io/sm/api/v1/msg read_url: https://api.gupshup.io/wa/app/{appId}/msg/{msgId}/read + # Gupshup reaction URL + cloud_api_url: https://api.gupshup.io/wa/api/v1/msg # Path prefix for webhook endpoints. Subpaths are /status and /receive. # Note that the webhook must be put behind a reverse proxy with https. webhook_path: /gupshup diff --git a/gupshup_matrix/gupshup/api.py b/gupshup_matrix/gupshup/api.py index b91b1e2..5990586 100644 --- a/gupshup_matrix/gupshup/api.py +++ b/gupshup_matrix/gupshup/api.py @@ -18,6 +18,7 @@ class GupshupClient: def __init__(self, config: Config, loop: asyncio.AbstractEventLoop) -> None: self.base_url = config["gupshup.base_url"] self.read_url = config["gupshup.read_url"] + self.cloud_api_url = config["gupshup.cloud_api_url"] self.app_name = config["gupshup.app_name"] self.sender = config["gupshup.sender"] self.http = ClientSession(loop=loop) @@ -153,3 +154,26 @@ async def send_location( response_data = json.loads(await resp.text()) return {"status": resp.status, "messageId": response_data.get("messageId")} + + async def send_reaction(self, message_id: str, emoji: str, type: str, data: dict): + """ + Send a reaction to whatsapp + + Parameters + ---------- + message_id: str + The message ID of the reaction event + emoji: str + The emoji that was reacted with + type: str + The type of the reaction event + data: dict + The necessary data to send the reaction + """ + headers = data.get("headers") + data.pop("headers") + data["message"] = json.dumps({"msgId": message_id, "type": type, "emoji": emoji}) + + resp = await self.http.post(self.cloud_api_url, data=data, headers=headers) + response_data = json.loads(await resp.text()) + return response_data diff --git a/gupshup_matrix/gupshup/data.py b/gupshup_matrix/gupshup/data.py index 7bf0841..a129302 100644 --- a/gupshup_matrix/gupshup/data.py +++ b/gupshup_matrix/gupshup/data.py @@ -47,6 +47,7 @@ class GupshupMessageData(SerializableAttrs): msg_gsId: str = attr.ib(default=None, metadata={"json": "gsId"}) name: str = attr.ib(default=None, metadata={"json": "name"}) address: str = attr.ib(default=None, metadata={"json": "address"}) + emoji: str = attr.ib(default=None, metadata={"json": "emoji"}) @dataclass diff --git a/gupshup_matrix/gupshup/webhook.py b/gupshup_matrix/gupshup/webhook.py index feb4dec..89677a7 100644 --- a/gupshup_matrix/gupshup/webhook.py +++ b/gupshup_matrix/gupshup/webhook.py @@ -65,7 +65,6 @@ async def receive(self, request: web.Request) -> None: f"Ignoring event because the gs_app [{data.get('app')}] is not registered." ) return web.Response(status=406) - if data.get("type") == GupshupEventType.MESSAGE: return await self.message_event(data) elif data.get("type") == GupshupEventType.MESSAGE_EVENT: @@ -110,7 +109,10 @@ async def message_event(self, data: GupshupMessageEvent) -> web.Response: user: u.User = await u.User.get_by_gs_app(data.app) info = ChatInfo.deserialize(data.__dict__) info.sender = data.payload.sender - await portal.handle_gupshup_message(user, info, data) + if data.payload.type == "reaction": + await portal.handle_gupshup_reaction(user, data) + else: + await portal.handle_gupshup_message(user, info, data) return web.Response(status=204) async def status_event(self, data: GupshupStatusEvent) -> web.Response: diff --git a/gupshup_matrix/matrix.py b/gupshup_matrix/matrix.py index 1f5b356..5a29b0e 100644 --- a/gupshup_matrix/matrix.py +++ b/gupshup_matrix/matrix.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Dict from mautrix.bridge import BaseMatrixHandler, RejectMatrixInvite from mautrix.types import ( @@ -8,6 +8,7 @@ EventID, EventType, ReactionEvent, + ReactionEventContent, RedactionEvent, RoomID, SingleReceiptEventContent, @@ -47,9 +48,7 @@ async def handle_event(self, evt: Event) -> None: await self.handle_redaction(evt.room_id, evt.sender, evt.redacts, evt.event_id) elif evt.type == EventType.REACTION: evt: ReactionEvent - await self.handle_reaction( - evt.room_id, evt.sender, evt.event_id, evt.content, evt.timestamp - ) + await self.handle_reaction(evt.room_id, evt.sender, evt.event_id, evt.content) async def handle_invite( self, room_id: RoomID, user_id: UserID, inviter: u.User, event_id: EventID @@ -98,7 +97,7 @@ async def handle_redaction( if not portal: return - await portal.handle_matrix_redaction(user, event_id, redaction_event_id) + await portal.handle_matrix_redaction(user, event_id) async def allow_message(self, user: u.User) -> bool: return user.relay_whitelisted @@ -111,3 +110,42 @@ async def handle_read_receipt( ) -> None: self.log.debug(f"Got read receipt for {event_id} from {user.mxid}") await portal.handle_matrix_read_receipt(event_id) + + async def handle_reaction( + self, + room_id: RoomID, + user_id: UserID, + event_id: EventID, + content: ReactionEventContent, + ) -> None: + """ + Send a reaction to a user in a room. + + Parameters + ---------- + room_id: RoomID + The room ID of the room where the reaction was sent + user_id: UserID + The user ID of the user who sent the reaction + event_id: EventID + The event ID of the reaction event + content: ReactionEventContent + The content of the reaction event + """ + self.log.debug(f"Received reaction event: {content}") + user: u.User = await u.User.get_by_mxid(user_id) + message_mxid = content.relates_to.event_id + if not user: + return + + if not message_mxid: + return + + portal: po.Portal = await po.Portal.get_by_mxid(room_id) + if not portal: + return + try: + await portal.handle_matrix_reaction(user, message_mxid, event_id, room_id, content) + except ValueError as error: + self.log.error(f"Error trying to send a reaction {error}") + return diff --git a/gupshup_matrix/portal.py b/gupshup_matrix/portal.py index 249e587..de3c286 100644 --- a/gupshup_matrix/portal.py +++ b/gupshup_matrix/portal.py @@ -17,6 +17,7 @@ Format, MessageType, PowerLevelStateEventContent, + ReactionEventContent, RoomID, UserID, ) @@ -35,6 +36,7 @@ from .db import GupshupApplication as DBGupshupApplication from .db import Message as DBMessage from .db import Portal as DBPortal +from .db import Reaction as DBReaction from .formatter import whatsapp_reply_to_matrix, whatsapp_to_matrix from .gupshup import ( GupshupClient, @@ -287,12 +289,23 @@ async def save(self) -> None: async def handle_gupshup_message( self, source: u.User, info: ChatInfo, message: GupshupMessageEvent ) -> None: + """ + Send a message to element and create a room if it doesn't exist. + + Parameters + ---------- + source: User + The user who sent the reaction + info: ChatInfo + The information of the user who sent the message + message: GupshupMessageEvent + The content of the reaction event + """ if not await self.create_matrix_room(source=source, info=info): return mxid = None msgtype = MessageType.TEXT - if message.payload.body.url: resp = await self.az.http_session.get(message.payload.body.url) data = await resp.read() @@ -440,6 +453,63 @@ async def handle_gupshup_status(self, status: GupshupPayload) -> None: await self.main_intent.react(self.mxid, msg.mxid, "\u274c") await self.main_intent.send_notice(self.mxid, None, html=reason_es) + async def handle_gupshup_reaction(self, sender: u.User, message: GupshupMessageEvent): + """ + Send a reaction to element. + + Parameters + ---------- + sender: User + The user who sent the reaction + message: GupshupMessageEvent + The content with the reaction event + """ + if not self.mxid: + return + + data_reaction = message.payload.body + msg_id = data_reaction.msg_gsId if data_reaction.msg_gsId else data_reaction.msg_id + msg: DBMessage = await DBMessage.get_by_gsid(gsid=msg_id) + if msg: + message_with_reaction: DBReaction = await DBReaction.get_by_gs_message_id( + msg.gsid, sender.mxid + ) + + if message_with_reaction: + await DBReaction.delete_by_event_mxid( + message_with_reaction.event_mxid, self.mxid, sender.mxid + ) + has_been_sent = await self.main_intent.redact( + self.mxid, message_with_reaction.event_mxid + ) + if not data_reaction.emoji: + return + + try: + has_been_sent = await self.main_intent.react( + self.mxid, + msg.mxid, + data_reaction.emoji, + ) + except Exception as e: + self.log.exception(f"Error sending reaction: {e}") + await self.main_intent.send_notice(self.mxid, "Error sending reaction") + return + + else: + self.log.error(f"Message id not found, mid: {msg_id}") + await self.main_intent.send_notice(self.mxid, "Error sending reaction") + return + + await DBReaction( + event_mxid=has_been_sent, + room_id=self.mxid, + sender=sender.mxid, + gs_message_id=msg.gsid, + reaction=data_reaction.emoji, + created_at=datetime.now(), + ).insert() + async def handle_matrix_message( self, sender: "u.User", @@ -640,3 +710,111 @@ async def handle_matrix_read_receipt(self, event_id: str) -> None: except ValueError as error: self.log.error(f"Read event error for event_id {event_id}: {error}") return + + async def handle_matrix_reaction( + self, + user: u.User, + message_mxid: str, + event_id: EventID, + room_id: RoomID, + content: ReactionEventContent, + ): + """ + Send a reaction to whatsapp + + Parameters + ---------- + user: User + The user who sent the reaction + message_mxid: str + The message ID of the reaction event + event_id: EventID + The event ID of the reaction event + room_id: RoomID + The room ID of the room where the reaction was sent + content: Dict + The content of the reaction event + """ + message: DBMessage = await DBMessage.get_by_mxid(message_mxid, room_id) + + if not message: + self.log.error(f"Message {message_mxid} not found when handling reaction") + await self.main_intent.send_notice( + self.mxid, "We couldn't find the message to react to" + ) + return + + reaction_value = content.relates_to.key + message_with_reaction = await DBReaction.get_by_gs_message_id(message.gsid, user.mxid) + data = await self.main_data_gs + if message_with_reaction: + await DBReaction.delete_by_event_mxid( + message_with_reaction.event_mxid, self.mxid, user.mxid + ) + await self.main_intent.redact(self.mxid, message_with_reaction.event_mxid) + + try: + await self.gsc.send_reaction( + message_id=message.gsid, + emoji=reaction_value, + type="reaction", + data=data, + ) + except ClientConnectorError as e: + self.log.error(e) + await self.main_intent.send_notice(f"Error sending reaction: {e}") + return + except TypeError as e: + self.log.error(e) + await self.main_intent.send_notice(f"Error sending reaction: {e}") + return + except Exception as e: + self.log.error(f"Error sending reaction: {e}") + await self.main_intent.send_notice(f"Error sending reaction: {e}") + return + + await DBReaction( + event_mxid=event_id, + room_id=self.mxid, + sender=user.mxid, + gs_message_id=message.gsid, + reaction=reaction_value, + created_at=datetime.now(), + ).insert() + + async def handle_matrix_redaction( + self, + user: u.User, + event_id: EventID, + ) -> None: + """ + When a user of Matrix redaction to a message, this function takes it and sends it to Gupshup + + Parameters + ---------- + user : User + The user who sent the redaction + + event_id: + The event_id of the reaction that was redacted + """ + self.log.debug(f"Handling redaction for {event_id}") + data = await self.main_data_gs + message: DBReaction = await DBReaction.get_by_event_mxid(event_id, self.mxid) + + if not message: + self.log.error(f"Message {event_id} not found when handling redaction") + await self.main_intent.send_notice( + self.mxid, "We couldn't find the message when handling redaction" + ) + return + + try: + await self.gsc.send_reaction( + message_id=message.gs_message_id, emoji="", type="reaction", data=data + ) + except Exception as e: + self.log.exception(f"Error sending reaction: {e}") + return + + await DBReaction.delete_by_event_mxid(message.event_mxid, self.mxid, user.mxid)