From c2990051c8f5ce17bd640b7fb297bfc01f144b7d Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Sun, 22 Nov 2020 12:02:37 +0900 Subject: [PATCH] Add Socket Mode implementation (aiohttp, websockets, websocket-client) --- .../samples/socket_mode/aiohttp_example.py | 46 +++++ .../socket_mode/bolt_adapter/aiohttp.py | 82 ++++++++ .../socket_mode/bolt_adapter/aiohttp_async.py | 82 ++++++++ .../bolt_adapter/websocket_client.py | 82 ++++++++ .../socket_mode/bolt_adapter/websockets.py | 82 ++++++++ .../bolt_adapter/websockets_async.py | 82 ++++++++ .../socket_mode/bolt_aiohttp_async_example.py | 49 +++++ .../socket_mode/bolt_aiohttp_example.py | 50 +++++ .../bolt_oauth_aiohttp_async_example.py | 62 ++++++ .../socket_mode/bolt_oauth_aiohttp_example.py | 68 ++++++ .../samples/socket_mode/bolt_oauth_example.py | 57 ++++++ .../bolt_websocket_client_example.py | 45 ++++ .../bolt_websockets_async_example.py | 49 +++++ .../socket_mode/bolt_websockets_example.py | 50 +++++ .../socket_mode/websocket_client_example.py | 39 ++++ .../samples/socket_mode/websockets_example.py | 46 +++++ setup.py | 3 + slack_sdk/socket_mode/__init__.py | 0 slack_sdk/socket_mode/aiohttp/__init__.py | 193 ++++++++++++++++++ slack_sdk/socket_mode/async_client.py | 131 ++++++++++++ slack_sdk/socket_mode/async_listeners.py | 22 ++ slack_sdk/socket_mode/client.py | 128 ++++++++++++ slack_sdk/socket_mode/listeners.py | 20 ++ slack_sdk/socket_mode/request.py | 57 ++++++ slack_sdk/socket_mode/response.py | 30 +++ .../socket_mode/websocket_client/__init__.py | 192 +++++++++++++++++ slack_sdk/socket_mode/websockets/__init__.py | 158 ++++++++++++++ slack_sdk/web/async_client.py | 9 + slack_sdk/web/client.py | 5 + slack_sdk/web/legacy_client.py | 7 + 30 files changed, 1926 insertions(+) create mode 100644 integration_tests/samples/socket_mode/aiohttp_example.py create mode 100644 integration_tests/samples/socket_mode/bolt_adapter/aiohttp.py create mode 100644 integration_tests/samples/socket_mode/bolt_adapter/aiohttp_async.py create mode 100644 integration_tests/samples/socket_mode/bolt_adapter/websocket_client.py create mode 100644 integration_tests/samples/socket_mode/bolt_adapter/websockets.py create mode 100644 integration_tests/samples/socket_mode/bolt_adapter/websockets_async.py create mode 100644 integration_tests/samples/socket_mode/bolt_aiohttp_async_example.py create mode 100644 integration_tests/samples/socket_mode/bolt_aiohttp_example.py create mode 100644 integration_tests/samples/socket_mode/bolt_oauth_aiohttp_async_example.py create mode 100644 integration_tests/samples/socket_mode/bolt_oauth_aiohttp_example.py create mode 100644 integration_tests/samples/socket_mode/bolt_oauth_example.py create mode 100644 integration_tests/samples/socket_mode/bolt_websocket_client_example.py create mode 100644 integration_tests/samples/socket_mode/bolt_websockets_async_example.py create mode 100644 integration_tests/samples/socket_mode/bolt_websockets_example.py create mode 100644 integration_tests/samples/socket_mode/websocket_client_example.py create mode 100644 integration_tests/samples/socket_mode/websockets_example.py create mode 100644 slack_sdk/socket_mode/__init__.py create mode 100644 slack_sdk/socket_mode/aiohttp/__init__.py create mode 100644 slack_sdk/socket_mode/async_client.py create mode 100644 slack_sdk/socket_mode/async_listeners.py create mode 100644 slack_sdk/socket_mode/client.py create mode 100644 slack_sdk/socket_mode/listeners.py create mode 100644 slack_sdk/socket_mode/request.py create mode 100644 slack_sdk/socket_mode/response.py create mode 100644 slack_sdk/socket_mode/websocket_client/__init__.py create mode 100644 slack_sdk/socket_mode/websockets/__init__.py diff --git a/integration_tests/samples/socket_mode/aiohttp_example.py b/integration_tests/samples/socket_mode/aiohttp_example.py new file mode 100644 index 000000000..51ebba395 --- /dev/null +++ b/integration_tests/samples/socket_mode/aiohttp_example.py @@ -0,0 +1,46 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import asyncio +import os +from slack_sdk.web.async_client import AsyncWebClient +from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.socket_mode.aiohttp import SocketModeClient + + +async def main(): + + client = SocketModeClient( + app_token=os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN"), + web_client=AsyncWebClient( + token=os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN") + ), + ) + + async def process(client: SocketModeClient, req: SocketModeRequest): + if req.type == "events_api": + response = SocketModeResponse(envelope_id=req.envelope_id) + await client.send_socket_mode_response(response) + + await client.web_client.reactions_add( + name="eyes", + channel=req.payload["event"]["channel"], + timestamp=req.payload["event"]["ts"], + ) + + client.socket_mode_request_listeners.append(process) + await client.connect() + await asyncio.sleep(float("inf")) + + +asyncio.run(main()) diff --git a/integration_tests/samples/socket_mode/bolt_adapter/aiohttp.py b/integration_tests/samples/socket_mode/bolt_adapter/aiohttp.py new file mode 100644 index 000000000..7ab1a4c2b --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_adapter/aiohttp.py @@ -0,0 +1,82 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../../..") +# ------------------ + +import logging +import os +import asyncio +from typing import Optional +from time import time +from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.socket_mode.aiohttp import SocketModeClient + +from slack_bolt import App +from slack_bolt.request import BoltRequest +from slack_bolt.response import BoltResponse + + +class SocketModeAdapter: + app: App + + def __init__(self, app: App): + self.app = app + + async def listener(self, client: SocketModeClient, req: SocketModeRequest): + start = time() + bolt_req: BoltRequest = BoltRequest(mode="socket_mode", body=req.payload) + bolt_resp: BoltResponse = self.app.dispatch(bolt_req) + if bolt_resp.status == 200: + if bolt_resp.body is None or len(bolt_resp.body) == 0: + await client.send_socket_mode_response( + SocketModeResponse(envelope_id=req.envelope_id) + ) + elif bolt_resp.body.startswith("{"): + await client.send_socket_mode_response( + SocketModeResponse( + envelope_id=req.envelope_id, payload=bolt_resp.body, + ) + ) + if client.logger.level <= logging.DEBUG: + spent_time = int((time() - start) * 1000) + client.logger.debug(f"Response time: {spent_time} milliseconds") + else: + client.logger.info( + f"Unsuccessful Bolt execution result (status: {bolt_resp.status}, body: {bolt_resp.body})" + ) + + +class SocketModeApp: + app: App + app_token: str + client: SocketModeClient + + def __init__( + self, app: App, app_token: Optional[str] = None, + ): + self.app = app + self.app_token = app_token or os.environ["SLACK_APP_TOKEN"] + self.client = SocketModeClient(app_token=self.app_token) + listener = SocketModeAdapter(self.app).listener + self.client.socket_mode_request_listeners.append(listener) + + async def connect_async(self): + await self.client.connect() + + async def disconnect_async(self): + await self.client.disconnect() + + async def close_async(self): + await self.client.close() + + async def start_async(self): + await self.connect_async() + if self.app.logger.level > logging.INFO: + print("⚡️ Bolt app is running!") + else: + self.app.logger.info("⚡️ Bolt app is running!") + await asyncio.sleep(float("inf")) diff --git a/integration_tests/samples/socket_mode/bolt_adapter/aiohttp_async.py b/integration_tests/samples/socket_mode/bolt_adapter/aiohttp_async.py new file mode 100644 index 000000000..6f96bc608 --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_adapter/aiohttp_async.py @@ -0,0 +1,82 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../../..") +# ------------------ + +import asyncio +import logging +import os +from typing import Optional +from time import time +from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.socket_mode.aiohttp import SocketModeClient + +from slack_bolt.app.async_app import AsyncApp +from slack_bolt.request.async_request import AsyncBoltRequest +from slack_bolt.response import BoltResponse + + +class AsyncSocketModeAdapter: + app: AsyncApp + + def __init__(self, app: AsyncApp): + self.app = app + + async def listener(self, client: SocketModeClient, req: SocketModeRequest): + start = time() + bolt_req: AsyncBoltRequest = AsyncBoltRequest(mode="socket_mode", body=req.payload) + bolt_resp: BoltResponse = await self.app.async_dispatch(bolt_req) + if bolt_resp.status == 200: + if bolt_resp.body is None or len(bolt_resp.body) == 0: + await client.send_socket_mode_response( + SocketModeResponse(envelope_id=req.envelope_id) + ) + elif bolt_resp.body.startswith("{"): + await client.send_socket_mode_response( + SocketModeResponse( + envelope_id=req.envelope_id, payload=bolt_resp.body, + ) + ) + if client.logger.level <= logging.DEBUG: + spent_time = int((time() - start) * 1000) + client.logger.debug(f"Response time: {spent_time} milliseconds") + else: + client.logger.info( + f"Unsuccessful Bolt execution result (status: {bolt_resp.status}, body: {bolt_resp.body})" + ) + + +class AsyncSocketModeApp: + app: AsyncApp + app_token: str + client: SocketModeClient + + def __init__( + self, app: AsyncApp, app_token: Optional[str] = None, + ): + self.app = app + self.app_token = app_token or os.environ["SLACK_APP_TOKEN"] + self.client = SocketModeClient(app_token=self.app_token) + listener = AsyncSocketModeAdapter(self.app).listener + self.client.socket_mode_request_listeners.append(listener) + + async def connect_async(self): + await self.client.connect() + + async def disconnect_async(self): + await self.client.disconnect() + + async def close_async(self): + await self.client.close() + + async def start_async(self): + await self.connect_async() + if self.app.logger.level > logging.INFO: + print("⚡️ Bolt app is running!") + else: + self.app.logger.info("⚡️ Bolt app is running!") + await asyncio.sleep(float("inf")) diff --git a/integration_tests/samples/socket_mode/bolt_adapter/websocket_client.py b/integration_tests/samples/socket_mode/bolt_adapter/websocket_client.py new file mode 100644 index 000000000..4cd39375f --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_adapter/websocket_client.py @@ -0,0 +1,82 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../../..") +# ------------------ + +import logging +import os +from threading import Event +from typing import Optional +from time import time +from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.socket_mode.websocket_client import SocketModeClient + +from slack_bolt.app import App +from slack_bolt.request import BoltRequest +from slack_bolt.response import BoltResponse + + +class SocketModeAdapter: + app: App + + def __init__(self, app: App): + self.app = app + + def listener(self, client: SocketModeClient, req: SocketModeRequest): + start = time() + bolt_req: BoltRequest = BoltRequest(mode="socket_mode", body=req.payload) + bolt_resp: BoltResponse = self.app.dispatch(bolt_req) + if bolt_resp.status == 200: + if bolt_resp.body is None or len(bolt_resp.body) == 0: + client.send_socket_mode_response( + SocketModeResponse(envelope_id=req.envelope_id) + ) + elif bolt_resp.body.startswith("{"): + client.send_socket_mode_response( + SocketModeResponse( + envelope_id=req.envelope_id, payload=bolt_resp.body, + ) + ) + if client.logger.level <= logging.DEBUG: + spent_time = int((time() - start) * 1000) + client.logger.debug(f"Response time: {spent_time} milliseconds") + else: + client.logger.info( + f"Unsuccessful Bolt execution result (status: {bolt_resp.status}, body: {bolt_resp.body})" + ) + + +class SocketModeApp: + app: App + app_token: str + client: SocketModeClient + + def __init__( + self, app: App, app_token: Optional[str] = None, + ): + self.app = app + self.app_token = app_token or os.environ["SLACK_APP_TOKEN"] + self.client = SocketModeClient(app_token=self.app_token) + listener = SocketModeAdapter(self.app).listener + self.client.socket_mode_request_listeners.append(listener) + + def connect(self): + self.client.connect() + + def disconnect(self): + self.client.disconnect() + + def close(self): + self.client.close() + + def start(self): + self.connect() + if self.app.logger.level > logging.INFO: + print("⚡️ Bolt app is running!") + else: + self.app.logger.info("⚡️ Bolt app is running!") + Event().wait() diff --git a/integration_tests/samples/socket_mode/bolt_adapter/websockets.py b/integration_tests/samples/socket_mode/bolt_adapter/websockets.py new file mode 100644 index 000000000..83c13fd28 --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_adapter/websockets.py @@ -0,0 +1,82 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../../..") +# ------------------ + +import logging +import os +import asyncio +from typing import Optional +from time import time +from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.socket_mode.websockets import SocketModeClient + +from slack_bolt import App +from slack_bolt.request import BoltRequest +from slack_bolt.response import BoltResponse + + +class SocketModeAdapter: + app: App + + def __init__(self, app: App): + self.app = app + + async def listener(self, client: SocketModeClient, req: SocketModeRequest): + start = time() + bolt_req: BoltRequest = BoltRequest(mode="socket_mode", body=req.payload) + bolt_resp: BoltResponse = self.app.dispatch(bolt_req) + if bolt_resp.status == 200: + if bolt_resp.body is None or len(bolt_resp.body) == 0: + await client.send_socket_mode_response( + SocketModeResponse(envelope_id=req.envelope_id) + ) + elif bolt_resp.body.startswith("{"): + await client.send_socket_mode_response( + SocketModeResponse( + envelope_id=req.envelope_id, payload=bolt_resp.body, + ) + ) + if client.logger.level <= logging.DEBUG: + spent_time = int((time() - start) * 1000) + client.logger.debug(f"Response time: {spent_time} milliseconds") + else: + client.logger.info( + f"Unsuccessful Bolt execution result (status: {bolt_resp.status}, body: {bolt_resp.body})" + ) + + +class SocketModeApp: + app: App + app_token: str + client: SocketModeClient + + def __init__( + self, app: App, app_token: Optional[str] = None, + ): + self.app = app + self.app_token = app_token or os.environ["SLACK_APP_TOKEN"] + self.client = SocketModeClient(app_token=self.app_token) + listener = SocketModeAdapter(self.app).listener + self.client.socket_mode_request_listeners.append(listener) + + async def connect_async(self): + await self.client.connect() + + async def disconnect_async(self): + await self.client.disconnect() + + async def close_async(self): + await self.client.close() + + async def start_async(self): + await self.connect_async() + if self.app.logger.level > logging.INFO: + print("⚡️ Bolt app is running!") + else: + self.app.logger.info("⚡️ Bolt app is running!") + await asyncio.sleep(float("inf")) diff --git a/integration_tests/samples/socket_mode/bolt_adapter/websockets_async.py b/integration_tests/samples/socket_mode/bolt_adapter/websockets_async.py new file mode 100644 index 000000000..6cde52cb8 --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_adapter/websockets_async.py @@ -0,0 +1,82 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../../..") +# ------------------ + +import asyncio +import logging +import os +from typing import Optional +from time import time +from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.socket_mode.websockets import SocketModeClient + +from slack_bolt.app.async_app import AsyncApp +from slack_bolt.request.async_request import AsyncBoltRequest +from slack_bolt.response import BoltResponse + + +class AsyncSocketModeAdapter: + app: AsyncApp + + def __init__(self, app: AsyncApp): + self.app = app + + async def listener(self, client: SocketModeClient, req: SocketModeRequest): + start = time() + bolt_req: AsyncBoltRequest = AsyncBoltRequest(mode="socket_mode", body=req.payload) + bolt_resp: BoltResponse = await self.app.async_dispatch(bolt_req) + if bolt_resp.status == 200: + if bolt_resp.body is None or len(bolt_resp.body) == 0: + await client.send_socket_mode_response( + SocketModeResponse(envelope_id=req.envelope_id) + ) + elif bolt_resp.body.startswith("{"): + await client.send_socket_mode_response( + SocketModeResponse( + envelope_id=req.envelope_id, payload=bolt_resp.body, + ) + ) + if client.logger.level <= logging.DEBUG: + spent_time = int((time() - start) * 1000) + client.logger.debug(f"Response time: {spent_time} milliseconds") + else: + client.logger.info( + f"Unsuccessful Bolt execution result (status: {bolt_resp.status}, body: {bolt_resp.body})" + ) + + +class AsyncSocketModeApp: + app: AsyncApp + app_token: str + client: SocketModeClient + + def __init__( + self, app: AsyncApp, app_token: Optional[str] = None, + ): + self.app = app + self.app_token = app_token or os.environ["SLACK_APP_TOKEN"] + self.client = SocketModeClient(app_token=self.app_token) + listener = AsyncSocketModeAdapter(self.app).listener + self.client.socket_mode_request_listeners.append(listener) + + async def connect_async(self): + await self.client.connect() + + async def disconnect_async(self): + await self.client.disconnect() + + async def close_async(self): + await self.client.close() + + async def start_async(self): + await self.connect_async() + if self.app.logger.level > logging.INFO: + print("⚡️ Bolt app is running!") + else: + self.app.logger.info("⚡️ Bolt app is running!") + await asyncio.sleep(float("inf")) diff --git a/integration_tests/samples/socket_mode/bolt_aiohttp_async_example.py b/integration_tests/samples/socket_mode/bolt_aiohttp_async_example.py new file mode 100644 index 000000000..50edab56f --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_aiohttp_async_example.py @@ -0,0 +1,49 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import os +from slack_bolt.app.async_app import AsyncApp +from slack_bolt.context.async_context import AsyncBoltContext + +bot_token = os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN") +app = AsyncApp(signing_secret="will-be-removed-soon", token=bot_token) + + +@app.event("app_mention") +async def mention(context: AsyncBoltContext): + await context.say(":wave: Hi there!") + + +@app.event("message") +async def message(context: AsyncBoltContext, event: dict): + await context.client.reactions_add( + channel=event["channel"], timestamp=event["ts"], name="eyes", + ) + + +async def main(): + from bolt_adapter.aiohttp_async import AsyncSocketModeApp + + app_token = os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN") + await AsyncSocketModeApp(app, app_token).start_async() + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) + + # export SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN= + # export SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN= + # pip install .[optional] + # pip install slack_bolt + # python integration_tests/samples/socket_mode/{this file name}.py diff --git a/integration_tests/samples/socket_mode/bolt_aiohttp_example.py b/integration_tests/samples/socket_mode/bolt_aiohttp_example.py new file mode 100644 index 000000000..966ad9141 --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_aiohttp_example.py @@ -0,0 +1,50 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import os + +from slack_bolt.app import App +from slack_bolt.context import BoltContext + +bot_token = os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN") +app = App(signing_secret="will-be-removed-soon", token=bot_token) + + +@app.event("app_mention") +def mention(context: BoltContext): + context.say(":wave: Hi there!") + + +@app.event("message") +def message(context: BoltContext, event: dict): + context.client.reactions_add( + channel=event["channel"], timestamp=event["ts"], name="eyes", + ) + + +async def main(): + from bolt_adapter.aiohttp import SocketModeApp + + app_token = os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN") + await SocketModeApp(app, app_token).start_async() + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) + + # export SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN= + # export SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN= + # pip install .[optional] + # pip install slack_bolt + # python integration_tests/samples/socket_mode/{this file name}.py diff --git a/integration_tests/samples/socket_mode/bolt_oauth_aiohttp_async_example.py b/integration_tests/samples/socket_mode/bolt_oauth_aiohttp_async_example.py new file mode 100644 index 000000000..da6816b3b --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_oauth_aiohttp_async_example.py @@ -0,0 +1,62 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import os + +from slack_bolt.app.async_app import AsyncApp +from slack_bolt.context.async_context import AsyncBoltContext +from slack_bolt.oauth.async_oauth_settings import AsyncOAuthSettings + +app = AsyncApp( + signing_secret=os.environ["SLACK_SIGNING_SECRET"], + oauth_settings=AsyncOAuthSettings( + client_id=os.environ["SLACK_CLIENT_ID"], + client_secret=os.environ["SLACK_CLIENT_SECRET"], + scopes=os.environ["SLACK_SCOPES"].split(","), + ) +) + + +@app.event("app_mention") +async def mention(context: AsyncBoltContext): + await context.say(":wave: Hi there!") + + +@app.event("message") +async def message(context: AsyncBoltContext, event: dict): + await context.client.reactions_add( + channel=event["channel"], timestamp=event["ts"], name="eyes", + ) + + +if __name__ == "__main__": + + import asyncio + from asyncio import Future + + async def socket_mode_runner(): + from bolt_adapter.aiohttp_async import AsyncSocketModeApp + app_token = os.environ.get("SLACK_APP_TOKEN") + await AsyncSocketModeApp(app, app_token).connect_async() + await asyncio.sleep(float("inf")) + + _: Future = asyncio.ensure_future(socket_mode_runner()) + app.start() + + # export SLACK_APP_TOKEN= + # export SLACK_SIGNING_SECRET= + # export SLACK_CLIENT_ID= + # export SLACK_CLIENT_SECRET= + # export SLACK_SCOPES= + # pip install .[optional] + # pip install slack_bolt + # python integration_tests/samples/socket_mode/{this file name}.py diff --git a/integration_tests/samples/socket_mode/bolt_oauth_aiohttp_example.py b/integration_tests/samples/socket_mode/bolt_oauth_aiohttp_example.py new file mode 100644 index 000000000..8636b4d47 --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_oauth_aiohttp_example.py @@ -0,0 +1,68 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import os + +from slack_bolt.app import App +from slack_bolt.context import BoltContext +from slack_bolt.oauth.oauth_settings import OAuthSettings + +app = App( + signing_secret=os.environ["SLACK_SIGNING_SECRET"], + oauth_settings=OAuthSettings( + client_id=os.environ["SLACK_CLIENT_ID"], + client_secret=os.environ["SLACK_CLIENT_SECRET"], + scopes=os.environ["SLACK_SCOPES"].split(","), + ) +) + + +@app.event("app_mention") +def mention(context: BoltContext): + context.say(":wave: Hi there!") + + +@app.event("message") +def message(context: BoltContext, event: dict): + context.client.reactions_add( + channel=event["channel"], timestamp=event["ts"], name="eyes", + ) + + +if __name__ == "__main__": + + def run_socket_mode_app(): + import asyncio + from bolt_adapter.aiohttp import SocketModeApp + + async def socket_mode_app(): + app_token = os.environ.get("SLACK_APP_TOKEN") + await SocketModeApp(app, app_token).connect_async() + await asyncio.sleep(float("inf")) + + asyncio.run(socket_mode_app()) + + from concurrent.futures.thread import ThreadPoolExecutor + + socket_mode_thread = ThreadPoolExecutor(1) + socket_mode_thread.submit(run_socket_mode_app) + + app.start() + + # export SLACK_APP_TOKEN= + # export SLACK_SIGNING_SECRET= + # export SLACK_CLIENT_ID= + # export SLACK_CLIENT_SECRET= + # export SLACK_SCOPES= + # pip install .[optional] + # pip install slack_bolt + # python integration_tests/samples/socket_mode/{this file name}.py diff --git a/integration_tests/samples/socket_mode/bolt_oauth_example.py b/integration_tests/samples/socket_mode/bolt_oauth_example.py new file mode 100644 index 000000000..db7cbfe44 --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_oauth_example.py @@ -0,0 +1,57 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import os + +from slack_bolt.app import App +from slack_bolt.context import BoltContext +from slack_bolt.oauth.oauth_settings import OAuthSettings + +app = App( + signing_secret=os.environ["SLACK_SIGNING_SECRET"], + oauth_settings=OAuthSettings( + client_id=os.environ["SLACK_CLIENT_ID"], + client_secret=os.environ["SLACK_CLIENT_SECRET"], + scopes=os.environ["SLACK_SCOPES"].split(","), + ) +) + + +@app.event("app_mention") +def mention(context: BoltContext): + context.say(":wave: Hi there!") + + +@app.event("message") +def message(context: BoltContext, event: dict): + context.client.reactions_add( + channel=event["channel"], timestamp=event["ts"], name="eyes", + ) + + +if __name__ == "__main__": + + from bolt_adapter.websocket_client import SocketModeApp + + app_token = os.environ.get("SLACK_APP_TOKEN") + SocketModeApp(app, app_token).connect() + + app.start() + + # export SLACK_APP_TOKEN= + # export SLACK_SIGNING_SECRET= + # export SLACK_CLIENT_ID= + # export SLACK_CLIENT_SECRET= + # export SLACK_SCOPES= + # pip install .[optional] + # pip install slack_bolt + # python integration_tests/samples/socket_mode/{this file name}.py diff --git a/integration_tests/samples/socket_mode/bolt_websocket_client_example.py b/integration_tests/samples/socket_mode/bolt_websocket_client_example.py new file mode 100644 index 000000000..40f855161 --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_websocket_client_example.py @@ -0,0 +1,45 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import os + +from slack_bolt.app import App +from slack_bolt.context import BoltContext + +bot_token = os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN") +app = App(signing_secret="will-be-removed-soon", token=bot_token) + + +@app.event("app_mention") +def mention(context: BoltContext): + context.say(":wave: Hi there!") + + +@app.event("message") +def message(context: BoltContext, event: dict): + context.client.reactions_add( + channel=event["channel"], timestamp=event["ts"], name="eyes", + ) + + +if __name__ == "__main__": + + from bolt_adapter.websocket_client import SocketModeApp + + app_token = os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN") + SocketModeApp(app, app_token).start() + + # export SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN= + # export SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN= + # pip install .[optional] + # pip install slack_bolt + # python integration_tests/samples/socket_mode/{this file name}.py diff --git a/integration_tests/samples/socket_mode/bolt_websockets_async_example.py b/integration_tests/samples/socket_mode/bolt_websockets_async_example.py new file mode 100644 index 000000000..1f4b47790 --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_websockets_async_example.py @@ -0,0 +1,49 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import os +from slack_bolt.app.async_app import AsyncApp +from slack_bolt.context.async_context import AsyncBoltContext + +bot_token = os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN") +app = AsyncApp(signing_secret="will-be-removed-soon", token=bot_token) + + +@app.event("app_mention") +async def mention(context: AsyncBoltContext): + await context.say(":wave: Hi there!") + + +@app.event("message") +async def message(context: AsyncBoltContext, event: dict): + await context.client.reactions_add( + channel=event["channel"], timestamp=event["ts"], name="eyes", + ) + + +async def main(): + from bolt_adapter.websockets_async import AsyncSocketModeApp + + app_token = os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN") + await AsyncSocketModeApp(app, app_token).start_async() + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) + + # export SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN= + # export SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN= + # pip install .[optional] + # pip install slack_bolt + # python integration_tests/samples/socket_mode/{this file name}.py diff --git a/integration_tests/samples/socket_mode/bolt_websockets_example.py b/integration_tests/samples/socket_mode/bolt_websockets_example.py new file mode 100644 index 000000000..0ea47be2d --- /dev/null +++ b/integration_tests/samples/socket_mode/bolt_websockets_example.py @@ -0,0 +1,50 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import os + +from slack_bolt.app import App +from slack_bolt.context import BoltContext + +bot_token = os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN") +app = App(signing_secret="will-be-removed-soon", token=bot_token) + + +@app.event("app_mention") +def mention(context: BoltContext): + context.say(":wave: Hi there!") + + +@app.event("message") +def message(context: BoltContext, event: dict): + context.client.reactions_add( + channel=event["channel"], timestamp=event["ts"], name="eyes", + ) + + +async def main(): + from bolt_adapter.websockets import SocketModeApp + + app_token = os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN") + await SocketModeApp(app, app_token).start_async() + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) + + # export SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN= + # export SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN= + # pip install .[optional] + # pip install slack_bolt + # python integration_tests/samples/socket_mode/{this file name}.py diff --git a/integration_tests/samples/socket_mode/websocket_client_example.py b/integration_tests/samples/socket_mode/websocket_client_example.py new file mode 100644 index 000000000..6bf3d9f8a --- /dev/null +++ b/integration_tests/samples/socket_mode/websocket_client_example.py @@ -0,0 +1,39 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import os +from threading import Event +from slack_sdk.web import WebClient +from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.socket_mode.websocket_client import SocketModeClient + +client = SocketModeClient( + app_token=os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN"), + web_client=WebClient(token=os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN")), +) + +if __name__ == "__main__": + + def process(client: SocketModeClient, req: SocketModeRequest): + if req.type == "events_api": + response = SocketModeResponse(envelope_id=req.envelope_id) + client.send_socket_mode_response(response) + client.web_client.reactions_add( + name="eyes", + channel=req.payload["event"]["channel"], + timestamp=req.payload["event"]["ts"], + ) + + client.socket_mode_request_listeners.append(process) + client.connect() + Event().wait() diff --git a/integration_tests/samples/socket_mode/websockets_example.py b/integration_tests/samples/socket_mode/websockets_example.py new file mode 100644 index 000000000..b6934464c --- /dev/null +++ b/integration_tests/samples/socket_mode/websockets_example.py @@ -0,0 +1,46 @@ +# ------------------ +# Only for running this script here +import sys +from os.path import dirname + +sys.path.insert(1, f"{dirname(__file__)}/../../..") +# ------------------ + +import logging + +logging.basicConfig(level=logging.DEBUG) + +import asyncio +import os +from slack_sdk.web.async_client import AsyncWebClient +from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.socket_mode.websockets import SocketModeClient + + +async def main(): + + client = SocketModeClient( + app_token=os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN"), + web_client=AsyncWebClient( + token=os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN") + ), + ) + + async def process(client: SocketModeClient, req: SocketModeRequest): + if req.type == "events_api": + response = SocketModeResponse(envelope_id=req.envelope_id) + await client.send_socket_mode_response(response) + + await client.web_client.reactions_add( + name="eyes", + channel=req.payload["event"]["channel"], + timestamp=req.payload["event"]["ts"], + ) + + client.socket_mode_request_listeners.append(process) + await client.connect() + await asyncio.sleep(float("inf")) + + +asyncio.run(main()) diff --git a/setup.py b/setup.py index eb94561d4..7458ae899 100644 --- a/setup.py +++ b/setup.py @@ -300,6 +300,9 @@ def run(self): "boto3<=2", # InstallationStore/OAuthStateStore "SQLAlchemy>=1,<2", + # Socket Mode + "websockets>=8,<9", + "websocket-client>=0.57<1", ], }, setup_requires=pytest_runner, diff --git a/slack_sdk/socket_mode/__init__.py b/slack_sdk/socket_mode/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/slack_sdk/socket_mode/aiohttp/__init__.py b/slack_sdk/socket_mode/aiohttp/__init__.py new file mode 100644 index 000000000..fec1f265b --- /dev/null +++ b/slack_sdk/socket_mode/aiohttp/__init__.py @@ -0,0 +1,193 @@ +import asyncio +import logging +from asyncio import Future +from asyncio import Queue +from logging import Logger +from typing import Union, Optional, List, Callable, Awaitable + +import aiohttp +from aiohttp import ClientWebSocketResponse, WSMessage, WSMsgType, ClientConnectionError + +from slack_sdk.socket_mode.async_client import AsyncBaseSocketModeClient +from slack_sdk.socket_mode.async_listeners import ( + AsyncWebSocketMessageListener, + AsyncSocketModeRequestListener, +) +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.web.async_client import AsyncWebClient + + +class SocketModeClient(AsyncBaseSocketModeClient): + logger: Logger + web_client: AsyncWebClient + app_token: str + wss_uri: Optional[str] + auto_reconnect_enabled: bool + web_socket_message_queue: Queue + web_socket_message_listeners: List[ + Union[ + AsyncWebSocketMessageListener, + Callable[ + ["AsyncBaseSocketModeClient", dict, Optional[str]], Awaitable[None] + ], + ] + ] + socket_mode_request_listeners: List[ + Union[ + AsyncSocketModeRequestListener, + Callable[["AsyncBaseSocketModeClient", SocketModeRequest], Awaitable[None]], + ] + ] + + web_socket_message_receiver: Optional[Future] + web_socket_message_processor: Future + + proxy: Optional[str] + ping_interval: float + current_session: Optional[ClientWebSocketResponse] + current_session_monitor: Optional[Future] + + on_message_listeners: List[Callable[[WSMessage], None]] + on_error_listeners: List[Callable[[WSMessage], None]] + on_close_listeners: List[Callable[[WSMessage], None]] + + def __init__( + self, + app_token: str, + logger: Optional[Logger] = None, + web_client: Optional[AsyncWebClient] = None, + proxy: Optional[str] = None, + auto_reconnect_enabled: bool = True, + ping_interval: float = 10, + on_message_listeners: Optional[List[Callable[[WSMessage], None]]] = None, + on_error_listeners: Optional[List[Callable[[WSMessage], None]]] = None, + on_close_listeners: Optional[List[Callable[[WSMessage], None]]] = None, + ): + self.app_token = app_token + self.logger = logger or logging.getLogger(__name__) + self.web_client = web_client or AsyncWebClient() + self.proxy = proxy + self.auto_reconnect_enabled = auto_reconnect_enabled + self.ping_interval = ping_interval + + self.wss_uri = None + self.web_socket_message_queue = Queue() + self.web_socket_message_listeners = [] + self.socket_mode_request_listeners = [] + self.current_session = None + self.current_session_monitor = None + + self.on_message_listeners = on_message_listeners or [] + self.on_error_listeners = on_error_listeners or [] + self.on_close_listeners = on_close_listeners or [] + + self.web_socket_message_receiver = None + self.web_socket_message_processor = asyncio.ensure_future( + self.process_web_socket_messages() + ) + + async def monitor_current_session(self) -> None: + while True: + await asyncio.sleep(self.ping_interval) + try: + if self.auto_reconnect_enabled and ( + self.current_session is None or self.current_session.closed + ): + self.logger.info( + "The session seems to be already closed. Going to reconnect..." + ) + await self.connect_to_new_endpoint() + except Exception as e: + self.logger.error( + "Failed to check the current session or reconnect to the server " + f"(error: {type(e).__name__}, message: {e})" + ) + + async def receive_web_socket_messages(self) -> None: + consecutive_error_count = 0 + while True: + try: + message: WSMessage = await self.current_session.receive() + if self.logger.level <= logging.DEBUG: + type = WSMsgType(message.type) + message_type = type.name if type is not None else message.type + message_data = message.data + if isinstance(message_data, bytes): + message_data = message_data.decode("utf-8") + self.logger.debug( + f"Received message (type: {message_type}, data: {message_data}, extra: {message.extra})" + ) + if message is not None: + if message.type == WSMsgType.TEXT: + message_data = message.data + await self.enqueue_web_socket_message(message_data) + for listener in self.on_message_listeners: + listener(message) + elif message.type == WSMsgType.CLOSE: + if self.auto_reconnect_enabled: + self.logger.info( + "Received CLOSE event. Going to reconnect..." + ) + self.connect_to_new_endpoint() + for listener in self.on_close_listeners: + listener(message) + elif message.type == WSMsgType.ERROR: + for listener in self.on_error_listeners: + listener(message) + elif message.type == WSMsgType.CLOSED: + await asyncio.sleep(self.ping_interval) + continue + consecutive_error_count = 0 + except Exception as e: + consecutive_error_count += 1 + self.logger.error( + f"Failed to receive or enqueue a message: {type(e).__name__}, {e}" + ) + if isinstance(e, ClientConnectionError): + await asyncio.sleep(self.ping_interval) + else: + await asyncio.sleep(consecutive_error_count) + + async def connect(self): + old_session = None if self.current_session is None else self.current_session + cs = aiohttp.ClientSession() + if self.wss_uri is None: + self.wss_uri = await self.issue_new_wss_url() + self.current_session = await cs.ws_connect( + self.wss_uri, heartbeat=self.ping_interval, proxy=self.proxy, + ) + self.logger.info("A new session has been established") + + old_current_session_monitor = self.current_session_monitor + self.current_session_monitor = asyncio.ensure_future( + self.monitor_current_session() + ) + + old_web_socket_message_receiver = self.web_socket_message_receiver + self.web_socket_message_receiver = asyncio.ensure_future( + self.receive_web_socket_messages() + ) + + if old_session is not None: + await old_session.close() + if old_current_session_monitor is not None: + old_current_session_monitor.cancel() + if old_web_socket_message_receiver is not None: + old_web_socket_message_receiver.cancel() + self.logger.info("The old session has been abandoned") + + async def disconnect(self): + await self.current_session.close() + + async def send_web_socket_message(self, message: str): + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"Sending a message: {message}") + await self.current_session.send_str(message) + + async def close(self): + self.disconnect() + self.web_socket_message_processor.cancel() + if self.current_session_monitor is not None: + self.current_session_monitor.cancel() + if self.web_socket_message_receiver is not None: + self.web_socket_message_receiver.cancel() diff --git a/slack_sdk/socket_mode/async_client.py b/slack_sdk/socket_mode/async_client.py new file mode 100644 index 000000000..5f188ec4d --- /dev/null +++ b/slack_sdk/socket_mode/async_client.py @@ -0,0 +1,131 @@ +import asyncio +import json +import logging +from asyncio import Queue +from asyncio.futures import Future +from logging import Logger +from typing import Dict, Union, Any, Optional, List, Callable, Awaitable + +from slack_sdk.errors import SlackApiError +from slack_sdk.socket_mode.async_listeners import ( + AsyncWebSocketMessageListener, + AsyncSocketModeRequestListener, +) +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.web.async_client import AsyncWebClient + + +class AsyncBaseSocketModeClient: + logger: Logger + web_client: AsyncWebClient + app_token: str + wss_uri: str + auto_reconnect_enabled: bool + web_socket_message_queue: Queue + web_socket_message_listeners: List[ + Union[ + AsyncWebSocketMessageListener, + Callable[ + ["AsyncBaseSocketModeClient", dict, Optional[str]], Awaitable[None] + ], + ] + ] + socket_mode_request_listeners: List[ + Union[ + AsyncSocketModeRequestListener, + Callable[["AsyncBaseSocketModeClient", SocketModeRequest], Awaitable[None]], + ] + ] + + async def issue_new_wss_url(self) -> str: + try: + response = await self.web_client.apps_connections_open( + app_token=self.app_token + ) + return response["url"] + except SlackApiError as e: + self.logger.error(f"Failed to retrieve Socket Mode URL: {e}") + raise e + + async def connect(self): + raise NotImplementedError() + + async def disconnect(self): + raise NotImplementedError() + + async def connect_to_new_endpoint(self): + self.wss_uri = await self.issue_new_wss_url() + await self.connect() + + async def close(self): + self.disconnect() + + async def send_web_socket_message(self, message: str): + raise NotImplementedError() + + async def send_socket_mode_response( + self, response: Union[Dict[str, Any], SocketModeResponse] + ): + if isinstance(response, SocketModeResponse): + await self.send_web_socket_message(json.dumps(response.to_dict())) + else: + await self.send_web_socket_message(json.dumps(response)) + + async def enqueue_web_socket_message(self, message: str): + await self.web_socket_message_queue.put(message) + if self.logger.level <= logging.DEBUG: + queue_size = self.web_socket_message_queue.qsize() + self.logger.debug( + f"A new message enqueued (current queue size: {queue_size})" + ) + + async def process_web_socket_messages(self): + while True: + try: + await self.process_web_socket_message() + except Exception as e: + self.logger.exception(f"Failed to process a message: {e}") + + async def process_web_socket_message(self): + raw_message = await self.web_socket_message_queue.get() + if raw_message is not None and raw_message.startswith("{"): + message: dict = json.loads(raw_message) + _: Future[None] = asyncio.ensure_future( + self.run_web_socket_message_listeners(message, raw_message) + ) + + async def run_web_socket_message_listeners( + self, message: dict, raw_message: str + ) -> None: + type, envelope_id = message.get("type"), message.get("envelope_id") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Message processing started (type: {type}, envelope_id: {envelope_id})" + ) + try: + if message.get("type") == "disconnect": + await self.connect_to_new_endpoint() + return + + for listener in self.web_socket_message_listeners: + try: + await listener(self, message, raw_message) + except Exception as e: + self.logger.exception(f"Failed to run a message listener: {e}") + + if len(self.socket_mode_request_listeners) > 0: + request = SocketModeRequest.from_dict(message) + if request is not None: + for listener in self.socket_mode_request_listeners: + try: + await listener(self, request) + except Exception as e: + self.logger.exception(f"Failed to run a request listener: {e}") + except Exception as e: + self.logger.exception(f"Failed to run message listeners: {e}") + finally: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Message processing completed (type: {type}, envelope_id: {envelope_id})" + ) diff --git a/slack_sdk/socket_mode/async_listeners.py b/slack_sdk/socket_mode/async_listeners.py new file mode 100644 index 000000000..bce647564 --- /dev/null +++ b/slack_sdk/socket_mode/async_listeners.py @@ -0,0 +1,22 @@ +from typing import Optional, Callable + +from slack_sdk.socket_mode.request import SocketModeRequest + + +class AsyncWebSocketMessageListener(Callable): + async def __call__( + self, + receiver: "AsyncSocketModeClient", # noqa: F821 + message: dict, + raw_message: Optional[str] = None, + ): + raise NotImplementedError() + + +class AsyncSocketModeRequestListener(Callable): + async def __call__( + self, + receiver: "AsyncSocketModeClient", # noqa: F821 + request: SocketModeRequest, + ): + raise NotImplementedError() diff --git a/slack_sdk/socket_mode/client.py b/slack_sdk/socket_mode/client.py new file mode 100644 index 000000000..1c11668c9 --- /dev/null +++ b/slack_sdk/socket_mode/client.py @@ -0,0 +1,128 @@ +import json +import logging +from queue import Queue +from concurrent.futures.thread import ThreadPoolExecutor +from logging import Logger +from typing import Dict, Union, Any, Optional, List, Callable + +from slack_sdk.errors import SlackApiError +from slack_sdk.socket_mode.listeners import ( + WebSocketMessageListener, + SocketModeRequestListener, +) +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.web import WebClient + + +class BaseSocketModeClient: + logger: Logger + web_client: WebClient + app_token: str + wss_uri: str + web_socket_message_queue: Queue + web_socket_message_listeners: List[ + Union[ + WebSocketMessageListener, + Callable[["BaseSocketModeClient", dict, Optional[str]], None], + ] + ] + socket_mode_request_listeners: List[ + Union[ + SocketModeRequestListener, + Callable[["BaseSocketModeClient", SocketModeRequest], None], + ] + ] + + web_socket_message_processor: ThreadPoolExecutor + web_socket_message_workers: ThreadPoolExecutor + + def issue_new_wss_url(self) -> str: + try: + response = self.web_client.apps_connections_open(app_token=self.app_token) + return response["url"] + except SlackApiError as e: + self.logger.error(f"Failed to retrieve Socket Mode URL: {e}") + raise e + + def connect(self) -> None: + raise NotImplementedError() + + def disconnect(self) -> None: + raise NotImplementedError() + + def connect_to_new_endpoint(self): + self.wss_uri = self.issue_new_wss_url() + self.connect() + + def close(self) -> None: + self.disconnect() + + def send_web_socket_message(self, message: str) -> None: + raise NotImplementedError() + + def send_socket_mode_response( + self, response: Union[Dict[str, Any], SocketModeResponse] + ) -> None: + if isinstance(response, SocketModeResponse): + self.send_web_socket_message(json.dumps(response.to_dict())) + else: + self.send_web_socket_message(json.dumps(response)) + + def enqueue_web_socket_message(self, message: str): + self.web_socket_message_queue.put(message) + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new message enqueued (current queue size: {self.web_socket_message_queue.qsize()})" + ) + + def process_web_socket_message(self): + raw_message = self.web_socket_message_queue.get() + print(raw_message) + + def _run_web_socket_message_listeners(): + if raw_message is not None and raw_message.startswith("{"): + message: dict = json.loads(raw_message) + self.run_web_socket_message_listeners(message, raw_message) + + self.web_socket_message_workers.submit(_run_web_socket_message_listeners) + + def run_web_socket_message_listeners(self, message: dict, raw_message: str) -> None: + type, envelope_id = message.get("type"), message.get("envelope_id") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Message processing started (type: {type}, envelope_id: {envelope_id})" + ) + try: + if message.get("type") == "disconnect": + self.connect_to_new_endpoint() + return + + for listener in self.web_socket_message_listeners: + try: + listener(self, message, raw_message) + except Exception as e: + self.logger.exception(f"Failed to run a message listener: {e}") + + if len(self.socket_mode_request_listeners) > 0: + request = SocketModeRequest.from_dict(message) + if request is not None: + for listener in self.socket_mode_request_listeners: + try: + listener(self, request) + except Exception as e: + self.logger.exception(f"Failed to run a request listener: {e}") + except Exception as e: + self.logger.exception(f"Failed to run message listeners: {e}") + finally: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Message processing completed (type: {type}, envelope_id: {envelope_id})" + ) + + def process_web_socket_messages(self) -> None: + while True: + try: + self.process_web_socket_message() + except Exception as e: + self.logger.exception(f"Failed to process a message: {e}") diff --git a/slack_sdk/socket_mode/listeners.py b/slack_sdk/socket_mode/listeners.py new file mode 100644 index 000000000..46c89fe7f --- /dev/null +++ b/slack_sdk/socket_mode/listeners.py @@ -0,0 +1,20 @@ +from typing import Optional + +from slack_sdk.socket_mode.request import SocketModeRequest + + +class WebSocketMessageListener: + def __call__( + self, + receiver: "SocketModeClient", # noqa: F821 + message: dict, + raw_message: Optional[str] = None, + ): + raise NotImplementedError() + + +class SocketModeRequestListener: + def __call__( + self, receiver: "SocketModeClient", request: SocketModeRequest # noqa: F821 + ): + raise NotImplementedError() diff --git a/slack_sdk/socket_mode/request.py b/slack_sdk/socket_mode/request.py new file mode 100644 index 000000000..5cac2e375 --- /dev/null +++ b/slack_sdk/socket_mode/request.py @@ -0,0 +1,57 @@ +from typing import Union, Optional + +from slack_sdk.models import JsonObject + + +class SocketModeRequest: + type: str + envelope_id: str + payload: dict + accepts_response_payload: bool + retry_attempt: Optional[int] # events_api + retry_reason: Optional[str] # events_api + + def __init__( + self, + type: str, + envelope_id: str, + payload: Union[dict, JsonObject, str], + accepts_response_payload: Optional[bool] = None, + retry_attempt: Optional[int] = None, + retry_reason: Optional[str] = None, + ): + self.type = type + self.envelope_id = envelope_id + + if isinstance(payload, JsonObject): + self.payload = payload.to_dict() + elif isinstance(payload, dict): + self.payload = payload + elif isinstance(payload, str): + self.payload = {"text": payload} + else: + raise ValueError(f"Unsupported payload data type ({type(payload)})") + + self.accepts_response_payload = accepts_response_payload or False + self.retry_attempt = retry_attempt + self.retry_reason = retry_reason + + @classmethod + def from_dict(cls, message: dict) -> Optional["SocketModeRequest"]: + if all(k in message for k in ("type", "envelope_id", "payload")): + return SocketModeRequest( + type=message.get("type"), + envelope_id=message.get("envelope_id"), + payload=message.get("payload"), + accepts_response_payload=message.get("accepts_response_payload") + or False, + retry_attempt=message.get("retry_attempt"), + retry_reason=message.get("retry_reason"), + ) + return None + + def to_dict(self) -> dict: # skipcq: PYL-W0221 + d = {"envelope_id": self.envelope_id} + if self.payload is not None: + d["payload"] = self.payload + return d diff --git a/slack_sdk/socket_mode/response.py b/slack_sdk/socket_mode/response.py new file mode 100644 index 000000000..f748a6c8e --- /dev/null +++ b/slack_sdk/socket_mode/response.py @@ -0,0 +1,30 @@ +from typing import Union, Optional + +from slack_sdk.models import JsonObject + + +class SocketModeResponse: + envelope_id: str + payload: dict + + def __init__( + self, envelope_id: str, payload: Optional[Union[dict, JsonObject, str]] = None + ): + self.envelope_id = envelope_id + + if payload is None: + self.payload = None + elif isinstance(payload, JsonObject): + self.payload = payload.to_dict() + elif isinstance(payload, dict): + self.payload = payload + elif isinstance(payload, str): + self.payload = {"text": payload} + else: + raise ValueError(f"Unsupported payload data type ({type(payload)})") + + def to_dict(self) -> dict: # skipcq: PYL-W0221 + d = {"envelope_id": self.envelope_id} + if self.payload is not None: + d["payload"] = self.payload + return d diff --git a/slack_sdk/socket_mode/websocket_client/__init__.py b/slack_sdk/socket_mode/websocket_client/__init__.py new file mode 100644 index 000000000..297ff1428 --- /dev/null +++ b/slack_sdk/socket_mode/websocket_client/__init__.py @@ -0,0 +1,192 @@ +import logging +import time +from concurrent.futures.thread import ThreadPoolExecutor +from logging import Logger +from queue import Queue +from typing import Union, Optional, List, Callable, Tuple + +import websocket +from websocket import WebSocketApp + +from slack_sdk.socket_mode.client import BaseSocketModeClient +from slack_sdk.socket_mode.listeners import ( + WebSocketMessageListener, + SocketModeRequestListener, +) +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.web import WebClient + + +class SocketModeClient(BaseSocketModeClient): + logger: Logger + web_client: WebClient + app_token: str + wss_uri: Optional[str] + web_socket_message_queue: Queue + web_socket_message_listeners: List[ + Union[ + WebSocketMessageListener, + Callable[["BaseSocketModeClient", dict, Optional[str]], None], + ] + ] + socket_mode_request_listeners: List[ + Union[ + SocketModeRequestListener, + Callable[["BaseSocketModeClient", SocketModeRequest], None], + ] + ] + + current_app_executor: ThreadPoolExecutor + current_app_monitor: ThreadPoolExecutor + web_socket_message_processor: ThreadPoolExecutor + web_socket_message_workers: ThreadPoolExecutor + + current_session: Optional[WebSocketApp] + + on_open_listeners: List[Callable[[WebSocketApp], None]] + on_message_listeners: List[Callable[[WebSocketApp, str], None]] + on_error_listeners: List[Callable[[WebSocketApp, Exception], None]] + on_close_listeners: List[Callable[[WebSocketApp], None]] + + def __init__( + self, + app_token: str, + logger: Optional[Logger] = None, + web_client: Optional[WebClient] = None, + auto_reconnect_enabled: bool = True, + ping_interval: float = 10, + concurrency: int = 10, + trace_enabled: bool = False, + http_proxy_host: Optional[str] = None, + http_proxy_port: Optional[int] = None, + http_proxy_auth: Optional[Tuple[str, str]] = None, + proxy_type: Optional[str] = None, + on_open_listeners: Optional[List[Callable[[WebSocketApp], None]]] = None, + on_message_listeners: Optional[ + List[Callable[[WebSocketApp, str], None]] + ] = None, + on_error_listeners: Optional[ + List[Callable[[WebSocketApp, Exception], None]] + ] = None, + on_close_listeners: Optional[List[Callable[[WebSocketApp], None]]] = None, + ): + self.app_token = app_token + self.logger = logger or logging.getLogger(__name__) + self.web_client = web_client or WebClient() + self.auto_reconnect_enabled = auto_reconnect_enabled + self.ping_interval = ping_interval + self.wss_uri = self.issue_new_wss_url() + self.web_socket_message_queue = Queue() + self.web_socket_message_listeners = [] + self.socket_mode_request_listeners = [] + + self.current_session = None + self.current_app_monitor = ThreadPoolExecutor(1) + self.current_app_executor = ThreadPoolExecutor(1) + + self.web_socket_message_processor = ThreadPoolExecutor(1) + self.web_socket_message_processor.submit(self.process_web_socket_messages) + self.web_socket_message_workers = ThreadPoolExecutor(max_workers=concurrency) + + # NOTE: only global settings is provided by the library + websocket.enableTrace(trace_enabled) + + self.http_proxy_host = http_proxy_host + self.http_proxy_port = http_proxy_port + self.http_proxy_auth = http_proxy_auth + self.proxy_type = proxy_type + + self.on_open_listeners = on_open_listeners or [] + self.on_message_listeners = on_message_listeners or [] + self.on_error_listeners = on_error_listeners or [] + self.on_close_listeners = on_close_listeners or [] + + def connect(self) -> None: + def on_open(ws: WebSocketApp): + if self.logger.level <= logging.DEBUG: + self.logger.debug("on_open invoked") + for listener in self.on_open_listeners: + listener(ws) + + def on_message(ws: WebSocketApp, message: str): + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"on_message invoked: (message: {message})") + self.enqueue_web_socket_message(message) + for listener in self.on_message_listeners: + listener(ws, message) + + def on_error(ws: WebSocketApp, error: Exception): + self.logger.error( + f"on_error invoked (error: {type(error).__name__}, message: {error})" + ) + for listener in self.on_error_listeners: + listener(ws, error) + + def on_close(ws: WebSocketApp): + if self.logger.level <= logging.DEBUG: + self.logger.debug("on_close invoked") + if self.auto_reconnect_enabled: + self.logger.info("Received CLOSE event. Going to reconnect...") + self.connect_to_new_endpoint() + for listener in self.on_close_listeners: + listener(ws) + + old_session: Optional[ + WebSocketApp + ] = None if self.current_session is None else self.current_session + self.current_session = websocket.WebSocketApp( + self.wss_uri, + on_open=on_open, + on_message=on_message, + on_error=on_error, + on_close=on_close, + ) + + def run_current_session(): + self.current_session.run_forever( + ping_interval=self.ping_interval, + http_proxy_host=self.http_proxy_host, + http_proxy_port=self.http_proxy_port, + http_proxy_auth=self.http_proxy_auth, + proxy_type=self.proxy_type, + ) + + def monitor_current_session(): + while True: + time.sleep(self.ping_interval) + try: + if self.auto_reconnect_enabled and ( + self.current_session is None + or self.current_session.sock is None + ): + self.logger.info( + "The session seems to be already closed. Going to reconnect..." + ) + self.connect_to_new_endpoint() + except Exception as e: + self.logger.error( + "Failed to check the current session or reconnect to the server " + f"(error: {type(e).__name__}, message: {e})" + ) + + self.current_app_executor.submit(run_current_session) + self.current_app_monitor.submit(monitor_current_session) + + self.logger.info("A new session has been established") + if old_session is not None: + old_session.close() + + def disconnect(self) -> None: + self.current_session.close() + + def send_web_socket_message(self, message: str) -> None: + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"Sending a message: {message}") + self.current_session.send(message) + + def close(self): + self.current_session.close() + self.current_app_monitor.shutdown() + self.current_app_executor.shutdown() + self.web_socket_message_processor.shutdown() + self.web_socket_message_workers.shutdown() diff --git a/slack_sdk/socket_mode/websockets/__init__.py b/slack_sdk/socket_mode/websockets/__init__.py new file mode 100644 index 000000000..0150a392d --- /dev/null +++ b/slack_sdk/socket_mode/websockets/__init__.py @@ -0,0 +1,158 @@ +import asyncio +import logging +from asyncio import Future +from logging import Logger +from asyncio import Queue +from typing import Union, Optional, List, Callable, Awaitable + +import websockets +from websockets.client import WebSocketClientProtocol + +from slack_sdk.socket_mode.async_client import AsyncBaseSocketModeClient +from slack_sdk.socket_mode.async_listeners import ( + AsyncWebSocketMessageListener, + AsyncSocketModeRequestListener, +) +from slack_sdk.socket_mode.request import SocketModeRequest +from slack_sdk.web.async_client import AsyncWebClient + + +class SocketModeClient(AsyncBaseSocketModeClient): + logger: Logger + web_client: AsyncWebClient + app_token: str + wss_uri: Optional[str] + auto_reconnect_enabled: bool + web_socket_message_queue: Queue + web_socket_message_listeners: List[ + Union[ + AsyncWebSocketMessageListener, + Callable[ + ["AsyncBaseSocketModeClient", dict, Optional[str]], Awaitable[None] + ], + ] + ] + socket_mode_request_listeners: List[ + Union[ + AsyncSocketModeRequestListener, + Callable[["AsyncBaseSocketModeClient", SocketModeRequest], Awaitable[None]], + ] + ] + + web_socket_message_receiver: Optional[Future] + web_socket_message_processor: Future + + ping_interval: float + current_session: Optional[WebSocketClientProtocol] + current_session_monitor: Optional[Future] + + def __init__( + self, + app_token: str, + logger: Optional[Logger] = None, + web_client: Optional[AsyncWebClient] = None, + auto_reconnect_enabled: bool = True, + ping_interval: float = 10, + ): + self.app_token = app_token + self.logger = logger or logging.getLogger(__name__) + self.web_client = web_client or AsyncWebClient() + self.auto_reconnect_enabled = auto_reconnect_enabled + self.ping_interval = ping_interval + self.wss_uri = None + self.web_socket_message_queue = Queue() + self.web_socket_message_listeners = [] + self.socket_mode_request_listeners = [] + self.current_session = None + self.current_session_monitor = None + + self.web_socket_message_receiver = None + self.web_socket_message_processor = asyncio.ensure_future( + self.process_web_socket_messages() + ) + + async def monitor_current_session(self) -> None: + while True: + await asyncio.sleep(self.ping_interval) + try: + if self.auto_reconnect_enabled and ( + self.current_session is None or self.current_session.closed + ): + self.logger.info( + "The session seems to be already closed. Going to reconnect..." + ) + await self.connect_to_new_endpoint() + except Exception as e: + self.logger.error( + "Failed to check the current session or reconnect to the server " + f"(error: {type(e).__name__}, message: {e})" + ) + + async def receive_web_socket_messages(self) -> None: + consecutive_error_count = 0 + while True: + try: + message = await self.current_session.recv() + if message is not None: + if isinstance(message, bytes): + message = message.decode("utf-8") + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"Received message: {message}") + await self.enqueue_web_socket_message(message) + consecutive_error_count = 0 + except Exception as e: + consecutive_error_count += 1 + self.logger.error( + f"Failed to receive or enqueue a message: {type(e).__name__}, {e}" + ) + if isinstance(e, websockets.ConnectionClosedError): + await asyncio.sleep(self.ping_interval) + else: + await asyncio.sleep(consecutive_error_count) + + async def connect(self): + if self.wss_uri is None: + self.wss_uri = await self.issue_new_wss_url() + old_session: Optional[ + WebSocketClientProtocol + ] = None if self.current_session is None else self.current_session + # NOTE: websockets does not support proxy settings + self.current_session = await websockets.connect( + uri=self.wss_uri, ping_interval=self.ping_interval, + ) + self.logger.info("A new session has been established") + + old_current_session_monitor = self.current_session_monitor + self.current_session_monitor = asyncio.ensure_future( + self.monitor_current_session() + ) + + old_web_socket_message_receiver = self.web_socket_message_receiver + self.web_socket_message_receiver = asyncio.ensure_future( + self.receive_web_socket_messages() + ) + + if old_session is not None: + await old_session.close() + if old_current_session_monitor is not None: + old_current_session_monitor.cancel() + if old_web_socket_message_receiver is not None: + old_web_socket_message_receiver.cancel() + self.logger.info("The old session has been abandoned") + + async def disconnect(self): + if self.current_session is not None: + await self.current_session.close() + + async def send_web_socket_message(self, message: str): + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"Sending a message: {message}") + await self.current_session.send(message) + + async def close(self): + self.disconnect() + self.web_socket_message_processor.cancel() + if self.current_session_monitor is not None: + self.current_session_monitor.cancel() + if self.web_socket_message_receiver is not None: + self.web_socket_message_receiver.cancel() diff --git a/slack_sdk/web/async_client.py b/slack_sdk/web/async_client.py index f49822688..fd6ee97a0 100644 --- a/slack_sdk/web/async_client.py +++ b/slack_sdk/web/async_client.py @@ -762,6 +762,15 @@ async def api_test(self, **kwargs) -> AsyncSlackResponse: """Checks API calling code.""" return await self.api_call("api.test", json=kwargs) + async def apps_connections_open( + self, *, app_token: str, **kwargs + ) -> AsyncSlackResponse: + """Get a new WSS URL for Socket Mode""" + kwargs.update({"token": app_token}) + return await self.api_call( + "apps.connections.open", http_verb="POST", params=kwargs + ) + async def apps_event_authorizations_list( self, event_context: str, **kwargs ) -> AsyncSlackResponse: diff --git a/slack_sdk/web/client.py b/slack_sdk/web/client.py index 4f0ae981d..25795e324 100644 --- a/slack_sdk/web/client.py +++ b/slack_sdk/web/client.py @@ -725,6 +725,11 @@ def api_test(self, **kwargs) -> SlackResponse: """Checks API calling code.""" return self.api_call("api.test", json=kwargs) + def apps_connections_open(self, *, app_token: str, **kwargs) -> SlackResponse: + """Get a new WSS URL for Socket Mode""" + kwargs.update({"token": app_token}) + return self.api_call("apps.connections.open", http_verb="POST", params=kwargs) + def apps_event_authorizations_list( self, event_context: str, **kwargs ) -> SlackResponse: diff --git a/slack_sdk/web/legacy_client.py b/slack_sdk/web/legacy_client.py index 5aec73c29..d44e48985 100644 --- a/slack_sdk/web/legacy_client.py +++ b/slack_sdk/web/legacy_client.py @@ -754,6 +754,13 @@ def api_test(self, **kwargs) -> Union[Future, SlackResponse]: """Checks API calling code.""" return self.api_call("api.test", json=kwargs) + def apps_connections_open( + self, *, app_token: str, **kwargs + ) -> Union[Future, SlackResponse]: + """Get a new WSS URL for Socket Mode""" + kwargs.update({"token": app_token}) + return self.api_call("apps.connections.open", http_verb="POST", params=kwargs) + def apps_event_authorizations_list( self, event_context: str, **kwargs ) -> Union[Future, SlackResponse]: