Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding queuing system #89

Merged
merged 11 commits into from
Sep 15, 2023
Merged

Adding queuing system #89

merged 11 commits into from
Sep 15, 2023

Conversation

rwood-97
Copy link
Contributor

This PR:

  • Converts to using AsyncWebClient, aiohttp SocketModeClient and AsyncSocketModeRequestListener
  • Sets up queuing system so that multiple messages in a row no longer cause error
  • Adds ping_interval argument to SocketModeClient to resolve the timeout/stale connection error

Behaviour now if you send two messages in quick succession is:

  • Message 1 is emoji'd and response is generated
  • Message 2 just sits
  • Once Message 1 response is done generating, message response is posted
  • Message 2 is emoji'd and response is generated
  • Once Message 2 response is done gnerating, message response is posted

i.e. Slack:
Rosie: Hi
Rosie: How are you?
Reginald: Hi
Reginald: I'm great thanks for asking!

Chat history:
Rosie: Hi
Regianld: Hi
Rosie: How are you?
Reginald: I'm great thanks for asking!

@rwood-97
Copy link
Contributor Author

Fixes #83

@rchan26
Copy link
Collaborator

rchan26 commented Sep 15, 2023

Seemingly getting an issue where if I send two messages in quick succession, it ignores the second:

Screenshot 2023-09-15 at 17 49 20
2023-09-15 17:44:34 [    INFO] There are currently 1 items in the queue.
2023-09-15 17:44:34 [    INFO] Received an events_api request
2023-09-15 17:44:34 [    INFO] Processing message 'hello' from user 'U05BM5KRRK9'.
2023-09-15 17:44:34 [    INFO] Reacting with emoji llama.
Batches: 100%|█████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 15.48it/s]
Llama.generate: prefix-match hit

llama_print_timings:        load time = 18572.72 ms
llama_print_timings:      sample time =    22.06 ms /    31 runs   (    0.71 ms per token,  1405.19 tokens per second)
llama_print_timings: prompt eval time = 68055.50 ms /  3676 tokens (   18.51 ms per token,    54.01 tokens per second)
llama_print_timings:        eval time =  3492.64 ms /    30 runs   (  116.42 ms per token,     8.59 tokens per second)
llama_print_timings:       total time = 71616.97 ms
2023-09-15 17:45:46 [ WARNING] Was expecting a backend response with a regular expression but couldn't find a match.
2023-09-15 17:45:46 [    INFO] Posting reply   Hello! It's nice to meet you. Is there something I can help you with or would you like to chat about something in particular?


I read the following documents to compose this answer:
https://alan-turing-institute.github.io/REG-handbook/docs/regular_events/drop-in_sessions/ (similarity: 0.252)

https://alan-turing-institute.github.io/REG-handbook/docs/onboarding/buddy_system/ (similarity: 0.242)

https://alan-turing-institute.github.io/REG-handbook/docs/onboarding/new_joiners/first_few_days/ (similarity: 0.239).
2023-09-15 17:45:46 [ WARNING] Failed to send a ping message (s_1632077409): Cannot write to closing transport
2023-09-15 17:45:46 [   ERROR] Task exception was never retrieved
future: <Task finished name='Task-51' coro=<WebSocketWriter.ping() done, defined at /Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py:672> exception=ConnectionResetError('Cannot write to closing transport')>
Traceback (most recent call last):
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 676, in ping
    await self._send_frame(message, WSMsgType.PING)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 646, in _send_frame
    self._write(header + mask + message)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 663, in _write
    raise ConnectionResetError("Cannot write to closing transport")
ConnectionResetError: Cannot write to closing transport
2023-09-15 17:45:46 [    INFO] There are currently 1 items in the queue.
2023-09-15 17:45:46 [    INFO] Received an events_api request
2023-09-15 17:45:46 [   ERROR] Task exception was never retrieved
future: <Task finished name='Task-11' coro=<Bot.worker() done, defined at /Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py:32> exception=ConnectionResetError('Cannot write to closing transport')>
Traceback (most recent call last):
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/socket_mode/aiohttp/__init__.py", line 399, in send_message
    await self.current_session.send_str(message)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/client_ws.py", line 151, in send_str
    await self._writer.send(data, binary=False, compress=compress)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 690, in send
    await self._send_frame(message, WSMsgType.TEXT, compress)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 646, in _send_frame
    self._write(header + mask + message)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 663, in _write
    raise ConnectionResetError("Cannot write to closing transport")
ConnectionResetError: Cannot write to closing transport

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py", line 36, in worker
    await coro
  File "/Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py", line 50, in _process_request
    await client.send_socket_mode_response(response)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/socket_mode/async_client.py", line 96, in send_socket_mode_response
    await self.send_message(json.dumps(response.to_dict()))
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/socket_mode/aiohttp/__init__.py", line 413, in send_message
    await self.current_session.send_str(message)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/client_ws.py", line 151, in send_str
    await self._writer.send(data, binary=False, compress=compress)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 690, in send
    await self._send_frame(message, WSMsgType.TEXT, compress)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 646, in _send_frame
    self._write(header + mask + message)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 663, in _write
    raise ConnectionResetError("Cannot write to closing transport")
ConnectionResetError: Cannot write to closing transport
2023-09-15 17:46:46 [ WARNING] Failed to send a ping message (s_1632077409): Cannot write to closing transport
2023-09-15 17:46:47 [   ERROR] Task exception was never retrieved
future: <Task finished name='Task-57' coro=<WebSocketWriter.ping() done, defined at /Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py:672> exception=ConnectionResetError('Cannot write to closing transport')>
Traceback (most recent call last):
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 676, in ping
    await self._send_frame(message, WSMsgType.PING)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 646, in _send_frame
    self._write(header + mask + message)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 663, in _write
    raise ConnectionResetError("Cannot write to closing transport")
ConnectionResetError: Cannot write to closing transport
2023-09-15 17:47:46 [    INFO] The session (s_1632077409) seems to be already closed. Reconnecting...
2023-09-15 17:47:46 [    INFO] A new session (s_385456357) has been established
2023-09-15 17:47:46 [    INFO] The old session (s_1632077409) has been abandoned

A new session does spawn though and awaits a new message, but the second message I sent is lost

2023-09-15 17:48:33 [    INFO] There are currently 1 items in the queue.
2023-09-15 17:48:33 [    INFO] There are currently 2 items in the queue.
2023-09-15 17:48:33 [    INFO] Received an events_api request
2023-09-15 17:48:33 [    INFO] Ignoring hidden message.
2023-09-15 17:48:33 [    INFO] Received an events_api request
2023-09-15 17:48:33 [    INFO] Ignoring an event triggered by a bot.

@rchan26
Copy link
Collaborator

rchan26 commented Sep 15, 2023

ah no wait, sorry. it did eventually acknowledge and do it. After 2/3 minutes, it acknowledged that there was another thing in the queue and processed it.

2023-09-15 17:50:45 [    INFO] There are currently 1 items in the queue.
2023-09-15 17:50:45 [    INFO] Received an events_api request
2023-09-15 17:50:45 [    INFO] Processing message 'what should a new starter in reg do?' from user 'U05BM5KRRK9'.
2023-09-15 17:50:45 [    INFO] Reacting with emoji llama.
Batches: 100%|█████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  9.02it/s]
Llama.generate: prefix-match hit

llama_print_timings:        load time = 18572.72 ms
llama_print_timings:      sample time =   117.67 ms /   165 runs   (    0.71 ms per token,  1402.27 tokens per second)
llama_print_timings: prompt eval time = 75226.43 ms /  3886 tokens (   19.36 ms per token,    51.66 tokens per second)
llama_print_timings:        eval time = 19653.66 ms /   164 runs   (  119.84 ms per token,     8.34 tokens per second)
llama_print_timings:       total time = 95227.72 ms
2023-09-15 17:52:20 [ WARNING] Was expecting a backend response with a regular expression but couldn't find a match.
2023-09-15 17:52:20 [    INFO] Posting reply   As a new starter in REG, there are several things you can do to get started and settle into your role:

1. Familiarize yourself with the REG handbook and repository: These provide information on how we work, our practices, tools, and languages.
2. Set up your machine and coding environment: Please see the "Systems Setup" page in the REG handbook for instructions on setting up your machine and coding environment.
3. Introduce yourself to your buddies: Your buddies can answer any questions you may have, provide guidance, and offer support during your onboarding process.
4. Attend the welcome coffee: This is a great opportunity to meet other members of the group and get a feel for how we work


I read the following documents to compose this answer:
https://alan-turing-institute.github.io/REG-handbook/docs/onboarding/new_joiners/ (similarity: 0.452)

https://alan-turing-institute.github.io/REG-handbook/docs/onboarding/new_joiners/checklist/ (similarity: 0.441)

https://alan-turing-institute.github.io/REG-handbook/docs/onboarding/new_joiners/first_few_days/ (similarity: 0.405).
2023-09-15 17:52:20 [ WARNING] Failed to send a ping message (s_385456357): Cannot write to closing transport
2023-09-15 17:52:20 [   ERROR] Task exception was never retrieved
future: <Task finished name='Task-80' coro=<WebSocketWriter.ping() done, defined at /Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py:672> exception=ConnectionResetError('Cannot write to closing transport')>
Traceback (most recent call last):
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 676, in ping
    await self._send_frame(message, WSMsgType.PING)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 646, in _send_frame
    self._write(header + mask + message)
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/aiohttp/http_websocket.py", line 663, in _write
    raise ConnectionResetError("Cannot write to closing transport")
ConnectionResetError: Cannot write to closing transport

@rchan26
Copy link
Collaborator

rchan26 commented Sep 15, 2023

I just tried this out with OpenAI model and tried to spam it, but it was able to reply to each message. Still some errors in the logs but it doesn't crash seemingly:

Payload: {'token': 'OEqSunrvpkP80Su25M5fdu4W', 'team_id': 'T057SAAS0K0', 'context_team_id': 'T057SAAS0K0', 'context_enterprise_id': None, 'api_app_id': 'A05RS0R47SA', 'event': {'client_msg_id': '8c11fd63-1ad5-4529-84d1-f91fc4657d59', 'type': 'message', 'text': 'hello, what is the project allocation process?', 'user': 'U05BM5KRRK9', 'ts': '1694797143.911529', 'blocks': [{'type': 'rich_text', 'block_id': '/Kqoo', 'elements': [{'type': 'rich_text_section', 'elements': [{'type': 'text', 'text': 'hello, what is the project allocation process?'}]}]}], 'team': 'T057SAAS0K0', 'channel': 'D05RJ4W5C79', 'event_ts': '1694797143.911529', 'channel_type': 'im'}, 'type': 'event_callback', 'event_id': 'Ev05SYECF7UH', 'event_time': 1694797143, 'authorizations': [{'enterprise_id': None, 'team_id': 'T057SAAS0K0', 'user_id': 'U05RJ4R1V1D', 'is_bot': True, 'is_enterprise_install': False}], 'is_ext_shared_channel': False, 'event_context': '4-eyJldCI6Im1lc3NhZ2UiLCJ0aWQiOiJUMDU3U0FBUzBLMCIsImFpZCI6IkEwNVJTMFI0N1NBIiwiY2lkIjoiRDA1Uko0VzVDNzkifQ'}.
The request to the Slack API failed. (url: https://www.slack.com/api/reactions.add)
The server responded with: {'ok': False, 'error': 'already_reacted'}
2023-09-15 18:00:18 [   ERROR] Task exception was never retrieved
future: <Task finished name='Task-76' coro=<Bot.worker() done, defined at /Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py:32> exception=SlackApiError("The request to the Slack API failed. (url: https://www.slack.com/api/reactions.add)\nThe server responded with: {'ok': False, 'error': 'already_reacted'}")>
Traceback (most recent call last):
  File "/Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py", line 36, in worker
    await coro
  File "/Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py", line 75, in _process_request
    await self.react(client, event["channel"], event["ts"])
  File "/Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py", line 113, in react
    await client.web_client.reactions_add(
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/web/async_client.py", line 3818, in reactions_add
    return await self.api_call("reactions.add", params=kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/web/async_base_client.py", line 161, in api_call
    return await self._send(
           ^^^^^^^^^^^^^^^^^
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/web/async_base_client.py", line 201, in _send
    return AsyncSlackResponse(**{**data, **res}).validate()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/web/async_slack_response.py", line 203, in validate
    raise e.SlackApiError(message=msg, response=self)
slack_sdk.errors.SlackApiError: The request to the Slack API failed. (url: https://www.slack.com/api/reactions.add)
The server responded with: {'ok': False, 'error': 'already_reacted'}
2023-09-15 18:00:19 [    INFO] There are currently 1 items in the queue.
2023-09-15 18:00:19 [    INFO] Received an events_api request
2023-09-15 18:00:19 [    INFO] Processing message 'hello, what is the project allocation process? 5' from user 'U05BM5KRRK9'.
2023-09-15 18:00:19 [    INFO] Reacting with emoji llama.
2023-09-15 18:00:19 [   ERROR] Something went wrong in processing a Slack request.
Payload: {'token': 'OEqSunrvpkP80Su25M5fdu4W', 'team_id': 'T057SAAS0K0', 'context_team_id': 'T057SAAS0K0', 'context_enterprise_id': None, 'api_app_id': 'A05RS0R47SA', 'event': {'client_msg_id': '1a1d5403-e820-4e8f-94c8-fc26eb6385f9', 'type': 'message', 'text': 'hello, what is the project allocation process? 5', 'user': 'U05BM5KRRK9', 'ts': '1694797146.619729', 'blocks': [{'type': 'rich_text', 'block_id': 'RABR', 'elements': [{'type': 'rich_text_section', 'elements': [{'type': 'text', 'text': 'hello, what is the project allocation process? 5'}]}]}], 'team': 'T057SAAS0K0', 'channel': 'D05RJ4W5C79', 'event_ts': '1694797146.619729', 'channel_type': 'im'}, 'type': 'event_callback', 'event_id': 'Ev05SYECT941', 'event_time': 1694797146, 'authorizations': [{'enterprise_id': None, 'team_id': 'T057SAAS0K0', 'user_id': 'U05RJ4R1V1D', 'is_bot': True, 'is_enterprise_install': False}], 'is_ext_shared_channel': False, 'event_context': '4-eyJldCI6Im1lc3NhZ2UiLCJ0aWQiOiJUMDU3U0FBUzBLMCIsImFpZCI6IkEwNVJTMFI0N1NBIiwiY2lkIjoiRDA1Uko0VzVDNzkifQ'}.
The request to the Slack API failed. (url: https://www.slack.com/api/reactions.add)
The server responded with: {'ok': False, 'error': 'already_reacted'}
2023-09-15 18:00:19 [   ERROR] Task exception was never retrieved
future: <Task finished name='Task-77' coro=<Bot.worker() done, defined at /Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py:32> exception=SlackApiError("The request to the Slack API failed. (url: https://www.slack.com/api/reactions.add)\nThe server responded with: {'ok': False, 'error': 'already_reacted'}")>
Traceback (most recent call last):
  File "/Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py", line 36, in worker
    await coro
  File "/Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py", line 75, in _process_request
    await self.react(client, event["channel"], event["ts"])
  File "/Users/rchan/reginald/slack_bot/slack_bot/bot/bot.py", line 113, in react
    await client.web_client.reactions_add(
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/web/async_client.py", line 3818, in reactions_add
    return await self.api_call("reactions.add", params=kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/web/async_base_client.py", line 161, in api_call
    return await self._send(
           ^^^^^^^^^^^^^^^^^
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/web/async_base_client.py", line 201, in _send
    return AsyncSlackResponse(**{**data, **res}).validate()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/rchan/opt/miniconda3/envs/reginald/lib/python3.11/site-packages/slack_sdk/web/async_slack_response.py", line 203, in validate
    raise e.SlackApiError(message=msg, response=self)
slack_sdk.errors.SlackApiError: The request to the Slack API failed. (url: https://www.slack.com/api/reactions.add)
The server responded with: {'ok': False, 'error': 'already_reacted'}
2023-09-15 18:00:19 [    INFO] There are currently 1 items in the queue.
2023-09-15 18:00:19 [    INFO] Received an events_api request
2023-09-15 18:00:19 [    INFO] Ignoring an event triggered by a bot.
2023-09-15 18:00:35 [    INFO] There are currently 1 items in the queue.
2023-09-15 18:00:35 [    INFO] Received an events_api request
2023-09-15 18:00:35 [    INFO] Ignoring an event triggered by a bot.
2023-09-15 18:00:35 [    INFO] There are currently 1 items in the queue.
2023-09-15 18:00:35 [    INFO] Received an events_api request
2023-09-15 18:00:35 [    INFO] Ignoring an event triggered by a bot.
2023-09-15 18:00:35 [    INFO] There are currently 1 items in the queue.
2023-09-15 18:00:35 [    INFO] Received an events_api request
2023-09-15 18:00:35 [    INFO] Ignoring an event triggered by a bot.
2023-09-15 18:00:36 [    INFO] There are currently 1 items in the queue.
2023-09-15 18:00:36 [    INFO] Received an events_api request
2023-09-15 18:00:36 [    INFO] Ignoring hidden message.

@rchan26 rchan26 merged commit 7039775 into main Sep 15, 2023
1 check passed
@rchan26 rchan26 deleted the llama2-query-queue branch September 15, 2023 17:08
@rwood-97
Copy link
Contributor Author

So I think, the "Cannot write to closing transport" is an error due to the ping interval being too fast. See #83

The code essentially does ping interval * 4 to decide whether the connection is stale and often our models are taking longer than that time to generate so then it things its stale and shuts it down:

    if self.last_ping_pong_time is None:
        return False
    disconnected_seconds = int(time.time() - self.last_ping_pong_time)
    return disconnected_seconds >= (self.ping_interval * 4)

I think this can be fixed by:

  • longer ping interval
  • better machine e.g. once we have a GPU going

Still not sure how to fix:

  • How to acknowledge message 2 whilst message 1 is being processed.

rchan26 added a commit that referenced this pull request Sep 27, 2023
rchan26 added a commit that referenced this pull request Nov 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants