From b141afab15be44b14a6f251ac97335d29d1b70c1 Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Wed, 13 Sep 2023 16:08:37 +0100 Subject: [PATCH 1/9] set up async bot --- slack_bot/run.py | 18 +++++++++++------- slack_bot/slack_bot/bot/bot.py | 25 ++++++++++++++----------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/slack_bot/run.py b/slack_bot/run.py index b8b6ef7c..c0b5c6df 100755 --- a/slack_bot/run.py +++ b/slack_bot/run.py @@ -1,12 +1,12 @@ import argparse +import asyncio import logging import os import pathlib import sys -import threading -from slack_sdk.socket_mode import SocketModeClient -from slack_sdk.web import WebClient +from slack_sdk.socket_mode.aiohttp import SocketModeClient +from slack_sdk.web.async_client import AsyncWebClient from slack_bot import MODELS, Bot @@ -17,7 +17,7 @@ DEFAULT_HF_MODEL = "StabilityAI/stablelm-tuned-alpha-3b" -if __name__ == "__main__": +async def main(): # Parse command line arguments parser = argparse.ArgumentParser() parser.add_argument( @@ -197,14 +197,18 @@ # This app-level token will be used only for establishing a connection app_token=os.environ.get("SLACK_APP_TOKEN"), # You will be using this WebClient for performing Web API calls in listeners - web_client=WebClient(token=os.environ.get("SLACK_BOT_TOKEN")), + web_client=AsyncWebClient(token=os.environ.get("SLACK_BOT_TOKEN")), ) # Add a new listener to receive messages from Slack client.socket_mode_request_listeners.append(slack_bot) # Establish a WebSocket connection to the Socket Mode servers - client.connect() + await client.connect() # Listen for events logging.info("Listening for requests...") - threading.Event().wait() + await asyncio.sleep(float("inf")) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/slack_bot/slack_bot/bot/bot.py b/slack_bot/slack_bot/bot/bot.py index 4f7772bc..40a2ef6d 100644 --- a/slack_bot/slack_bot/bot/bot.py +++ b/slack_bot/slack_bot/bot/bot.py @@ -1,18 +1,19 @@ +import asyncio import logging -from slack_sdk.socket_mode import SocketModeClient -from slack_sdk.socket_mode.listeners import SocketModeRequestListener +from slack_sdk.socket_mode.aiohttp import SocketModeClient +from slack_sdk.socket_mode.async_listeners import AsyncSocketModeRequestListener from slack_sdk.socket_mode.request import SocketModeRequest from slack_sdk.socket_mode.response import SocketModeResponse from ..models.base import ResponseModel -class Bot(SocketModeRequestListener): +class Bot(AsyncSocketModeRequestListener): def __init__(self, model: ResponseModel) -> None: self.model = model - def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> None: + async def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> None: if req.type != "events_api": logging.info(f"Received unexpected request of type '{req.type}'") return None @@ -20,7 +21,7 @@ def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> None: # Acknowledge the request logging.info(f"Received an events_api request") response = SocketModeResponse(envelope_id=req.envelope_id) - client.send_socket_mode_response(response) + await client.send_socket_mode_response(response) try: # Extract event from payload @@ -48,12 +49,12 @@ def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> None: # If this is a direct message to REGinald... if event_type == "message" and event_subtype is None: - self.react(client, event["channel"], event["ts"]) + await self.react(client, event["channel"], event["ts"]) model_response = self.model.direct_message(message, user_id) # If @REGinald is mentioned in a channel elif event_type == "app_mention": - self.react(client, event["channel"], event["ts"]) + await self.react(client, event["channel"], event["ts"]) model_response = self.model.channel_mention(message, user_id) # Otherwise @@ -61,10 +62,10 @@ def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> None: logging.info(f"Received unexpected event of type '{event['type']}'.") return None - # Add an emoji and a reply as required + # Add a reply as required if model_response and model_response.message: logging.info(f"Posting reply {model_response.message}.") - client.web_client.chat_postMessage( + await client.web_client.chat_postMessage( channel=event["channel"], text=f"<@{user_id}>, you asked me: '{message}'.\n{model_response.message}", ) @@ -80,11 +81,13 @@ def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> None: ) raise - def react(self, client: SocketModeClient, channel: str, timestamp: str) -> None: + async def react( + self, client: SocketModeClient, channel: str, timestamp: str + ) -> None: """Emoji react to the input message""" if self.model.emoji: logging.info(f"Reacting with emoji {self.model.emoji}.") - client.web_client.reactions_add( + await client.web_client.reactions_add( name=self.model.emoji, channel=channel, timestamp=timestamp, From 941cc4133f3a71dcbce7f67cd90fb690540c1876 Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Thu, 14 Sep 2023 10:59:38 +0100 Subject: [PATCH 2/9] make sure hello model works --- slack_bot/run.py | 15 +++++++++------ slack_bot/slack_bot/models/hello.py | 4 ++++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/slack_bot/run.py b/slack_bot/run.py index c0b5c6df..83487c9d 100755 --- a/slack_bot/run.py +++ b/slack_bot/run.py @@ -175,12 +175,15 @@ async def main(): else: model_args = {} - response_model = model( - force_new_index=force_new_index, - data_dir=data_dir, - which_index=which_index, - **model_args, - ) + if model_name == "hello": + response_model = model() + else: + response_model = model( + force_new_index=force_new_index, + data_dir=data_dir, + which_index=which_index, + **model_args, + ) # Initialise Bot with response model logging.info(f"Initalising bot with model: {response_model}") diff --git a/slack_bot/slack_bot/models/hello.py b/slack_bot/slack_bot/models/hello.py index 9c9e54fa..85e91fef 100644 --- a/slack_bot/slack_bot/models/hello.py +++ b/slack_bot/slack_bot/models/hello.py @@ -1,3 +1,5 @@ +import time + from .base import MessageResponse, ResponseModel @@ -6,7 +8,9 @@ def __init__(self): super().__init__(emoji="wave") def direct_message(self, message: str, user_id: str) -> MessageResponse: + time.sleep(5) return MessageResponse("Let's discuss this in a channel!") def channel_mention(self, message: str, user_id: str) -> MessageResponse: + time.sleep(5) return MessageResponse(f"Hello <@{user_id}>") From babd4bca436324dffc6ffd1c56df2a5e772ce948 Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Thu, 14 Sep 2023 16:06:35 +0100 Subject: [PATCH 3/9] ignore hidden messages --- slack_bot/slack_bot/bot/bot.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/slack_bot/slack_bot/bot/bot.py b/slack_bot/slack_bot/bot/bot.py index 40a2ef6d..0787e98f 100644 --- a/slack_bot/slack_bot/bot/bot.py +++ b/slack_bot/slack_bot/bot/bot.py @@ -26,11 +26,13 @@ async def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> No try: # Extract event from payload event = req.payload["event"] - sender_is_bot = "bot_id" in event # Ignore messages from bots - if sender_is_bot: - logging.info(f"Ignoring an event triggered by a bot.") + if event.get("bot_id") is not None: + logging.info("Ignoring an event triggered by a bot.") + return None + if event.get("hidden") is not None: + logging.info("Ignoring hidden message.") return None # Extract user and message information From 47cce58f852713c0668d4067a4fc3f96898b7658 Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Fri, 15 Sep 2023 09:51:21 +0100 Subject: [PATCH 4/9] add queue --- slack_bot/slack_bot/bot/bot.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/slack_bot/slack_bot/bot/bot.py b/slack_bot/slack_bot/bot/bot.py index 488ae3f6..5060febd 100644 --- a/slack_bot/slack_bot/bot/bot.py +++ b/slack_bot/slack_bot/bot/bot.py @@ -12,8 +12,34 @@ class Bot(AsyncSocketModeRequestListener): def __init__(self, model: ResponseModel) -> None: self.model = model + self.queue = asyncio.Queue(maxsize=3) async def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> None: + self.queue.put_nowait(self._process_request(client, req)) + print(f"There are currently {self.queue.qsize()} items in the queue.") + + # Create three worker tasks to process the queue concurrently. + tasks = [] + for i in range(3): + task = asyncio.create_task(self.worker(self.queue)) + tasks.append(task) + + await self.queue.join() + + for task in tasks: + task.cancel() + + @staticmethod + async def worker(queue): + while True: + coro = await queue.get() + await coro + # Notify the queue that the "work item" has been processed. + queue.task_done() + + async def _process_request( + self, client: SocketModeClient, req: SocketModeRequest + ) -> None: if req.type != "events_api": logging.info(f"Received unexpected request of type '{req.type}'") return None From 461ec7e212ec391aa873409387679d5a0a65bbd0 Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Fri, 15 Sep 2023 11:42:58 +0100 Subject: [PATCH 5/9] fix f"strings --- slack_bot/slack_bot/bot/bot.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/slack_bot/slack_bot/bot/bot.py b/slack_bot/slack_bot/bot/bot.py index 5060febd..6aed4687 100644 --- a/slack_bot/slack_bot/bot/bot.py +++ b/slack_bot/slack_bot/bot/bot.py @@ -45,7 +45,7 @@ async def _process_request( return None # Acknowledge the request - logging.info(f"Received an events_api request") + logging.info("Received an events_api request") response = SocketModeResponse(envelope_id=req.envelope_id) await client.send_socket_mode_response(response) @@ -93,7 +93,7 @@ async def _process_request( text=f"<@{user_id}>, you asked me: '{message}'.\n{model_response.message}", ) else: - logging.info(f"No reply was generated.") + logging.info("No reply was generated.") except KeyError as exc: logging.warning(f"Attempted to access key that does not exist.\n{str(exc)}") @@ -116,4 +116,4 @@ async def react( timestamp=timestamp, ) else: - logging.info(f"No emoji defined for this model.") + logging.info("No emoji defined for this model.") From f87487165efe06a62661fae337339245827abc3a Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Fri, 15 Sep 2023 14:20:25 +0100 Subject: [PATCH 6/9] change print to logging.info --- slack_bot/slack_bot/bot/bot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slack_bot/slack_bot/bot/bot.py b/slack_bot/slack_bot/bot/bot.py index 6aed4687..6d337821 100644 --- a/slack_bot/slack_bot/bot/bot.py +++ b/slack_bot/slack_bot/bot/bot.py @@ -16,7 +16,7 @@ def __init__(self, model: ResponseModel) -> None: async def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> None: self.queue.put_nowait(self._process_request(client, req)) - print(f"There are currently {self.queue.qsize()} items in the queue.") + logging.info(f"There are currently {self.queue.qsize()} items in the queue.") # Create three worker tasks to process the queue concurrently. tasks = [] From f71537049b2d27430cbddd6b231b623b52cb0f45 Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Fri, 15 Sep 2023 14:20:44 +0100 Subject: [PATCH 7/9] increase queue size limit --- slack_bot/slack_bot/bot/bot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slack_bot/slack_bot/bot/bot.py b/slack_bot/slack_bot/bot/bot.py index 6d337821..b612e842 100644 --- a/slack_bot/slack_bot/bot/bot.py +++ b/slack_bot/slack_bot/bot/bot.py @@ -12,7 +12,7 @@ class Bot(AsyncSocketModeRequestListener): def __init__(self, model: ResponseModel) -> None: self.model = model - self.queue = asyncio.Queue(maxsize=3) + self.queue = asyncio.Queue(maxsize=10) async def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> None: self.queue.put_nowait(self._process_request(client, req)) From 2f9e209598dac010108f45180ab14d88b1e13ac9 Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Fri, 15 Sep 2023 14:20:54 +0100 Subject: [PATCH 8/9] stop waiting for tasks to finish --- slack_bot/slack_bot/bot/bot.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/slack_bot/slack_bot/bot/bot.py b/slack_bot/slack_bot/bot/bot.py index b612e842..f1aedde9 100644 --- a/slack_bot/slack_bot/bot/bot.py +++ b/slack_bot/slack_bot/bot/bot.py @@ -24,10 +24,10 @@ async def __call__(self, client: SocketModeClient, req: SocketModeRequest) -> No task = asyncio.create_task(self.worker(self.queue)) tasks.append(task) - await self.queue.join() + # await self.queue.join() - for task in tasks: - task.cancel() + # for task in tasks: + # task.cancel() @staticmethod async def worker(queue): From bfb9ea95c8707c4aed6a284a753ba80bf7253eb4 Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Fri, 15 Sep 2023 16:07:16 +0100 Subject: [PATCH 9/9] fix stale connection error --- slack_bot/run.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/slack_bot/run.py b/slack_bot/run.py index ace47656..4475e5de 100755 --- a/slack_bot/run.py +++ b/slack_bot/run.py @@ -214,12 +214,14 @@ async def main(): logging.error("SLACK_APP_TOKEN is not set") sys.exit(1) - # Initialize SocketModeClient with an app-level token + WebClient + # Initialize SocketModeClient with an app-level token + AsyncWebClient client = SocketModeClient( # This app-level token will be used only for establishing a connection app_token=os.environ.get("SLACK_APP_TOKEN"), - # You will be using this WebClient for performing Web API calls in listeners + # You will be using this AsyncWebClient for performing Web API calls in listeners web_client=AsyncWebClient(token=os.environ.get("SLACK_BOT_TOKEN")), + # To ensure connection doesn't go stale - we can adjust as needed. + ping_interval=60, ) # Add a new listener to receive messages from Slack