-
Notifications
You must be signed in to change notification settings - Fork 4.6k
/
mattermost.py
229 lines (189 loc) · 7.56 KB
/
mattermost.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import json
import logging
import requests
from requests import Response
from sanic import Blueprint, response
from sanic.request import Request
from typing import Text, Dict, Any, List, Callable, Awaitable, Optional
from rasa.core.channels.channel import UserMessage, OutputChannel, InputChannel
from sanic.response import HTTPResponse
logger = logging.getLogger(__name__)
class MattermostBot(OutputChannel):
"""A Mattermost communication channel."""
@classmethod
def name(cls) -> Text:
return "mattermost"
@classmethod
def token_from_login(cls, url: Text, user: Text, password: Text) -> Optional[Text]:
"""Retrieve access token for mattermost user."""
data = {"login_id": user, "password": password}
r = requests.post(url + "/users/login", data=json.dumps(data))
if r.status_code == 200:
return r.headers["Token"]
else:
logger.error(f"Failed to login mattermost user {user}. Response: {r}")
return None
def __init__(
self, url: Text, token: Text, bot_channel: Text, webhook_url: Optional[Text]
) -> None:
self.url = url
self.token = token
self.bot_channel = bot_channel
self.webhook_url = webhook_url
super(MattermostBot, self).__init__()
def _post_message_to_channel(self, channel_id: Text, message: Text) -> Response:
return self._post_data_to_channel(
{"channel_id": channel_id, "message": message}
)
def _post_data_to_channel(self, data: Dict[Text, Any]) -> Response:
"""Send a message to a mattermost channel."""
headers = {"Authorization": "Bearer " + self.token}
r = requests.post(self.url + "/posts", headers=headers, data=json.dumps(data))
if not r.status_code == 200:
logger.error(
f"Failed to send message to mattermost channel "
f"{data.get('channel_id')}. Response: {r}"
)
return r
async def send_text_message(
self, recipient_id: Text, text: Text, **kwargs: Any
) -> None:
for message_part in text.strip().split("\n\n"):
self._post_message_to_channel(self.bot_channel, message_part)
async def send_custom_json(
self, recipient_id: Text, json_message: Dict[Text, Any], **kwargs: Any
) -> None:
json_message.setdefault("channel_id", self.bot_channel)
json_message.setdefault("message", "")
self._post_data_to_channel(json_message)
async def send_image_url(
self, recipient_id: Text, image: Text, **kwargs: Any
) -> None:
"""Sends an image."""
self._post_data_to_channel(
{
"channel_id": self.bot_channel,
"props": {"attachments": [{"image_url": image}]},
}
)
async def send_text_with_buttons(
self,
recipient_id: Text,
text: Text,
buttons: List[Dict[Text, Any]],
**kwargs: Any,
) -> None:
"""Sends buttons to the output."""
# buttons are a list of objects: [(option_name, payload)]
# See https://docs.mattermost.com/developer/interactive-messages.html#message-buttons # noqa: E501
actions = [
{
"name": button["title"],
"integration": {
"url": self.webhook_url,
"context": {"action": button["payload"]},
},
}
for button in buttons
]
props = {"attachments": [{"actions": actions}]}
self._post_data_to_channel(
{"channel_id": self.bot_channel, "message": text, "props": props}
)
class MattermostInput(InputChannel):
"""Mattermost input channel implemenation."""
@classmethod
def name(cls) -> Text:
return "mattermost"
@classmethod
def from_credentials(cls, credentials: Optional[Dict[Text, Any]]) -> InputChannel:
if credentials is None:
cls.raise_missing_credentials_exception()
token = credentials.get("token")
return cls(credentials.get("url"), token, credentials.get("webhook_url"))
def __init__(self, url: Text, token: Text, webhook_url: Text) -> None:
"""Create a Mattermost input channel.
Needs a couple of settings to properly authenticate and validate
messages.
Args:
url: Your Mattermost team url including /v4 example
https://mysite.example.com/api/v4
token: Your mattermost bot token
webhook_url: The mattermost callback url as specified
in the outgoing webhooks in mattermost example
https://mysite.example.com/webhooks/mattermost/webhook
"""
self.url = url
self.token = token
self.webhook_url = webhook_url
async def message_with_trigger_word(
self,
on_new_message: Callable[[UserMessage], Awaitable[None]],
output: Dict[Text, Any],
metadata: Optional[Dict],
) -> None:
# splitting to get rid of the @botmention
# trigger we are using for this
split_message = output["text"].split(" ", 1)
if len(split_message) >= 2:
message = split_message[1]
else:
message = output["text"]
await self._handle_message(
message, output["user_id"], output["channel_id"], metadata, on_new_message
)
async def action_from_button(
self,
on_new_message: Callable[[UserMessage], Awaitable[None]],
output: Dict[Text, Any],
metadata: Optional[Dict],
) -> None:
# get the action, the buttons triggers
action = output["context"]["action"]
await self._handle_message(
action, output["user_id"], output["channel_id"], metadata, on_new_message
)
async def _handle_message(
self,
message: Text,
sender_id: Text,
bot_channel: Text,
metadata: Optional[Dict],
on_new_message: Callable[[UserMessage], Awaitable[None]],
) -> None:
try:
out_channel = MattermostBot(
self.url, self.token, bot_channel, self.webhook_url
)
user_msg = UserMessage(
message,
out_channel,
sender_id,
input_channel=self.name(),
metadata=metadata,
)
await on_new_message(user_msg)
except Exception as e:
logger.error(f"Exception when trying to handle message.{e}")
logger.debug(e, exc_info=True)
def blueprint(
self, on_new_message: Callable[[UserMessage], Awaitable[None]]
) -> Blueprint:
mattermost_webhook = Blueprint("mattermost_webhook", __name__)
@mattermost_webhook.route("/", methods=["GET"])
async def health(_: Request) -> HTTPResponse:
return response.json({"status": "ok"})
@mattermost_webhook.route("/webhook", methods=["POST"])
async def webhook(request: Request) -> HTTPResponse:
output = request.json
if not output:
return response.text("")
metadata = self.get_metadata(request)
# handle normal message with trigger_word
if "trigger_word" in output:
await self.message_with_trigger_word(on_new_message, output, metadata)
# handle context actions from buttons
elif "context" in output:
await self.action_from_button(on_new_message, output, metadata)
return response.text("success")
return mattermost_webhook